ENH: reduce overhead of decomposedBlockData IO

- OCharStream for serializing
- skip intermediate blocks without reading
- support string_view
- read and distribute with direct non-blocking send/recv
  instead of PstreamBuffers or with IPstream/OPstream streaming
  operators.
- non-blocking gather/write when using intermediate buffer space
This commit is contained in:
Mark Olesen
2023-09-05 19:23:14 +02:00
committed by Mark Olesen
parent 23542cabc8
commit c5b9d9b532
3 changed files with 723 additions and 498 deletions

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2017-2018 OpenFOAM Foundation Copyright (C) 2017-2018 OpenFOAM Foundation
Copyright (C) 2020-2023 OpenCFD Ltd. Copyright (C) 2020-2025 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -51,6 +51,8 @@ FoamFile
class decomposedBlockData; class decomposedBlockData;
location "constant/polyMesh"; location "constant/polyMesh";
object points; object points;
data.format ascii; // optional
data.class vectorField; // optional
} }
// processor0 // processor0
@ -135,23 +137,15 @@ protected:
// Protected Member Functions // Protected Member Functions
//- Helper: determine number of processors whose recvSizes fits //- Read data (on master) and transmit.
//- into maxBufferSize
static label calcNumProcs
(
const label comm,
const off_t maxBufferSize,
const labelUList& recvSizes,
const label startProci
);
//- Read data into *this. ISstream is only valid on master.
static bool readBlocks static bool readBlocks
( (
const label comm, const label comm,
// [in] The input stream (only valid on master)
autoPtr<ISstream>& isPtr, autoPtr<ISstream>& isPtr,
List<char>& contentChars, // [out] The processor local data
const UPstream::commsTypes commsType List<char>& localData,
const UPstream::commsTypes commsType /* unused */
); );
//- Helper: skip a block of (binary) character data //- Helper: skip a block of (binary) character data
@ -277,14 +271,15 @@ public:
} }
//- Helper: write block of (binary) character content //- Helper: write block of (binary) character content
// Housekeeping
static std::streamoff writeBlockEntry static std::streamoff writeBlockEntry
( (
OSstream& os, OSstream& os,
const label blocki, const label blocki,
const std::string& s std::string_view sv
) )
{ {
return writeBlockEntry(os, blocki, s.data(), s.size()); return writeBlockEntry(os, blocki, sv.data(), sv.size());
} }
//- Helper: write block of (binary) character data //- Helper: write block of (binary) character data
@ -307,41 +302,37 @@ public:
); );
//- Read master header information (into headerIO) and return //- Read master header information (into headerIO) and return
//- data in stream. Note: isPtr is only valid on master. //- data in stream.
static autoPtr<ISstream> readBlocks static autoPtr<ISstream> readBlocks
( (
const label comm, const label comm,
const fileName& fName, const fileName& fName,
//! [in] The input stream (only valid on master)
autoPtr<ISstream>& isPtr, autoPtr<ISstream>& isPtr,
//! [out] header information
IOobject& headerIO, IOobject& headerIO,
const UPstream::commsTypes commsType const UPstream::commsTypes commsType /* unused */
); );
//- Helper: gather single label. Note: using native Pstream. //- Helper: gather data from (subset of) sub-ranks.
// datas sized with num procs but undefined contents on // In non-blocking mode it sets up send/recv for non-empty content.
// slaves // In blocking/scheduled mode it uses MPI_Gatherv to collect data.
static void gather
(
const label comm,
const label data,
labelList& datas
);
//- Helper: gather data from (subset of) slaves.
// //
// Returns: // Returns:
// - recvData : received data // - recvData : the received data
// - recvOffsets : offset in data. recvOffsets is nProcs+1 // - recvOffsets : offset in data. recvOffsets is nProcs+1
static void gatherSlaveData static void gatherProcData
( (
const label comm, const label comm,
const UList<char>& data, const UList<char>& localData, //!< [in] required on all procs
const labelUList& recvSizes, const labelUList& recvSizes, //!< [in] only required on master
const labelRange& fromProcs, const labelRange& whichProcs, //!< [in] required on all procs
List<int>& recvOffsets, List<int>& recvOffsets, //!< [out] only relevant on master
DynamicList<char>& recvData DynamicList<char>& recvData, //!< [out] only relevant on master
const UPstream::commsTypes commsType
); );
//- Write *this. Ostream only valid on master. //- Write *this. Ostream only valid on master.
@ -349,19 +340,96 @@ public:
static bool writeBlocks static bool writeBlocks
( (
const label comm, const label comm,
//! [in] output stream (relevant on master)
autoPtr<OSstream>& osPtr, autoPtr<OSstream>& osPtr,
//! [out] start offsets to each block (relevant on master),
//! ignored if List::null() type
List<std::streamoff>& blockOffset, List<std::streamoff>& blockOffset,
const UList<char>& masterData, const UList<char>& localData, //!< [in] required on all procs
const labelUList& recvSizes, //!< [in] only required on master
const labelUList& recvSizes, //! Optional proc data (only written on master)
//! but \b must also be symmetrically defined (empty/non-empty)
// Optional slave data (on master) //! on all ranks
const UPtrList<SubList<char>>& slaveData, const UList<std::string_view>& procData,
const UPstream::commsTypes commsType, const UPstream::commsTypes commsType,
const bool syncReturnState = true const bool syncReturnState = true
); );
// Housekeeping
//- Write *this. Ostream only valid on master.
// Returns offsets of processor blocks in blockOffset
FOAM_DEPRECATED_FOR(2023-09, "write with std::string_view instead")
static bool writeBlocks
(
const label comm,
autoPtr<OSstream>& osPtr,
List<std::streamoff>& blockOffset,
const UList<char>& localData, // [in] required on all procs
const labelUList& recvSizes, // [in] only required on master
// Optional proc data (only written on master)
// but \b must also be symmetrically defined (empty/non-empty)
// on all ranks
const UPtrList<SubList<char>>& procData,
const UPstream::commsTypes commsType,
const bool syncReturnState = true
)
{
// Transcribe to string_view
List<std::string_view> spans(procData.size());
forAll(procData, proci)
{
const auto* ptr = procData.get(proci);
if (ptr && !ptr->empty())
{
spans[proci] = std::string_view(ptr->cdata(), ptr->size());
}
}
bool ok = decomposedBlockData::writeBlocks
(
comm,
osPtr,
blockOffset,
localData,
recvSizes,
spans,
commsType,
syncReturnState
);
return ok;
}
//- Deprecated(2023-09) - consider UPstream::listGatherValue
// The only difference is that this gather also resizes the output
// on the non-master procs
// \deprecated(2023-09) - consider UPstream::listGatherValue
FOAM_DEPRECATED_FOR(2023-09, "consider UPstream::listGatherValue()")
static void gather
(
const label comm,
const label localValue,
labelList& allValues
)
{
allValues.resize_nocopy(UPstream::nProcs(comm));
UPstream::mpiGather
(
reinterpret_cast<const char*>(&localValue),
allValues.data_bytes(),
sizeof(label), // The send/recv size per rank
comm
);
}
}; };

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com \\ / A nd | www.openfoam.com
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2021-2022 OpenCFD Ltd. Copyright (C) 2021-2023 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -189,10 +189,9 @@ void Foam::decomposedBlockData::writeExtraHeaderContent
dict.set("data.class", io.type()); dict.set("data.class", io.type());
// Deep-copy of meta-data (if any) // Deep-copy of meta-data (if any)
const dictionary* metaDataDict = io.findMetaData(); if (const auto* meta = io.findMetaData(); meta && !meta->empty())
if (metaDataDict && !metaDataDict->empty())
{ {
dict.add("meta", *metaDataDict); dict.add("meta", *meta);
} }
} }
@ -221,16 +220,16 @@ void Foam::decomposedBlockData::writeHeader
io.name() io.name()
); );
// Same as writeExtraHeaderContent
{ {
writeHeaderEntry(os, "data.format", streamOptData.format()); writeHeaderEntry(os, "data.format", streamOptData.format());
writeHeaderEntry(os, "data.class", io.type()); writeHeaderEntry(os, "data.class", io.type());
} }
// Meta-data (if any) // Meta-data (if any)
const dictionary* metaDataDict = io.findMetaData(); if (const auto* meta = io.findMetaData(); meta && !meta->empty())
if (metaDataDict && !metaDataDict->empty())
{ {
metaDataDict->writeEntry("meta", os); meta->writeEntry("meta", os);
} }
os.endBlock(); os.endBlock();