ENH: additional gather/scatter modes for PstreamBuffers

- gather/scatter types of operations can avoid AllToAll communication
  and use simple MPI gather (or scatter) to establish the receive sizes.

  New methods: finishedGathers() / finishedScatters()
This commit is contained in:
Mark Olesen
2022-03-11 12:15:59 +01:00
parent a674c9d373
commit de8ef5332d
11 changed files with 237 additions and 71 deletions

View File

@ -353,18 +353,15 @@ int main(int argc, char *argv[])
{ {
sendData = identity(500); sendData = identity(500);
for (const int proci : Pstream::allProcs()) for (const int proci : Pstream::subProcs())
{ {
if (proci != Pstream::myProcNo()) UOPstream os(proci, pBufs);
{ os << sendData;
UOPstream os(proci, pBufs);
os << sendData;
}
} }
} }
Info<< "call finishedSends()" << endl; Info<< "call finishedSends()" << endl;
pBufs.finishedSends(); pBufs.finishedScatters();
if (!Pstream::master()) if (!Pstream::master())
{ {

View File

@ -71,14 +71,12 @@ int main(int argc, char *argv[])
if (!Pstream::master()) if (!Pstream::master())
{ {
Perr<< "slave sending to master " Perr<< "sending to master" << endl;
<< Pstream::masterNo() << endl;
UOPstream toMaster(Pstream::masterNo(), pBufs); UOPstream toMaster(Pstream::masterNo(), pBufs);
toMaster << data; toMaster << data;
} }
// Start sending and receiving and block pBufs.finishedGathers();
pBufs.finishedSends();
// Consume // Consume
DynamicList<vector> allData; DynamicList<vector> allData;
@ -87,36 +85,34 @@ int main(int argc, char *argv[])
// Collect my own data // Collect my own data
allData.append(data); allData.append(data);
for (const int slave : Pstream::subProcs()) for (const int proci : Pstream::subProcs())
{ {
Perr << "master receiving from slave " << slave << endl; Perr << "master receiving from " << proci << endl;
UIPstream fromSlave(slave, pBufs); UIPstream fromProc(proci, pBufs);
allData.append(vector(fromSlave)); allData.append(vector(fromProc));
} }
} }
// Send allData back // Send allData back
PstreamBuffers pBufs2(Pstream::commsTypes::nonBlocking); pBufs.clear();
if (Pstream::master()) if (Pstream::master())
{ {
for (const int slave : Pstream::subProcs()) for (const int proci : Pstream::subProcs())
{ {
Perr << "master sending to slave " << slave << endl; Perr << "master sending to " << proci << endl;
UOPstream toSlave(slave, pBufs2); UOPstream toProc(proci, pBufs);
toSlave << allData; toSlave << allData;
} }
} }
// Start sending and receiving and block pBufs.finishedScatters();
pBufs2.finishedSends();
// Consume // Consume
if (!Pstream::master()) if (!Pstream::master())
{ {
Perr<< "slave receiving from master " Perr<< "receive from master" << endl;
<< Pstream::masterNo() << endl; UIPstream fromMaster(Pstream::masterNo(), pBufs);
UIPstream fromMaster(Pstream::masterNo(), pBufs2);
fromMaster >> allData; fromMaster >> allData;
Perr<< allData << endl; Perr<< allData << endl;
} }

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2017-2018 OpenFOAM Foundation Copyright (C) 2017-2018 OpenFOAM Foundation
Copyright (C) 2020-2021 OpenCFD Ltd. Copyright (C) 2020-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -394,7 +394,7 @@ bool Foam::decomposedBlockData::readBlocks
} }
} }
pBufs.finishedSends(); pBufs.finishedScatters();
if (!UPstream::master(comm)) if (!UPstream::master(comm))
{ {
@ -403,7 +403,7 @@ bool Foam::decomposedBlockData::readBlocks
} }
} }
Pstream::scatter(ok, Pstream::msgType(), comm); Pstream::broadcast(ok, comm);
return ok; return ok;
} }
@ -523,7 +523,7 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
ok = is.good(); ok = is.good();
} }
pBufs.finishedSends(); pBufs.finishedScatters();
if (!UPstream::master(comm)) if (!UPstream::master(comm))
{ {
@ -535,7 +535,7 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
} }
} }
Pstream::scatter(ok, Pstream::msgType(), comm); Pstream::broadcast(ok, comm);
//- Set stream properties from realIsPtr on master //- Set stream properties from realIsPtr on master
@ -916,7 +916,7 @@ bool Foam::decomposedBlockData::writeBlocks
{ {
//- Enable to get synchronised error checking. Is the one that keeps //- Enable to get synchronised error checking. Is the one that keeps
// slaves as slow as the master (which does all the writing) // slaves as slow as the master (which does all the writing)
Pstream::scatter(ok, Pstream::msgType(), comm); Pstream::broadcast(ok, comm);
} }
return ok; return ok;

