diff --git a/applications/test/parallel-chunks/Make/files b/applications/test/parallel-chunks/Make/files new file mode 100644 index 0000000000..55e0fad2c4 --- /dev/null +++ b/applications/test/parallel-chunks/Make/files @@ -0,0 +1,3 @@ +Test-parallel-chunks.C + +EXE = $(FOAM_USER_APPBIN)/Test-parallel-chunks diff --git a/applications/test/parallel-chunks/Make/options b/applications/test/parallel-chunks/Make/options new file mode 100644 index 0000000000..18e6fe47af --- /dev/null +++ b/applications/test/parallel-chunks/Make/options @@ -0,0 +1,2 @@ +/* EXE_INC = */ +/* EXE_LIBS = */ diff --git a/applications/test/parallel-chunks/Test-parallel-chunks.C b/applications/test/parallel-chunks/Test-parallel-chunks.C new file mode 100644 index 0000000000..1dc4cb7380 --- /dev/null +++ b/applications/test/parallel-chunks/Test-parallel-chunks.C @@ -0,0 +1,389 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2022 OpenCFD Ltd. +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see . + +Application + Test-parallel-chunks + +Description + Test for sending contiguous data in chunk-wise. + Largely mirrors Pstream::exchange or vice versa + +\*---------------------------------------------------------------------------*/ + +#include "List.H" +#include "argList.H" +#include "Time.H" +#include "IPstream.H" +#include "OPstream.H" +#include "IOstreams.H" + +using namespace Foam; + + +// Looks like Pstream::exchangeBuf +template +void do_exchangeBuf +( + const label sendSize, + const char* sendData, + const label recvSize, + char* recvData, + const int tag, + const label comm, + const bool wait +) +{ + const label startOfRequests = Pstream::nRequests(); + + // Set up receives + // ~~~~~~~~~~~~~~~ + + // forAll(recvSizes, proci) + { + // if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0) + if (!Pstream::master(comm) && recvSize > 0) + { + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + UPstream::myProcNo(comm), // proci, + recvData, + recvSize*sizeof(T), + tag, + comm + ); + } + } + + + // Set up sends + // ~~~~~~~~~~~~ + + // forAll(sendBufs, proci) + for (const int proci : Pstream::subProcs(comm)) + { + if (sendSize > 0) + // if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0) + { + if + ( + !UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + sendData, + sendSize*sizeof(T), + tag, + comm + ) + ) + { + FatalErrorInFunction + << "Cannot send outgoing message. " + << "to:" << proci << " nBytes:" + << label(sendSize*sizeof(T)) + << Foam::abort(FatalError); + } + } + } + + + // Wait for all to finish + // ~~~~~~~~~~~~~~~~~~~~~~ + + if (wait) + { + UPstream::waitRequests(startOfRequests); + } +} + + +// Looks like Pstream::exchangeContainer +template +void do_exchangeContainer +( + const Container& sendData, + const label recvSize, + Container& recvData, + const int tag, + const label comm, + const bool wait +) +{ + const label startOfRequests = Pstream::nRequests(); + + // Set up receives + // ~~~~~~~~~~~~~~~ + + // for (const int proci : Pstream::allProcs(comm)) + { + if (!Pstream::master(comm) && recvSize > 0) + // if (proci != Pstream::myProcNo(comm) && recvSize > 0) + { + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + UPstream::myProcNo(comm), // proci, + recvData.data_bytes(), + recvSize*sizeof(T), + tag, + comm + ); + } + } + + + // Set up sends + // ~~~~~~~~~~~~ + + if (Pstream::master(comm) && sendData.size() > 0) + { + for (const int proci : Pstream::subProcs(comm)) + { + if + ( + !UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + sendData.cdata_bytes(), + sendData.size_bytes(), + tag, + comm + ) + ) + { + FatalErrorInFunction + << "Cannot send outgoing message. " + << "to:" << proci << " nBytes:" + << label(sendData.size_bytes()) + << Foam::abort(FatalError); + } + } + } + + // Wait for all to finish + // ~~~~~~~~~~~~~~~~~~~~~~ + + if (wait) + { + UPstream::waitRequests(startOfRequests); + } +} + + +template +void broadcast_chunks +( + Container& sendData, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const bool wait = true +) +{ + // OR static_assert(is_contiguous::value, "Contiguous data only!") + if (!is_contiguous::value) + { + FatalErrorInFunction + << "Contiguous data only." << sizeof(T) << Foam::abort(FatalError); + } + + if (UPstream::maxCommsSize <= int(sizeof(T))) + { + // Do in one go + Info<< "send " << sendData.size() << " elements in one go" << endl; + Pstream::broadcast(sendData, comm); + return; + } + + label sendSize(sendData.size()); + Pstream::broadcast(sendSize, comm); + + label recvSize(sendSize); + + sendData.resize_nocopy(recvSize); // A no-op on master + + // Determine the number of chunks to send. Note that we + // only have to look at the sending data since we are + // guaranteed that some processor's sending size is some other + // processor's receive size. Also we can ignore any local comms. + + // We need to send bytes so the number of iterations: + // maxChunkSize iterations + // ------------ ---------- + // 0 0 + // 1..maxChunkSize 1 + // maxChunkSize+1..2*maxChunkSize 2 + // etc. + + const label maxChunkSize(UPstream::maxCommsSize/sizeof(T)); + + label nIter(0); + { + label nSendMax = 0; + // forAll(sendBufs, proci) + // { + // if (proci != Pstream::myProcNo(comm)) + // { + // nSendMax = max(nSendMax, sendBufs[proci].size()); + // } + // } + nSendMax = sendSize; + + if (nSendMax) + { + nIter = 1 + ((nSendMax-1)/maxChunkSize); + } + reduce(nIter, maxOp