Compare commits

...

5 Commits

Author SHA1 Message Date
11964a4731 ENH: reduce overhead and clearer data ownership for OFstreamCollator
- local data to be written is now transferable into the
  OFstreamCollator. This avoids making a full copy when threading is
  active.

- use plain lists for managing proc data
  * storage: List<List<char>> instead of PtrList<List<char>>
  * views:   List<stdFoam::span<char>> instead of PtrList<SubList<char>>

- use gather/write (unthreaded) as backstop if the output is too big
  to fit in the buffer size. Emit warning instead of FatalError
2024-01-23 17:16:02 +01:00
c82facdc8e ENH: reduce overhead of decomposedBlockData IO
- OCharStream for serializing
- skip intermediate blocks without reading
- support character spans
- read and distribute with direct non-blocking send/recv
  instead of PstreamBuffers or with IPstream/OPstream streaming
  operators.
- non-blocking gather/write when using intermediate buffer space
2024-01-23 17:16:02 +01:00
da560be4df ENH: reduce overhead of masterOFstream
- use OCharStream instead of OStringStream to avoid copying char data.

- direct non-blocking send/recv with probing instead of PstreamBuffers
  to avoid serialization/de-serialization of char data and reduce the
  memory footprint somewhat.

- polling dispatch to write file content as it becomes available,
  which should improve communication and IO overlap
2024-01-23 17:16:02 +01:00
cf6e3a1acf CONFIG: adjust compile-time value of maxThreadFileBufferSize to 0
- consistent with etc/controlDict default

STYLE: update banner message for collated
2024-01-23 17:16:02 +01:00
0a51a4848c TEST: add standalone test application: Test-checkIOspeed 2024-01-23 17:16:02 +01:00
15 changed files with 1537 additions and 710 deletions

View File

@ -0,0 +1,3 @@
Test-checkIOspeed.cxx
EXE = $(FOAM_USER_APPBIN)/Test-checkIOspeed

View File

@ -0,0 +1,7 @@
EXE_INC = \
-I$(LIB_SRC)/finiteVolume/lnInclude \
-I$(LIB_SRC)/mesh/blockMesh/lnInclude
EXE_LIBS = \
-lfiniteVolume \
-lblockMesh

View File

@ -0,0 +1,400 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Application
Test-checkIOspeed
Description
Simple test of file writing, including timings
\*---------------------------------------------------------------------------*/
#include "argList.H"
#include "profiling.H"
#include "clockTime.H"
#include "fileName.H"
#include "fileOperation.H"
#include "IOstreams.H"
#include "OSspecific.H"
#include "globalIndex.H"
#include "volFields.H"
#include "IOField.H"
#include "PDRblock.H"
// Not really great since CoherentMesh only works with reading!
#ifdef USE_COHERENT
#include "OFCstream.H"
#include "SliceStreamRepo.H"
#endif
#include <numeric>
using namespace Foam;
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// Main program:
int main(int argc, char *argv[])
{
argList::addNote("Rewrites fields multiple times");
argList::noFunctionObjects(); // Disallow function objects
argList::noCheckProcessorDirectories();
argList::addVerboseOption("additional verbosity");
argList::addOption
(
"output",
"N",
"Begin output iteration (default: 10000)"
);
argList::addOption
(
"count",
"N",
"Number of writes (default: 1)"
);
argList::addOption
(
"fields",
"N",
"Number of fields to write (default: 1)"
);
argList::addOption
(
"global",
"N",
"Global field size"
);
argList::addOption
(
"local",
"N",
"Local fields size (default: 1000)"
);
argList::addOption
(
"mesh",
"(nx ny nz)",
"Create with a mesh"
);
argList::addOption
(
"exclude",
"(int ... )",
"zero-sized on ranks with specified modulo"
);
argList::addBoolOption("coherent", "Force coherent output");
#include "setRootCase.H"
#include "createTime.H"
const label firstOutput = args.getOrDefault("output", 10000);
const label nOutput = args.getOrDefault("count", 1);
const label nFields = args.getOrDefault("fields", 1);
labelVector meshCells(0, 0, 0);
const int verbose = args.verbose();
const bool useCoherent = args.found("coherent");
labelList excludes;
args.readListIfPresent("exclude", excludes);
bool writeOnProc = true;
const label myProci = UPstream::myProcNo();
for (const label excl : excludes)
{
if (excl > 1 && myProci > 0 && (myProci % excl) == 0)
{
writeOnProc = false;
break;
}
}
const label nProcsEff =
returnReduce((writeOnProc ? 1 : 0), sumOp<label>());
Info<< "Output " << nProcsEff
<< "/" << UPstream::nProcs()
<< " ranks" << nl;
if (args.readIfPresent("mesh", meshCells))
{
if (!writeOnProc)
{
meshCells = Zero;
}
PDRblock block(boundBox(point::zero, point::one), meshCells);
// Advance time
// - coherent currently still needs to read the mesh itself!
runTime.setTime(firstOutput, firstOutput);
IOobject meshIO
(
polyMesh::defaultRegion,
runTime.timeName(),
runTime,
IOobject::NO_READ,
IOobject::NO_WRITE,
IOobject::NO_REGISTER
);
autoPtr<polyMesh> pmeshPtr(block.innerMesh(meshIO));
fvMesh mesh
(
meshIO,
pointField(pmeshPtr->points()),
faceList(pmeshPtr->faces()),
labelList(pmeshPtr->faceOwner()),
labelList(pmeshPtr->faceNeighbour())
);
pmeshPtr.reset(nullptr);
const label fieldSize = mesh.nCells();
const globalIndex giCells(fieldSize);
// Create fields
Info<< nl << "Create " << nFields << " fields" << nl
<< "field-size:" << fieldSize
<< " total-size:" << giCells.totalSize() << nl;
// Dimensioned field (no proc boundaries)
PtrList<volScalarField::Internal> fields(nFields);
{
IOobject io
(
"field",
runTime.timeName(),
runTime,
IOobject::NO_READ,
IOobject::NO_WRITE,
IOobject::NO_REGISTER
);
forAll(fields, fieldi)
{
io.resetHeader("field" + Foam::name(fieldi));
fields.set
(
fieldi,
new volScalarField::Internal(io, mesh, dimless)
);
auto& fld = fields[fieldi];
std::iota(fld.begin(), fld.end(), scalar(0));
}
}
IOstreamOption streamOpt(IOstreamOption::BINARY);
if (useCoherent)
{
#ifdef USE_COHERENT
streamOpt.format(IOstreamOption::COHERENT);
runTime.writeFormat(IOstreamOption::COHERENT);
mesh.writeObject(streamOpt, true);
Info<< nl
<< "Specified -coherent (instance: "
<< mesh.pointsInstance() << ")" << endl;
const auto& coherent = CoherentMesh::New(mesh);
Info<< " points = "
<< coherent.globalPointOffsets().totalSize() << nl
<< " cells = "
<< coherent.globalCellOffsets().totalSize() << nl
<< " patches = "
<< coherent.nNonProcessorPatches() << nl;
#else
Info<< "Warning: -coherent ignored" << nl;
#endif
}
Info<< nl
<< "Writing " << nOutput << " times starting at "
<< firstOutput << nl;
clockTime timing;
if (verbose) Info<< "Time:";
for
(
label timeIndex = firstOutput, count = 0;
count < nOutput;
++timeIndex, ++count
)
{
runTime.setTime(timeIndex, timeIndex);
if (verbose) Info<< ' ' << runTime.timeName() << flush;
runTime.writeNow();
for (const auto& fld : fields)
{
fld.regIOobject::writeObject(streamOpt, writeOnProc);
}
}
if (useCoherent)
{
#ifdef USE_COHERENT
SliceStreamRepo::closeInstance();
#endif
}
if (verbose) Info<< nl;
Info<< nl << "Writing took "
<< timing.timeIncrement() << "s" << endl;
Info<< nl
<< "Cleanup newly generated files with" << nl << nl
<< " foamListTimes -rm -time "
<< firstOutput << ":" << nl
<< " foamListTimes -processor -rm -time "
<< firstOutput << ":" << nl;
}
else
{
label fieldSize = 1000;
if (args.readIfPresent("global", fieldSize))
{
fieldSize /= nProcsEff;
}
else
{
args.readIfPresent("local", fieldSize);
}
if (!writeOnProc)
{
fieldSize = 0;
}
const globalIndex giCells(fieldSize);
// Create fields
Info<< nl << "Create " << nFields << " fields" << nl
<< "field-size:" << fieldSize
<< " total-size:" << giCells.totalSize() << nl;
PtrList<IOField<scalar>> fields(nFields);
{
IOobject io
(
"field",
runTime.timeName(),
runTime,
IOobject::NO_READ,
IOobject::NO_WRITE,
IOobject::NO_REGISTER
);
forAll(fields, fieldi)
{
io.resetHeader("field" + Foam::name(fieldi));
fields.set
(
fieldi,
new IOField<scalar>(io, fieldSize)
);
auto& fld = fields[fieldi];
std::iota(fld.begin(), fld.end(), scalar(0));
}
}
IOstreamOption streamOpt(IOstreamOption::BINARY);
if (useCoherent)
{
Info<< "Warning: -coherent ignored" << nl;
}
Info<< nl
<< "Writing " << nOutput << " times starting at "
<< firstOutput << nl;
clockTime timing;
if (verbose) Info<< "Time:";
for
(
label timeIndex = firstOutput, count = 0;
count < nOutput;
++timeIndex, ++count
)
{
runTime.setTime(timeIndex, timeIndex);
if (verbose) Info<< ' ' << runTime.timeName() << flush;
runTime.writeNow();
for (const auto& fld : fields)
{
fld.regIOobject::writeObject(streamOpt, writeOnProc);
}
}
if (verbose) Info<< nl;
Info<< nl << "Writing took "
<< timing.timeIncrement() << "s" << endl;
Info<< nl
<< "Cleanup newly generated files with" << nl << nl
<< " foamListTimes -rm -time "
<< firstOutput << ":" << nl
<< " foamListTimes -processor -rm -time "
<< firstOutput << ":" << nl;
}
Info<< "\nEnd\n" << endl;
return 0;
}
// ************************************************************************* //

