diff --git a/src/OpenFOAM/db/IOstreams/Fstreams/masterOFstream.C b/src/OpenFOAM/db/IOstreams/Fstreams/masterOFstream.C index 91a4f68534..58f1caddf3 100644 --- a/src/OpenFOAM/db/IOstreams/Fstreams/masterOFstream.C +++ b/src/OpenFOAM/db/IOstreams/Fstreams/masterOFstream.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2017 OpenFOAM Foundation - Copyright (C) 2020-2022 OpenCFD Ltd. + Copyright (C) 2020-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -31,7 +31,6 @@ License #include "OSspecific.H" #include "PstreamBuffers.H" #include "masterUncollatedFileOperation.H" -#include // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // @@ -111,49 +110,45 @@ void Foam::masterOFstream::commit() return; } - boolList procValid(UPstream::listGatherValues(valid_)); - // Different files PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); - // Send my buffer to master + // Send my (valid) buffer to master if (!Pstream::master()) { - UOPstream os(Pstream::masterNo(), pBufs); - string s(this->str()); - this->reset(); + if (valid_) + { + string s(this->str()); - os.write(s.data(), s.length()); + UOPstream os(Pstream::masterNo(), pBufs); + os.write(s.data(), s.length()); + } + this->reset(); } - labelList recvSizes; - pBufs.finishedGathers(recvSizes); + pBufs.finishedGathers(); if (Pstream::master()) { - // Write master data - if (procValid[Pstream::masterNo()]) + // Write (valid) master data + if (valid_) { checkWrite(filePaths[Pstream::masterNo()], this->str()); } this->reset(); - // Find the max receive size - recvSizes[Pstream::masterNo()] = 0; - List buf - ( - *std::max_element(recvSizes.cbegin(), recvSizes.cend()) - ); + // Allocate large enough to read without resizing + List buf(pBufs.maxRecvCount()); for (const int proci : Pstream::subProcs()) { - UIPstream is(proci, pBufs); + const std::streamsize count(pBufs.recvDataCount(proci)); - const std::streamsize count(recvSizes[proci]); - is.read(buf.data(), count); - - if (procValid[proci]) + if (count) { + UIPstream is(proci, pBufs); + + is.read(buf.data(), count); checkWrite(filePaths[proci], buf.cdata(), count); } } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C index 880ec1cd6e..ff37820401 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C @@ -33,13 +33,15 @@ License void Foam::PstreamBuffers::finalExchange ( - labelList& recvSizes, - const bool wait + const bool wait, + const bool needSizes, + labelList& recvSizes ) { // Could also check that it is not called twice // but that is used for overlapping send/recv (eg, overset) finishedSendsCalled_ = true; + recvPositions_ = Zero; if (commsType_ == UPstream::commsTypes::nonBlocking) { @@ -60,10 +62,13 @@ void Foam::PstreamBuffers::finalExchange ); // Copy back out - recvSizes.resize_nocopy(recvBuffers_.size()); - forAll(recvBuffers_, proci) + if (needSizes) { - recvSizes[proci] = recvBuffers_[proci].size(); + recvSizes.resize_nocopy(recvBuffers_.size()); + forAll(recvBuffers_, proci) + { + recvSizes[proci] = recvBuffers_[proci].size(); + } } return; @@ -89,13 +94,15 @@ void Foam::PstreamBuffers::finalExchange ( const labelUList& sendProcs, const labelUList& recvProcs, - labelList& recvSizes, - const bool wait + const bool wait, + const bool needSizes, // unused + labelList& recvSizes ) { // Could also check that it is not called twice // but that is used for overlapping send/recv (eg, overset) finishedSendsCalled_ = true; + recvPositions_ = Zero; if (commsType_ == UPstream::commsTypes::nonBlocking) { @@ -125,17 +132,18 @@ void Foam::PstreamBuffers::finalExchange void Foam::PstreamBuffers::finalExchangeGatherScatter ( const bool isGather, - const bool wait + const bool wait, + const bool needSizes, + labelList& recvSizes ) { // Could also check that it is not called twice // but that is used for overlapping send/recv (eg, overset) finishedSendsCalled_ = true; + recvPositions_ = Zero; if (commsType_ == UPstream::commsTypes::nonBlocking) { - labelList recvSizes; - if (isGather) { // gather mode (all-to-one): master [0] <- everyone @@ -145,7 +153,7 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter if (!UPstream::master(comm_)) { - recvSizes.resize_nocopy(recvBuffers_.size()); + recvSizes.resize_nocopy(nProcs_); recvSizes = Zero; } } @@ -153,7 +161,7 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter { // scatter mode (one-to-all): master [0] -> everyone - recvSizes.resize_nocopy(sendBuffers_.size()); + recvSizes.resize_nocopy(nProcs_); if (UPstream::master(comm_)) { @@ -187,9 +195,9 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter Foam::PstreamBuffers::PstreamBuffers ( - const UPstream::commsTypes commsType, - const int tag, - const label comm, + UPstream::commsTypes commsType, + int tag, + label communicator, IOstreamOption::streamFormat fmt ) : @@ -198,30 +206,11 @@ Foam::PstreamBuffers::PstreamBuffers format_(fmt), commsType_(commsType), tag_(tag), - comm_(comm), - sendBuffers_(UPstream::nProcs(comm)), - recvBuffers_(UPstream::nProcs(comm)), - recvPositions_(UPstream::nProcs(comm), Zero) -{} - - -Foam::PstreamBuffers::PstreamBuffers -( - const label comm, - const UPstream::commsTypes commsType, - const int tag, - IOstreamOption::streamFormat fmt -) -: - finishedSendsCalled_(false), - allowClearRecv_(true), - format_(fmt), - commsType_(commsType), - tag_(tag), - comm_(comm), - sendBuffers_(UPstream::nProcs(comm)), - recvBuffers_(UPstream::nProcs(comm)), - recvPositions_(UPstream::nProcs(comm), Zero) + comm_(communicator), + nProcs_(UPstream::nProcs(comm_)), + sendBuffers_(nProcs_), + recvBuffers_(nProcs_), + recvPositions_(nProcs_, Zero) {} @@ -284,7 +273,7 @@ void Foam::PstreamBuffers::clear() { buf.clear(); } - recvPositions_ = 0; + recvPositions_ = Zero; finishedSendsCalled_ = false; } @@ -309,7 +298,7 @@ void Foam::PstreamBuffers::clearStorage() { buf.clearStorage(); } - recvPositions_ = 0; + recvPositions_ = Zero; finishedSendsCalled_ = false; } @@ -409,6 +398,46 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const } +Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount(const label proci) const +{ + label maxLen = 0; + + if (finishedSendsCalled_) + { + forAll(recvPositions_, i) + { + if (i != proci) + { + const label len(recvBuffers_[i].size() - recvPositions_[i]); + maxLen = max(maxLen, len); + } + } + } + #ifdef FULLDEBUG + else + { + FatalErrorInFunction + << "Call finishedSends first" << exit(FatalError); + } + #endif + + return maxLen; +} + + +Foam::label Foam::PstreamBuffers::maxRecvCount() const +{ + // Use out-of-range proci to avoid excluding any processor + return maxNonLocalRecvCount(-1); +} + + +Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount() const +{ + return maxNonLocalRecvCount(UPstream::myProcNo(comm_)); +} + + const Foam::UList Foam::PstreamBuffers::peekRecvData(const label proci) const { @@ -449,7 +478,7 @@ bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept void Foam::PstreamBuffers::finishedSends(const bool wait) { labelList recvSizes; - finalExchange(recvSizes, wait); + finalExchange(wait, false, recvSizes); } @@ -459,7 +488,7 @@ void Foam::PstreamBuffers::finishedSends const bool wait ) { - finalExchange(recvSizes, wait); + finalExchange(wait, true, recvSizes); if (commsType_ != UPstream::commsTypes::nonBlocking) { @@ -483,7 +512,7 @@ void Foam::PstreamBuffers::finishedSends ) { labelList recvSizes; - finalExchange(sendProcs, recvProcs, recvSizes, wait); + finalExchange(sendProcs, recvProcs, wait, false, recvSizes); } @@ -495,7 +524,7 @@ void Foam::PstreamBuffers::finishedSends const bool wait ) { - finalExchange(sendProcs, recvProcs, recvSizes, wait); + finalExchange(sendProcs, recvProcs, wait, true, recvSizes); if (commsType_ != UPstream::commsTypes::nonBlocking) { @@ -555,14 +584,14 @@ bool Foam::PstreamBuffers::finishedSends } } - finishedSends(wait); // All-to-all + labelList recvSizes; + finishedSends(recvSizes, wait); // All-to-all // The recv ranks recvProcs.clear(); - forAll(recvBuffers_, proci) + forAll(recvSizes, proci) { - // ie, recvDataCount(proci) - if (!recvBuffers_[proci].empty()) + if (recvSizes[proci] > 0) { recvProcs.push_back(proci); } @@ -581,13 +610,15 @@ bool Foam::PstreamBuffers::finishedSends void Foam::PstreamBuffers::finishedGathers(const bool wait) { - finalExchangeGatherScatter(true, wait); + labelList recvSizes; + finalExchangeGatherScatter(true, wait, false, recvSizes); } void Foam::PstreamBuffers::finishedScatters(const bool wait) { - finalExchangeGatherScatter(false, wait); + labelList recvSizes; + finalExchangeGatherScatter(false, wait, false, recvSizes); } @@ -597,7 +628,7 @@ void Foam::PstreamBuffers::finishedGathers const bool wait ) { - finalExchangeGatherScatter(true, wait); + finalExchangeGatherScatter(true, wait, true, recvSizes); if (commsType_ != UPstream::commsTypes::nonBlocking) { @@ -610,11 +641,6 @@ void Foam::PstreamBuffers::finishedGathers // Note: maybe possible only if using different tag from write started // by ~UOPstream. Needs some work. } - - // For nonBlocking mode, simply recover received sizes - // from the buffers themselves. - - recvSizes = recvDataCounts(); } @@ -624,7 +650,7 @@ void Foam::PstreamBuffers::finishedScatters const bool wait ) { - finalExchangeGatherScatter(false, wait); + finalExchangeGatherScatter(false, wait, true, recvSizes); if (commsType_ != UPstream::commsTypes::nonBlocking) { @@ -637,11 +663,6 @@ void Foam::PstreamBuffers::finishedScatters // Note: maybe possible only if using different tag from write started // by ~UOPstream. Needs some work. } - - // For nonBlocking mode, simply recover received sizes - // from the buffers themselves. - - recvSizes = recvDataCounts(); } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H index b9f5b04b1b..c3912569c9 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H @@ -137,8 +137,8 @@ class PstreamBuffers //- Communicator const label comm_; - // Number of ranks associated with PstreamBuffers - // currently obtained from recvPositions_ + //- Number of ranks associated with PstreamBuffers (at construction) + const label nProcs_; // Buffer storage @@ -157,7 +157,12 @@ class PstreamBuffers //- Mark all sends as having been done. // This will start receives (nonBlocking comms). - void finalExchange(labelList& recvSizes, const bool wait); + void finalExchange + ( + const bool wait, + const bool needSizes, // If recvSizes needed or scratch + labelList& recvSizes + ); //- Mark sends as done. // Only exchange sizes using the sendProcs/recvProcs subset @@ -166,12 +171,19 @@ class PstreamBuffers ( const labelUList& sendProcs, const labelUList& recvProcs, - labelList& recvSizes, - const bool wait + const bool wait, + const bool needSizes, // If recvSizes needed or scratch + labelList& recvSizes ); //- For all-to-one or one-to-all - void finalExchangeGatherScatter(const bool isGather, const bool wait); + void finalExchangeGatherScatter + ( + const bool isGather, + const bool wait, + const bool needSizes, // If recvSizes needed or scratch + labelList& recvSizes + ); // Friendship Access @@ -194,23 +206,41 @@ public: // Constructors - //- Construct given comms type, message tag, communicator, IO format + //- Construct given communication type (default: nonBlocking), message + //- tag, communicator (default: worldComm), IO format (default: binary) explicit PstreamBuffers ( - const UPstream::commsTypes commsType, - const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm, + UPstream::commsTypes commsType = UPstream::commsTypes::nonBlocking, + int tag = UPstream::msgType(), + label communicator = UPstream::worldComm, IOstreamOption::streamFormat fmt = IOstreamOption::BINARY ); - //- Construct given communicator, comms type, message tag, IO format + //- Construct given communicator, communication type + //- (default: nonBlocking), message tag, IO format (default: binary) explicit PstreamBuffers ( - const label comm, - const UPstream::commsTypes commsType, - const int tag = UPstream::msgType(), + label communicator, + UPstream::commsTypes commsType = UPstream::commsTypes::nonBlocking, + int tag = UPstream::msgType(), IOstreamOption::streamFormat fmt = IOstreamOption::BINARY - ); + ) + : + PstreamBuffers(commsType, tag, communicator, fmt) + {} + + //- Construct given communicator, message tag, communication type + //- (default: nonBlocking), IO format (default: binary) + PstreamBuffers + ( + label communicator, + int tag, + UPstream::commsTypes commsType = UPstream::commsTypes::nonBlocking, + IOstreamOption::streamFormat fmt = IOstreamOption::BINARY + ) + : + PstreamBuffers(commsType, tag, communicator, fmt) + {} //- Destructor - checks that all data have been consumed @@ -248,7 +278,7 @@ public: //- Number of ranks associated with PstreamBuffers label nProcs() const noexcept { - return recvPositions_.size(); + return nProcs_; } @@ -258,14 +288,14 @@ public: UPstream::rangeType allProcs() const noexcept { // Proc 0 -> nProcs (int value) - return UPstream::rangeType(static_cast(nProcs())); + return UPstream::rangeType(static_cast(nProcs_)); } //- Range of sub-processes indices associated with PstreamBuffers UPstream::rangeType subProcs() const noexcept { // Proc 1 -> nProcs (int value) - return UPstream::rangeType(1, static_cast(nProcs()-1)); + return UPstream::rangeType(1, static_cast(nProcs_-1)); } @@ -302,6 +332,18 @@ public: //- Must call finishedSends() or other finished.. method first! labelList recvDataCounts() const; + //- Maximum receive size from any rocessor rank. + //- Must call finishedSends() or other finished.. method first! + label maxRecvCount() const; + + //- Maximum receive size, excluding current processor rank + //- Must call finishedSends() or other finished.. method first! + label maxNonLocalRecvCount() const; + + //- Maximum receive size, excluding the specified processor rank + //- Must call finishedSends() or other finished.. method first! + label maxNonLocalRecvCount(const label proci) const; + //- Number of unconsumed receive bytes for the specified processor. //- Must call finishedSends() or other finished.. method first! // The method is only useful in limited situations, such as when @@ -447,7 +489,7 @@ public: //- Mark all sends to master as done. // // Non-blocking mode: populates receive buffers. - // Can use recvDataCounts() method to recover sizes received. + // Can use recvDataCount, maxRecvCount etc to recover sizes received. // // \param wait wait for requests to complete (in nonBlocking mode) // @@ -467,7 +509,7 @@ public: //- Mark all sends to sub-procs as done. // // Non-blocking mode: populates receive buffers. - // Can use recvDataCounts() method to recover sizes received. + // Can use recvDataCount, maxRecvCount etc to recover sizes received. // // \param wait wait for requests to complete (in nonBlocking mode) // diff --git a/src/OpenFOAM/global/fileOperations/masterUncollatedFileOperation/masterUncollatedFileOperation.C b/src/OpenFOAM/global/fileOperations/masterUncollatedFileOperation/masterUncollatedFileOperation.C index a6030560f0..7f3563edb5 100644 --- a/src/OpenFOAM/global/fileOperations/masterUncollatedFileOperation/masterUncollatedFileOperation.C +++ b/src/OpenFOAM/global/fileOperations/masterUncollatedFileOperation/masterUncollatedFileOperation.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2017-2018 OpenFOAM Foundation - Copyright (C) 2019-2022 OpenCFD Ltd. + Copyright (C) 2019-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -641,8 +641,7 @@ Foam::fileOperations::masterUncollatedFileOperation::read } } - labelList recvSizes; - pBufs.finishedSends(recvSizes); + pBufs.finishedSends(); // isPtr will be valid on master and will be the unbuffered // IFstream. Else the information is in the PstreamBuffers (and @@ -653,12 +652,11 @@ Foam::fileOperations::masterUncollatedFileOperation::read if (procValid[Pstream::myProcNo(comm)]) { // This processor needs to return something + List buf(pBufs.recvDataCount(Pstream::masterNo())); - UIPstream is(Pstream::masterNo(), pBufs); - - List buf(recvSizes[Pstream::masterNo()]); if (!buf.empty()) { + UIPstream is(Pstream::masterNo(), pBufs); is.read(buf.data(), buf.size()); } @@ -2353,8 +2351,7 @@ Foam::fileOperations::masterUncollatedFileOperation::NewIFstream } - labelList recvSizes; - pBufs.finishedSends(recvSizes); + pBufs.finishedSends(); if (Pstream::master(Pstream::worldComm)) { @@ -2370,10 +2367,13 @@ Foam::fileOperations::masterUncollatedFileOperation::NewIFstream << " from processor " << Pstream::masterNo() << endl; } - UIPstream is(Pstream::masterNo(), pBufs); + List buf(pBufs.recvDataCount(Pstream::masterNo())); - List buf(recvSizes[Pstream::masterNo()]); - is.read(buf.data(), buf.size()); + if (!buf.empty()) + { + UIPstream is(Pstream::masterNo(), pBufs); + is.read(buf.data(), buf.size()); + } if (debug) { diff --git a/src/dynamicMesh/fvMeshDistribute/fvMeshDistribute.C b/src/dynamicMesh/fvMeshDistribute/fvMeshDistribute.C index 5751e519c3..41462314aa 100644 --- a/src/dynamicMesh/fvMeshDistribute/fvMeshDistribute.C +++ b/src/dynamicMesh/fvMeshDistribute/fvMeshDistribute.C @@ -2159,8 +2159,8 @@ Foam::autoPtr Foam::fvMeshDistribute::distribute // ~~~~~~~~~~~~~~~~~ labelList nSendCells(countCells(distribution)); - labelList nRevcCells(Pstream::nProcs()); - Pstream::allToAll(nSendCells, nRevcCells); + labelList nRecvCells(Pstream::nProcs()); + UPstream::allToAll(nSendCells, nRecvCells); // Allocate buffers PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); @@ -2382,22 +2382,17 @@ Foam::autoPtr Foam::fvMeshDistribute::distribute UPstream::parRun(oldParRun); // Restore parallel state - - // Start sending&receiving from buffers + if (debug) { - if (debug) - { - Pout<< "Starting sending" << endl; - } + Pout<< "Starting sending" << endl; + } - labelList recvSizes; - pBufs.finishedSends(recvSizes); + pBufs.finishedSends(); - if (debug) - { - Pout<< "Finished sending and receiving : " << flatOutput(recvSizes) - << endl; - } + if (debug) + { + Pout<< "Finished sending and receiving : " + << flatOutput(pBufs.recvDataCounts()) << endl; } @@ -2547,17 +2542,17 @@ Foam::autoPtr Foam::fvMeshDistribute::distribute ); PtrList> dtfs(Pstream::nProcs()); - forAll(nRevcCells, sendProc) + forAll(nRecvCells, sendProc) { // Did processor sendProc send anything to me? - if (sendProc != Pstream::myProcNo() && nRevcCells[sendProc] > 0) + if (sendProc != Pstream::myProcNo() && nRecvCells[sendProc] > 0) { if (debug) { Pout<< nl << "RECEIVING FROM DOMAIN " << sendProc << " cells to receive:" - << nRevcCells[sendProc] + << nRecvCells[sendProc] << nl << endl; }