View File

@ -31,7 +31,6 @@ License
#include "OSspecific.H" #include "OSspecific.H"
#include "PstreamBuffers.H" #include "PstreamBuffers.H"
#include "masterUncollatedFileOperation.H" #include "masterUncollatedFileOperation.H"
#include "boolList.H"
#include <algorithm> #include <algorithm>
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
@ -115,7 +114,7 @@ void Foam::masterOFstream::commit()
return; return;
} }
boolList valid(UPstream::listGatherValues<bool>(valid_)); boolList procValid(UPstream::listGatherValues<bool>(valid_));
// Different files // Different files
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
@ -131,18 +130,18 @@ void Foam::masterOFstream::commit()
} }
labelList recvSizes; labelList recvSizes;
pBufs.finishedSends(recvSizes); pBufs.finishedGathers(recvSizes);
if (Pstream::master()) if (Pstream::master())
{ {
// Write master data // Write master data
if (valid[Pstream::masterNo()]) if (procValid[Pstream::masterNo()])
{ {
checkWrite(filePaths[Pstream::masterNo()], this->str()); checkWrite(filePaths[Pstream::masterNo()], this->str());
} }
this->reset(); this->reset();
// Find the max slave size // Find the max receive size
recvSizes[Pstream::masterNo()] = 0; recvSizes[Pstream::masterNo()] = 0;
List<char> buf List<char> buf
( (
@ -156,7 +155,7 @@ void Foam::masterOFstream::commit()
const std::streamsize count(recvSizes[proci]); const std::streamsize count(recvSizes[proci]);
is.read(buf.data(), count); is.read(buf.data(), count);
if (valid[proci]) if (procValid[proci])
{ {
checkWrite(filePaths[proci], buf.cdata(), count); checkWrite(filePaths[proci], buf.cdata(), count);
} }

View File

@ -96,6 +96,66 @@ void Foam::PstreamBuffers::finalExchange
} }
void Foam::PstreamBuffers::finalExchangeGatherScatter
(
const bool isGather,
const bool wait
)
{
// Could also check that it is not called twice
// but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true;
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
labelList recvSizes;
if (isGather)
{
// gather mode (all-to-one): master [0] <- everyone
recvSizes = UPstream::listGatherValues(sendBuf_[0].size(), comm_);
if (!UPstream::master(comm_))
{
recvSizes.resize_nocopy(recvBuf_.size());
recvSizes = Zero;
}
}
else
{
// scatter mode (one-to-all): master [0] -> everyone
recvSizes.resize_nocopy(sendBuf_.size());
if (UPstream::master(comm_))
{
forAll(sendBuf_, proci)
{
recvSizes[proci] = sendBuf_[proci].size();
}
}
const label myRecv(UPstream::listScatterValues(recvSizes, comm_));
recvSizes = Zero;
recvSizes[0] = myRecv;
}
Pstream::exchange<DynamicList<char>, char>
(
sendBuf_,
recvSizes,
recvBuf_,
tag_,
comm_,
wait
);
}
}
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::PstreamBuffers::PstreamBuffers Foam::PstreamBuffers::PstreamBuffers
@ -375,4 +435,48 @@ bool Foam::PstreamBuffers::finishedSends
} }
void Foam::PstreamBuffers::finishedGathers(const bool wait)
{
finalExchangeGatherScatter(true, wait);
}
void Foam::PstreamBuffers::finishedScatters(const bool wait)
{
finalExchangeGatherScatter(false, wait);
}
void Foam::PstreamBuffers::finishedGathers
(
labelList& recvSizes,
const bool wait
)
{
finalExchangeGatherScatter(true, wait);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
FatalErrorInFunction
<< "Obtaining sizes not supported in "
<< UPstream::commsTypeNames[commsType_] << endl
<< " since transfers already in progress. Use non-blocking instead."
<< exit(FatalError);
// Note: maybe possible only if using different tag from write started
// by ~UOPstream. Needs some work.
}
// For nonBlocking mode, simply recover received sizes
// from the buffers themselves.
recvSizes.resize_nocopy(recvBuf_.size());
forAll(recvBuf_, proci)
{
recvSizes[proci] = recvBuf_[proci].size();
}
}
// ************************************************************************* // // ************************************************************************* //

