diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index 4a293d80bf..07399cf3bd 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -63,42 +63,6 @@ class Pstream : public UPstream { - // Private Static Functions - - //- Exchange \em contiguous data. - //- Sends sendBufs, receives into recvBufs. - // Data provided and received as container. - // - // No internal guards or resizing. - template - static void exchangeContainer - ( - const UList& sendBufs, - const labelUList& recvSizes, //!< Num of recv elements (not bytes) - List& recvBufs, - const int tag, - const label comm, - const bool wait //!< Wait for requests to complete - ); - - //- Exchange \em contiguous data. - //- Sends sendBufs, receives into recvBufs. - // Data provided and received as pointers. - // - // No internal guards or resizing. - template - static void exchangeBuf - ( - const labelUList& sendSizes, //!< Num of send elements (not bytes) - const UList& sendBufs, - const labelUList& recvSizes, //!< Num of recv elements (not bytes) - List& recvBufs, - const int tag, - const label comm, - const bool wait //!< Wait for requests to complete - ); - - protected: // Protected Data @@ -504,62 +468,62 @@ public: ); - // Exchange + // Exchange - //- Helper: exchange sizes of sendData for specified - //- set of send/receive processes. - template - static void exchangeSizes - ( - const labelUList& sendProcs, - const labelUList& recvProcs, - const Container& sendData, - labelList& sizes, - const label tag = UPstream::msgType(), - const label comm = UPstream::worldComm - ); + //- Helper: exchange sizes of sendBufs for specified + //- set of send/receive processes. + template + static void exchangeSizes + ( + const labelUList& sendProcs, + const labelUList& recvProcs, + const Container& sendBufs, + labelList& sizes, + const label tag = UPstream::msgType(), + const label comm = UPstream::worldComm + ); - //- Helper: exchange sizes of sendData. - //- The sendData is the data per processor (in the communicator). - // Returns sizes of sendData on the sending processor. - // \n - // For \b non-parallel : copy sizes from sendData directly. - template - static void exchangeSizes - ( - const Container& sendData, - labelList& sizes, - const label comm = UPstream::worldComm - ); + //- Helper: exchange sizes of sendBufs. + //- The sendBufs is the data per processor (in the communicator). + // Returns sizes of sendBufs on the sending processor. + // \n + // For \b non-parallel : copy sizes from sendBufs directly. + template + static void exchangeSizes + ( + const Container& sendBufs, + labelList& recvSizes, + const label comm = UPstream::worldComm + ); + //- Helper: exchange \em contiguous data. + //- Sends sendBufs, receives into recvBufs using predetermined receive + //- sizing. + // If wait=true will wait for all transfers to finish. + template + static void exchange + ( + const UList& sendBufs, + const labelUList& recvSizes, + List& recvBufs, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const bool wait = true //!< Wait for requests to complete + ); - //- Helper: exchange \em contiguous data. - //- Sends sendData, receives into recvData. - // If wait=true will wait for all transfers to finish. - template - static void exchange - ( - const UList& sendData, - const labelUList& recvSizes, - List& recvData, - const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm, - const bool wait = true //!< Wait for requests to complete - ); - - //- Exchange \em contiguous data. - //- Sends sendData, receives into recvData. - //- Determines sizes to receive. - // If wait=true will wait for all transfers to finish. - template - static void exchange - ( - const UList& sendData, - List& recvData, - const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm, - const bool wait = true //!< Wait for requests to complete - ); + //- Exchange \em contiguous data. + //- Sends sendBufs, receives into recvBufs. + //- Determines sizes to receive. + // If wait=true will wait for all transfers to finish. + template + static void exchange + ( + const UList& sendBufs, + List& recvBufs, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const bool wait = true //!< Wait for requests to complete + ); // Non-blocking exchange @@ -636,7 +600,6 @@ public: #include "PstreamGather.C" #include "PstreamCombineGather.C" #include "PstreamGatherList.C" - #include "PstreamExchangeConsensus.C" #include "PstreamExchange.C" #endif diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C index 5e4aa2130f..9979998a8c 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2016 OpenFOAM Foundation - Copyright (C) 2016-2022 OpenCFD Ltd. + Copyright (C) 2016-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -24,43 +24,300 @@ License You should have received a copy of the GNU General Public License along with OpenFOAM. If not, see . -Description - Exchange data. - \*---------------------------------------------------------------------------*/ #include "Pstream.H" #include "contiguous.H" #include "PstreamReduceOps.H" -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // +// * * * * * * * * * * * * * * * * * Details * * * * * * * * * * * * * * * * // +namespace Foam +{ +namespace PstreamDetail +{ + +//- Setup sends and receives, each specified as [rank, span] tuple +// The serial list of tuples can be populated from other lists, from maps +// of data or subsets of lists/maps etc. +template +void exchangeBuf +( + const UList>>& sends, + const UList>>& recvs, + + const int tag, + const label comm, + const bool wait +) +{ + const label startOfRequests = UPstream::nRequests(); + const int myProci = UPstream::myProcNo(comm); + + // Set up receives + // ~~~~~~~~~~~~~~~ + + for (auto& slot : recvs) + { + // [rank, span] + const auto proci = slot.first; + auto& payload = slot.second; + + if (proci != myProci && !payload.empty()) + { + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + payload.data_bytes(), + payload.size_bytes(), + tag, + comm + ); + } + } + + // Set up sends + // ~~~~~~~~~~~~ + + for (const auto& slot : sends) + { + // [rank, span] + const auto proci = slot.first; + const auto& payload = slot.second; + + if (proci != myProci && !payload.empty()) + { + if + ( + !UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + payload.cdata_bytes(), + payload.size_bytes(), + tag, + comm + ) + ) + { + FatalErrorInFunction + << "Cannot send outgoing message to:" + << proci << " nBytes:" + << label(payload.size_bytes()) + << Foam::abort(FatalError); + } + } + } + + // Wait for all to finish + // ~~~~~~~~~~~~~~~~~~~~~~ + + if (wait) + { + UPstream::waitRequests(startOfRequests); + } +} + + +//- Chunked exchange of \em contiguous data. +//- Setup sends and receives, each specified as [rank, span] tuple. +// The serial list of tuples can be populated from other lists, from +// maps of data or subsets of lists/maps etc. +template +void exchangeChunkedBuf +( + const UList>>& sends, + const UList>>& recvs, + + const int tag, + const label comm, + const bool wait +) +{ + typedef std::pair> sendTuple; + typedef std::pair> recvTuple; + + // Caller already checked for parRun and maxChunkSize > 0 + + { + // Determine the number of chunks to send. Note that we + // only have to look at the sending data since we are + // guaranteed that some processor's sending size is some other + // processor's receive size. Also we can ignore any local comms. + // + // We need to send chunks so the number of iterations: + // maxChunkSize iterations + // ------------ ---------- + // 0 0 + // 1..maxChunkSize 1 + // maxChunkSize+1..2*maxChunkSize 2 + // ... + + const label maxChunkSize = + ( + max + ( + static_cast