From b6af124b80733287c9bfa0c639de193c3cc8675e Mon Sep 17 00:00:00 2001 From: Mark Olesen Date: Fri, 10 Feb 2023 20:49:44 +0100 Subject: [PATCH] ENH: add Pstream chunked transfer detail - to service both List and Map exchanges with limited message sizes (termed 'data chunking' here) add a PstreamDetail for walking and dispatching. Like other Detail components, the API is subject to (possibly breaking) changes in the future at any time. The regular exchangeBuf detail has this type of signature: PstreamDetail::exchangeBuf ( const UList>>& sends, const UList>>& recvs, ... ) Where [rank, span] is the tuple pack. The basic idea is to pre-process the send/receive buffers and marshall them into a flat list of [rank, span] tuples. The originating buffers could be any type of container (List or Map) which is then marshalled into this given sequence that can be processed in source-agnostic fashion. If data chunking is required (when UPstream::maxCommsSize > 0) it is possible to make a cheap copy of the rank/address information and then walk different slices or views. ENH: replace private static methods with PstreamDetail functions - simpler to update locally. --- src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H | 141 ++--- .../db/IOstreams/Pstreams/PstreamExchange.C | 565 +++++++++++------- .../Pstreams/PstreamExchangeConsensus.C | 12 - 3 files changed, 397 insertions(+), 321 deletions(-) diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index 4a293d80bf..07399cf3bd 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -63,42 +63,6 @@ class Pstream : public UPstream { - // Private Static Functions - - //- Exchange \em contiguous data. - //- Sends sendBufs, receives into recvBufs. - // Data provided and received as container. - // - // No internal guards or resizing. - template - static void exchangeContainer - ( - const UList& sendBufs, - const labelUList& recvSizes, //!< Num of recv elements (not bytes) - List& recvBufs, - const int tag, - const label comm, - const bool wait //!< Wait for requests to complete - ); - - //- Exchange \em contiguous data. - //- Sends sendBufs, receives into recvBufs. - // Data provided and received as pointers. - // - // No internal guards or resizing. - template - static void exchangeBuf - ( - const labelUList& sendSizes, //!< Num of send elements (not bytes) - const UList& sendBufs, - const labelUList& recvSizes, //!< Num of recv elements (not bytes) - List& recvBufs, - const int tag, - const label comm, - const bool wait //!< Wait for requests to complete - ); - - protected: // Protected Data @@ -504,62 +468,62 @@ public: ); - // Exchange + // Exchange - //- Helper: exchange sizes of sendData for specified - //- set of send/receive processes. - template - static void exchangeSizes - ( - const labelUList& sendProcs, - const labelUList& recvProcs, - const Container& sendData, - labelList& sizes, - const label tag = UPstream::msgType(), - const label comm = UPstream::worldComm - ); + //- Helper: exchange sizes of sendBufs for specified + //- set of send/receive processes. + template + static void exchangeSizes + ( + const labelUList& sendProcs, + const labelUList& recvProcs, + const Container& sendBufs, + labelList& sizes, + const label tag = UPstream::msgType(), + const label comm = UPstream::worldComm + ); - //- Helper: exchange sizes of sendData. - //- The sendData is the data per processor (in the communicator). - // Returns sizes of sendData on the sending processor. - // \n - // For \b non-parallel : copy sizes from sendData directly. - template - static void exchangeSizes - ( - const Container& sendData, - labelList& sizes, - const label comm = UPstream::worldComm - ); + //- Helper: exchange sizes of sendBufs. + //- The sendBufs is the data per processor (in the communicator). + // Returns sizes of sendBufs on the sending processor. + // \n + // For \b non-parallel : copy sizes from sendBufs directly. + template + static void exchangeSizes + ( + const Container& sendBufs, + labelList& recvSizes, + const label comm = UPstream::worldComm + ); + //- Helper: exchange \em contiguous data. + //- Sends sendBufs, receives into recvBufs using predetermined receive + //- sizing. + // If wait=true will wait for all transfers to finish. + template + static void exchange + ( + const UList& sendBufs, + const labelUList& recvSizes, + List& recvBufs, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const bool wait = true //!< Wait for requests to complete + ); - //- Helper: exchange \em contiguous data. - //- Sends sendData, receives into recvData. - // If wait=true will wait for all transfers to finish. - template - static void exchange - ( - const UList& sendData, - const labelUList& recvSizes, - List& recvData, - const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm, - const bool wait = true //!< Wait for requests to complete - ); - - //- Exchange \em contiguous data. - //- Sends sendData, receives into recvData. - //- Determines sizes to receive. - // If wait=true will wait for all transfers to finish. - template - static void exchange - ( - const UList& sendData, - List& recvData, - const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm, - const bool wait = true //!< Wait for requests to complete - ); + //- Exchange \em contiguous data. + //- Sends sendBufs, receives into recvBufs. + //- Determines sizes to receive. + // If wait=true will wait for all transfers to finish. + template + static void exchange + ( + const UList& sendBufs, + List& recvBufs, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const bool wait = true //!< Wait for requests to complete + ); // Non-blocking exchange @@ -636,7 +600,6 @@ public: #include "PstreamGather.C" #include "PstreamCombineGather.C" #include "PstreamGatherList.C" - #include "PstreamExchangeConsensus.C" #include "PstreamExchange.C" #endif diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C index 5e4aa2130f..9979998a8c 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2016 OpenFOAM Foundation - Copyright (C) 2016-2022 OpenCFD Ltd. + Copyright (C) 2016-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -24,43 +24,300 @@ License You should have received a copy of the GNU General Public License along with OpenFOAM. If not, see . -Description - Exchange data. - \*---------------------------------------------------------------------------*/ #include "Pstream.H" #include "contiguous.H" #include "PstreamReduceOps.H" -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // +// * * * * * * * * * * * * * * * * * Details * * * * * * * * * * * * * * * * // +namespace Foam +{ +namespace PstreamDetail +{ + +//- Setup sends and receives, each specified as [rank, span] tuple +// The serial list of tuples can be populated from other lists, from maps +// of data or subsets of lists/maps etc. +template +void exchangeBuf +( + const UList>>& sends, + const UList>>& recvs, + + const int tag, + const label comm, + const bool wait +) +{ + const label startOfRequests = UPstream::nRequests(); + const int myProci = UPstream::myProcNo(comm); + + // Set up receives + // ~~~~~~~~~~~~~~~ + + for (auto& slot : recvs) + { + // [rank, span] + const auto proci = slot.first; + auto& payload = slot.second; + + if (proci != myProci && !payload.empty()) + { + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + payload.data_bytes(), + payload.size_bytes(), + tag, + comm + ); + } + } + + // Set up sends + // ~~~~~~~~~~~~ + + for (const auto& slot : sends) + { + // [rank, span] + const auto proci = slot.first; + const auto& payload = slot.second; + + if (proci != myProci && !payload.empty()) + { + if + ( + !UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + payload.cdata_bytes(), + payload.size_bytes(), + tag, + comm + ) + ) + { + FatalErrorInFunction + << "Cannot send outgoing message to:" + << proci << " nBytes:" + << label(payload.size_bytes()) + << Foam::abort(FatalError); + } + } + } + + // Wait for all to finish + // ~~~~~~~~~~~~~~~~~~~~~~ + + if (wait) + { + UPstream::waitRequests(startOfRequests); + } +} + + +//- Chunked exchange of \em contiguous data. +//- Setup sends and receives, each specified as [rank, span] tuple. +// The serial list of tuples can be populated from other lists, from +// maps of data or subsets of lists/maps etc. +template +void exchangeChunkedBuf +( + const UList>>& sends, + const UList>>& recvs, + + const int tag, + const label comm, + const bool wait +) +{ + typedef std::pair> sendTuple; + typedef std::pair> recvTuple; + + // Caller already checked for parRun and maxChunkSize > 0 + + { + // Determine the number of chunks to send. Note that we + // only have to look at the sending data since we are + // guaranteed that some processor's sending size is some other + // processor's receive size. Also we can ignore any local comms. + // + // We need to send chunks so the number of iterations: + // maxChunkSize iterations + // ------------ ---------- + // 0 0 + // 1..maxChunkSize 1 + // maxChunkSize+1..2*maxChunkSize 2 + // ... + + const label maxChunkSize = + ( + max + ( + static_cast