View File

@ -1,3 +1,3 @@
Test-fileHandler-writing.C Test-fileHandler-writing.cxx
EXE = $(FOAM_USER_APPBIN)/Test-fileHandler-writing EXE = $(FOAM_USER_APPBIN)/Test-fileHandler-writing

View File

@ -56,8 +56,18 @@ int main(int argc, char *argv[])
argList::noFunctionObjects(); // Disallow function objects argList::noFunctionObjects(); // Disallow function objects
argList::addVerboseOption("additional verbosity"); argList::addVerboseOption("additional verbosity");
argList::addOption("output", "Begin output iteration (default: 10000)"); argList::addOption
argList::addOption("count", "Number of writes (default: 1)"); (
"output",
"N",
"Begin output iteration (default: 10000)"
);
argList::addOption
(
"count",
"N",
"Number of writes (default: 1)"
);
#include "setRootCase.H" #include "setRootCase.H"
#include "createTime.H" #include "createTime.H"

View File

@ -114,7 +114,7 @@ OptimisationSwitches
// (sized with magnitude of value) is large enough to hold all // (sized with magnitude of value) is large enough to hold all
// outstanding writes so will not try to initialise the Pstream with // outstanding writes so will not try to initialise the Pstream with
// threading support. // threading support.
// Default: 1e9 // Default: 0
maxThreadFileBufferSize 0; maxThreadFileBufferSize 0;
//- masterUncollated: non-blocking buffer size. //- masterUncollated: non-blocking buffer size.

View File

