diff --git a/applications/test/parallel-chunks/Test-parallel-chunks.C b/applications/test/parallel-chunks/Test-parallel-chunks.C index 36a6cc63c8..8c4360cbd9 100644 --- a/applications/test/parallel-chunks/Test-parallel-chunks.C +++ b/applications/test/parallel-chunks/Test-parallel-chunks.C @@ -353,18 +353,15 @@ int main(int argc, char *argv[]) { sendData = identity(500); - for (const int proci : Pstream::allProcs()) + for (const int proci : Pstream::subProcs()) { - if (proci != Pstream::myProcNo()) - { - UOPstream os(proci, pBufs); - os << sendData; - } + UOPstream os(proci, pBufs); + os << sendData; } } Info<< "call finishedSends()" << endl; - pBufs.finishedSends(); + pBufs.finishedScatters(); if (!Pstream::master()) { diff --git a/applications/test/parallel-nonBlocking/Test-parallel-nonBlocking.C b/applications/test/parallel-nonBlocking/Test-parallel-nonBlocking.C index 8d80b9bab5..a1885891d8 100644 --- a/applications/test/parallel-nonBlocking/Test-parallel-nonBlocking.C +++ b/applications/test/parallel-nonBlocking/Test-parallel-nonBlocking.C @@ -71,14 +71,12 @@ int main(int argc, char *argv[]) if (!Pstream::master()) { - Perr<< "slave sending to master " - << Pstream::masterNo() << endl; + Perr<< "sending to master" << endl; UOPstream toMaster(Pstream::masterNo(), pBufs); toMaster << data; } - // Start sending and receiving and block - pBufs.finishedSends(); + pBufs.finishedGathers(); // Consume DynamicList allData; @@ -87,36 +85,34 @@ int main(int argc, char *argv[]) // Collect my own data allData.append(data); - for (const int slave : Pstream::subProcs()) + for (const int proci : Pstream::subProcs()) { - Perr << "master receiving from slave " << slave << endl; - UIPstream fromSlave(slave, pBufs); - allData.append(vector(fromSlave)); + Perr << "master receiving from " << proci << endl; + UIPstream fromProc(proci, pBufs); + allData.append(vector(fromProc)); } } // Send allData back - PstreamBuffers pBufs2(Pstream::commsTypes::nonBlocking); + pBufs.clear(); if (Pstream::master()) { - for (const int slave : Pstream::subProcs()) + for (const int proci : Pstream::subProcs()) { - Perr << "master sending to slave " << slave << endl; - UOPstream toSlave(slave, pBufs2); + Perr << "master sending to " << proci << endl; + UOPstream toProc(proci, pBufs); toSlave << allData; } } - // Start sending and receiving and block - pBufs2.finishedSends(); + pBufs.finishedScatters(); // Consume if (!Pstream::master()) { - Perr<< "slave receiving from master " - << Pstream::masterNo() << endl; - UIPstream fromMaster(Pstream::masterNo(), pBufs2); + Perr<< "receive from master" << endl; + UIPstream fromMaster(Pstream::masterNo(), pBufs); fromMaster >> allData; Perr<< allData << endl; } diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C index 0c05da957a..be7b9dbee8 100644 --- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C +++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2017-2018 OpenFOAM Foundation - Copyright (C) 2020-2021 OpenCFD Ltd. + Copyright (C) 2020-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -394,7 +394,7 @@ bool Foam::decomposedBlockData::readBlocks } } - pBufs.finishedSends(); + pBufs.finishedScatters(); if (!UPstream::master(comm)) { @@ -403,7 +403,7 @@ bool Foam::decomposedBlockData::readBlocks } } - Pstream::scatter(ok, Pstream::msgType(), comm); + Pstream::broadcast(ok, comm); return ok; } @@ -523,7 +523,7 @@ Foam::autoPtr Foam::decomposedBlockData::readBlocks ok = is.good(); } - pBufs.finishedSends(); + pBufs.finishedScatters(); if (!UPstream::master(comm)) { @@ -535,7 +535,7 @@ Foam::autoPtr Foam::decomposedBlockData::readBlocks } } - Pstream::scatter(ok, Pstream::msgType(), comm); + Pstream::broadcast(ok, comm); //- Set stream properties from realIsPtr on master @@ -916,7 +916,7 @@ bool Foam::decomposedBlockData::writeBlocks { //- Enable to get synchronised error checking. Is the one that keeps // slaves as slow as the master (which does all the writing) - Pstream::scatter(ok, Pstream::msgType(), comm); + Pstream::broadcast(ok, comm); } return ok; diff --git a/src/OpenFOAM/db/IOstreams/Fstreams/masterOFstream.C b/src/OpenFOAM/db/IOstreams/Fstreams/masterOFstream.C index ec3d964cf5..cc2d5a472a 100644 --- a/src/OpenFOAM/db/IOstreams/Fstreams/masterOFstream.C +++ b/src/OpenFOAM/db/IOstreams/Fstreams/masterOFstream.C @@ -31,7 +31,6 @@ License #include "OSspecific.H" #include "PstreamBuffers.H" #include "masterUncollatedFileOperation.H" -#include "boolList.H" #include // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // @@ -115,7 +114,7 @@ void Foam::masterOFstream::commit() return; } - boolList valid(UPstream::listGatherValues(valid_)); + boolList procValid(UPstream::listGatherValues(valid_)); // Different files PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); @@ -131,18 +130,18 @@ void Foam::masterOFstream::commit() } labelList recvSizes; - pBufs.finishedSends(recvSizes); + pBufs.finishedGathers(recvSizes); if (Pstream::master()) { // Write master data - if (valid[Pstream::masterNo()]) + if (procValid[Pstream::masterNo()]) { checkWrite(filePaths[Pstream::masterNo()], this->str()); } this->reset(); - // Find the max slave size + // Find the max receive size recvSizes[Pstream::masterNo()] = 0; List buf ( @@ -156,7 +155,7 @@ void Foam::masterOFstream::commit() const std::streamsize count(recvSizes[proci]); is.read(buf.data(), count); - if (valid[proci]) + if (procValid[proci]) { checkWrite(filePaths[proci], buf.cdata(), count); } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C index d7e4c3d87e..55d892b9c4 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C @@ -96,6 +96,66 @@ void Foam::PstreamBuffers::finalExchange } +void Foam::PstreamBuffers::finalExchangeGatherScatter +( + const bool isGather, + const bool wait +) +{ + // Could also check that it is not called twice + // but that is used for overlapping send/recv (eg, overset) + finishedSendsCalled_ = true; + + if (commsType_ == UPstream::commsTypes::nonBlocking) + { + labelList recvSizes; + + if (isGather) + { + // gather mode (all-to-one): master [0] <- everyone + + recvSizes = UPstream::listGatherValues(sendBuf_[0].size(), comm_); + + if (!UPstream::master(comm_)) + { + recvSizes.resize_nocopy(recvBuf_.size()); + recvSizes = Zero; + } + } + else + { + // scatter mode (one-to-all): master [0] -> everyone + + recvSizes.resize_nocopy(sendBuf_.size()); + + if (UPstream::master(comm_)) + { + forAll(sendBuf_, proci) + { + recvSizes[proci] = sendBuf_[proci].size(); + } + } + + const label myRecv(UPstream::listScatterValues(recvSizes, comm_)); + + recvSizes = Zero; + recvSizes[0] = myRecv; + } + + + Pstream::exchange, char> + ( + sendBuf_, + recvSizes, + recvBuf_, + tag_, + comm_, + wait + ); + } +} + + // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // Foam::PstreamBuffers::PstreamBuffers @@ -375,4 +435,48 @@ bool Foam::PstreamBuffers::finishedSends } +void Foam::PstreamBuffers::finishedGathers(const bool wait) +{ + finalExchangeGatherScatter(true, wait); +} + + +void Foam::PstreamBuffers::finishedScatters(const bool wait) +{ + finalExchangeGatherScatter(false, wait); +} + + +void Foam::PstreamBuffers::finishedGathers +( + labelList& recvSizes, + const bool wait +) +{ + finalExchangeGatherScatter(true, wait); + + if (commsType_ != UPstream::commsTypes::nonBlocking) + { + FatalErrorInFunction + << "Obtaining sizes not supported in " + << UPstream::commsTypeNames[commsType_] << endl + << " since transfers already in progress. Use non-blocking instead." + << exit(FatalError); + + // Note: maybe possible only if using different tag from write started + // by ~UOPstream. Needs some work. + } + + // For nonBlocking mode, simply recover received sizes + // from the buffers themselves. + + recvSizes.resize_nocopy(recvBuf_.size()); + + forAll(recvBuf_, proci) + { + recvSizes[proci] = recvBuf_[proci].size(); + } +} + + // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H index 81666658e6..75209b559d 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H @@ -63,6 +63,32 @@ Description } \endcode + There are additional special versions of finishedSends() for + restricted neighbour communication as well as for special + one-to-all and all-to-one communication patterns. + For example, + \code + PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); + + if (Pstream::master()) + { + someObject vals; + for (const int proci : Pstream::subProcs()) + { + UOPstream send(proci, pBufs); + send << vals; + } + } + + pBufs.finishedScatters(); + + if (!Pstream::master()) + { + UIPstream recv(Pstream::masterNo(), pBufs); + someObject vals(recv); + } + \endcode + SourceFiles PstreamBuffers.C @@ -143,6 +169,9 @@ class PstreamBuffers const bool wait ); + //- For all-to-one or one-to-all + void finalExchangeGatherScatter(const bool isGather, const bool wait); + public: @@ -206,6 +235,13 @@ public: return UPstream::rangeType(static_cast(nProcs())); } + //- Range of sub-processes indices associated with PstreamBuffers + UPstream::rangeType subProcs() const noexcept + { + // Proc 1 -> nProcs (int value) + return UPstream::rangeType(1, static_cast(nProcs()-1)); + } + // Queries @@ -250,7 +286,7 @@ public: bool allowClearRecv(bool on) noexcept; - // Functions + // Regular Functions //- Mark sends as done // @@ -268,6 +304,9 @@ public: // \warning currently only valid for nonBlocking comms. void finishedSends(labelList& recvSizes, const bool wait = true); + + // Functions with restricted neighbours + //- Mark sends as done using subset of send/recv ranks //- to exchange data on. // @@ -360,6 +399,37 @@ public: { finishedSends(neighProcs, neighProcs, recvSizes, wait); } + + + // Gather/scatter modes + + //- Mark all sends to master as done. + // + // Non-blocking mode: populates receive buffers. + // + // \param wait wait for requests to complete (in nonBlocking mode) + // + // \warning currently only valid for nonBlocking comms. + void finishedGathers(const bool wait = true); + + //- Mark all sends to master as done. + //- Recovers the sizes (bytes) received. + // + // Non-blocking mode: populates receive buffers (all-to-one). + // \param[out] recvSizes the sizes (bytes) received + // \param wait wait for requests to complete (in nonBlocking mode) + // + // \warning currently only valid for nonBlocking comms. + void finishedGathers(labelList& recvSizes, const bool wait = true); + + //- Mark all sends to sub-procs as done. + // + // Non-blocking mode: populates receive buffers. + // + // \param wait wait for requests to complete (in nonBlocking mode) + // + // \warning currently only valid for nonBlocking comms. + void finishedScatters(const bool wait = true); }; diff --git a/src/OpenFOAM/global/fileOperations/masterUncollatedFileOperation/masterUncollatedFileOperationTemplates.C b/src/OpenFOAM/global/fileOperations/masterUncollatedFileOperation/masterUncollatedFileOperationTemplates.C index 9b6228f3ab..8fc8fed149 100644 --- a/src/OpenFOAM/global/fileOperations/masterUncollatedFileOperation/masterUncollatedFileOperationTemplates.C +++ b/src/OpenFOAM/global/fileOperations/masterUncollatedFileOperation/masterUncollatedFileOperationTemplates.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2017-2018 OpenFOAM Foundation - Copyright (C) 2020 OpenCFD Ltd. + Copyright (C) 2020-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -34,7 +34,7 @@ License template Type Foam::fileOperations::masterUncollatedFileOperation::scatterList ( - const UList& masterLst, + const UList& allValues, const int tag, const label comm ) const @@ -46,23 +46,23 @@ Type Foam::fileOperations::masterUncollatedFileOperation::scatterList for (const int proci : Pstream::subProcs(comm)) { UOPstream os(proci, pBufs); - os << masterLst[proci]; + os << allValues[proci]; } } - pBufs.finishedSends(); + pBufs.finishedScatters(); - Type myResult; + Type value; if (Pstream::master(comm)) { - myResult = masterLst[Pstream::myProcNo(comm)]; + value = allValues[0]; } else { UIPstream is(Pstream::masterNo(), pBufs); - is >> myResult; + is >> value; } - return myResult; + return value; } diff --git a/src/functionObjects/field/externalCoupled/externalCoupled.C b/src/functionObjects/field/externalCoupled/externalCoupled.C index 07c712fbde..0feb198e42 100644 --- a/src/functionObjects/field/externalCoupled/externalCoupled.C +++ b/src/functionObjects/field/externalCoupled/externalCoupled.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2015-2021 OpenCFD Ltd. + Copyright (C) 2015-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -108,9 +108,10 @@ void Foam::functionObjects::externalCoupled::readColumns ) const { // Get sizes for all processors - const globalIndex globalFaces(nRows); + const globalIndex globalFaces(nRows, globalIndex::gatherOnly{}); PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); + if (Pstream::master()) { string line; @@ -158,15 +159,15 @@ void Foam::functionObjects::externalCoupled::readColumns } // Send to proci - UOPstream str(proci, pBufs); - str << values; + UOPstream toProc(proci, pBufs); + toProc << values; } } - pBufs.finishedSends(); + pBufs.finishedScatters(); - // Read from PstreamBuffers - UIPstream str(Pstream::masterNo(), pBufs); - str >> data; + // Get scattered data from PstreamBuffers + UIPstream fromMaster(UPstream::masterNo(), pBufs); + fromMaster >> data; } @@ -178,7 +179,7 @@ void Foam::functionObjects::externalCoupled::readLines ) const { // Get sizes for all processors - const globalIndex globalFaces(nRows); + const globalIndex globalFaces(nRows, globalIndex::gatherOnly{}); PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); @@ -220,14 +221,13 @@ void Foam::functionObjects::externalCoupled::readLines } } + pBufs.finishedScatters(); - pBufs.finishedSends(); - - // Read lines from PstreamBuffers - UIPstream str(Pstream::masterNo(), pBufs); + // Get scattered data from PstreamBuffers + UIPstream fromMaster(UPstream::masterNo(), pBufs); for (label rowi = 0; rowi < nRows; ++rowi) { - string line(str); + string line(fromMaster); lines << line.c_str() << nl; } } diff --git a/src/functionObjects/randomProcesses/energySpectrum/energySpectrum.C b/src/functionObjects/randomProcesses/energySpectrum/energySpectrum.C index d918de3e4b..fe7072831c 100644 --- a/src/functionObjects/randomProcesses/energySpectrum/energySpectrum.C +++ b/src/functionObjects/randomProcesses/energySpectrum/energySpectrum.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2018-2020 OpenCFD Ltd. + Copyright (C) 2018-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -223,11 +223,12 @@ bool Foam::functionObjects::energySpectrum::write() { PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); - UOPstream toProc(Pstream::masterNo(), pBufs); + { + UOPstream toMaster(Pstream::masterNo(), pBufs); + toMaster << Uc << C << cellAddr_; + } - toProc << Uc << C << cellAddr_; - - pBufs.finishedSends(); + pBufs.finishedGathers(); if (Pstream::master()) { diff --git a/src/parallel/reconstruct/faReconstruct/faMeshReconstructor.C b/src/parallel/reconstruct/faReconstruct/faMeshReconstructor.C index 73d90d8362..4f0e22e40a 100644 --- a/src/parallel/reconstruct/faReconstruct/faMeshReconstructor.C +++ b/src/parallel/reconstruct/faReconstruct/faMeshReconstructor.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2021 OpenCFD Ltd. + Copyright (C) 2021-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -156,7 +156,7 @@ void Foam::faMeshReconstructor::calcAddressing faFaceProcAddr_ = labelList(faFaceProcAddr_, localOrder); } - pBufs.finishedSends(); + pBufs.finishedScatters(); if (!Pstream::master()) { diff --git a/src/randomProcesses/noise/noiseModels/surfaceNoise/surfaceNoise.C b/src/randomProcesses/noise/noiseModels/surfaceNoise/surfaceNoise.C index b6129ad0b0..e914399718 100644 --- a/src/randomProcesses/noise/noiseModels/surfaceNoise/surfaceNoise.C +++ b/src/randomProcesses/noise/noiseModels/surfaceNoise/surfaceNoise.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2015-2021 OpenCFD Ltd. + Copyright (C) 2015-2022 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -179,11 +179,10 @@ void surfaceNoise::readSurfaceData } } - pBufs.finishedSends(); + pBufs.finishedScatters(); // Receive data from the master UIPstream fromProc(Pstream::masterNo(), pBufs); - scalarList pSlice(fromProc); forAll(pSlice, faceI) @@ -264,7 +263,7 @@ scalar surfaceNoise::writeSurfaceData toProc << data; } - pBufs.finishedSends(); + pBufs.finishedGathers(); scalar areaAverage = 0; if (Pstream::master()) @@ -379,7 +378,7 @@ scalar surfaceNoise::surfaceAverage toProc << data; } - pBufs.finishedSends(); + pBufs.finishedGathers(); scalar areaAverage = 0; if (Pstream::master())