diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C index cb8e6642c4..1e4dfe4bee 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2021-2022 OpenCFD Ltd. + Copyright (C) 2021-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -44,13 +44,13 @@ void Foam::PstreamBuffers::finalExchange if (commsType_ == UPstream::commsTypes::nonBlocking) { // all-to-all - Pstream::exchangeSizes(sendBuf_, recvSizes, comm_); + Pstream::exchangeSizes(sendBuffers_, recvSizes, comm_); Pstream::exchange, char> ( - sendBuf_, + sendBuffers_, recvSizes, - recvBuf_, + recvBuffers_, tag_, comm_, wait @@ -77,7 +77,7 @@ void Foam::PstreamBuffers::finalExchange ( sendProcs, recvProcs, - sendBuf_, + sendBuffers_, recvSizes, tag_, comm_ @@ -85,9 +85,9 @@ void Foam::PstreamBuffers::finalExchange Pstream::exchange, char> ( - sendBuf_, + sendBuffers_, recvSizes, - recvBuf_, + recvBuffers_, tag_, comm_, wait @@ -114,11 +114,12 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter { // gather mode (all-to-one): master [0] <- everyone - recvSizes = UPstream::listGatherValues(sendBuf_[0].size(), comm_); + recvSizes = + UPstream::listGatherValues(sendBuffers_[0].size(), comm_); if (!UPstream::master(comm_)) { - recvSizes.resize_nocopy(recvBuf_.size()); + recvSizes.resize_nocopy(recvBuffers_.size()); recvSizes = Zero; } } @@ -126,13 +127,13 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter { // scatter mode (one-to-all): master [0] -> everyone - recvSizes.resize_nocopy(sendBuf_.size()); + recvSizes.resize_nocopy(sendBuffers_.size()); if (UPstream::master(comm_)) { - forAll(sendBuf_, proci) + forAll(sendBuffers_, proci) { - recvSizes[proci] = sendBuf_[proci].size(); + recvSizes[proci] = sendBuffers_[proci].size(); } } @@ -145,9 +146,9 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter Pstream::exchange, char> ( - sendBuf_, + sendBuffers_, recvSizes, - recvBuf_, + recvBuffers_, tag_, comm_, wait @@ -172,9 +173,9 @@ Foam::PstreamBuffers::PstreamBuffers commsType_(commsType), tag_(tag), comm_(comm), - sendBuf_(UPstream::nProcs(comm_)), - recvBuf_(UPstream::nProcs(comm_)), - recvBufPos_(UPstream::nProcs(comm_), Zero) + sendBuffers_(UPstream::nProcs(comm)), + recvBuffers_(UPstream::nProcs(comm)), + recvPositions_(UPstream::nProcs(comm), Zero) {} @@ -192,9 +193,9 @@ Foam::PstreamBuffers::PstreamBuffers commsType_(commsType), tag_(tag), comm_(comm), - sendBuf_(UPstream::nProcs(comm_)), - recvBuf_(UPstream::nProcs(comm_)), - recvBufPos_(UPstream::nProcs(comm_), Zero) + sendBuffers_(UPstream::nProcs(comm)), + recvBuffers_(UPstream::nProcs(comm)), + recvPositions_(UPstream::nProcs(comm), Zero) {} @@ -203,33 +204,61 @@ Foam::PstreamBuffers::PstreamBuffers Foam::PstreamBuffers::~PstreamBuffers() { // Check that all data has been consumed. - forAll(recvBufPos_, proci) + forAll(recvPositions_, proci) { - if (recvBufPos_[proci] < recvBuf_[proci].size()) + const label pos = recvPositions_[proci]; + const label len = recvBuffers_[proci].size(); + + if (pos < len) { FatalErrorInFunction << "Message from processor " << proci - << " Only consumed " << recvBufPos_[proci] << " of " - << recvBuf_[proci].size() << " bytes" << nl + << " Only consumed " << pos << " of " << len << " bytes" << nl << Foam::abort(FatalError); } } } +// * * * * * * * * * * * * Protected Member Functions * * * * * * * * * * * // + +Foam::DynamicList& Foam::PstreamBuffers::accessSendBuffer +( + const label proci +) +{ + return sendBuffers_[proci]; +} + + +Foam::DynamicList& Foam::PstreamBuffers::accessRecvBuffer +( + const label proci +) +{ + return recvBuffers_[proci]; +} + + +Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci) +{ + return recvPositions_[proci]; +} + + // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * // void Foam::PstreamBuffers::clear() { - for (DynamicList& buf : sendBuf_) + for (DynamicList& buf : sendBuffers_) { buf.clear(); } - for (DynamicList& buf : recvBuf_) + for (DynamicList& buf : recvBuffers_) { buf.clear(); } - recvBufPos_ = 0; + recvPositions_ = 0; finishedSendsCalled_ = false; } @@ -237,24 +266,24 @@ void Foam::PstreamBuffers::clear() void Foam::PstreamBuffers::clearRecv(const label proci) { - recvBuf_[proci].clear(); - recvBufPos_[proci] = 0; + recvBuffers_[proci].clear(); + recvPositions_[proci] = 0; } void Foam::PstreamBuffers::clearStorage() { - // Could also clear out entire sendBuf_, recvBuf_ and reallocate. + // Could also clear out entire sendBuffers_, recvBuffers_ and reallocate. // Not sure if it makes much difference - for (DynamicList& buf : sendBuf_) + for (DynamicList& buf : sendBuffers_) { buf.clearStorage(); } - for (DynamicList& buf : recvBuf_) + for (DynamicList& buf : recvBuffers_) { buf.clearStorage(); } - recvBufPos_ = 0; + recvPositions_ = 0; finishedSendsCalled_ = false; } @@ -262,7 +291,7 @@ void Foam::PstreamBuffers::clearStorage() bool Foam::PstreamBuffers::hasSendData() const { - for (const DynamicList& buf : sendBuf_) + for (const DynamicList& buf : sendBuffers_) { if (!buf.empty()) { @@ -277,9 +306,9 @@ bool Foam::PstreamBuffers::hasRecvData() const { if (finishedSendsCalled_) { - forAll(recvBufPos_, proci) + forAll(recvPositions_, proci) { - if (recvBuf_[proci].size() > recvBufPos_[proci]) + if (recvPositions_[proci] < recvBuffers_[proci].size()) { return true; } @@ -299,7 +328,7 @@ bool Foam::PstreamBuffers::hasRecvData() const Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const { - return sendBuf_[proci].size(); + return sendBuffers_[proci].size(); } @@ -307,7 +336,7 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const { if (finishedSendsCalled_) { - const label len(recvBuf_[proci].size() > recvBufPos_[proci]); + const label len(recvBuffers_[proci].size() - recvPositions_[proci]); if (len > 0) { @@ -328,13 +357,13 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const Foam::labelList Foam::PstreamBuffers::recvDataCounts() const { - labelList counts(recvBuf_.size(), Zero); + labelList counts(recvPositions_.size(), Zero); if (finishedSendsCalled_) { - forAll(recvBufPos_, proci) + forAll(recvPositions_, proci) { - const label len(recvBuf_[proci].size() - recvBufPos_[proci]); + const label len(recvBuffers_[proci].size() - recvPositions_[proci]); if (len > 0) { @@ -359,14 +388,15 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const { if (finishedSendsCalled_) { - const label len(recvBuf_[proci].size() - recvBufPos_[proci]); + const label pos = recvPositions_[proci]; + const label len = recvBuffers_[proci].size(); - if (len > 0) + if (pos < len) { return UList ( - const_cast(&recvBuf_[proci][recvBufPos_[proci]]), - len + const_cast(recvBuffers_[proci].cdata()) + pos, + (len - pos) ); } } @@ -472,17 +502,17 @@ bool Foam::PstreamBuffers::finishedSends // Update send connections // - reasonable to assume there are no self-sends on UPstream::myProcNo - forAll(sendBuf_, proci) + forAll(sendBuffers_, proci) { // ie, sendDataCount(proci) != 0 - if (sendConnections.set(proci, !sendBuf_[proci].empty())) + if (sendConnections.set(proci, !sendBuffers_[proci].empty())) { // The state changed changed = true; } } - UPstream::reduceOr(changed); + UPstream::reduceOr(changed, comm_); if (changed) { @@ -490,12 +520,12 @@ bool Foam::PstreamBuffers::finishedSends // The send ranks sendProcs.clear(); - forAll(sendBuf_, proci) + forAll(sendBuffers_, proci) { // ie, sendDataCount(proci) != 0 - if (!sendBuf_[proci].empty()) + if (!sendBuffers_[proci].empty()) { - sendProcs.append(proci); + sendProcs.push_back(proci); } } @@ -503,12 +533,12 @@ bool Foam::PstreamBuffers::finishedSends // The recv ranks recvProcs.clear(); - forAll(recvBuf_, proci) + forAll(recvBuffers_, proci) { // ie, recvDataCount(proci) - if (!recvBuf_[proci].empty()) + if (!recvBuffers_[proci].empty()) { - recvProcs.append(proci); + recvProcs.push_back(proci); } } } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H index 44bfea19c6..b9f5b04b1b 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2021-2022 OpenCFD Ltd. + Copyright (C) 2021-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -117,11 +117,6 @@ class bitSet; class PstreamBuffers { - // Friendship - friend class UOPstreamBase; // Access to sendBuf_ - friend class UIPstreamBase; // Access to recvBuf_, recvBufPos_ - - // Private Data //- Track if sends are complete @@ -142,14 +137,20 @@ class PstreamBuffers //- Communicator const label comm_; - //- Send buffer. Size is nProcs() - List> sendBuf_; + // Number of ranks associated with PstreamBuffers + // currently obtained from recvPositions_ - //- Receive buffer. Size is nProcs() - List> recvBuf_; - //- Current read positions within recvBuf_. Size is nProcs() - labelList recvBufPos_; + // Buffer storage + + //- Send buffers. Size is nProcs() + List> sendBuffers_; + + //- Receive buffers. Size is nProcs() + List> recvBuffers_; + + //- Current read positions within recvBuffers_. Size is nProcs() + labelList recvPositions_; // Private Member Functions @@ -173,6 +174,22 @@ class PstreamBuffers void finalExchangeGatherScatter(const bool isGather, const bool wait); + // Friendship Access + + //- Access a send buffer for given proc (in range 0-nProcs) + DynamicList& accessSendBuffer(const label proci); + + //- Access a recv buffer for given proc (in range 0-nProcs). + DynamicList& accessRecvBuffer(const label proci); + + //- Access the recv position within recv buffer for given proc + //- (in range 0-nProcs). + label& accessRecvPosition(const label proci); + + friend class UOPstreamBase; // accessSendBuffer() + friend class UIPstreamBase; // accessRecvBuffer(), accessRecvPosition() + + public: // Constructors @@ -202,7 +219,7 @@ public: // Member Functions - // Access + // Attributes //- The associated buffer format (ascii | binary) IOstreamOption::streamFormat format() const noexcept @@ -216,27 +233,27 @@ public: return commsType_; } - //- The transfer message type + //- The transfer message tag int tag() const noexcept { return tag_; } - //- Communicator + //- The communicator index label comm() const noexcept { return comm_; } - - // Sizing - //- Number of ranks associated with PstreamBuffers label nProcs() const noexcept { - return recvBufPos_.size(); + return recvPositions_.size(); } + + // Sizing + //- Range of ranks indices associated with PstreamBuffers UPstream::rangeType allProcs() const noexcept { diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C index 21adb26831..228332528f 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C @@ -180,8 +180,8 @@ Foam::UIPstreamBase::UIPstreamBase UPstream(buffers.commsType()), Istream(buffers.format()), fromProcNo_(fromProcNo), - recvBuf_(buffers.recvBuf_[fromProcNo]), - recvBufPos_(buffers.recvBufPos_[fromProcNo]), + recvBuf_(buffers.accessRecvBuffer(fromProcNo)), + recvBufPos_(buffers.accessRecvPosition(fromProcNo)), tag_(buffers.tag()), comm_(buffers.comm()), clearAtEnd_(buffers.allowClearRecv()), diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C index 631818b772..73d4c83939 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C @@ -160,7 +160,7 @@ Foam::UOPstreamBase::UOPstreamBase(const int toProcNo, PstreamBuffers& buffers) UPstream(buffers.commsType()), Ostream(buffers.format()), toProcNo_(toProcNo), - sendBuf_(buffers.sendBuf_[toProcNo]), + sendBuf_(buffers.accessSendBuffer(toProcNo)), tag_(buffers.tag()), comm_(buffers.comm()), sendAtDestruct_(buffers.commsType() != UPstream::commsTypes::nonBlocking)