ENH: reduce overhead of masterOFstream

- use OCharStream instead of OStringStream to avoid copying char data.

- replace PstreamBuffers mechanism with a direct non-blocking PEX
  algorithm, which avoids unnecessary serialization/de-serialization
  of char data. Also reduces memory footprint and allocations somewhat.

- polling dispatch to write file contents as they become available
This commit is contained in:
Mark Olesen
2023-09-05 18:23:58 +02:00
committed by Mark Olesen
parent 278ad6fb44
commit 23542cabc8
2 changed files with 157 additions and 74 deletions

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2017 OpenFOAM Foundation
Copyright (C) 2020-2023 OpenCFD Ltd.
Copyright (C) 2020-2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -29,7 +29,7 @@ License
#include "masterOFstream.H"
#include "OFstream.H"
#include "OSspecific.H"
#include "PstreamBuffers.H"
#include "Pstream.H"
#include "masterUncollatedFileOperation.H"
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
@ -41,9 +41,9 @@ void Foam::masterOFstream::checkWrite
std::streamsize len
)
{
if (!len)
if (!str || !(len > 0))
{
// Can probably skip all of this if there is nothing to write
// Can skip everything if there is nothing to write
return;
}
@ -63,9 +63,7 @@ void Foam::masterOFstream::checkWrite
<< exit(FatalIOError);
}
// Use writeRaw() instead of writeQuoted(string,false) to output
// characters directly.
// Write characters directly to std::ostream
os.writeRaw(str, len);
if (!os.good())
@ -77,24 +75,29 @@ void Foam::masterOFstream::checkWrite
}
void Foam::masterOFstream::checkWrite
(
const fileName& fName,
const std::string& s
)
{
checkWrite(fName, s.data(), s.length());
}
void Foam::masterOFstream::commit()
{
if (UPstream::parRun())
// Take ownership of serialized content
DynamicList<char> charData(OCharStream::release());
if (!UPstream::parRun())
{
// Write (non-empty) data
checkWrite(pathName_, charData);
}
else
{
// Ignore content if not writing
if (!writeOnProc_)
{
charData.clear();
}
List<fileName> filePaths(UPstream::nProcs(comm_));
filePaths[UPstream::myProcNo(comm_)] = pathName_;
Pstream::gatherList(filePaths, UPstream::msgType(), comm_);
// Test for identical output paths
bool uniform =
(
UPstream::master(comm_)
@ -105,69 +108,136 @@ void Foam::masterOFstream::commit()
if (uniform)
{
// Identical file paths - write on master
if (UPstream::master(comm_) && writeOnProc_)
{
checkWrite(pathName_, this->str());
checkWrite(pathName_, charData);
}
this->reset();
return;
}
// Different files
PstreamBuffers pBufs(comm_);
// ---------------
//
// Non-sparse (most ranks have writeOnProc_ == true),
// so gather sizes first and use PEX-like handling,
// with polling for when data becomes available.
//
// Could also consider double buffering + write to reduce
// memory overhead.
if (!UPstream::master(comm_))
{
if (writeOnProc_)
{
// Send buffer to master
string s(this->str());
// Or int64_t
const label dataSize =
(
(UPstream::is_subrank(comm_) && writeOnProc_)
? charData.size()
: 0
);
UOPstream os(UPstream::masterNo(), pBufs);
os.write(s.data(), s.length());
}
this->reset(); // Done with contents
}
pBufs.finishedGathers();
const labelList recvSizes
(
UPstream::listGatherValues<label>(dataSize, comm_)
);
// Receive from these procs
DynamicList<int> recvProcs;
if (UPstream::master(comm_))
{
if (writeOnProc_)
// Sorted by message size
labelList order(Foam::sortedOrder(recvSizes));
recvProcs.reserve_exact(order.size());
// Want to receive large messages first. Ignore empty slots
forAllReverse(order, i)
{
// Write master data
checkWrite(filePaths[UPstream::masterNo()], this->str());
}
this->reset(); // Done with contents
const label proci = order[i];
// Allocate large enough to read without resizing
List<char> buf(pBufs.maxRecvCount());
for (const int proci : UPstream::subProcs(comm_))
{
const std::streamsize count(pBufs.recvDataCount(proci));
if (count)
// Ignore empty slots
if (recvSizes[proci] > 0)
{
UIPstream is(proci, pBufs);
is.read(buf.data(), count);
checkWrite(filePaths[proci], buf.cdata(), count);
recvProcs.push_back(proci);
}
}
}
}
else
{
checkWrite(pathName_, this->str());
this->reset();
}
// This method is only called once (internally)
// so no need to clear/flush old buffered data
// Non-blocking communication
const label startOfRequests = UPstream::nRequests();
// Some unique tag for this read/write grouping (extra precaution)
const int messageTag = (UPstream::msgType() + 256);
if (UPstream::is_subrank(comm_) && dataSize > 0)
{
// Send to content to master
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
UPstream::masterNo(),
charData.cdata_bytes(),
charData.size_bytes(),
messageTag,
comm_
);
}
else if (UPstream::master(comm_))
{
// The receive slots
List<List<char>> recvBuffers(UPstream::nProcs(comm_));
// Receive from these procs (non-empty slots)
for (const int proci : recvProcs)
{
auto& slot = recvBuffers[proci];
slot.resize_nocopy(recvSizes[proci]);
// Receive content
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
slot.data_bytes(),
slot.size_bytes(),
messageTag,
comm_
);
}
if (writeOnProc_)
{
// Write non-empty master data
checkWrite(pathName_, charData);
charData.clear();
}
// Poll for completed receive requests and dispatch
DynamicList<int> indices(recvProcs.size());
while
(
UPstream::waitSomeRequests
(
startOfRequests,
recvProcs.size(),
&indices
)
)
{
for (const int i : indices)
{
const int proci = recvProcs[i];
auto& slot = recvBuffers[proci];
// Write non-empty sub-proc data
checkWrite(filePaths[proci], slot);
// Eager cleanup
slot.clear();
}
}
}
UPstream::waitRequests(startOfRequests);
}
}
@ -176,21 +246,24 @@ void Foam::masterOFstream::commit()
Foam::masterOFstream::masterOFstream
(
IOstreamOption::atomicType atomic,
const label comm,
const int communicator,
const fileName& pathName,
IOstreamOption streamOpt,
IOstreamOption::appendType append,
const bool writeOnProc
)
:
OStringStream(streamOpt),
OCharStream(streamOpt),
pathName_(pathName),
atomic_(atomic),
compression_(streamOpt.compression()),
append_(append),
writeOnProc_(writeOnProc),
comm_(comm)
{}
comm_(communicator < 0 ? UPstream::worldComm : communicator)
{
// Start with a slightly larger buffer
OCharStream::reserve(4*1024);
}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2017 OpenFOAM Foundation
Copyright (C) 2020-2023 OpenCFD Ltd.
Copyright (C) 2020-2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -41,7 +41,7 @@ SourceFiles
#ifndef Foam_masterOFstream_H
#define Foam_masterOFstream_H
#include "StringStream.H"
#include "SpanStream.H"
#include "UPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -55,7 +55,7 @@ namespace Foam
class masterOFstream
:
public OStringStream
public OCharStream
{
// Private Data
@ -80,7 +80,8 @@ class masterOFstream
// Private Member Functions
//- Open file with checking and write append contents
//- Open file with checking and write append contents.
// A no-op if str is null or len is zero
void checkWrite
(
const fileName& fName,
@ -89,9 +90,16 @@ class masterOFstream
);
//- Open file with checking and write append contents
void checkWrite(const fileName& fName, const std::string& s);
void checkWrite
(
const fileName& fName,
const UList<char>& charData
)
{
checkWrite(fName, charData.cdata(), charData.size_bytes());
}
//- Commit buffered information, including parallel gather as required
//- Commit buffered information, including communication as required
void commit();
@ -104,7 +112,8 @@ public:
masterOFstream
(
IOstreamOption::atomicType atomic,
const label comm,
//! The communicator number (-1 == worldComm)
const int communicator,
const fileName& pathname,
IOstreamOption streamOpt = IOstreamOption(),
IOstreamOption::appendType append = IOstreamOption::NO_APPEND,
@ -115,7 +124,8 @@ public:
//- from pathname, stream option, optional append
masterOFstream
(
const label comm,
//! The communicator number (-1 == worldComm)
const int communicator,
const fileName& pathname,
IOstreamOption streamOpt = IOstreamOption(),
IOstreamOption::appendType append = IOstreamOption::NO_APPEND,
@ -125,7 +135,7 @@ public:
masterOFstream
(
IOstreamOption::NON_ATOMIC,
comm,
communicator,
pathname,
streamOpt,
append,