@ -135,23 +135,15 @@ protected:
// Protected Member Functions // Protected Member Functions
//- Helper: determine number of processors whose recvSizes fits //- Read data (on master) and transmit.
//- into maxBufferSize
static label calcNumProcs
(
const label comm,
const off_t maxBufferSize,
const labelUList& recvSizes,
const label startProci
);
//- Read data into *this. ISstream is only valid on master.
static bool readBlocks static bool readBlocks
( (
const label comm, const label comm,
// [in] The input stream (only valid on master)
autoPtr<ISstream>& isPtr, autoPtr<ISstream>& isPtr,
List<char>& contentChars, // [out] The processor local data
const UPstream::commsTypes commsType List<char>& localData,
const UPstream::commsTypes commsType /* unused */
); );
//- Helper: skip a block of (binary) character data //- Helper: skip a block of (binary) character data
@ -277,6 +269,19 @@ public:
} }
//- Helper: write block of (binary) character content //- Helper: write block of (binary) character content
// Housekeeping
static std::streamoff writeBlockEntry
(
OSstream& os,
const label blocki,
const stdFoam::span<char>& s
)
{
return writeBlockEntry(os, blocki, s.data(), s.size());
}
//- Helper: write block of (binary) character content
// Housekeeping
static std::streamoff writeBlockEntry static std::streamoff writeBlockEntry
( (
OSstream& os, OSstream& os,
@ -307,41 +312,37 @@ public:
); );
//- Read master header information (into headerIO) and return //- Read master header information (into headerIO) and return
//- data in stream. Note: isPtr is only valid on master. //- data in stream.
static autoPtr<ISstream> readBlocks static autoPtr<ISstream> readBlocks
( (
const label comm, const label comm,
const fileName& fName, const fileName& fName,
//! [in] The input stream (only valid on master)
autoPtr<ISstream>& isPtr, autoPtr<ISstream>& isPtr,
//! [out] header information
IOobject& headerIO, IOobject& headerIO,
const UPstream::commsTypes commsType const UPstream::commsTypes commsType /* unused */
); );
//- Helper: gather single label. Note: using native Pstream. //- Helper: gather data from (subset of) sub-ranks.
// datas sized with num procs but undefined contents on // In non-blocking mode it sets up send/recv for non-empty content.
// slaves // In blocking/scheduled mode it uses MPI_Gatherv to collect data.
static void gather
(
const label comm,
const label data,
labelList& datas
);
//- Helper: gather data from (subset of) slaves.
// //
// Returns: // Returns:
// - recvData : received data // - recvData : the received data
// - recvOffsets : offset in data. recvOffsets is nProcs+1 // - recvOffsets : offset in data. recvOffsets is nProcs+1
static void gatherSlaveData static void gatherProcData
( (
const label comm, const label comm,
const UList<char>& data, const UList<char>& localData, //!< [in] required on all procs
const labelUList& recvSizes, const labelUList& recvSizes, //!< [in] only required on master
const labelRange& fromProcs, const labelRange& whichProcs, //!< [in] required on all procs
List<int>& recvOffsets, List<int>& recvOffsets, //!< [out] only relevant on master
DynamicList<char>& recvData DynamicList<char>& recvData, //!< [out] only relevant on master
const UPstream::commsTypes commsType
); );
//- Write *this. Ostream only valid on master. //- Write *this. Ostream only valid on master.
@ -349,19 +350,98 @@ public:
static bool writeBlocks static bool writeBlocks
( (
const label comm, const label comm,
//! [in] output stream (relevant on master)
autoPtr<OSstream>& osPtr, autoPtr<OSstream>& osPtr,
//! [out] start offsets to each block (relevant on master),
//! ignored if List::null() type
List<std::streamoff>& blockOffset, List<std::streamoff>& blockOffset,
const UList<char>& masterData, const UList<char>& localData, //!< [in] required on all procs
const labelUList& recvSizes, //!< [in] only required on master
const labelUList& recvSizes, //! Optional proc data (only written on master)
//! but \b must also be symmetrically defined (empty/non-empty)
// Optional slave data (on master) //! on all ranks
const UPtrList<SubList<char>>& slaveData, const UList<stdFoam::span<char>>& procData,
const UPstream::commsTypes commsType, const UPstream::commsTypes commsType,
const bool syncReturnState = true const bool syncReturnState = true
); );
// Housekeeping
//- Write *this. Ostream only valid on master.
// Returns offsets of processor blocks in blockOffset
FOAM_DEPRECATED_FOR(2023-09, "write with char span instead")
static bool writeBlocks
(
const label comm,
autoPtr<OSstream>& osPtr,
List<std::streamoff>& blockOffset,
const UList<char>& localData, // [in] required on all procs
const labelUList& recvSizes, // [in] only required on master
// Optional proc data (only written on master)
// but \b must also be symmetrically defined (empty/non-empty)
// on all ranks
const UPtrList<SubList<char>>& procData,
const UPstream::commsTypes commsType,
const bool syncReturnState = true
)
{
// Transcribe to span<char>
List<stdFoam::span<char>> spans(procData.size());
forAll(procData, proci)
{
if (procData.test(proci))
{
spans[proci] = stdFoam::span<char>
(
const_cast<char*>(procData[proci].cdata()),
procData[proci].size()
);
}
}
return decomposedBlockData::writeBlocks
(
comm,
osPtr,
blockOffset,
localData,
recvSizes,
spans,
commsType,
syncReturnState
);
}
//- Deprecated(2023-09) - consider UPstream::listGatherValue
// The only difference is that this gather also resizes the output
// on the non-master procs
// \deprecated(2023-09) - consider UPstream::listGatherValue
FOAM_DEPRECATED_FOR(2023-09, "consider UPstream::listGatherValue()")
static void gather
(
const label comm,
const label localValue,
labelList& allValues
)
{
allValues.resize_nocopy(UPstream::nProcs(comm));
UPstream::mpiGather
(
reinterpret_cast<const char*>(&localValue),
allValues.data_bytes(),
sizeof(label), // The send/recv size per rank
comm
);
}
}; };

View File

