ENH: add PstreamBuffers access routines for friend classes

- allows the possibility of using demand-driven internal buffers
  and/or different storage mechanisms.

  Changes:

    * old: sendBuf_[proci]    -> accessSendBuffer(proci)
    * old: recvBuf_[proci]    -> accessRecvBuffer(proci)
    * old: recvBufPos_[proci] -> accessRecvPosition(proci)

  only affects internals of UIPstreamBase and UOPstreamBase

BUG: reduceOr in PstreamBuffers uses world communicator

- should respect the value of the communicator defined within
  PstreamBuffers
This commit is contained in:
Mark Olesen
2023-02-03 09:35:44 +01:00
parent 173c9ac163
commit ab4c5f25ac
4 changed files with 123 additions and 76 deletions

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2021-2022 OpenCFD Ltd. Copyright (C) 2021-2023 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -44,13 +44,13 @@ void Foam::PstreamBuffers::finalExchange
if (commsType_ == UPstream::commsTypes::nonBlocking) if (commsType_ == UPstream::commsTypes::nonBlocking)
{ {
// all-to-all // all-to-all
Pstream::exchangeSizes(sendBuf_, recvSizes, comm_); Pstream::exchangeSizes(sendBuffers_, recvSizes, comm_);
Pstream::exchange<DynamicList<char>, char> Pstream::exchange<DynamicList<char>, char>
( (
sendBuf_, sendBuffers_,
recvSizes, recvSizes,
recvBuf_, recvBuffers_,
tag_, tag_,
comm_, comm_,
wait wait
@ -77,7 +77,7 @@ void Foam::PstreamBuffers::finalExchange
( (
sendProcs, sendProcs,
recvProcs, recvProcs,
sendBuf_, sendBuffers_,
recvSizes, recvSizes,
tag_, tag_,
comm_ comm_
@ -85,9 +85,9 @@ void Foam::PstreamBuffers::finalExchange
Pstream::exchange<DynamicList<char>, char> Pstream::exchange<DynamicList<char>, char>
( (
sendBuf_, sendBuffers_,
recvSizes, recvSizes,
recvBuf_, recvBuffers_,
tag_, tag_,
comm_, comm_,
wait wait
@ -114,11 +114,12 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
{ {
// gather mode (all-to-one): master [0] <- everyone // gather mode (all-to-one): master [0] <- everyone
recvSizes = UPstream::listGatherValues(sendBuf_[0].size(), comm_); recvSizes =
UPstream::listGatherValues(sendBuffers_[0].size(), comm_);
if (!UPstream::master(comm_)) if (!UPstream::master(comm_))
{ {
recvSizes.resize_nocopy(recvBuf_.size()); recvSizes.resize_nocopy(recvBuffers_.size());
recvSizes = Zero; recvSizes = Zero;
} }
} }
@ -126,13 +127,13 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
{ {
// scatter mode (one-to-all): master [0] -> everyone // scatter mode (one-to-all): master [0] -> everyone
recvSizes.resize_nocopy(sendBuf_.size()); recvSizes.resize_nocopy(sendBuffers_.size());
if (UPstream::master(comm_)) if (UPstream::master(comm_))
{ {
forAll(sendBuf_, proci) forAll(sendBuffers_, proci)
{ {
recvSizes[proci] = sendBuf_[proci].size(); recvSizes[proci] = sendBuffers_[proci].size();
} }
} }
@ -145,9 +146,9 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
Pstream::exchange<DynamicList<char>, char> Pstream::exchange<DynamicList<char>, char>
( (
sendBuf_, sendBuffers_,
recvSizes, recvSizes,
recvBuf_, recvBuffers_,
tag_, tag_,
comm_, comm_,
wait wait
@ -172,9 +173,9 @@ Foam::PstreamBuffers::PstreamBuffers
commsType_(commsType), commsType_(commsType),
tag_(tag), tag_(tag),
comm_(comm), comm_(comm),
sendBuf_(UPstream::nProcs(comm_)), sendBuffers_(UPstream::nProcs(comm)),
recvBuf_(UPstream::nProcs(comm_)), recvBuffers_(UPstream::nProcs(comm)),
recvBufPos_(UPstream::nProcs(comm_), Zero) recvPositions_(UPstream::nProcs(comm), Zero)
{} {}
@ -192,9 +193,9 @@ Foam::PstreamBuffers::PstreamBuffers
commsType_(commsType), commsType_(commsType),
tag_(tag), tag_(tag),
comm_(comm), comm_(comm),
sendBuf_(UPstream::nProcs(comm_)), sendBuffers_(UPstream::nProcs(comm)),
recvBuf_(UPstream::nProcs(comm_)), recvBuffers_(UPstream::nProcs(comm)),
recvBufPos_(UPstream::nProcs(comm_), Zero) recvPositions_(UPstream::nProcs(comm), Zero)
{} {}
@ -203,33 +204,61 @@ Foam::PstreamBuffers::PstreamBuffers
Foam::PstreamBuffers::~PstreamBuffers() Foam::PstreamBuffers::~PstreamBuffers()
{ {
// Check that all data has been consumed. // Check that all data has been consumed.
forAll(recvBufPos_, proci) forAll(recvPositions_, proci)
{ {
if (recvBufPos_[proci] < recvBuf_[proci].size()) const label pos = recvPositions_[proci];
const label len = recvBuffers_[proci].size();
if (pos < len)
{ {
FatalErrorInFunction FatalErrorInFunction
<< "Message from processor " << proci << "Message from processor " << proci
<< " Only consumed " << recvBufPos_[proci] << " of " << " Only consumed " << pos << " of " << len << " bytes" << nl
<< recvBuf_[proci].size() << " bytes" << nl
<< Foam::abort(FatalError); << Foam::abort(FatalError);
} }
} }
} }
// * * * * * * * * * * * * Protected Member Functions * * * * * * * * * * * //
Foam::DynamicList<char>& Foam::PstreamBuffers::accessSendBuffer
(
const label proci
)
{
return sendBuffers_[proci];
}
Foam::DynamicList<char>& Foam::PstreamBuffers::accessRecvBuffer
(
const label proci
)
{
return recvBuffers_[proci];
}
Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
{
return recvPositions_[proci];
}
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
void Foam::PstreamBuffers::clear() void Foam::PstreamBuffers::clear()
{ {
for (DynamicList<char>& buf : sendBuf_) for (DynamicList<char>& buf : sendBuffers_)
{ {
buf.clear(); buf.clear();
} }
for (DynamicList<char>& buf : recvBuf_) for (DynamicList<char>& buf : recvBuffers_)
{ {
buf.clear(); buf.clear();
} }
recvBufPos_ = 0; recvPositions_ = 0;
finishedSendsCalled_ = false; finishedSendsCalled_ = false;
} }
@ -237,24 +266,24 @@ void Foam::PstreamBuffers::clear()
void Foam::PstreamBuffers::clearRecv(const label proci) void Foam::PstreamBuffers::clearRecv(const label proci)
{ {
recvBuf_[proci].clear(); recvBuffers_[proci].clear();
recvBufPos_[proci] = 0; recvPositions_[proci] = 0;
} }
void Foam::PstreamBuffers::clearStorage() void Foam::PstreamBuffers::clearStorage()
{ {
// Could also clear out entire sendBuf_, recvBuf_ and reallocate. // Could also clear out entire sendBuffers_, recvBuffers_ and reallocate.
// Not sure if it makes much difference // Not sure if it makes much difference
for (DynamicList<char>& buf : sendBuf_) for (DynamicList<char>& buf : sendBuffers_)
{ {
buf.clearStorage(); buf.clearStorage();
} }
for (DynamicList<char>& buf : recvBuf_) for (DynamicList<char>& buf : recvBuffers_)
{ {
buf.clearStorage(); buf.clearStorage();
} }
recvBufPos_ = 0; recvPositions_ = 0;
finishedSendsCalled_ = false; finishedSendsCalled_ = false;
} }
@ -262,7 +291,7 @@ void Foam::PstreamBuffers::clearStorage()
bool Foam::PstreamBuffers::hasSendData() const bool Foam::PstreamBuffers::hasSendData() const
{ {
for (const DynamicList<char>& buf : sendBuf_) for (const DynamicList<char>& buf : sendBuffers_)
{ {
if (!buf.empty()) if (!buf.empty())
{ {
@ -277,9 +306,9 @@ bool Foam::PstreamBuffers::hasRecvData() const
{ {
if (finishedSendsCalled_) if (finishedSendsCalled_)
{ {
forAll(recvBufPos_, proci) forAll(recvPositions_, proci)
{ {
if (recvBuf_[proci].size() > recvBufPos_[proci]) if (recvPositions_[proci] < recvBuffers_[proci].size())
{ {
return true; return true;
} }
@ -299,7 +328,7 @@ bool Foam::PstreamBuffers::hasRecvData() const
Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const
{ {
return sendBuf_[proci].size(); return sendBuffers_[proci].size();
} }
@ -307,7 +336,7 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
{ {
if (finishedSendsCalled_) if (finishedSendsCalled_)
{ {
const label len(recvBuf_[proci].size() > recvBufPos_[proci]); const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
if (len > 0) if (len > 0)
{ {
@ -328,13 +357,13 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
Foam::labelList Foam::PstreamBuffers::recvDataCounts() const Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
{ {
labelList counts(recvBuf_.size(), Zero); labelList counts(recvPositions_.size(), Zero);
if (finishedSendsCalled_) if (finishedSendsCalled_)
{ {
forAll(recvBufPos_, proci) forAll(recvPositions_, proci)
{ {
const label len(recvBuf_[proci].size() - recvBufPos_[proci]); const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
if (len > 0) if (len > 0)
{ {
@ -359,14 +388,15 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const
{ {
if (finishedSendsCalled_) if (finishedSendsCalled_)
{ {
const label len(recvBuf_[proci].size() - recvBufPos_[proci]); const label pos = recvPositions_[proci];
const label len = recvBuffers_[proci].size();
if (len > 0) if (pos < len)
{ {
return UList<char> return UList<char>
( (
const_cast<char*>(&recvBuf_[proci][recvBufPos_[proci]]), const_cast<char*>(recvBuffers_[proci].cdata()) + pos,
len (len - pos)
); );
} }
} }
@ -472,17 +502,17 @@ bool Foam::PstreamBuffers::finishedSends
// Update send connections // Update send connections
// - reasonable to assume there are no self-sends on UPstream::myProcNo // - reasonable to assume there are no self-sends on UPstream::myProcNo
forAll(sendBuf_, proci) forAll(sendBuffers_, proci)
{ {
// ie, sendDataCount(proci) != 0 // ie, sendDataCount(proci) != 0
if (sendConnections.set(proci, !sendBuf_[proci].empty())) if (sendConnections.set(proci, !sendBuffers_[proci].empty()))
{ {
// The state changed // The state changed
changed = true; changed = true;
} }
} }
UPstream::reduceOr(changed); UPstream::reduceOr(changed, comm_);
if (changed) if (changed)
{ {
@ -490,12 +520,12 @@ bool Foam::PstreamBuffers::finishedSends
// The send ranks // The send ranks
sendProcs.clear(); sendProcs.clear();
forAll(sendBuf_, proci) forAll(sendBuffers_, proci)
{ {
// ie, sendDataCount(proci) != 0 // ie, sendDataCount(proci) != 0
if (!sendBuf_[proci].empty()) if (!sendBuffers_[proci].empty())
{ {
sendProcs.append(proci); sendProcs.push_back(proci);
} }
} }
@ -503,12 +533,12 @@ bool Foam::PstreamBuffers::finishedSends
// The recv ranks // The recv ranks
recvProcs.clear(); recvProcs.clear();
forAll(recvBuf_, proci) forAll(recvBuffers_, proci)
{ {
// ie, recvDataCount(proci) // ie, recvDataCount(proci)
if (!recvBuf_[proci].empty()) if (!recvBuffers_[proci].empty())
{ {
recvProcs.append(proci); recvProcs.push_back(proci);
} }
} }
} }

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2021-2022 OpenCFD Ltd. Copyright (C) 2021-2023 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -117,11 +117,6 @@ class bitSet;
class PstreamBuffers class PstreamBuffers
{ {
// Friendship
friend class UOPstreamBase; // Access to sendBuf_
friend class UIPstreamBase; // Access to recvBuf_, recvBufPos_
// Private Data // Private Data
//- Track if sends are complete //- Track if sends are complete
@ -142,14 +137,20 @@ class PstreamBuffers
//- Communicator //- Communicator
const label comm_; const label comm_;
//- Send buffer. Size is nProcs() // Number of ranks associated with PstreamBuffers
List<DynamicList<char>> sendBuf_; // currently obtained from recvPositions_
//- Receive buffer. Size is nProcs()
List<DynamicList<char>> recvBuf_;
//- Current read positions within recvBuf_. Size is nProcs() // Buffer storage
labelList recvBufPos_;
//- Send buffers. Size is nProcs()
List<DynamicList<char>> sendBuffers_;
//- Receive buffers. Size is nProcs()
List<DynamicList<char>> recvBuffers_;
//- Current read positions within recvBuffers_. Size is nProcs()
labelList recvPositions_;
// Private Member Functions // Private Member Functions
@ -173,6 +174,22 @@ class PstreamBuffers
void finalExchangeGatherScatter(const bool isGather, const bool wait); void finalExchangeGatherScatter(const bool isGather, const bool wait);
// Friendship Access
//- Access a send buffer for given proc (in range 0-nProcs)
DynamicList<char>& accessSendBuffer(const label proci);
//- Access a recv buffer for given proc (in range 0-nProcs).
DynamicList<char>& accessRecvBuffer(const label proci);
//- Access the recv position within recv buffer for given proc
//- (in range 0-nProcs).
label& accessRecvPosition(const label proci);
friend class UOPstreamBase; // accessSendBuffer()
friend class UIPstreamBase; // accessRecvBuffer(), accessRecvPosition()
public: public:
// Constructors // Constructors
@ -202,7 +219,7 @@ public:
// Member Functions // Member Functions
// Access // Attributes
//- The associated buffer format (ascii | binary) //- The associated buffer format (ascii | binary)
IOstreamOption::streamFormat format() const noexcept IOstreamOption::streamFormat format() const noexcept
@ -216,27 +233,27 @@ public:
return commsType_; return commsType_;
} }
//- The transfer message type //- The transfer message tag
int tag() const noexcept int tag() const noexcept
{ {
return tag_; return tag_;
} }
//- Communicator //- The communicator index
label comm() const noexcept label comm() const noexcept
{ {
return comm_; return comm_;
} }
// Sizing
//- Number of ranks associated with PstreamBuffers //- Number of ranks associated with PstreamBuffers
label nProcs() const noexcept label nProcs() const noexcept
{ {
return recvBufPos_.size(); return recvPositions_.size();
} }
// Sizing
//- Range of ranks indices associated with PstreamBuffers //- Range of ranks indices associated with PstreamBuffers
UPstream::rangeType allProcs() const noexcept UPstream::rangeType allProcs() const noexcept
{ {

View File

@ -180,8 +180,8 @@ Foam::UIPstreamBase::UIPstreamBase
UPstream(buffers.commsType()), UPstream(buffers.commsType()),
Istream(buffers.format()), Istream(buffers.format()),
fromProcNo_(fromProcNo), fromProcNo_(fromProcNo),
recvBuf_(buffers.recvBuf_[fromProcNo]), recvBuf_(buffers.accessRecvBuffer(fromProcNo)),
recvBufPos_(buffers.recvBufPos_[fromProcNo]), recvBufPos_(buffers.accessRecvPosition(fromProcNo)),
tag_(buffers.tag()), tag_(buffers.tag()),
comm_(buffers.comm()), comm_(buffers.comm()),
clearAtEnd_(buffers.allowClearRecv()), clearAtEnd_(buffers.allowClearRecv()),

View File

@ -160,7 +160,7 @@ Foam::UOPstreamBase::UOPstreamBase(const int toProcNo, PstreamBuffers& buffers)
UPstream(buffers.commsType()), UPstream(buffers.commsType()),
Ostream(buffers.format()), Ostream(buffers.format()),
toProcNo_(toProcNo), toProcNo_(toProcNo),
sendBuf_(buffers.sendBuf_[toProcNo]), sendBuf_(buffers.accessSendBuffer(toProcNo)),
tag_(buffers.tag()), tag_(buffers.tag()),
comm_(buffers.comm()), comm_(buffers.comm()),
sendAtDestruct_(buffers.commsType() != UPstream::commsTypes::nonBlocking) sendAtDestruct_(buffers.commsType() != UPstream::commsTypes::nonBlocking)