View File

@ -63,6 +63,32 @@ Description
} }
\endcode \endcode
There are additional special versions of finishedSends() for
restricted neighbour communication as well as for special
one-to-all and all-to-one communication patterns.
For example,
\code
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
if (Pstream::master())
{
someObject vals;
for (const int proci : Pstream::subProcs())
{
UOPstream send(proci, pBufs);
send << vals;
}
}
pBufs.finishedScatters();
if (!Pstream::master())
{
UIPstream recv(Pstream::masterNo(), pBufs);
someObject vals(recv);
}
\endcode
SourceFiles SourceFiles
PstreamBuffers.C PstreamBuffers.C
@ -143,6 +169,9 @@ class PstreamBuffers
const bool wait const bool wait
); );
//- For all-to-one or one-to-all
void finalExchangeGatherScatter(const bool isGather, const bool wait);
public: public:
@ -206,6 +235,13 @@ public:
return UPstream::rangeType(static_cast<int>(nProcs())); return UPstream::rangeType(static_cast<int>(nProcs()));
} }
//- Range of sub-processes indices associated with PstreamBuffers
UPstream::rangeType subProcs() const noexcept
{
// Proc 1 -> nProcs (int value)
return UPstream::rangeType(1, static_cast<int>(nProcs()-1));
}
// Queries // Queries
@ -250,7 +286,7 @@ public:
bool allowClearRecv(bool on) noexcept; bool allowClearRecv(bool on) noexcept;
// Functions // Regular Functions
//- Mark sends as done //- Mark sends as done
// //
@ -268,6 +304,9 @@ public:
// \warning currently only valid for nonBlocking comms. // \warning currently only valid for nonBlocking comms.
void finishedSends(labelList& recvSizes, const bool wait = true); void finishedSends(labelList& recvSizes, const bool wait = true);
// Functions with restricted neighbours
//- Mark sends as done using subset of send/recv ranks //- Mark sends as done using subset of send/recv ranks
//- to exchange data on. //- to exchange data on.
// //
@ -360,6 +399,37 @@ public:
{ {
finishedSends(neighProcs, neighProcs, recvSizes, wait); finishedSends(neighProcs, neighProcs, recvSizes, wait);
} }
// Gather/scatter modes
//- Mark all sends to master as done.
//
// Non-blocking mode: populates receive buffers.
//
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedGathers(const bool wait = true);
//- Mark all sends to master as done.
//- Recovers the sizes (bytes) received.
//
// Non-blocking mode: populates receive buffers (all-to-one).
// \param[out] recvSizes the sizes (bytes) received
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedGathers(labelList& recvSizes, const bool wait = true);
//- Mark all sends to sub-procs as done.
//
// Non-blocking mode: populates receive buffers.
//
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedScatters(const bool wait = true);
}; };

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2017-2018 OpenFOAM Foundation Copyright (C) 2017-2018 OpenFOAM Foundation
Copyright (C) 2020 OpenCFD Ltd. Copyright (C) 2020-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -34,7 +34,7 @@ License
template<class Type> template<class Type>
Type Foam::fileOperations::masterUncollatedFileOperation::scatterList Type Foam::fileOperations::masterUncollatedFileOperation::scatterList
( (
const UList<Type>& masterLst, const UList<Type>& allValues,
const int tag, const int tag,
const label comm const label comm
) const ) const
@ -46,23 +46,23 @@ Type Foam::fileOperations::masterUncollatedFileOperation::scatterList
for (const int proci : Pstream::subProcs(comm)) for (const int proci : Pstream::subProcs(comm))
{ {
UOPstream os(proci, pBufs); UOPstream os(proci, pBufs);
os << masterLst[proci]; os << allValues[proci];
} }
} }
pBufs.finishedSends(); pBufs.finishedScatters();
Type myResult; Type value;
if (Pstream::master(comm)) if (Pstream::master(comm))
{ {
myResult = masterLst[Pstream::myProcNo(comm)]; value = allValues[0];
} }
else else
{ {
UIPstream is(Pstream::masterNo(), pBufs); UIPstream is(Pstream::masterNo(), pBufs);
is >> myResult; is >> value;
} }
return myResult; return value;
} }

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com \\ / A nd | www.openfoam.com
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2015-2021 OpenCFD Ltd. Copyright (C) 2015-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -108,9 +108,10 @@ void Foam::functionObjects::externalCoupled::readColumns
) const ) const
{ {
// Get sizes for all processors // Get sizes for all processors
const globalIndex globalFaces(nRows); const globalIndex globalFaces(nRows, globalIndex::gatherOnly{});
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
if (Pstream::master()) if (Pstream::master())
{ {
string line; string line;
@ -158,15 +159,15 @@ void Foam::functionObjects::externalCoupled::readColumns
} }
// Send to proci // Send to proci
UOPstream str(proci, pBufs); UOPstream toProc(proci, pBufs);
str << values; toProc << values;
} }
} }
pBufs.finishedSends(); pBufs.finishedScatters();
// Read from PstreamBuffers // Get scattered data from PstreamBuffers
UIPstream str(Pstream::masterNo(), pBufs); UIPstream fromMaster(UPstream::masterNo(), pBufs);
str >> data; fromMaster >> data;
} }
@ -178,7 +179,7 @@ void Foam::functionObjects::externalCoupled::readLines
) const ) const
{ {
// Get sizes for all processors // Get sizes for all processors
const globalIndex globalFaces(nRows); const globalIndex globalFaces(nRows, globalIndex::gatherOnly{});
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
@ -220,14 +221,13 @@ void Foam::functionObjects::externalCoupled::readLines
} }
} }
pBufs.finishedScatters();
pBufs.finishedSends(); // Get scattered data from PstreamBuffers
UIPstream fromMaster(UPstream::masterNo(), pBufs);
// Read lines from PstreamBuffers
UIPstream str(Pstream::masterNo(), pBufs);
for (label rowi = 0; rowi < nRows; ++rowi) for (label rowi = 0; rowi < nRows; ++rowi)
{ {
string line(str); string line(fromMaster);
lines << line.c_str() << nl; lines << line.c_str() << nl;
} }
} }

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com \\ / A nd | www.openfoam.com
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2018-2020 OpenCFD Ltd. Copyright (C) 2018-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -223,11 +223,12 @@ bool Foam::functionObjects::energySpectrum::write()
{ {
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
UOPstream toProc(Pstream::masterNo(), pBufs); {
UOPstream toMaster(Pstream::masterNo(), pBufs);
toMaster << Uc << C << cellAddr_;
}
toProc << Uc << C << cellAddr_; pBufs.finishedGathers();
pBufs.finishedSends();
if (Pstream::master()) if (Pstream::master())
{ {

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com \\ / A nd | www.openfoam.com
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2021 OpenCFD Ltd. Copyright (C) 2021-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -156,7 +156,7 @@ void Foam::faMeshReconstructor::calcAddressing
faFaceProcAddr_ = labelList(faFaceProcAddr_, localOrder); faFaceProcAddr_ = labelList(faFaceProcAddr_, localOrder);
} }
pBufs.finishedSends(); pBufs.finishedScatters();
if (!Pstream::master()) if (!Pstream::master())
{ {

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com \\ / A nd | www.openfoam.com
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2015-2021 OpenCFD Ltd. Copyright (C) 2015-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -179,11 +179,10 @@ void surfaceNoise::readSurfaceData
} }
} }
pBufs.finishedSends(); pBufs.finishedScatters();
// Receive data from the master // Receive data from the master
UIPstream fromProc(Pstream::masterNo(), pBufs); UIPstream fromProc(Pstream::masterNo(), pBufs);
scalarList pSlice(fromProc); scalarList pSlice(fromProc);
forAll(pSlice, faceI) forAll(pSlice, faceI)
@ -264,7 +263,7 @@ scalar surfaceNoise::writeSurfaceData
toProc << data; toProc << data;
} }
pBufs.finishedSends(); pBufs.finishedGathers();
scalar areaAverage = 0; scalar areaAverage = 0;
if (Pstream::master()) if (Pstream::master())
@ -379,7 +378,7 @@ scalar surfaceNoise::surfaceAverage
toProc << data; toProc << data;
} }
pBufs.finishedSends(); pBufs.finishedGathers();
scalar areaAverage = 0; scalar areaAverage = 0;
if (Pstream::master()) if (Pstream::master())