@ -29,7 +29,7 @@ License
#include "masterOFstream.H" #include "masterOFstream.H"
#include "OFstream.H" #include "OFstream.H"
#include "OSspecific.H" #include "OSspecific.H"
#include "PstreamBuffers.H" #include "Pstream.H"
#include "masterUncollatedFileOperation.H" #include "masterUncollatedFileOperation.H"
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
@ -38,10 +38,10 @@ void Foam::masterOFstream::checkWrite
( (
const fileName& fName, const fileName& fName,
const char* str, const char* str,
std::streamsize len const std::streamsize len
) )
{ {
if (!len) if (!str || !len)
{ {
// Can probably skip all of this if there is nothing to write // Can probably skip all of this if there is nothing to write
return; return;
@ -63,9 +63,7 @@ void Foam::masterOFstream::checkWrite
<< exit(FatalIOError); << exit(FatalIOError);
} }
// Use writeRaw() instead of writeQuoted(string,false) to output // Write characters directly to std::ostream
// characters directly.
os.writeRaw(str, len); os.writeRaw(str, len);
if (!os.good()) if (!os.good())
@ -77,97 +75,159 @@ void Foam::masterOFstream::checkWrite
} }
void Foam::masterOFstream::checkWrite
(
const fileName& fName,
const std::string& s
)
{
checkWrite(fName, s.data(), s.length());
}
void Foam::masterOFstream::commit() void Foam::masterOFstream::commit()
{ {
// Take ownership of serialized content, without copying or reallocation
DynamicList<char> charData(OCharStream::release());
if (UPstream::parRun()) if (UPstream::parRun())
{ {
// Ignore content if not writing (reduces communication)
if (!writeOnProc_)
{
charData.clear();
}
List<fileName> filePaths(UPstream::nProcs(comm_)); List<fileName> filePaths(UPstream::nProcs(comm_));
filePaths[UPstream::myProcNo(comm_)] = pathName_; filePaths[UPstream::myProcNo(comm_)] = pathName_;
Pstream::gatherList(filePaths, UPstream::msgType(), comm_); Pstream::gatherList(filePaths, UPstream::msgType(), comm_);
// Test for identical output paths
bool uniform = bool uniform =
( (
UPstream::master(comm_) UPstream::master(comm_)
&& fileOperation::uniformFile(filePaths) ? fileOperation::uniformFile(filePaths)
: true
); );
Pstream::broadcast(uniform, comm_); Pstream::broadcast(uniform, comm_);
if (uniform) if (uniform)
{ {
// Identical file paths - write on master
if (UPstream::master(comm_) && writeOnProc_) if (UPstream::master(comm_) && writeOnProc_)
{ {
checkWrite(pathName_, this->str()); checkWrite(pathName_, charData);
} }
this->reset();
return; return;
} }
// Different files // Different files
PstreamBuffers pBufs(comm_, UPstream::commsTypes::nonBlocking); // ---------------
// Current strategy is to setup all non-blocking send/recv
// using the probed message size to establish the recv size
// (to avoid an additional communication of the sizes).
//
// For ranks with writeOnProc=false, the message size is 0.
if (!UPstream::master(comm_)) // An alternative approach would be to gather recv sizes
// to avoid zero-sized messages and/or use double buffering
// to recv into a buffer and write.
//
// const labelList recvSizes
// (
// UPstream::listGatherValues<label>
// (
// (UPstream::is_subrank(comm_) ? charData.size() : label(0)),
// comm_
// )
// );
const label startOfRequests = UPstream::nRequests();
// Some unique tag for this read/write/probe grouping
const int messageTag = UPstream::msgType() + 256;
if (UPstream::is_subrank(comm_))
{ {
if (writeOnProc_) // Send to master. When (!writeOnProc_) it is zero-sized.
{ UOPstream::write
// Send buffer to master (
string s(this->str()); UPstream::commsTypes::nonBlocking,
UPstream::masterNo(),
UOPstream os(UPstream::masterNo(), pBufs); charData.cdata_bytes(),
os.write(s.data(), s.length()); charData.size_bytes(),
} messageTag,
this->reset(); // Done with contents comm_
);
} }
else if (UPstream::master(comm_))
pBufs.finishedGathers();
if (UPstream::master(comm_))
{ {
// The receive slots
List<List<char>> procBuffers(UPstream::nProcs(comm_));
const auto recvProcs = UPstream::subProcs(comm_);
for (const int proci : recvProcs)
{
auto& procSlice = procBuffers[proci];
// Probe the message size
std::pair<int, int> probed =
UPstream::probeMessage
(
UPstream::commsTypes::blocking,
proci,
messageTag,
comm_
);
procSlice.resize_nocopy(probed.second);
// Receive content (can also be zero-sized)
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
procSlice.data_bytes(),
procSlice.size_bytes(),
messageTag,
comm_
);
}
if (writeOnProc_) if (writeOnProc_)
{ {
// Write master data // Write non-empty master data
checkWrite(filePaths[UPstream::masterNo()], this->str()); checkWrite(pathName_, charData);
} }
this->reset(); // Done with contents
// Poll for completed receive requests and dispatch
// Allocate large enough to read without resizing DynamicList<int> indices(recvProcs.size());
List<char> buf(pBufs.maxRecvCount()); while
(
for (const int proci : UPstream::subProcs(comm_)) UPstream::waitSomeRequests
(
startOfRequests,
recvProcs.size(),
&indices
)
)
{ {
const std::streamsize count(pBufs.recvDataCount(proci)); for (const int idx : indices)
if (count)
{ {
UIPstream is(proci, pBufs); const int proci = recvProcs[idx];
auto& procSlice = procBuffers[proci];
is.read(buf.data(), count); if (!procSlice.empty())
checkWrite(filePaths[proci], buf.cdata(), count); {
// Write non-empty sub-proc data
checkWrite(filePaths[proci], procSlice);
}
// Eager cleanup?
// TBD: procSlice.clear();
} }
} }
} }
UPstream::waitRequests(startOfRequests);
} }
else else
{ {
checkWrite(pathName_, this->str()); // Write (non-empty) data
this->reset(); checkWrite(pathName_, charData);
} }
// This method is only called once (internally)
// so no need to clear/flush old buffered data
} }
@ -183,7 +243,7 @@ Foam::masterOFstream::masterOFstream
const bool writeOnProc const bool writeOnProc
) )
: :
OStringStream(streamOpt), OCharStream(streamOpt),
pathName_(pathName), pathName_(pathName),
atomic_(atomic), atomic_(atomic),
compression_(streamOpt.compression()), compression_(streamOpt.compression()),

View File

