From 341d9c402dbb8e83c7699b0f4d64406bd3e66d1a Mon Sep 17 00:00:00 2001 From: Mark Olesen Date: Thu, 3 Mar 2022 13:46:14 +0100 Subject: [PATCH] BUG: incorrect chunk handling in Pstream::exchange (fixes #2375) - used Pstream::maxCommsSize (bytes) for the lower limit when sending. This would have send more data on each iteration than expected based on maxCommsSize and finish with a number of useless iterations. Was generally not a serious bug since maxCommsSize (if used) was likely still far away from the MPI limits and exchange() is primarily harnessed by PstreamBuffers, which is sending character data (ie, number of elements and number of bytes is identical). --- applications/test/parallel-chunks/Make/files | 3 + .../test/parallel-chunks/Make/options | 2 + .../parallel-chunks/Test-parallel-chunks.C | 389 ++++++++++++++++++ .../db/IOstreams/Pstreams/PstreamExchange.C | 92 +++-- 4 files changed, 443 insertions(+), 43 deletions(-) create mode 100644 applications/test/parallel-chunks/Make/files create mode 100644 applications/test/parallel-chunks/Make/options create mode 100644 applications/test/parallel-chunks/Test-parallel-chunks.C diff --git a/applications/test/parallel-chunks/Make/files b/applications/test/parallel-chunks/Make/files new file mode 100644 index 0000000000..55e0fad2c4 --- /dev/null +++ b/applications/test/parallel-chunks/Make/files @@ -0,0 +1,3 @@ +Test-parallel-chunks.C + +EXE = $(FOAM_USER_APPBIN)/Test-parallel-chunks diff --git a/applications/test/parallel-chunks/Make/options b/applications/test/parallel-chunks/Make/options new file mode 100644 index 0000000000..18e6fe47af --- /dev/null +++ b/applications/test/parallel-chunks/Make/options @@ -0,0 +1,2 @@ +/* EXE_INC = */ +/* EXE_LIBS = */ diff --git a/applications/test/parallel-chunks/Test-parallel-chunks.C b/applications/test/parallel-chunks/Test-parallel-chunks.C new file mode 100644 index 0000000000..1dc4cb7380 --- /dev/null +++ b/applications/test/parallel-chunks/Test-parallel-chunks.C @@ -0,0 +1,389 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2022 OpenCFD Ltd. +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see . + +Application + Test-parallel-chunks + +Description + Test for sending contiguous data in chunk-wise. + Largely mirrors Pstream::exchange or vice versa + +\*---------------------------------------------------------------------------*/ + +#include "List.H" +#include "argList.H" +#include "Time.H" +#include "IPstream.H" +#include "OPstream.H" +#include "IOstreams.H" + +using namespace Foam; + + +// Looks like Pstream::exchangeBuf +template +void do_exchangeBuf +( + const label sendSize, + const char* sendData, + const label recvSize, + char* recvData, + const int tag, + const label comm, + const bool wait +) +{ + const label startOfRequests = Pstream::nRequests(); + + // Set up receives + // ~~~~~~~~~~~~~~~ + + // forAll(recvSizes, proci) + { + // if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0) + if (!Pstream::master(comm) && recvSize > 0) + { + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + UPstream::myProcNo(comm), // proci, + recvData, + recvSize*sizeof(T), + tag, + comm + ); + } + } + + + // Set up sends + // ~~~~~~~~~~~~ + + // forAll(sendBufs, proci) + for (const int proci : Pstream::subProcs(comm)) + { + if (sendSize > 0) + // if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0) + { + if + ( + !UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + sendData, + sendSize*sizeof(T), + tag, + comm + ) + ) + { + FatalErrorInFunction + << "Cannot send outgoing message. " + << "to:" << proci << " nBytes:" + << label(sendSize*sizeof(T)) + << Foam::abort(FatalError); + } + } + } + + + // Wait for all to finish + // ~~~~~~~~~~~~~~~~~~~~~~ + + if (wait) + { + UPstream::waitRequests(startOfRequests); + } +} + + +// Looks like Pstream::exchangeContainer +template +void do_exchangeContainer +( + const Container& sendData, + const label recvSize, + Container& recvData, + const int tag, + const label comm, + const bool wait +) +{ + const label startOfRequests = Pstream::nRequests(); + + // Set up receives + // ~~~~~~~~~~~~~~~ + + // for (const int proci : Pstream::allProcs(comm)) + { + if (!Pstream::master(comm) && recvSize > 0) + // if (proci != Pstream::myProcNo(comm) && recvSize > 0) + { + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + UPstream::myProcNo(comm), // proci, + recvData.data_bytes(), + recvSize*sizeof(T), + tag, + comm + ); + } + } + + + // Set up sends + // ~~~~~~~~~~~~ + + if (Pstream::master(comm) && sendData.size() > 0) + { + for (const int proci : Pstream::subProcs(comm)) + { + if + ( + !UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + sendData.cdata_bytes(), + sendData.size_bytes(), + tag, + comm + ) + ) + { + FatalErrorInFunction + << "Cannot send outgoing message. " + << "to:" << proci << " nBytes:" + << label(sendData.size_bytes()) + << Foam::abort(FatalError); + } + } + } + + // Wait for all to finish + // ~~~~~~~~~~~~~~~~~~~~~~ + + if (wait) + { + UPstream::waitRequests(startOfRequests); + } +} + + +template +void broadcast_chunks +( + Container& sendData, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const bool wait = true +) +{ + // OR static_assert(is_contiguous::value, "Contiguous data only!") + if (!is_contiguous::value) + { + FatalErrorInFunction + << "Contiguous data only." << sizeof(T) << Foam::abort(FatalError); + } + + if (UPstream::maxCommsSize <= int(sizeof(T))) + { + // Do in one go + Info<< "send " << sendData.size() << " elements in one go" << endl; + Pstream::broadcast(sendData, comm); + return; + } + + label sendSize(sendData.size()); + Pstream::broadcast(sendSize, comm); + + label recvSize(sendSize); + + sendData.resize_nocopy(recvSize); // A no-op on master + + // Determine the number of chunks to send. Note that we + // only have to look at the sending data since we are + // guaranteed that some processor's sending size is some other + // processor's receive size. Also we can ignore any local comms. + + // We need to send bytes so the number of iterations: + // maxChunkSize iterations + // ------------ ---------- + // 0 0 + // 1..maxChunkSize 1 + // maxChunkSize+1..2*maxChunkSize 2 + // etc. + + const label maxChunkSize(UPstream::maxCommsSize/sizeof(T)); + + label nIter(0); + { + label nSendMax = 0; + // forAll(sendBufs, proci) + // { + // if (proci != Pstream::myProcNo(comm)) + // { + // nSendMax = max(nSendMax, sendBufs[proci].size()); + // } + // } + nSendMax = sendSize; + + if (nSendMax) + { + nIter = 1 + ((nSendMax-1)/maxChunkSize); + } + reduce(nIter, maxOp