diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C index 47d22fa427..3120361ea0 100644 --- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C +++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2017-2018 OpenFOAM Foundation - Copyright (C) 2020-2023 OpenCFD Ltd. + Copyright (C) 2020-2025 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -27,18 +27,13 @@ License \*---------------------------------------------------------------------------*/ #include "decomposedBlockData.H" -#include "OPstream.H" -#include "IPstream.H" -#include "PstreamBuffers.H" #include "Fstream.H" +#include "IPstream.H" +#include "OPstream.H" +#include "SpanStream.H" #include "dictionary.H" #include "objectRegistry.H" -#include "SubList.H" -#include "charList.H" -#include "labelPair.H" #include "masterUncollatedFileOperation.H" -#include "SpanStream.H" -#include "StringStream.H" // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // @@ -50,24 +45,17 @@ namespace Foam // * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * // -bool Foam::decomposedBlockData::isCollatedType -( - const word& objectType -) +bool Foam::decomposedBlockData::isCollatedType(const word& objectType) { - return - ( - objectType == decomposedBlockData::typeName - ); + return (objectType == decomposedBlockData::typeName); } -bool Foam::decomposedBlockData::isCollatedType -( - const IOobject& io -) +bool Foam::decomposedBlockData::isCollatedType(const IOobject& io) { - return decomposedBlockData::isCollatedType(io.headerClassName()); + // same: return io.isHeaderClass(decomposedBlockData::typeName); + // same: return isCollatedType(io.headerClassName()); + return io.isHeaderClass(); } @@ -82,8 +70,7 @@ Foam::decomposedBlockData::decomposedBlockData : regIOobject(io), commsType_(commsType), - comm_(comm), - contentData_() + comm_(comm) { // Temporary warning if (readOpt() == IOobjectOption::READ_MODIFIED) @@ -119,55 +106,41 @@ std::streamoff Foam::decomposedBlockData::writeBlockEntry const word procName("processor" + Foam::name(blocki)); - // Write as commented content - // ---------------- - // // processorN - // NCHARS - // (...) - // ---------------- + // Write as primitiveEntry or commented content + constexpr bool isDictFormat = false; + + if constexpr (isDictFormat) { + // Like writeKeyword() + os << nl << procName << nl; + } + else + { + // Human-readable comments os << nl << "// " << procName << nl; - - if (str && len > 0) - { - // Special treatment for char data (binary I/O only) - const auto oldFmt = os.format(IOstreamOption::BINARY); - - os << label(len) << nl; - os.write(str, len); - os << nl; - - os.format(oldFmt); - } - else - { - os << label(0) << nl; - } } - // Write as primitiveEntry - // { - // // Like writeKeyword() - // os << nl << procName << nl; - // - // if (str && len > 0) - // { - // // Special treatment for char data (binary I/O only) - // const auto oldFmt = os.format(IOstreamOption::BINARY); - // - // os << label(len) << nl; - // os.write(str, len); - // os << nl; - // - // os.format(oldFmt); - // } - // else - // { - // os << label(0) << nl; - // } - // - // os.endEntry(); - // } + // Data + if (str && len > 0) + { + // Special treatment for char data (binary I/O only) + const auto oldFmt = os.format(IOstreamOption::BINARY); + + os << label(len) << nl; + os.write(str, len); + os << nl; + + os.format(oldFmt); + } + else + { + os << label(0) << nl; + } + + if constexpr (isDictFormat) + { + os.endEntry(); + } return blockOffset; } @@ -190,7 +163,8 @@ bool Foam::decomposedBlockData::readBlockEntry token tok(is); is.fatalCheck(FUNCTION_NAME); - // Dictionary format has primitiveEntry keyword: + // Dictionary format? + // - has a keyword, which is not a compound token: const bool isDictFormat = (tok.isWord() && !tok.isCompound()); if (!isDictFormat && tok.good()) @@ -231,7 +205,8 @@ bool Foam::decomposedBlockData::skipBlockEntry(Istream& is) token tok(is); if (!is.good()) return false; - // Dictionary format has primitiveEntry keyword: + // Dictionary format? + // - has a keyword, which is not a compound token: const bool isDictFormat = (tok.isWord() && !tok.isCompound()); if (isDictFormat) @@ -365,10 +340,11 @@ std::streamoff Foam::decomposedBlockData::writeBlockEntry const bool withLocalHeader ) { - // String of all data to write - string contentChars; + // Serialize content to write + DynamicList serialized; { - OStringStream buf(streamOptData); + OCharStream buf(streamOptData); + buf.reserve(4*1024); // Start with a slightly larger buffer bool ok = true; @@ -382,7 +358,7 @@ std::streamoff Foam::decomposedBlockData::writeBlockEntry IOobject::bannerEnabled(old); } - // Write the data to the Ostream + // Serialize the output ok = ok && io.writeData(buf); if (!ok) @@ -390,10 +366,11 @@ std::streamoff Foam::decomposedBlockData::writeBlockEntry return std::streamoff(-1); } - contentChars = buf.str(); + // Take ownership of serialized content + serialized = buf.release(); } - return decomposedBlockData::writeBlockEntry(os, blocki, contentChars); + return decomposedBlockData::writeBlockEntry(os, blocki, serialized); } @@ -412,28 +389,29 @@ Foam::decomposedBlockData::readBlock << endl; } + // The character input stream for the specified block + autoPtr blockIsPtr; + // Extracted header information IOstreamOption streamOptData; unsigned labelWidth = is.labelByteSize(); unsigned scalarWidth = is.scalarByteSize(); - autoPtr realIsPtr; - // Read master for header List data; decomposedBlockData::readBlockEntry(is, data); - if (blocki == 0) + if (blocki == 0) // ie, UPstream::masterNo() { - realIsPtr.reset(new ICharStream(std::move(data))); - realIsPtr->name() = is.name(); + blockIsPtr.reset(new ICharStream(std::move(data))); + blockIsPtr->name() = is.name(); { // Read header from first block, // advancing the stream position - if (!headerIO.readHeader(*realIsPtr)) + if (!headerIO.readHeader(*blockIsPtr)) { - FatalIOErrorInFunction(*realIsPtr) + FatalIOErrorInFunction(*blockIsPtr) << "Problem while reading object header " << is.relativeName() << nl << exit(FatalIOError); @@ -443,7 +421,8 @@ Foam::decomposedBlockData::readBlock else { { - // Read header from first block + // Read header from first block, + // without advancing the stream position ISpanStream headerStream(data); if (!headerIO.readHeader(headerStream)) { @@ -457,22 +436,29 @@ Foam::decomposedBlockData::readBlock scalarWidth = headerStream.scalarByteSize(); } - for (label i = 1; i < blocki+1; i++) + // Skip intermediate blocks + for (label i = 1; i < blocki; ++i) { - // Read and discard data, only retain the last one - decomposedBlockData::readBlockEntry(is, data); + decomposedBlockData::skipBlockEntry(is); } - realIsPtr.reset(new ICharStream(std::move(data))); - realIsPtr->name() = is.name(); + + // Read the block of interest + decomposedBlockData::readBlockEntry(is, data); + + blockIsPtr.reset(new ICharStream(std::move(data))); + blockIsPtr->name() = is.name(); // Apply stream settings - realIsPtr().format(streamOptData.format()); - realIsPtr().version(streamOptData.version()); - realIsPtr().setLabelByteSize(labelWidth); - realIsPtr().setScalarByteSize(scalarWidth); + { + auto& iss = blockIsPtr(); + iss.format(streamOptData.format()); + iss.version(streamOptData.version()); + iss.setLabelByteSize(labelWidth); + iss.setScalarByteSize(scalarWidth); + } } - return realIsPtr; + return blockIsPtr; } @@ -480,99 +466,96 @@ bool Foam::decomposedBlockData::readBlocks ( const label comm, autoPtr& isPtr, - List& data, - const UPstream::commsTypes commsType + List& localData, + const UPstream::commsTypes /* unused */ ) { if (debug) { Pout<< "decomposedBlockData::readBlocks:" - << " stream:" << (isPtr ? isPtr->name() : "invalid") - << " commsType:" << Pstream::commsTypeNames[commsType] - << " comm:" << comm << endl; + << " stream:" << (isPtr ? isPtr->name() : "") + << " non-blocking comm:" << comm << endl; } + // Read data on master and transmit. Always non-blocking bool ok = false; + // The send buffers + List> procBuffers; + + // Some unique tag for this read/write grouping (as extra safety) + const int messageTag = (UPstream::msgType() + 256); + + const label startOfRequests = UPstream::nRequests(); + if (UPstream::master(comm)) { - auto& is = *isPtr; + auto& is = isPtr(); is.fatalCheck(FUNCTION_NAME); // Read master data - decomposedBlockData::readBlockEntry(is, data); - } + decomposedBlockData::readBlockEntry(is, localData); - if (commsType == UPstream::commsTypes::scheduled) - { - if (UPstream::master(comm)) + // Read proc data and setup non-blocking sends + procBuffers.resize(UPstream::nProcs(comm)); + for (const int proci : UPstream::subProcs(comm)) { - // Master data already read ... - auto& is = *isPtr; - is.fatalCheck(FUNCTION_NAME); + auto& slot = procBuffers[proci]; - // Read and transmit slave data - for (const int proci : UPstream::subProcs(comm)) - { - List elems; - decomposedBlockData::readBlockEntry(is, elems); + decomposedBlockData::readBlockEntry(is, slot); - OPstream os - ( - UPstream::commsTypes::scheduled, - proci, - 0, - UPstream::msgType(), - comm - ); - os << elems; - } - - ok = is.good(); - } - else - { - IPstream is + // Send content (non-blocking) + UOPstream::write ( - UPstream::commsTypes::scheduled, - UPstream::masterNo(), - 0, - UPstream::msgType(), + UPstream::commsTypes::nonBlocking, + proci, + slot.cdata_bytes(), + slot.size_bytes(), + messageTag, comm ); - is >> data; } + + ok = is.good(); } - else + else if (UPstream::is_subrank(comm)) { - PstreamBuffers pBufs(comm); + List& slot = localData; - if (UPstream::master(comm)) + // Probe for the message size + const auto [fromProci, numBytes] = + UPstream::probeMessage + ( + UPstream::commsTypes::scheduled, // blocking call + UPstream::masterNo(), + messageTag, + comm + ); + + slot.resize_nocopy(numBytes); + + if (debug) { - // Master data already read ... - auto& is = *isPtr; - is.fatalCheck(FUNCTION_NAME); - - // Read and transmit slave data - for (const int proci : UPstream::subProcs(comm)) - { - List elems; - decomposedBlockData::readBlockEntry(is, elems); - - UOPstream os(proci, pBufs); - os << elems; - } + Pout<< "probed to receive " << label(numBytes) << " from " + << fromProci << endl; } - pBufs.finishedScatters(); - - if (!UPstream::master(comm)) - { - UIPstream is(UPstream::masterNo(), pBufs); - is >> data; - } + // Receive content (can also be zero-sized) + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + UPstream::masterNo(), + slot.data_bytes(), + slot.size_bytes(), + messageTag, + comm + ); } + UPstream::waitRequests(startOfRequests); + procBuffers.clear(); + + // Sync the status Pstream::broadcast(ok, comm); return ok; @@ -585,19 +568,26 @@ Foam::autoPtr Foam::decomposedBlockData::readBlocks const fileName& fName, autoPtr& isPtr, IOobject& headerIO, - const UPstream::commsTypes commsType + const UPstream::commsTypes /* unused */ ) { if (debug) { Pout<< "decomposedBlockData::readBlocks:" - << " stream:" << (isPtr ? isPtr->name() : "invalid") - << " commsType:" << Pstream::commsTypeNames[commsType] << endl; + << " stream:" << (isPtr ? isPtr->name() : "") + << " non-blocking" << endl; } + // Read data on master and transmit. Always non-blocking bool ok = false; - List data; - autoPtr realIsPtr; + List localData; + List> procBuffers; + autoPtr blockIsPtr; + + // Some unique tag for this read/write/probe grouping + const int messageTag = (UPstream::msgType() + 256); + + const label startOfRequests = UPstream::nRequests(); if (UPstream::master(comm)) { @@ -605,123 +595,120 @@ Foam::autoPtr Foam::decomposedBlockData::readBlocks is.fatalCheck(FUNCTION_NAME); // Read master data - decomposedBlockData::readBlockEntry(is, data); + decomposedBlockData::readBlockEntry(is, localData); - realIsPtr.reset(new ICharStream(std::move(data))); - realIsPtr->name() = fName; + // Move block data into a stream + blockIsPtr.reset(new ICharStream(std::move(localData))); + blockIsPtr->name() = fName; { // Read header from first block, // advancing the stream position - if (!headerIO.readHeader(*realIsPtr)) + if (!headerIO.readHeader(*blockIsPtr)) { - FatalIOErrorInFunction(*realIsPtr) + FatalIOErrorInFunction(*blockIsPtr) << "Problem while reading object header " << is.relativeName() << nl << exit(FatalIOError); } } - } - if (commsType == UPstream::commsTypes::scheduled) - { - if (UPstream::master(comm)) + // Read proc data and setup non-blocking sends + procBuffers.resize(UPstream::nProcs(comm)); + for (const int proci : UPstream::subProcs(comm)) { - // Master data already read ... - auto& is = *isPtr; - is.fatalCheck(FUNCTION_NAME); + auto& slot = procBuffers[proci]; - // Read and transmit slave data - for (const int proci : UPstream::subProcs(comm)) - { - decomposedBlockData::readBlockEntry(is, data); + decomposedBlockData::readBlockEntry(is, slot); - OPstream os - ( - UPstream::commsTypes::scheduled, - proci, - 0, - UPstream::msgType(), - comm - ); - os << data; - } - - ok = is.good(); - } - else - { - IPstream is + // Send content - non-blocking mode + UOPstream::write ( - UPstream::commsTypes::scheduled, - UPstream::masterNo(), - 0, - UPstream::msgType(), + UPstream::commsTypes::nonBlocking, + proci, + slot.cdata_bytes(), + slot.size_bytes(), + messageTag, comm ); - is >> data; - - realIsPtr.reset(new ICharStream(std::move(data))); - realIsPtr->name() = fName; } + + ok = is.good(); } - else + else if (UPstream::is_subrank(comm)) { - PstreamBuffers pBufs(comm); + List& slot = localData; - if (UPstream::master(comm)) + // Probe for the message size + const auto [fromProci, numBytes] = + UPstream::probeMessage + ( + UPstream::commsTypes::scheduled, // blocking call + UPstream::masterNo(), + messageTag, + comm + ); + + slot.resize_nocopy(numBytes); + + if (debug) { - // Master data already read ... - auto& is = *isPtr; - is.fatalCheck(FUNCTION_NAME); - - // Read and transmit slave data - for (const int proci : UPstream::subProcs(comm)) - { - List elems; - decomposedBlockData::readBlockEntry(is, elems); - - UOPstream os(proci, pBufs); - os << elems; - } - - ok = is.good(); + Pout<< "probed to receive " << label(numBytes) << " from " + << fromProci << endl; } - pBufs.finishedScatters(); - - if (!UPstream::master(comm)) - { - UIPstream is(UPstream::masterNo(), pBufs); - is >> data; - - realIsPtr.reset(new ICharStream(std::move(data))); - realIsPtr->name() = fName; - } + // Receive content (can also be zero-sized) + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + UPstream::masterNo(), + slot.data_bytes(), + slot.size_bytes(), + messageTag, + comm + ); } - Pstream::broadcast(ok, comm); + UPstream::waitRequests(startOfRequests); + procBuffers.clear(); + + if (UPstream::is_subrank(comm)) + { + // Move block data into a stream + blockIsPtr.reset(new ICharStream(std::move(localData))); + blockIsPtr->name() = fName; + } // Broadcast master header info, - // set stream properties from realIsPtr on master + // set stream properties from blockIsPtr on master - int verValue; - int fmtValue; - unsigned labelWidth; - unsigned scalarWidth; + int verValue(0); + int fmtValue(0); + unsigned labelWidth(0); + unsigned scalarWidth(0); word headerName(headerIO.name()); + // The stream characteristics + // + // unsigned formatSizes + // ( + // ((static_cast(iss.format()) & 0xFF) << 16) + // | ((iss.labelByteSize() & 0xFF) << 8) + // | ((iss.scalarByteSize() & 0xFF)) + // ); + if (UPstream::master(comm)) { - verValue = realIsPtr().version().canonical(); - fmtValue = static_cast(realIsPtr().format()); - labelWidth = realIsPtr().labelByteSize(); - scalarWidth = realIsPtr().scalarByteSize(); + auto& iss = blockIsPtr(); + verValue = iss.version().canonical(); + fmtValue = static_cast(iss.format()); + labelWidth = iss.labelByteSize(); + scalarWidth = iss.scalarByteSize(); } Pstream::broadcasts ( - UPstream::worldComm, // Future? comm, + comm, verValue, fmtValue, labelWidth, @@ -733,141 +720,155 @@ Foam::autoPtr Foam::decomposedBlockData::readBlocks // Unneeded: headerIO.local() ); - realIsPtr().version(IOstreamOption::versionNumber::canonical(verValue)); - realIsPtr().format(IOstreamOption::streamFormat(fmtValue)); - realIsPtr().setLabelByteSize(labelWidth); - realIsPtr().setScalarByteSize(scalarWidth); + if (blockIsPtr) + { + auto& iss = *blockIsPtr; + iss.version(IOstreamOption::versionNumber::canonical(verValue)); + iss.format(IOstreamOption::streamFormat(fmtValue)); + iss.setLabelByteSize(labelWidth); + iss.setScalarByteSize(scalarWidth); + } headerIO.rename(headerName); - return realIsPtr; -} - - -void Foam::decomposedBlockData::gather -( - const label comm, - const label data, - labelList& datas -) -{ - const label nProcs = UPstream::nProcs(comm); - datas.resize(nProcs); - - char* data0Ptr = datas.data_bytes(); - - List recvOffsets; - List recvSizes; - if (UPstream::master(comm)) + if (debug) { - recvOffsets.setSize(nProcs); - forAll(recvOffsets, proci) - { - // Note: truncating long int to int since - // UPstream::mpiGatherv is limited to ints - recvOffsets[proci] = - int(reinterpret_cast(&datas[proci]) - data0Ptr); - } - recvSizes.setSize(nProcs, sizeof(label)); + Info<< "reading ok:" << ok << endl; } - UPstream::mpiGatherv - ( - reinterpret_cast(&data), - sizeof(label), - data0Ptr, - recvSizes, - recvOffsets, - comm - ); + return blockIsPtr; } -void Foam::decomposedBlockData::gatherSlaveData +void Foam::decomposedBlockData::gatherProcData ( const label comm, - const UList& data, + const UList& localData, const labelUList& recvSizes, - const labelRange& fromProcs, + const labelRange& whichProcs, List& sliceOffsets, - DynamicList& recvData + DynamicList& recvData, + const UPstream::commsTypes commsType ) { - const label myProci = UPstream::myProcNo(comm); - const label numProcs = UPstream::nProcs(comm); + const label myRank = UPstream::myProcNo(comm); + const label nProcs = UPstream::nProcs(comm); + + // Some unique tag for this read/write grouping (as extra safety) + const int messageTag = (UPstream::msgType() + 256); int nSendBytes = 0; recvData.clear(); - // Calculate master data + // On master, calculate sizing/offsets and resize the recv buffer. + // Do not need sliceSizes when nonBlocking List sliceSizes; if (UPstream::master(comm)) { - sliceSizes.resize(numProcs, 0); - sliceOffsets.resize(numProcs+1, 0); - - // Offset 1 beyond the end of the range - const label endProci = fromProcs.end_value(); + sliceSizes.resize_nocopy(nProcs); + sliceSizes = 0; + sliceOffsets.resize_nocopy(nProcs+1); + sliceOffsets = 0; int totalSize = 0; - for (const label proci : fromProcs) + for (const label proci : whichProcs) { - sliceSizes[proci] = int(recvSizes[proci]); + const auto nRecvBytes = static_cast(recvSizes[proci]); + sliceOffsets[proci] = totalSize; - totalSize += sliceSizes[proci]; + totalSize += nRecvBytes; + + sliceSizes[proci] = nRecvBytes; } + // One beyond the end of the range + const label endProci = whichProcs.end_value(); + sliceOffsets[endProci] = totalSize; - recvData.resize(totalSize); + recvData.resize_nocopy(totalSize); } - else if (fromProcs.contains(myProci)) + else if (whichProcs.contains(myRank) && !localData.empty()) { - // Note: UPstream::mpiGatherv limited to int - nSendBytes = int(data.size_bytes()); + // Note: UPstream::gather limited to int + nSendBytes = static_cast(localData.size_bytes()); } - UPstream::mpiGatherv - ( - data.cdata(), - nSendBytes, - - recvData.data(), - sliceSizes, - sliceOffsets, - comm - ); -} - - -Foam::label Foam::decomposedBlockData::calcNumProcs -( - const label comm, - const off_t maxBufferSize, - const labelUList& recvSizes, - const label startProci -) -{ - const label nProcs = UPstream::nProcs(comm); - - label nSendProcs = 0; - if (UPstream::master(comm)) + if (UPstream::commsTypes::nonBlocking == commsType) { - off_t totalSize = recvSizes[startProci]; - label proci = startProci+1; - while (proci < nProcs && (totalSize+recvSizes[proci] < maxBufferSize)) + if (UPstream::master(comm)) { - totalSize += recvSizes[proci]; - proci++; + for (const label proci : whichProcs) + { + SubList procSlice + ( + recvData, + sliceOffsets[proci+1]-sliceOffsets[proci], + sliceOffsets[proci] + ); + + if (procSlice.empty()) + { + continue; + } + else if (proci == UPstream::masterNo()) + { + // No self-communication, although masterNo is normally + // not contained in whichProcs range anyhow. + std::copy + ( + localData.cbegin(), + localData.cbegin(procSlice.size()), + procSlice.begin() + ); + } + else + { + // Receive non-zero content + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + procSlice.data_bytes(), + procSlice.size_bytes(), + messageTag, + comm + ); + } + } + } + else if (whichProcs.contains(myRank) && !localData.empty()) + { + // Send non-zero content + UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + UPstream::masterNo(), + localData.cdata_bytes(), + localData.size_bytes(), + messageTag, + comm + ); } - nSendProcs = proci-startProci; + // Waiting is done by the caller } + else + { + // This is MPI_Gatherv() !! - but this path is unlikely to be used - Pstream::broadcast(nSendProcs, comm); + UPstream::mpiGatherv + ( + localData.cdata(), + nSendBytes, - return nSendProcs; + recvData.data(), + sliceSizes, + sliceOffsets, + comm + ); + } } @@ -876,65 +877,82 @@ bool Foam::decomposedBlockData::writeBlocks const label comm, autoPtr& osPtr, List& blockOffset, - const UList& masterData, + const UList& localData, const labelUList& recvSizes, - const UPtrList>& slaveData, + const UList& procData, const UPstream::commsTypes commsType, const bool syncReturnState ) { - if (debug) - { - Pout<< "decomposedBlockData::writeBlocks:" - << " stream:" << (osPtr ? osPtr->name() : "none") - << " data:" << masterData.size() - << " (master only) slaveData:" << slaveData.size() - << " commsType:" << Pstream::commsTypeNames[commsType] << endl; - } - const label nProcs = UPstream::nProcs(comm); bool ok = true; - // Write master data - if (UPstream::master(comm)) + // Recovery of blockOffset is optional + if (UPstream::master(comm) && notNull(blockOffset)) { blockOffset.resize(nProcs); - - OSstream& os = osPtr(); - - blockOffset[UPstream::masterNo()] = - decomposedBlockData::writeBlockEntry - ( - os, - UPstream::masterNo(), - masterData - ); - - ok = os.good(); } - if (slaveData.size()) + // Max proc data size to be received + label maxNonLocalSize = 0; + if (UPstream::master(comm) && procData.empty()) { - // Already have gathered the slave data. + for (label proci = 1; proci < nProcs; ++proci) + { + maxNonLocalSize = Foam::max(maxNonLocalSize, recvSizes[proci]); + } + } + + if (debug) + { + Pout<< " stream:" << (osPtr ? osPtr->name() : "") + << " data:" << localData.size() + << " proc-data:" << procData.size() + << " max-size:" << maxNonLocalSize + << " " << UPstream::commsTypeNames[commsType] << endl; + } + + if (procData.size()) + { + // -------- + // With pre-gathered proc data + // -------- if (UPstream::master(comm)) { - // Master data already written ... - OSstream& os = osPtr(); + auto& os = osPtr(); - // Write slaves + std::streamoff currOffset = + decomposedBlockData::writeBlockEntry + ( + os, + UPstream::masterNo(), + localData + ); + + if (blockOffset.size() > UPstream::masterNo()) + { + blockOffset[UPstream::masterNo()] = currOffset; + } + + // Write all pre-gathered proc data. for (label proci = 1; proci < nProcs; ++proci) { - blockOffset[proci] = + currOffset = decomposedBlockData::writeBlockEntry ( os, proci, - slaveData[proci] + procData[proci] ); + + if (blockOffset.size() > proci) + { + blockOffset[proci] = currOffset; + } } ok = os.good(); @@ -942,142 +960,282 @@ bool Foam::decomposedBlockData::writeBlocks } else if (commsType == UPstream::commsTypes::scheduled) { + // -------- + // Gather/write each rank, one at a time. + // Note: This is often associated with maxMasterFileBufferSize == 0 + // -------- + + // Some unique tag for this read/write grouping (as extra safety) + const int messageTag = (UPstream::msgType() + 256); + if (UPstream::master(comm)) { - // Master data already written ... - OSstream& os = osPtr(); + auto& os = osPtr(); - // Receive and write slaves - label maxNonLocalSize = 0; - for (label proci = 1; proci < nProcs; ++proci) - { - maxNonLocalSize = max(maxNonLocalSize, recvSizes[proci]); - } - - DynamicList elems(maxNonLocalSize); - for (label proci = 1; proci < nProcs; ++proci) - { - elems.resize_nocopy(recvSizes[proci]); - UIPstream::read + std::streamoff currOffset = + decomposedBlockData::writeBlockEntry ( - UPstream::commsTypes::scheduled, - proci, - elems.data(), - elems.size_bytes(), - UPstream::msgType(), - comm + os, + UPstream::masterNo(), + localData ); - blockOffset[proci] = + if (blockOffset.size() > UPstream::masterNo()) + { + blockOffset[UPstream::masterNo()] = currOffset; + } + + // Could discard/recycle localData on master + // (if we had taken ownership...) + + DynamicList recvData(maxNonLocalSize); + for (label proci = 1; proci < nProcs; ++proci) + { + recvData.resize_nocopy(recvSizes[proci]); + + if (!recvData.empty()) + { + UIPstream::read + ( + UPstream::commsTypes::scheduled, + proci, + recvData.data_bytes(), + recvData.size_bytes(), + messageTag, + comm + ); + } + + currOffset = decomposedBlockData::writeBlockEntry ( os, proci, - elems + recvData ); + + if (blockOffset.size() > proci) + { + blockOffset[proci] = currOffset; + } } ok = os.good(); } - else + else if (UPstream::is_subrank(comm) && !localData.empty()) { UOPstream::write ( UPstream::commsTypes::scheduled, UPstream::masterNo(), - masterData.cdata(), - masterData.size_bytes(), - UPstream::msgType(), + localData.cdata_bytes(), + localData.size_bytes(), + messageTag, comm ); } } else { - // Master data already written ... - - // Find out how many processor can be received into - // maxMasterFileBufferSize - - // Starting slave processor and number of processors - label startProc = 1; - label nSendProcs = nProcs-1; + // -------- + // Gather/write ranks, packing together several smaller gathers + // into a single buffer space + // -------- DynamicList recvData; + List recvOffsets; // Offsets into recvData - while (nSendProcs > 0 && startProc < nProcs) + // Offsets of combined ranks for communication. + // Never includes master rank (handled separately) + labelList procOffsets(nProcs, Foam::zero{}); + + // Max combined data to be received (master only) + label maxRecvCount = 0; + + if (UPstream::master(comm)) { - nSendProcs = calcNumProcs + // Find out how many ranks can be received into + // maxMasterFileBufferSize and the corresponding schedule + + off_t maxBufferSize ( - comm, - off_t - ( - fileOperations::masterUncollatedFileOperation:: - maxMasterFileBufferSize - ), - recvSizes, - startProc + fileOperations::masterUncollatedFileOperation:: + maxMasterFileBufferSize ); - if (nSendProcs == 0) + // Buffer must fit the largest off-processor size + if (maxBufferSize < off_t(maxNonLocalSize)) + { + maxBufferSize = off_t(maxNonLocalSize); + } + + // Max combined proc data size to be received + off_t maxCollected = 0; + + for (label proci = 1, nChunks = 0; proci < nProcs; /*nil*/) + { + procOffsets[nChunks] = proci; + + // At least one proc, regardless of maxBufferSize. + // Also handles the corner case when the first proc has + // size 0, but the next one is too large. + + for + ( + off_t total = 0; + ( + proci < nProcs + && (!total || (total + recvSizes[proci] < maxBufferSize)) + ); + ++proci + ) + { + total += recvSizes[proci]; + + if (maxCollected < total) + { + maxCollected = total; + } + } + + procOffsets[++nChunks] = proci; + } + + maxRecvCount = static_cast