@ -41,7 +41,7 @@ SourceFiles
#ifndef Foam_masterOFstream_H #ifndef Foam_masterOFstream_H
#define Foam_masterOFstream_H #define Foam_masterOFstream_H
#include "StringStream.H" #include "SpanStream.H"
#include "UPstream.H" #include "UPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -55,7 +55,7 @@ namespace Foam
class masterOFstream class masterOFstream
: :
public OStringStream public OCharStream
{ {
// Private Data // Private Data
@ -85,13 +85,20 @@ class masterOFstream
( (
const fileName& fName, const fileName& fName,
const char* str, const char* str,
std::streamsize len const std::streamsize len
); );
//- Open file with checking and write append contents //- Open file with checking and write append contents
void checkWrite(const fileName& fName, const std::string& s); void checkWrite
(
const fileName& fName,
const UList<char>& charData
)
{
checkWrite(fName, charData.cdata(), charData.size_bytes());
}
//- Commit buffered information, including parallel gather as required //- Commit buffered information, including communication as required
void commit(); void commit();

View File

@ -47,9 +47,9 @@ bool Foam::OFstreamCollator::writeFile
const label comm, const label comm,
const word& objectType, const word& objectType,
const fileName& fName, const fileName& fName,
const string& masterData, const UList<char>& localData,
const labelUList& recvSizes, const labelUList& recvSizes,
const UPtrList<SubList<char>>& slaveData, // optional slave data const UList<stdFoam::span<char>>& procData, // optional proc data
IOstreamOption streamOpt, IOstreamOption streamOpt,
IOstreamOption::atomicType atomic, IOstreamOption::atomicType atomic,
IOstreamOption::appendType append, IOstreamOption::appendType append,
@ -58,18 +58,14 @@ bool Foam::OFstreamCollator::writeFile
{ {
if (debug) if (debug)
{ {
Pout<< "OFstreamCollator : Writing master " << label(masterData.size()) Pout<< "OFstreamCollator : Writing local " << localData.size()
<< " bytes to " << fName << " using comm " << comm << " bytes to " << fName << " using comm " << comm
<< " and " << slaveData.size() << " sub-ranks" << endl; << " and " << procData.size() << " sub-ranks" << endl;
forAll(slaveData, proci) forAll(procData, proci)
{ {
if (slaveData.set(proci)) Pout<< " " << proci << " size:"
{ << label(procData[proci].size()) << nl;
Pout<< " " << proci
<< " size:" << slaveData[proci].size()
<< endl;
}
} }
} }
@ -104,32 +100,27 @@ bool Foam::OFstreamCollator::writeFile
// for some mpi so default is non-blocking. // for some mpi so default is non-blocking.
const UPstream::commsTypes myCommsType const UPstream::commsTypes myCommsType
( (
mag
( (
fileOperations::masterUncollatedFileOperation:: fileOperations::masterUncollatedFileOperation::
maxMasterFileBufferSize == 0 maxMasterFileBufferSize
) ) < 1
? UPstream::commsTypes::scheduled ? UPstream::commsTypes::scheduled
: UPstream::commsTypes::nonBlocking : UPstream::commsTypes::nonBlocking
); );
UList<char> slice List<std::streamoff> blockOffsets; // Optional
(
const_cast<char*>(masterData.data()),
label(masterData.size())
);
List<std::streamoff> blockOffset;
decomposedBlockData::writeBlocks decomposedBlockData::writeBlocks
( (
comm, comm,
osPtr, osPtr,
blockOffset, blockOffsets, // or List<std::streamoff>::null()
slice, localData,
recvSizes, recvSizes,
slaveData, procData,
myCommsType, myCommsType,
false // do not reduce return state false // do not sync return state
); );
if (osPtr && !osPtr->good()) if (osPtr && !osPtr->good())
@ -140,17 +131,18 @@ bool Foam::OFstreamCollator::writeFile
if (debug) if (debug)
{ {
Pout<< "OFstreamCollator : Finished writing " << masterData.size() Pout<< "OFstreamCollator : Finished writing "
<< " bytes"; << localData.size() << " bytes";
if (UPstream::master(comm)) if (UPstream::master(comm))
{ {
off_t sum = 0; off_t total = 0;
for (const label recv : recvSizes) for (const label recv : recvSizes)
{ {
sum += recv; total += recv;
} }
// Use std::to_string to display long int // Use std::to_string to display long int
Pout<< " (overall " << std::to_string(sum) << ')'; Pout<< " (overall " << std::to_string(total) << ')';
} }
Pout<< " to " << fName Pout<< " to " << fName
<< " using comm " << comm << endl; << " using comm " << comm << endl;
@ -167,13 +159,16 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
// Consume stack // Consume stack
while (true) while (true)
{ {
writeData* ptr = nullptr; std::unique_ptr<writeData> ptr;
{ {
std::lock_guard<std::mutex> guard(handler.mutex_); std::lock_guard<std::mutex> guard(handler.mutex_);
if (handler.objects_.size()) if (handler.objects_.size())
{ {
ptr = handler.objects_.pop(); // FIFO
ptr.reset(handler.objects_.front());
handler.objects_.pop_front();
} }
} }
@ -181,51 +176,39 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
{ {
break; break;
} }
else
writeData& obj = *ptr;
// Obtain spans from storage
List<stdFoam::span<char>> procData(obj.procData_.size());
forAll(procData, proci)
{ {
// Convert storage to pointers procData[proci] = stdFoam::span<char>
PtrList<SubList<char>> slaveData;
if (ptr->slaveData_.size())
{
slaveData.resize(ptr->slaveData_.size());
forAll(slaveData, proci)
{
if (ptr->slaveData_.set(proci))
{
slaveData.set
(
proci,
new SubList<char>
(
ptr->slaveData_[proci],
ptr->sizes_[proci]
)
);
}
}
}
bool ok = writeFile
( (
ptr->comm_, const_cast<char*>(obj.procData_[proci].cdata()),
ptr->objectType_, obj.procData_[proci].size()
ptr->pathName_,
ptr->data_,
ptr->sizes_,
slaveData,
ptr->streamOpt_,
ptr->atomic_,
ptr->append_,
ptr->headerEntries_
); );
if (!ok) }
{
FatalIOErrorInFunction(ptr->pathName_)
<< "Failed writing " << ptr->pathName_
<< exit(FatalIOError);
}
delete ptr; bool ok = writeFile
(
obj.comm_,
obj.objectType_,
obj.pathName_,
obj.localData_,
obj.sizes_,
procData,
obj.streamOpt_,
obj.atomic_,
obj.append_,
obj.headerEntries_
);
if (!ok)
{
FatalIOErrorInFunction(obj.pathName_)
<< "Failed writing " << obj.pathName_
<< exit(FatalIOError);
} }
//sleep(1); //sleep(1);
} }
@ -248,14 +231,14 @@ void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
{ {
while (true) while (true)
{ {
// Count files to be written // The pending output size(s)
off_t totalSize = 0; off_t totalSize = 0;
{ {
std::lock_guard<std::mutex> guard(mutex_); std::lock_guard<std::mutex> guard(mutex_);
forAllConstIters(objects_, iter) for (const writeData* ptr : objects_)
{ {
totalSize += iter()->size(); if (ptr) totalSize += ptr->size();
} }
} }
@ -287,17 +270,7 @@ void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize) Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize)
: :
maxBufferSize_(maxBufferSize), OFstreamCollator(maxBufferSize, UPstream::worldComm)
threadRunning_(false),
localComm_(UPstream::worldComm),
threadComm_
(
UPstream::allocateCommunicator
(
localComm_,
labelRange(UPstream::nProcs(localComm_))
)
)
{} {}
@ -312,6 +285,7 @@ Foam::OFstreamCollator::OFstreamCollator
localComm_(comm), localComm_(comm),
threadComm_ threadComm_
( (
// dupComm
UPstream::allocateCommunicator UPstream::allocateCommunicator
( (
localComm_, localComm_,
@ -345,7 +319,7 @@ bool Foam::OFstreamCollator::write
( (
const word& objectType, const word& objectType,
const fileName& fName, const fileName& fName,
const string& data, DynamicList<char>&& localData,
IOstreamOption streamOpt, IOstreamOption streamOpt,
IOstreamOption::atomicType atomic, IOstreamOption::atomicType atomic,
IOstreamOption::appendType append, IOstreamOption::appendType append,
@ -355,78 +329,109 @@ bool Foam::OFstreamCollator::write
{ {
// Determine (on master) sizes to receive. Note: do NOT use thread // Determine (on master) sizes to receive. Note: do NOT use thread
// communicator // communicator
labelList recvSizes; const labelList recvSizes
decomposedBlockData::gather(localComm_, label(data.size()), recvSizes); (
UPstream::listGatherValues<label>(localData.size(), localComm_)
);
off_t totalSize = 0; off_t totalSize = 0;
label maxLocalSize = 0; label maxLocalSize = 0;
if (UPstream::master(localComm_))
{ {
if (UPstream::master(localComm_)) for (const label recvSize : recvSizes)
{ {
for (const label recvSize : recvSizes) totalSize += recvSize;
{ maxLocalSize = max(maxLocalSize, recvSize);
totalSize += recvSize;
maxLocalSize = max(maxLocalSize, recvSize);
}
} }
Pstream::broadcasts(localComm_, totalSize, maxLocalSize);
} }
Pstream::broadcasts(localComm_, totalSize, maxLocalSize);
// Determine how things will be gathered and written...
enum class dispatchModes { GATHER_WRITE, PREFETCH_THREADED, FULL_THREADED };
dispatchModes dispatch(dispatchModes::GATHER_WRITE);
if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_) if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
{
dispatch = dispatchModes::GATHER_WRITE;
}
else if (totalSize <= maxBufferSize_)
{
// Total size can be stored locally
// - gather all data now and only do the writing in the thread
dispatch = dispatchModes::PREFETCH_THREADED;
}
else
{
// Gather data and write in the thread
dispatch = dispatchModes::FULL_THREADED;
if (!UPstream::haveThreads())
{
WarningInFunction
<< "MPI not initialized with thread support." << nl
<< " maxThreadFileBufferSize = 0 to disable threading" << nl
<< " or maxThreadFileBufferSize > " << totalSize
<< " to collate before threaded writing." << nl << nl;
dispatch = dispatchModes::GATHER_WRITE;
}
}
// -----------
// Dispatching
// -----------
if (dispatch == dispatchModes::GATHER_WRITE)
{ {
if (debug) if (debug)
{ {
Pout<< "OFstreamCollator : non-thread gather and write of " << fName Pout<< "OFstreamCollator : non-thread gather/write "
<< " using local comm " << localComm_ << endl; << "(local comm: " << localComm_ << ") of "
<< fName << endl;
} }
// Direct collating and writing (so master blocks until all written!) // Direct collating and writing (so master blocks until all written!)
const PtrList<SubList<char>> dummySlaveData;
return writeFile return writeFile
( (
localComm_, localComm_,
objectType, objectType,
fName, fName,
data, localData,
recvSizes, recvSizes,
dummySlaveData, UList<stdFoam::span<char>>(), // dummy proc data
streamOpt, streamOpt,
atomic, atomic,
append, append,
headerEntries headerEntries
); );
} }
else if (totalSize <= maxBufferSize_) else if (dispatch == dispatchModes::PREFETCH_THREADED)
{ {
// Total size can be stored locally so receive all data now and only
// do the writing in the thread
if (debug) if (debug)
{ {
Pout<< "OFstreamCollator : non-thread gather; thread write of " Pout<< "OFstreamCollator : non-thread gather; thread write of "
<< fName << endl; << fName << endl;
} }
if (Pstream::master(localComm_)) if (UPstream::master(localComm_))
{ {
waitForBufferSpace(totalSize); waitForBufferSpace(totalSize);
} }
std::unique_ptr<writeData> fileAndDataPtr
// Receive in chunks of labelMax (2^31-1) since this is the maximum
// size that a List can be
autoPtr<writeData> fileAndDataPtr
( (
new writeData new writeData
( (
threadComm_, // Note: comm not actually used anymore threadComm_, // Note: comm not actually used anymore
objectType, objectType,
fName, fName,
(
Pstream::master(localComm_)
? data // Only used on master
: string::null
),
recvSizes, recvSizes,
streamOpt, streamOpt,
atomic, atomic,
@ -434,63 +439,81 @@ bool Foam::OFstreamCollator::write
headerEntries headerEntries
) )
); );
writeData& fileAndData = fileAndDataPtr(); auto& fileAndData = *fileAndDataPtr;
PtrList<List<char>>& slaveData = fileAndData.slaveData_; List<List<char>>& procData = fileAndData.procData_;
if (UPstream::master(localComm_))
{
// Move in local data (master only!)
fileAndData.transfer(localData);
UList<char> slice(const_cast<char*>(data.data()), label(data.size())); // Storage for receive data
procData.resize(UPstream::nProcs(localComm_));
for (const int proci : UPstream::subProcs(localComm_))
{
procData[proci].resize(recvSizes[proci]);
}
}
else if (UPstream::is_subrank(localComm_))
{
// Requires a size for decomposedBlockData::writeBlocks() logic
procData.resize(UPstream::nProcs(localComm_));
}
slaveData.setSize(recvSizes.size());
// Gather all data onto master. Is done in local communicator since // Gather all data onto master. Is done in local communicator since
// not in write thread. Note that we do not store in contiguous // not in write thread.
// buffer since that would limit to 2G chars.
const label startOfRequests = UPstream::nRequests(); const label startOfRequests = UPstream::nRequests();
if (Pstream::master(localComm_)) if (UPstream::master(localComm_))
{ {
for (label proci = 1; proci < slaveData.size(); proci++) for (const int proci : UPstream::subProcs(localComm_))
{ {
slaveData.set(proci, new List<char>(recvSizes[proci])); List<char>& procSlice = procData[proci];
if (procSlice.empty()) continue;
UIPstream::read UIPstream::read
( (
UPstream::commsTypes::nonBlocking, UPstream::commsTypes::nonBlocking,
proci, proci,
slaveData[proci].data(), procSlice.data_bytes(),
slaveData[proci].size_bytes(), procSlice.size_bytes(),
Pstream::msgType(), UPstream::msgType(),
localComm_ localComm_
); );
} }
} }
else else if (UPstream::is_subrank(localComm_) && !localData.empty())
{ {
if if
( (
!UOPstream::write !UOPstream::write
( (
UPstream::commsTypes::nonBlocking, UPstream::commsTypes::nonBlocking,
0, UPstream::masterNo(),
slice.cdata(), localData.cdata_bytes(),
slice.size_bytes(), localData.size_bytes(),
Pstream::msgType(), UPstream::msgType(),
localComm_ localComm_
) )
) )
{ {
FatalErrorInFunction FatalErrorInFunction
<< "Cannot send outgoing message. " << "Cannot send outgoing message (size: "
<< "to:" << 0 << " nBytes:" << localData.size() << ") to master" << nl
<< label(slice.size_bytes())
<< Foam::abort(FatalError); << Foam::abort(FatalError);
} }
} }
UPstream::waitRequests(startOfRequests); UPstream::waitRequests(startOfRequests);
// The localData has been moved (master) or communicated
localData.clearStorage();
{ {
std::lock_guard<std::mutex> guard(mutex_); std::lock_guard<std::mutex> guard(mutex_);
// Append to thread buffer // Append to thread buffer (as FIFO), take ownership
objects_.push(fileAndDataPtr.ptr()); objects_.push_back(fileAndDataPtr.release());
// Start thread if not running // Start thread if not running
if (!threadRunning_) if (!threadRunning_)
@ -517,49 +540,46 @@ bool Foam::OFstreamCollator::write
return true; return true;
} }
else else if (dispatch == dispatchModes::FULL_THREADED)
{ {
if (debug) if (debug)
{ {
Pout<< "OFstreamCollator : thread gather and write of " << fName Pout<< "OFstreamCollator : thread gather and write "
<< " using communicator " << threadComm_ << endl; << "(thread comm: " << threadComm_
<< ") of " << fName << endl;
} }
if (!UPstream::haveThreads()) if (UPstream::master(localComm_))
{ {
FatalErrorInFunction waitForBufferSpace(localData.size());
<< "mpi does not seem to have thread support."
<< " Make sure to set buffer size 'maxThreadFileBufferSize'"
<< " to at least " << totalSize
<< " to be able to do the collating before threading."
<< exit(FatalError);
} }
if (Pstream::master(localComm_)) std::unique_ptr<writeData> fileAndDataPtr
{ (
waitForBufferSpace(data.size()); new writeData
} (
threadComm_,
objectType,
fName,
recvSizes,
streamOpt,
atomic,
append,
headerEntries
)
);
// Move in local data (all procs)
fileAndDataPtr->transfer(localData);
{ {
std::lock_guard<std::mutex> guard(mutex_); std::lock_guard<std::mutex> guard(mutex_);
// Push all file info on buffer. Note that no slave data provided // Append to thread buffer (as FIFO), take ownership
objects_.push_back(fileAndDataPtr.release());
// Note: no proc data provided
// so it will trigger communication inside the thread // so it will trigger communication inside the thread
objects_.push
(
new writeData
(
threadComm_,
objectType,
fName,
data,
recvSizes,
streamOpt,
atomic,
append,
headerEntries
)
);
if (!threadRunning_) if (!threadRunning_)
{ {
@ -584,6 +604,12 @@ bool Foam::OFstreamCollator::write
return true; return true;
} }
FatalErrorInFunction
<< "Unknown dispatch mode: " << int(dispatch)
<< " - programming error?" << abort(FatalError);
return false;
} }
@ -591,7 +617,7 @@ void Foam::OFstreamCollator::waitAll()
{ {
// Wait for all buffer space to be available i.e. wait for all jobs // Wait for all buffer space to be available i.e. wait for all jobs
// to finish // to finish
if (Pstream::master(localComm_)) if (UPstream::master(localComm_))
{ {
if (debug) if (debug)
{ {

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2017-2018 OpenFOAM Foundation Copyright (C) 2017-2018 OpenFOAM Foundation
Copyright (C) 2021-2022 OpenCFD Ltd. Copyright (C) 2019-2023 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -51,14 +51,14 @@ SourceFiles
#ifndef Foam_OFstreamCollator_H #ifndef Foam_OFstreamCollator_H
#define Foam_OFstreamCollator_H #define Foam_OFstreamCollator_H
#include <thread>
#include <mutex>
#include "IOstream.H" #include "IOstream.H"
#include "labelList.H" #include "List.H"
#include "FIFOStack.H" #include "CircularBuffer.H" // As FIFO
#include "SubList.H"
#include "dictionary.H" #include "dictionary.H"
#include <mutex>
#include <thread>
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam namespace Foam
@ -72,26 +72,33 @@ class OFstreamCollator
{ {
// Private Class // Private Class
//- Holds data to be written
struct writeData struct writeData
{ {
const label comm_; const label comm_;
const word objectType_; const word objectType_;
const fileName pathName_; const fileName pathName_;
const string data_; DynamicList<char> localData_;
const labelList sizes_; const labelList sizes_;
PtrList<List<char>> slaveData_; List<List<char>> procData_;
const IOstreamOption streamOpt_; const IOstreamOption streamOpt_;
IOstreamOption::atomicType atomic_; IOstreamOption::atomicType atomic_;
IOstreamOption::appendType append_; IOstreamOption::appendType append_;
const dictionary headerEntries_; const dictionary headerEntries_;
writeData() = delete; // No default construct
writeData(const writeData&) = delete; // No copy construct
writeData(writeData&&) = delete; // No move construct
void operator=(const writeData&) = delete; // No copy assign
void operator=(writeData&&) = delete; // No move assign
//- Construct without local data
writeData writeData
( (
const label comm, const label comm,
const word& objectType, const word& objectType,
const fileName& pathName, const fileName& pathName,
const string& data, const labelUList& sizes,
const labelList& sizes,
IOstreamOption streamOpt, IOstreamOption streamOpt,
IOstreamOption::atomicType atomic, IOstreamOption::atomicType atomic,
IOstreamOption::appendType append, IOstreamOption::appendType append,
@ -101,27 +108,30 @@ class OFstreamCollator
comm_(comm), comm_(comm),
objectType_(objectType), objectType_(objectType),
pathName_(pathName), pathName_(pathName),
data_(data), localData_(),
sizes_(sizes), sizes_(sizes),
slaveData_(), procData_(),
streamOpt_(streamOpt), streamOpt_(streamOpt),
atomic_(atomic), atomic_(atomic),
append_(append), append_(append),
headerEntries_(headerEntries) headerEntries_(headerEntries)
{} {}
//- The (approximate) size of master + any optional slave data //- Move reset local data
void transfer(DynamicList<char>& localData)
{
localData_.transfer(localData);
}
//- The (approximate) size of local + any optional proc data
off_t size() const off_t size() const
{ {
off_t totalSize = data_.size(); off_t total = localData_.size();
forAll(slaveData_, i) for (const auto& data : procData_)
{ {
if (slaveData_.set(i)) total += data.size();
{
totalSize += slaveData_[i].size();
}
} }
return totalSize; return total;
} }
}; };
@ -135,8 +145,8 @@ class OFstreamCollator
std::unique_ptr<std::thread> thread_; std::unique_ptr<std::thread> thread_;
//- Stack of files to write + contents //- FIFO of files to write and their contents
FIFOStack<writeData*> objects_; CircularBuffer<writeData*> objects_;
//- Whether thread is running (and not exited) //- Whether thread is running (and not exited)
bool threadRunning_; bool threadRunning_;
@ -156,9 +166,9 @@ class OFstreamCollator
const label comm, const label comm,
const word& objectType, const word& objectType,
const fileName& fName, const fileName& fName,
const string& masterData, const UList<char>& localData,
const labelUList& recvSizes, const labelUList& recvSizes,
const UPtrList<SubList<char>>& slaveData, const UList<stdFoam::span<char>>& procData,
IOstreamOption streamOpt, IOstreamOption streamOpt,
IOstreamOption::atomicType atomic, IOstreamOption::atomicType atomic,
IOstreamOption::appendType append, IOstreamOption::appendType append,
@ -181,7 +191,8 @@ public:
// Constructors // Constructors
//- Construct from buffer size. 0 = do not use thread //- Construct from buffer size (0 = do not use thread)
//- and with worldComm
explicit OFstreamCollator(const off_t maxBufferSize); explicit OFstreamCollator(const off_t maxBufferSize);
//- Construct from buffer size (0 = do not use thread) //- Construct from buffer size (0 = do not use thread)
@ -195,14 +206,15 @@ public:
// Member Functions // Member Functions
//- Write file with contents. //- Write file with contents, possibly taking ownership of the
// Blocks until writethread has space available //- content.
// Blocks until write-thread has space available
// (total file sizes < maxBufferSize) // (total file sizes < maxBufferSize)
bool write bool write
( (
const word& objectType, const word& objectType,
const fileName&, const fileName& fName,
const string& data, DynamicList<char>&& localData,
IOstreamOption streamOpt, IOstreamOption streamOpt,
IOstreamOption::atomicType atomic, IOstreamOption::atomicType atomic,
IOstreamOption::appendType append, IOstreamOption::appendType append,
@ -210,6 +222,37 @@ public:
const dictionary& headerEntries = dictionary::null const dictionary& headerEntries = dictionary::null
); );
//- Write file with contents.
FOAM_DEPRECATED_FOR(2023-09, "use write with movable content")
bool write
(
const word& objectType,
const fileName& fName,
const std::string& s,
IOstreamOption streamOpt,
IOstreamOption::atomicType atomic,
IOstreamOption::appendType append,
const bool useThread = true,
const dictionary& headerEntries = dictionary::null
)
{
DynamicList<char> charData;
charData.setCapacity(s.size());
std::copy(s.begin(), s.end(), charData.begin());
return write
(
objectType,
fName,
std::move(charData),
streamOpt,
atomic,
append,
useThread,
headerEntries
);
}
//- Wait for all thread actions to have finished //- Wait for all thread actions to have finished
void waitAll(); void waitAll();
}; };

View File

@ -59,7 +59,7 @@ namespace fileOperations
float collatedFileOperation::maxThreadFileBufferSize float collatedFileOperation::maxThreadFileBufferSize
( (
debug::floatOptimisationSwitch("maxThreadFileBufferSize", 1e9) debug::floatOptimisationSwitch("maxThreadFileBufferSize", 0)
); );
registerOptSwitch registerOptSwitch
( (
@ -90,26 +90,41 @@ void Foam::fileOperations::collatedFileOperation::printBanner
DetailInfo DetailInfo
<< "I/O : " << this->type(); << "I/O : " << this->type();
if (maxThreadFileBufferSize == 0) if (mag(maxThreadFileBufferSize) > 1)
{
DetailInfo
<< " [unthreaded] (maxThreadFileBufferSize = 0)." << nl
<< " Writing may be slow for large file sizes."
<< endl;
}
else
{ {
// FUTURE: deprecate or remove threading?
DetailInfo DetailInfo
<< " [threaded] (maxThreadFileBufferSize = " << " [threaded] (maxThreadFileBufferSize = "
<< maxThreadFileBufferSize << ")." << nl << maxThreadFileBufferSize << ")." << nl
<< " Requires buffer large enough to collect all data" << " Requires buffer large enough to collect all data"
" or thread support" << nl " or MPI thread support." << nl
<< " enabled in MPI. If MPI thread support cannot be" << " To avoid MPI threading [slow], set"
" enabled, deactivate" << nl " (maxThreadFileBufferSize = 0) in" << nl
<< " threading by setting maxThreadFileBufferSize"
" to 0 in" << nl
<< " OpenFOAM etc/controlDict" << endl; << " OpenFOAM etc/controlDict" << endl;
} }
else
{
DetailInfo
<< " [unthreaded] (maxThreadFileBufferSize = 0)." << nl;
if (mag(maxMasterFileBufferSize) < 1)
{
DetailInfo
<< " With scheduled transfer" << nl;
}
else if (maxMasterFileBufferSize >= 1)
{
DetailInfo
<< " With non-blocking transfer,"
" buffer-size = " << maxMasterFileBufferSize << nl;
}
else
{
DetailInfo
<< " With non-blocking transfer,"
" minimal buffer size" << nl;
}
}
if (withRanks) if (withRanks)
{ {
@ -455,7 +470,7 @@ bool Foam::fileOperations::collatedFileOperation::writeObject
{ {
// Re-check static maxThreadFileBufferSize variable to see // Re-check static maxThreadFileBufferSize variable to see
// if needs to use threading // if needs to use threading
const bool useThread = (maxThreadFileBufferSize != 0); const bool useThread = (mag(maxThreadFileBufferSize) > 1);
if (debug) if (debug)
{ {

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2017-2018 OpenFOAM Foundation Copyright (C) 2017-2018 OpenFOAM Foundation
Copyright (C) 2020-2022 OpenCFD Ltd. Copyright (C) 2020-2023 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -41,7 +41,7 @@ Foam::threadedCollatedOFstream::threadedCollatedOFstream
const bool useThread const bool useThread
) )
: :
OStringStream(streamOpt), OCharStream(streamOpt),
writer_(writer), writer_(writer),
pathName_(pathName), pathName_(pathName),
atomic_(atomic), atomic_(atomic),
@ -74,11 +74,22 @@ Foam::threadedCollatedOFstream::threadedCollatedOFstream
Foam::threadedCollatedOFstream::~threadedCollatedOFstream() Foam::threadedCollatedOFstream::~threadedCollatedOFstream()
{ {
commit();
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
void Foam::threadedCollatedOFstream::commit()
{
// Take ownership of serialized content, without copying or reallocation
DynamicList<char> charData(OCharStream::release());
writer_.write writer_.write
( (
decomposedBlockData::typeName, decomposedBlockData::typeName,
pathName_, pathName_,
str(), std::move(charData),
IOstreamOption(IOstreamOption::BINARY, version(), compression_), IOstreamOption(IOstreamOption::BINARY, version(), compression_),
atomic_, atomic_,
IOstreamOption::NON_APPEND, IOstreamOption::NON_APPEND,

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2017-2018 OpenFOAM Foundation Copyright (C) 2017-2018 OpenFOAM Foundation
Copyright (C) 2021-2022 OpenCFD Ltd. Copyright (C) 2021-2023 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -39,7 +39,7 @@ SourceFiles
#define Foam_threadedCollatedOFstream_H #define Foam_threadedCollatedOFstream_H
#include "dictionary.H" #include "dictionary.H"
#include "StringStream.H" #include "SpanStream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -55,7 +55,7 @@ class OFstreamCollator;
class threadedCollatedOFstream class threadedCollatedOFstream
: :
public OStringStream public OCharStream
{ {
// Private Data // Private Data
@ -78,6 +78,11 @@ class threadedCollatedOFstream
dictionary headerEntries_; dictionary headerEntries_;
// Private Member Functions
//- Commit buffered information
void commit();
public: public:
// Constructors // Constructors
@ -102,12 +107,14 @@ public:
); );
//- Destructor //- Destructor - commits buffered information to file
~threadedCollatedOFstream(); ~threadedCollatedOFstream();
// Member Functions // Member Functions
// -> using OCharStream::rewind
//- Define the header entries for the data block(s) //- Define the header entries for the data block(s)
void setHeaderEntries(const dictionary& dict); void setHeaderEntries(const dictionary& dict);
}; };