ENH: PstreamBuffers consistency improvements

- make nProcs() independent of internal storage mechanism.
- reset receive positions with finished sends

- use size of received buffers to manage validity instead of
  an separate additional gather operation.
This commit is contained in:
Mark Olesen
2023-02-08 09:44:49 +01:00
parent 4ec75d538f
commit 70d310329c
5 changed files with 189 additions and 136 deletions

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2017 OpenFOAM Foundation
Copyright (C) 2020-2022 OpenCFD Ltd.
Copyright (C) 2020-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -31,7 +31,6 @@ License
#include "OSspecific.H"
#include "PstreamBuffers.H"
#include "masterUncollatedFileOperation.H"
#include <algorithm>
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
@ -111,49 +110,45 @@ void Foam::masterOFstream::commit()
return;
}
boolList procValid(UPstream::listGatherValues<bool>(valid_));
// Different files
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Send my buffer to master
// Send my (valid) buffer to master
if (!Pstream::master())
{
UOPstream os(Pstream::masterNo(), pBufs);
string s(this->str());
this->reset();
if (valid_)
{
string s(this->str());
os.write(s.data(), s.length());
UOPstream os(Pstream::masterNo(), pBufs);
os.write(s.data(), s.length());
}
this->reset();
}
labelList recvSizes;
pBufs.finishedGathers(recvSizes);
pBufs.finishedGathers();
if (Pstream::master())
{
// Write master data
if (procValid[Pstream::masterNo()])
// Write (valid) master data
if (valid_)
{
checkWrite(filePaths[Pstream::masterNo()], this->str());
}
this->reset();
// Find the max receive size
recvSizes[Pstream::masterNo()] = 0;
List<char> buf
(
*std::max_element(recvSizes.cbegin(), recvSizes.cend())
);
// Allocate large enough to read without resizing
List<char> buf(pBufs.maxRecvCount());
for (const int proci : Pstream::subProcs())
{
UIPstream is(proci, pBufs);
const std::streamsize count(pBufs.recvDataCount(proci));
const std::streamsize count(recvSizes[proci]);
is.read(buf.data(), count);
if (procValid[proci])
if (count)
{
UIPstream is(proci, pBufs);
is.read(buf.data(), count);
checkWrite(filePaths[proci], buf.cdata(), count);
}
}

View File

@ -33,13 +33,15 @@ License
void Foam::PstreamBuffers::finalExchange
(
labelList& recvSizes,
const bool wait
const bool wait,
const bool needSizes,
labelList& recvSizes
)
{
// Could also check that it is not called twice
// but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true;
recvPositions_ = Zero;
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
@ -60,10 +62,13 @@ void Foam::PstreamBuffers::finalExchange
);
// Copy back out
recvSizes.resize_nocopy(recvBuffers_.size());
forAll(recvBuffers_, proci)
if (needSizes)
{
recvSizes[proci] = recvBuffers_[proci].size();
recvSizes.resize_nocopy(recvBuffers_.size());
forAll(recvBuffers_, proci)
{
recvSizes[proci] = recvBuffers_[proci].size();
}
}
return;
@ -89,13 +94,15 @@ void Foam::PstreamBuffers::finalExchange
(
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
const bool wait
const bool wait,
const bool needSizes, // unused
labelList& recvSizes
)
{
// Could also check that it is not called twice
// but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true;
recvPositions_ = Zero;
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
@ -125,17 +132,18 @@ void Foam::PstreamBuffers::finalExchange
void Foam::PstreamBuffers::finalExchangeGatherScatter
(
const bool isGather,
const bool wait
const bool wait,
const bool needSizes,
labelList& recvSizes
)
{
// Could also check that it is not called twice
// but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true;
recvPositions_ = Zero;
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
labelList recvSizes;
if (isGather)
{
// gather mode (all-to-one): master [0] <- everyone
@ -145,7 +153,7 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
if (!UPstream::master(comm_))
{
recvSizes.resize_nocopy(recvBuffers_.size());
recvSizes.resize_nocopy(nProcs_);
recvSizes = Zero;
}
}
@ -153,7 +161,7 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
{
// scatter mode (one-to-all): master [0] -> everyone
recvSizes.resize_nocopy(sendBuffers_.size());
recvSizes.resize_nocopy(nProcs_);
if (UPstream::master(comm_))
{
@ -187,9 +195,9 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
Foam::PstreamBuffers::PstreamBuffers
(
const UPstream::commsTypes commsType,
const int tag,
const label comm,
UPstream::commsTypes commsType,
int tag,
label communicator,
IOstreamOption::streamFormat fmt
)
:
@ -198,30 +206,11 @@ Foam::PstreamBuffers::PstreamBuffers
format_(fmt),
commsType_(commsType),
tag_(tag),
comm_(comm),
sendBuffers_(UPstream::nProcs(comm)),
recvBuffers_(UPstream::nProcs(comm)),
recvPositions_(UPstream::nProcs(comm), Zero)
{}
Foam::PstreamBuffers::PstreamBuffers
(
const label comm,
const UPstream::commsTypes commsType,
const int tag,
IOstreamOption::streamFormat fmt
)
:
finishedSendsCalled_(false),
allowClearRecv_(true),
format_(fmt),
commsType_(commsType),
tag_(tag),
comm_(comm),
sendBuffers_(UPstream::nProcs(comm)),
recvBuffers_(UPstream::nProcs(comm)),
recvPositions_(UPstream::nProcs(comm), Zero)
comm_(communicator),
nProcs_(UPstream::nProcs(comm_)),
sendBuffers_(nProcs_),
recvBuffers_(nProcs_),
recvPositions_(nProcs_, Zero)
{}
@ -284,7 +273,7 @@ void Foam::PstreamBuffers::clear()
{
buf.clear();
}
recvPositions_ = 0;
recvPositions_ = Zero;
finishedSendsCalled_ = false;
}
@ -309,7 +298,7 @@ void Foam::PstreamBuffers::clearStorage()
{
buf.clearStorage();
}
recvPositions_ = 0;
recvPositions_ = Zero;
finishedSendsCalled_ = false;
}
@ -409,6 +398,46 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
}
Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount(const label proci) const
{
label maxLen = 0;
if (finishedSendsCalled_)
{
forAll(recvPositions_, i)
{
if (i != proci)
{
const label len(recvBuffers_[i].size() - recvPositions_[i]);
maxLen = max(maxLen, len);
}
}
}
#ifdef FULLDEBUG
else
{
FatalErrorInFunction
<< "Call finishedSends first" << exit(FatalError);
}
#endif
return maxLen;
}
Foam::label Foam::PstreamBuffers::maxRecvCount() const
{
// Use out-of-range proci to avoid excluding any processor
return maxNonLocalRecvCount(-1);
}
Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount() const
{
return maxNonLocalRecvCount(UPstream::myProcNo(comm_));
}
const Foam::UList<char>
Foam::PstreamBuffers::peekRecvData(const label proci) const
{
@ -449,7 +478,7 @@ bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
void Foam::PstreamBuffers::finishedSends(const bool wait)
{
labelList recvSizes;
finalExchange(recvSizes, wait);
finalExchange(wait, false, recvSizes);
}
@ -459,7 +488,7 @@ void Foam::PstreamBuffers::finishedSends
const bool wait
)
{
finalExchange(recvSizes, wait);
finalExchange(wait, true, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@ -483,7 +512,7 @@ void Foam::PstreamBuffers::finishedSends
)
{
labelList recvSizes;
finalExchange(sendProcs, recvProcs, recvSizes, wait);
finalExchange(sendProcs, recvProcs, wait, false, recvSizes);
}
@ -495,7 +524,7 @@ void Foam::PstreamBuffers::finishedSends
const bool wait
)
{
finalExchange(sendProcs, recvProcs, recvSizes, wait);
finalExchange(sendProcs, recvProcs, wait, true, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@ -555,14 +584,14 @@ bool Foam::PstreamBuffers::finishedSends
}
}
finishedSends(wait); // All-to-all
labelList recvSizes;
finishedSends(recvSizes, wait); // All-to-all
// The recv ranks
recvProcs.clear();
forAll(recvBuffers_, proci)
forAll(recvSizes, proci)
{
// ie, recvDataCount(proci)
if (!recvBuffers_[proci].empty())
if (recvSizes[proci] > 0)
{
recvProcs.push_back(proci);
}
@ -581,13 +610,15 @@ bool Foam::PstreamBuffers::finishedSends
void Foam::PstreamBuffers::finishedGathers(const bool wait)
{
finalExchangeGatherScatter(true, wait);
labelList recvSizes;
finalExchangeGatherScatter(true, wait, false, recvSizes);
}
void Foam::PstreamBuffers::finishedScatters(const bool wait)
{
finalExchangeGatherScatter(false, wait);
labelList recvSizes;
finalExchangeGatherScatter(false, wait, false, recvSizes);
}
@ -597,7 +628,7 @@ void Foam::PstreamBuffers::finishedGathers
const bool wait
)
{
finalExchangeGatherScatter(true, wait);
finalExchangeGatherScatter(true, wait, true, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@ -610,11 +641,6 @@ void Foam::PstreamBuffers::finishedGathers
// Note: maybe possible only if using different tag from write started
// by ~UOPstream. Needs some work.
}
// For nonBlocking mode, simply recover received sizes
// from the buffers themselves.
recvSizes = recvDataCounts();
}
@ -624,7 +650,7 @@ void Foam::PstreamBuffers::finishedScatters
const bool wait
)
{
finalExchangeGatherScatter(false, wait);
finalExchangeGatherScatter(false, wait, true, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@ -637,11 +663,6 @@ void Foam::PstreamBuffers::finishedScatters
// Note: maybe possible only if using different tag from write started
// by ~UOPstream. Needs some work.
}
// For nonBlocking mode, simply recover received sizes
// from the buffers themselves.
recvSizes = recvDataCounts();
}

View File

@ -137,8 +137,8 @@ class PstreamBuffers
//- Communicator
const label comm_;
// Number of ranks associated with PstreamBuffers
// currently obtained from recvPositions_
//- Number of ranks associated with PstreamBuffers (at construction)
const label nProcs_;
// Buffer storage
@ -157,7 +157,12 @@ class PstreamBuffers
//- Mark all sends as having been done.
// This will start receives (nonBlocking comms).
void finalExchange(labelList& recvSizes, const bool wait);
void finalExchange
(
const bool wait,
const bool needSizes, // If recvSizes needed or scratch
labelList& recvSizes
);
//- Mark sends as done.
// Only exchange sizes using the sendProcs/recvProcs subset
@ -166,12 +171,19 @@ class PstreamBuffers
(
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
const bool wait
const bool wait,
const bool needSizes, // If recvSizes needed or scratch
labelList& recvSizes
);
//- For all-to-one or one-to-all
void finalExchangeGatherScatter(const bool isGather, const bool wait);
void finalExchangeGatherScatter
(
const bool isGather,
const bool wait,
const bool needSizes, // If recvSizes needed or scratch
labelList& recvSizes
);
// Friendship Access
@ -194,23 +206,41 @@ public:
// Constructors
//- Construct given comms type, message tag, communicator, IO format
//- Construct given communication type (default: nonBlocking), message
//- tag, communicator (default: worldComm), IO format (default: binary)
explicit PstreamBuffers
(
const UPstream::commsTypes commsType,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
UPstream::commsTypes commsType = UPstream::commsTypes::nonBlocking,
int tag = UPstream::msgType(),
label communicator = UPstream::worldComm,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
//- Construct given communicator, comms type, message tag, IO format
//- Construct given communicator, communication type
//- (default: nonBlocking), message tag, IO format (default: binary)
explicit PstreamBuffers
(
const label comm,
const UPstream::commsTypes commsType,
const int tag = UPstream::msgType(),
label communicator,
UPstream::commsTypes commsType = UPstream::commsTypes::nonBlocking,
int tag = UPstream::msgType(),
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
)
:
PstreamBuffers(commsType, tag, communicator, fmt)
{}
//- Construct given communicator, message tag, communication type
//- (default: nonBlocking), IO format (default: binary)
PstreamBuffers
(
label communicator,
int tag,
UPstream::commsTypes commsType = UPstream::commsTypes::nonBlocking,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
)
:
PstreamBuffers(commsType, tag, communicator, fmt)
{}
//- Destructor - checks that all data have been consumed
@ -248,7 +278,7 @@ public:
//- Number of ranks associated with PstreamBuffers
label nProcs() const noexcept
{
return recvPositions_.size();
return nProcs_;
}
@ -258,14 +288,14 @@ public:
UPstream::rangeType allProcs() const noexcept
{
// Proc 0 -> nProcs (int value)
return UPstream::rangeType(static_cast<int>(nProcs()));
return UPstream::rangeType(static_cast<int>(nProcs_));
}
//- Range of sub-processes indices associated with PstreamBuffers
UPstream::rangeType subProcs() const noexcept
{
// Proc 1 -> nProcs (int value)
return UPstream::rangeType(1, static_cast<int>(nProcs()-1));
return UPstream::rangeType(1, static_cast<int>(nProcs_-1));
}
@ -302,6 +332,18 @@ public:
//- Must call finishedSends() or other finished.. method first!
labelList recvDataCounts() const;
//- Maximum receive size from any rocessor rank.
//- Must call finishedSends() or other finished.. method first!
label maxRecvCount() const;
//- Maximum receive size, excluding current processor rank
//- Must call finishedSends() or other finished.. method first!
label maxNonLocalRecvCount() const;
//- Maximum receive size, excluding the specified processor rank
//- Must call finishedSends() or other finished.. method first!
label maxNonLocalRecvCount(const label proci) const;
//- Number of unconsumed receive bytes for the specified processor.
//- Must call finishedSends() or other finished.. method first!
// The method is only useful in limited situations, such as when
@ -447,7 +489,7 @@ public:
//- Mark all sends to master as done.
//
// Non-blocking mode: populates receive buffers.
// Can use recvDataCounts() method to recover sizes received.
// Can use recvDataCount, maxRecvCount etc to recover sizes received.
//
// \param wait wait for requests to complete (in nonBlocking mode)
//
@ -467,7 +509,7 @@ public:
//- Mark all sends to sub-procs as done.
//
// Non-blocking mode: populates receive buffers.
// Can use recvDataCounts() method to recover sizes received.
// Can use recvDataCount, maxRecvCount etc to recover sizes received.
//
// \param wait wait for requests to complete (in nonBlocking mode)
//

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2017-2018 OpenFOAM Foundation
Copyright (C) 2019-2022 OpenCFD Ltd.
Copyright (C) 2019-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -641,8 +641,7 @@ Foam::fileOperations::masterUncollatedFileOperation::read
}
}
labelList recvSizes;
pBufs.finishedSends(recvSizes);
pBufs.finishedSends();
// isPtr will be valid on master and will be the unbuffered
// IFstream. Else the information is in the PstreamBuffers (and
@ -653,12 +652,11 @@ Foam::fileOperations::masterUncollatedFileOperation::read
if (procValid[Pstream::myProcNo(comm)])
{
// This processor needs to return something
List<char> buf(pBufs.recvDataCount(Pstream::masterNo()));
UIPstream is(Pstream::masterNo(), pBufs);
List<char> buf(recvSizes[Pstream::masterNo()]);
if (!buf.empty())
{
UIPstream is(Pstream::masterNo(), pBufs);
is.read(buf.data(), buf.size());
}
@ -2353,8 +2351,7 @@ Foam::fileOperations::masterUncollatedFileOperation::NewIFstream
}
labelList recvSizes;
pBufs.finishedSends(recvSizes);
pBufs.finishedSends();
if (Pstream::master(Pstream::worldComm))
{
@ -2370,10 +2367,13 @@ Foam::fileOperations::masterUncollatedFileOperation::NewIFstream
<< " from processor " << Pstream::masterNo() << endl;
}
UIPstream is(Pstream::masterNo(), pBufs);
List<char> buf(pBufs.recvDataCount(Pstream::masterNo()));
List<char> buf(recvSizes[Pstream::masterNo()]);
is.read(buf.data(), buf.size());
if (!buf.empty())
{
UIPstream is(Pstream::masterNo(), pBufs);
is.read(buf.data(), buf.size());
}
if (debug)
{

View File

@ -2159,8 +2159,8 @@ Foam::autoPtr<Foam::mapDistributePolyMesh> Foam::fvMeshDistribute::distribute
// ~~~~~~~~~~~~~~~~~
labelList nSendCells(countCells(distribution));
labelList nRevcCells(Pstream::nProcs());
Pstream::allToAll(nSendCells, nRevcCells);
labelList nRecvCells(Pstream::nProcs());
UPstream::allToAll(nSendCells, nRecvCells);
// Allocate buffers
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
@ -2382,22 +2382,17 @@ Foam::autoPtr<Foam::mapDistributePolyMesh> Foam::fvMeshDistribute::distribute
UPstream::parRun(oldParRun); // Restore parallel state
// Start sending&receiving from buffers
if (debug)
{
if (debug)
{
Pout<< "Starting sending" << endl;
}
Pout<< "Starting sending" << endl;
}
labelList recvSizes;
pBufs.finishedSends(recvSizes);
pBufs.finishedSends();
if (debug)
{
Pout<< "Finished sending and receiving : " << flatOutput(recvSizes)
<< endl;
}
if (debug)
{
Pout<< "Finished sending and receiving : "
<< flatOutput(pBufs.recvDataCounts()) << endl;
}
@ -2547,17 +2542,17 @@ Foam::autoPtr<Foam::mapDistributePolyMesh> Foam::fvMeshDistribute::distribute
);
PtrList<PtrList<volTensorField::Internal>> dtfs(Pstream::nProcs());
forAll(nRevcCells, sendProc)
forAll(nRecvCells, sendProc)
{
// Did processor sendProc send anything to me?
if (sendProc != Pstream::myProcNo() && nRevcCells[sendProc] > 0)
if (sendProc != Pstream::myProcNo() && nRecvCells[sendProc] > 0)
{
if (debug)
{
Pout<< nl
<< "RECEIVING FROM DOMAIN " << sendProc
<< " cells to receive:"
<< nRevcCells[sendProc]
<< nRecvCells[sendProc]
<< nl << endl;
}