Compare commits
4 Commits
develop
...
reworked-c
| Author | SHA1 | Date | |
|---|---|---|---|
| 6f21313466 | |||
| 5bb3ad052f | |||
| c5b9d9b532 | |||
| 23542cabc8 |
@ -1,7 +1,7 @@
|
||||
/*--------------------------------*- C++ -*----------------------------------*\
|
||||
| ========= | |
|
||||
| \\ / F ield | OpenFOAM: The Open Source CFD Toolbox |
|
||||
| \\ / O peration | Version: v2506 |
|
||||
| \\ / O peration | Version: v2512 |
|
||||
| \\ / A nd | Website: www.openfoam.com |
|
||||
| \\/ M anipulation | |
|
||||
\*---------------------------------------------------------------------------*/
|
||||
@ -108,6 +108,11 @@ OptimisationSwitches
|
||||
// uncollated (default), collated, masterUncollated etc.
|
||||
fileHandler uncollated;
|
||||
|
||||
//- Preferred backend for collated format. Default: 0
|
||||
// 0: legacy writer
|
||||
// 1: MPI/IO writer
|
||||
collated.backend 0;
|
||||
|
||||
//- collated: thread buffer size for queued file writes.
|
||||
// If set to 0 or not sufficient for the file size, threading is not used.
|
||||
// A special setting is a negative value which assumes the buffer
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2017-2018 OpenFOAM Foundation
|
||||
Copyright (C) 2020-2023 OpenCFD Ltd.
|
||||
Copyright (C) 2020-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -51,6 +51,8 @@ FoamFile
|
||||
class decomposedBlockData;
|
||||
location "constant/polyMesh";
|
||||
object points;
|
||||
data.format ascii; // optional
|
||||
data.class vectorField; // optional
|
||||
}
|
||||
|
||||
// processor0
|
||||
@ -135,23 +137,15 @@ protected:
|
||||
|
||||
// Protected Member Functions
|
||||
|
||||
//- Helper: determine number of processors whose recvSizes fits
|
||||
//- into maxBufferSize
|
||||
static label calcNumProcs
|
||||
(
|
||||
const label comm,
|
||||
const off_t maxBufferSize,
|
||||
const labelUList& recvSizes,
|
||||
const label startProci
|
||||
);
|
||||
|
||||
//- Read data into *this. ISstream is only valid on master.
|
||||
//- Read data (on master) and transmit.
|
||||
static bool readBlocks
|
||||
(
|
||||
const label comm,
|
||||
// [in] The input stream (only valid on master)
|
||||
autoPtr<ISstream>& isPtr,
|
||||
List<char>& contentChars,
|
||||
const UPstream::commsTypes commsType
|
||||
// [out] The processor local data
|
||||
List<char>& localData,
|
||||
const UPstream::commsTypes commsType /* unused */
|
||||
);
|
||||
|
||||
//- Helper: skip a block of (binary) character data
|
||||
@ -277,14 +271,15 @@ public:
|
||||
}
|
||||
|
||||
//- Helper: write block of (binary) character content
|
||||
// Housekeeping
|
||||
static std::streamoff writeBlockEntry
|
||||
(
|
||||
OSstream& os,
|
||||
const label blocki,
|
||||
const std::string& s
|
||||
std::string_view sv
|
||||
)
|
||||
{
|
||||
return writeBlockEntry(os, blocki, s.data(), s.size());
|
||||
return writeBlockEntry(os, blocki, sv.data(), sv.size());
|
||||
}
|
||||
|
||||
//- Helper: write block of (binary) character data
|
||||
@ -307,41 +302,37 @@ public:
|
||||
);
|
||||
|
||||
//- Read master header information (into headerIO) and return
|
||||
//- data in stream. Note: isPtr is only valid on master.
|
||||
//- data in stream.
|
||||
static autoPtr<ISstream> readBlocks
|
||||
(
|
||||
const label comm,
|
||||
const fileName& fName,
|
||||
//! [in] The input stream (only valid on master)
|
||||
autoPtr<ISstream>& isPtr,
|
||||
//! [out] header information
|
||||
IOobject& headerIO,
|
||||
const UPstream::commsTypes commsType
|
||||
const UPstream::commsTypes commsType /* unused */
|
||||
);
|
||||
|
||||
//- Helper: gather single label. Note: using native Pstream.
|
||||
// datas sized with num procs but undefined contents on
|
||||
// slaves
|
||||
static void gather
|
||||
(
|
||||
const label comm,
|
||||
const label data,
|
||||
labelList& datas
|
||||
);
|
||||
|
||||
//- Helper: gather data from (subset of) slaves.
|
||||
//- Helper: gather data from (subset of) sub-ranks.
|
||||
// In non-blocking mode it sets up send/recv for non-empty content.
|
||||
// In blocking/scheduled mode it uses MPI_Gatherv to collect data.
|
||||
//
|
||||
// Returns:
|
||||
// - recvData : received data
|
||||
// - recvData : the received data
|
||||
// - recvOffsets : offset in data. recvOffsets is nProcs+1
|
||||
static void gatherSlaveData
|
||||
static void gatherProcData
|
||||
(
|
||||
const label comm,
|
||||
const UList<char>& data,
|
||||
const labelUList& recvSizes,
|
||||
const UList<char>& localData, //!< [in] required on all procs
|
||||
const labelUList& recvSizes, //!< [in] only required on master
|
||||
|
||||
const labelRange& fromProcs,
|
||||
const labelRange& whichProcs, //!< [in] required on all procs
|
||||
|
||||
List<int>& recvOffsets,
|
||||
DynamicList<char>& recvData
|
||||
List<int>& recvOffsets, //!< [out] only relevant on master
|
||||
DynamicList<char>& recvData, //!< [out] only relevant on master
|
||||
|
||||
const UPstream::commsTypes commsType
|
||||
);
|
||||
|
||||
//- Write *this. Ostream only valid on master.
|
||||
@ -349,19 +340,96 @@ public:
|
||||
static bool writeBlocks
|
||||
(
|
||||
const label comm,
|
||||
|
||||
//! [in] output stream (relevant on master)
|
||||
autoPtr<OSstream>& osPtr,
|
||||
//! [out] start offsets to each block (relevant on master),
|
||||
//! ignored if List::null() type
|
||||
List<std::streamoff>& blockOffset,
|
||||
|
||||
const UList<char>& masterData,
|
||||
const UList<char>& localData, //!< [in] required on all procs
|
||||
const labelUList& recvSizes, //!< [in] only required on master
|
||||
|
||||
const labelUList& recvSizes,
|
||||
|
||||
// Optional slave data (on master)
|
||||
const UPtrList<SubList<char>>& slaveData,
|
||||
//! Optional proc data (only written on master)
|
||||
//! but \b must also be symmetrically defined (empty/non-empty)
|
||||
//! on all ranks
|
||||
const UList<std::string_view>& procData,
|
||||
|
||||
const UPstream::commsTypes commsType,
|
||||
const bool syncReturnState = true
|
||||
);
|
||||
|
||||
|
||||
// Housekeeping
|
||||
|
||||
//- Write *this. Ostream only valid on master.
|
||||
// Returns offsets of processor blocks in blockOffset
|
||||
FOAM_DEPRECATED_FOR(2023-09, "write with std::string_view instead")
|
||||
static bool writeBlocks
|
||||
(
|
||||
const label comm,
|
||||
autoPtr<OSstream>& osPtr,
|
||||
List<std::streamoff>& blockOffset,
|
||||
|
||||
const UList<char>& localData, // [in] required on all procs
|
||||
const labelUList& recvSizes, // [in] only required on master
|
||||
|
||||
// Optional proc data (only written on master)
|
||||
// but \b must also be symmetrically defined (empty/non-empty)
|
||||
// on all ranks
|
||||
const UPtrList<SubList<char>>& procData,
|
||||
|
||||
const UPstream::commsTypes commsType,
|
||||
const bool syncReturnState = true
|
||||
)
|
||||
{
|
||||
// Transcribe to string_view
|
||||
List<std::string_view> spans(procData.size());
|
||||
forAll(procData, proci)
|
||||
{
|
||||
const auto* ptr = procData.get(proci);
|
||||
if (ptr && !ptr->empty())
|
||||
{
|
||||
spans[proci] = std::string_view(ptr->cdata(), ptr->size());
|
||||
}
|
||||
}
|
||||
|
||||
bool ok = decomposedBlockData::writeBlocks
|
||||
(
|
||||
comm,
|
||||
osPtr,
|
||||
blockOffset,
|
||||
localData,
|
||||
recvSizes,
|
||||
spans,
|
||||
commsType,
|
||||
syncReturnState
|
||||
);
|
||||
return ok;
|
||||
}
|
||||
|
||||
//- Deprecated(2023-09) - consider UPstream::listGatherValue
|
||||
// The only difference is that this gather also resizes the output
|
||||
// on the non-master procs
|
||||
// \deprecated(2023-09) - consider UPstream::listGatherValue
|
||||
FOAM_DEPRECATED_FOR(2023-09, "consider UPstream::listGatherValue()")
|
||||
static void gather
|
||||
(
|
||||
const label comm,
|
||||
const label localValue,
|
||||
labelList& allValues
|
||||
)
|
||||
{
|
||||
allValues.resize_nocopy(UPstream::nProcs(comm));
|
||||
|
||||
UPstream::mpiGather
|
||||
(
|
||||
reinterpret_cast<const char*>(&localValue),
|
||||
allValues.data_bytes(),
|
||||
sizeof(label), // The send/recv size per rank
|
||||
comm
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
\\ / A nd | www.openfoam.com
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2021-2022 OpenCFD Ltd.
|
||||
Copyright (C) 2021-2023 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -189,10 +189,9 @@ void Foam::decomposedBlockData::writeExtraHeaderContent
|
||||
dict.set("data.class", io.type());
|
||||
|
||||
// Deep-copy of meta-data (if any)
|
||||
const dictionary* metaDataDict = io.findMetaData();
|
||||
if (metaDataDict && !metaDataDict->empty())
|
||||
if (const auto* meta = io.findMetaData(); meta && !meta->empty())
|
||||
{
|
||||
dict.add("meta", *metaDataDict);
|
||||
dict.add("meta", *meta);
|
||||
}
|
||||
}
|
||||
|
||||
@ -221,16 +220,16 @@ void Foam::decomposedBlockData::writeHeader
|
||||
io.name()
|
||||
);
|
||||
|
||||
// Same as writeExtraHeaderContent
|
||||
{
|
||||
writeHeaderEntry(os, "data.format", streamOptData.format());
|
||||
writeHeaderEntry(os, "data.class", io.type());
|
||||
}
|
||||
|
||||
// Meta-data (if any)
|
||||
const dictionary* metaDataDict = io.findMetaData();
|
||||
if (metaDataDict && !metaDataDict->empty())
|
||||
if (const auto* meta = io.findMetaData(); meta && !meta->empty())
|
||||
{
|
||||
metaDataDict->writeEntry("meta", os);
|
||||
meta->writeEntry("meta", os);
|
||||
}
|
||||
|
||||
os.endBlock();
|
||||
|
||||
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2017 OpenFOAM Foundation
|
||||
Copyright (C) 2020-2023 OpenCFD Ltd.
|
||||
Copyright (C) 2020-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -29,7 +29,7 @@ License
|
||||
#include "masterOFstream.H"
|
||||
#include "OFstream.H"
|
||||
#include "OSspecific.H"
|
||||
#include "PstreamBuffers.H"
|
||||
#include "Pstream.H"
|
||||
#include "masterUncollatedFileOperation.H"
|
||||
|
||||
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
|
||||
@ -41,9 +41,9 @@ void Foam::masterOFstream::checkWrite
|
||||
std::streamsize len
|
||||
)
|
||||
{
|
||||
if (!len)
|
||||
if (!str || !(len > 0))
|
||||
{
|
||||
// Can probably skip all of this if there is nothing to write
|
||||
// Can skip everything if there is nothing to write
|
||||
return;
|
||||
}
|
||||
|
||||
@ -63,9 +63,7 @@ void Foam::masterOFstream::checkWrite
|
||||
<< exit(FatalIOError);
|
||||
}
|
||||
|
||||
// Use writeRaw() instead of writeQuoted(string,false) to output
|
||||
// characters directly.
|
||||
|
||||
// Write characters directly to std::ostream
|
||||
os.writeRaw(str, len);
|
||||
|
||||
if (!os.good())
|
||||
@ -77,24 +75,29 @@ void Foam::masterOFstream::checkWrite
|
||||
}
|
||||
|
||||
|
||||
void Foam::masterOFstream::checkWrite
|
||||
(
|
||||
const fileName& fName,
|
||||
const std::string& s
|
||||
)
|
||||
{
|
||||
checkWrite(fName, s.data(), s.length());
|
||||
}
|
||||
|
||||
|
||||
void Foam::masterOFstream::commit()
|
||||
{
|
||||
if (UPstream::parRun())
|
||||
// Take ownership of serialized content
|
||||
DynamicList<char> charData(OCharStream::release());
|
||||
|
||||
if (!UPstream::parRun())
|
||||
{
|
||||
// Write (non-empty) data
|
||||
checkWrite(pathName_, charData);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Ignore content if not writing
|
||||
if (!writeOnProc_)
|
||||
{
|
||||
charData.clear();
|
||||
}
|
||||
|
||||
List<fileName> filePaths(UPstream::nProcs(comm_));
|
||||
filePaths[UPstream::myProcNo(comm_)] = pathName_;
|
||||
Pstream::gatherList(filePaths, UPstream::msgType(), comm_);
|
||||
|
||||
// Test for identical output paths
|
||||
bool uniform =
|
||||
(
|
||||
UPstream::master(comm_)
|
||||
@ -105,69 +108,136 @@ void Foam::masterOFstream::commit()
|
||||
|
||||
if (uniform)
|
||||
{
|
||||
// Identical file paths - write on master
|
||||
if (UPstream::master(comm_) && writeOnProc_)
|
||||
{
|
||||
checkWrite(pathName_, this->str());
|
||||
checkWrite(pathName_, charData);
|
||||
}
|
||||
|
||||
this->reset();
|
||||
return;
|
||||
}
|
||||
|
||||
// Different files
|
||||
PstreamBuffers pBufs(comm_);
|
||||
// ---------------
|
||||
//
|
||||
// Non-sparse (most ranks have writeOnProc_ == true),
|
||||
// so gather sizes first and use PEX-like handling,
|
||||
// with polling for when data becomes available.
|
||||
//
|
||||
// Could also consider double buffering + write to reduce
|
||||
// memory overhead.
|
||||
|
||||
if (!UPstream::master(comm_))
|
||||
{
|
||||
if (writeOnProc_)
|
||||
{
|
||||
// Send buffer to master
|
||||
string s(this->str());
|
||||
// Or int64_t
|
||||
const label dataSize =
|
||||
(
|
||||
(UPstream::is_subrank(comm_) && writeOnProc_)
|
||||
? charData.size()
|
||||
: 0
|
||||
);
|
||||
|
||||
UOPstream os(UPstream::masterNo(), pBufs);
|
||||
os.write(s.data(), s.length());
|
||||
}
|
||||
this->reset(); // Done with contents
|
||||
}
|
||||
|
||||
pBufs.finishedGathers();
|
||||
const labelList recvSizes
|
||||
(
|
||||
UPstream::listGatherValues<label>(dataSize, comm_)
|
||||
);
|
||||
|
||||
// Receive from these procs
|
||||
DynamicList<int> recvProcs;
|
||||
|
||||
if (UPstream::master(comm_))
|
||||
{
|
||||
if (writeOnProc_)
|
||||
// Sorted by message size
|
||||
labelList order(Foam::sortedOrder(recvSizes));
|
||||
recvProcs.reserve_exact(order.size());
|
||||
|
||||
// Want to receive large messages first. Ignore empty slots
|
||||
forAllReverse(order, i)
|
||||
{
|
||||
// Write master data
|
||||
checkWrite(filePaths[UPstream::masterNo()], this->str());
|
||||
}
|
||||
this->reset(); // Done with contents
|
||||
const label proci = order[i];
|
||||
|
||||
|
||||
// Allocate large enough to read without resizing
|
||||
List<char> buf(pBufs.maxRecvCount());
|
||||
|
||||
for (const int proci : UPstream::subProcs(comm_))
|
||||
{
|
||||
const std::streamsize count(pBufs.recvDataCount(proci));
|
||||
|
||||
if (count)
|
||||
// Ignore empty slots
|
||||
if (recvSizes[proci] > 0)
|
||||
{
|
||||
UIPstream is(proci, pBufs);
|
||||
|
||||
is.read(buf.data(), count);
|
||||
checkWrite(filePaths[proci], buf.cdata(), count);
|
||||
recvProcs.push_back(proci);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
checkWrite(pathName_, this->str());
|
||||
this->reset();
|
||||
}
|
||||
|
||||
// This method is only called once (internally)
|
||||
// so no need to clear/flush old buffered data
|
||||
// Non-blocking communication
|
||||
const label startOfRequests = UPstream::nRequests();
|
||||
|
||||
// Some unique tag for this read/write grouping (extra precaution)
|
||||
const int messageTag = (UPstream::msgType() + 256);
|
||||
|
||||
if (UPstream::is_subrank(comm_) && dataSize > 0)
|
||||
{
|
||||
// Send to content to master
|
||||
UOPstream::write
|
||||
(
|
||||
UPstream::commsTypes::nonBlocking,
|
||||
UPstream::masterNo(),
|
||||
charData.cdata_bytes(),
|
||||
charData.size_bytes(),
|
||||
messageTag,
|
||||
comm_
|
||||
);
|
||||
}
|
||||
else if (UPstream::master(comm_))
|
||||
{
|
||||
// The receive slots
|
||||
List<List<char>> recvBuffers(UPstream::nProcs(comm_));
|
||||
|
||||
// Receive from these procs (non-empty slots)
|
||||
for (const int proci : recvProcs)
|
||||
{
|
||||
auto& slot = recvBuffers[proci];
|
||||
slot.resize_nocopy(recvSizes[proci]);
|
||||
|
||||
// Receive content
|
||||
UIPstream::read
|
||||
(
|
||||
UPstream::commsTypes::nonBlocking,
|
||||
proci,
|
||||
slot.data_bytes(),
|
||||
slot.size_bytes(),
|
||||
messageTag,
|
||||
comm_
|
||||
);
|
||||
}
|
||||
|
||||
if (writeOnProc_)
|
||||
{
|
||||
// Write non-empty master data
|
||||
checkWrite(pathName_, charData);
|
||||
charData.clear();
|
||||
}
|
||||
|
||||
// Poll for completed receive requests and dispatch
|
||||
DynamicList<int> indices(recvProcs.size());
|
||||
while
|
||||
(
|
||||
UPstream::waitSomeRequests
|
||||
(
|
||||
startOfRequests,
|
||||
recvProcs.size(),
|
||||
&indices
|
||||
)
|
||||
)
|
||||
{
|
||||
for (const int i : indices)
|
||||
{
|
||||
const int proci = recvProcs[i];
|
||||
auto& slot = recvBuffers[proci];
|
||||
|
||||
// Write non-empty sub-proc data
|
||||
checkWrite(filePaths[proci], slot);
|
||||
|
||||
// Eager cleanup
|
||||
slot.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
UPstream::waitRequests(startOfRequests);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -176,21 +246,24 @@ void Foam::masterOFstream::commit()
|
||||
Foam::masterOFstream::masterOFstream
|
||||
(
|
||||
IOstreamOption::atomicType atomic,
|
||||
const label comm,
|
||||
const int communicator,
|
||||
const fileName& pathName,
|
||||
IOstreamOption streamOpt,
|
||||
IOstreamOption::appendType append,
|
||||
const bool writeOnProc
|
||||
)
|
||||
:
|
||||
OStringStream(streamOpt),
|
||||
OCharStream(streamOpt),
|
||||
pathName_(pathName),
|
||||
atomic_(atomic),
|
||||
compression_(streamOpt.compression()),
|
||||
append_(append),
|
||||
writeOnProc_(writeOnProc),
|
||||
comm_(comm)
|
||||
{}
|
||||
comm_(communicator < 0 ? UPstream::worldComm : communicator)
|
||||
{
|
||||
// Start with a slightly larger buffer
|
||||
OCharStream::reserve(4*1024);
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
|
||||
|
||||
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2017 OpenFOAM Foundation
|
||||
Copyright (C) 2020-2023 OpenCFD Ltd.
|
||||
Copyright (C) 2020-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -41,7 +41,7 @@ SourceFiles
|
||||
#ifndef Foam_masterOFstream_H
|
||||
#define Foam_masterOFstream_H
|
||||
|
||||
#include "StringStream.H"
|
||||
#include "SpanStream.H"
|
||||
#include "UPstream.H"
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
@ -55,7 +55,7 @@ namespace Foam
|
||||
|
||||
class masterOFstream
|
||||
:
|
||||
public OStringStream
|
||||
public OCharStream
|
||||
{
|
||||
// Private Data
|
||||
|
||||
@ -80,7 +80,8 @@ class masterOFstream
|
||||
|
||||
// Private Member Functions
|
||||
|
||||
//- Open file with checking and write append contents
|
||||
//- Open file with checking and write append contents.
|
||||
// A no-op if str is null or len is zero
|
||||
void checkWrite
|
||||
(
|
||||
const fileName& fName,
|
||||
@ -89,9 +90,16 @@ class masterOFstream
|
||||
);
|
||||
|
||||
//- Open file with checking and write append contents
|
||||
void checkWrite(const fileName& fName, const std::string& s);
|
||||
void checkWrite
|
||||
(
|
||||
const fileName& fName,
|
||||
const UList<char>& charData
|
||||
)
|
||||
{
|
||||
checkWrite(fName, charData.cdata(), charData.size_bytes());
|
||||
}
|
||||
|
||||
//- Commit buffered information, including parallel gather as required
|
||||
//- Commit buffered information, including communication as required
|
||||
void commit();
|
||||
|
||||
|
||||
@ -104,7 +112,8 @@ public:
|
||||
masterOFstream
|
||||
(
|
||||
IOstreamOption::atomicType atomic,
|
||||
const label comm,
|
||||
//! The communicator number (-1 == worldComm)
|
||||
const int communicator,
|
||||
const fileName& pathname,
|
||||
IOstreamOption streamOpt = IOstreamOption(),
|
||||
IOstreamOption::appendType append = IOstreamOption::NO_APPEND,
|
||||
@ -115,7 +124,8 @@ public:
|
||||
//- from pathname, stream option, optional append
|
||||
masterOFstream
|
||||
(
|
||||
const label comm,
|
||||
//! The communicator number (-1 == worldComm)
|
||||
const int communicator,
|
||||
const fileName& pathname,
|
||||
IOstreamOption streamOpt = IOstreamOption(),
|
||||
IOstreamOption::appendType append = IOstreamOption::NO_APPEND,
|
||||
@ -125,7 +135,7 @@ public:
|
||||
masterOFstream
|
||||
(
|
||||
IOstreamOption::NON_ATOMIC,
|
||||
comm,
|
||||
communicator,
|
||||
pathname,
|
||||
streamOpt,
|
||||
append,
|
||||
|
||||
@ -47,9 +47,9 @@ bool Foam::OFstreamCollator::writeFile
|
||||
const label comm,
|
||||
const word& objectType,
|
||||
const fileName& fName,
|
||||
const string& masterData,
|
||||
const UList<char>& localData,
|
||||
const labelUList& recvSizes,
|
||||
const UPtrList<SubList<char>>& slaveData, // optional slave data
|
||||
const UList<std::string_view>& procData, // optional proc data
|
||||
IOstreamOption streamOpt,
|
||||
IOstreamOption::atomicType atomic,
|
||||
IOstreamOption::appendType append,
|
||||
@ -58,18 +58,14 @@ bool Foam::OFstreamCollator::writeFile
|
||||
{
|
||||
if (debug)
|
||||
{
|
||||
Pout<< "OFstreamCollator : Writing master " << label(masterData.size())
|
||||
Pout<< "OFstreamCollator : Writing local " << localData.size()
|
||||
<< " bytes to " << fName << " using comm " << comm
|
||||
<< " and " << slaveData.size() << " sub-ranks" << endl;
|
||||
<< " and " << procData.size() << " sub-ranks" << endl;
|
||||
|
||||
forAll(slaveData, proci)
|
||||
forAll(procData, proci)
|
||||
{
|
||||
if (slaveData.set(proci))
|
||||
{
|
||||
Pout<< " " << proci
|
||||
<< " size:" << slaveData[proci].size()
|
||||
<< endl;
|
||||
}
|
||||
Pout<< " " << proci << " size:"
|
||||
<< label(procData[proci].size()) << nl;
|
||||
}
|
||||
}
|
||||
|
||||
@ -91,7 +87,7 @@ bool Foam::OFstreamCollator::writeFile
|
||||
streamOpt, // streamOpt for container
|
||||
objectType,
|
||||
"", // note
|
||||
"", // location (leave empty instead inaccurate)
|
||||
"", // location (leave empty, otherwise inaccurate)
|
||||
fName.name(), // object name
|
||||
headerEntries
|
||||
);
|
||||
@ -104,32 +100,28 @@ bool Foam::OFstreamCollator::writeFile
|
||||
// for some mpi so default is non-blocking.
|
||||
const UPstream::commsTypes myCommsType
|
||||
(
|
||||
// Blocking when buffer size is 0
|
||||
Foam::mag
|
||||
(
|
||||
fileOperations::masterUncollatedFileOperation::
|
||||
maxMasterFileBufferSize == 0
|
||||
)
|
||||
maxMasterFileBufferSize
|
||||
) < 1
|
||||
? UPstream::commsTypes::scheduled
|
||||
: UPstream::commsTypes::nonBlocking
|
||||
);
|
||||
|
||||
|
||||
UList<char> slice
|
||||
(
|
||||
const_cast<char*>(masterData.data()),
|
||||
label(masterData.size())
|
||||
);
|
||||
|
||||
List<std::streamoff> blockOffset;
|
||||
List<std::streamoff> blockOffsets; // Optional
|
||||
decomposedBlockData::writeBlocks
|
||||
(
|
||||
comm,
|
||||
osPtr,
|
||||
blockOffset,
|
||||
slice,
|
||||
blockOffsets, // or List<std::streamoff>::null()
|
||||
localData,
|
||||
recvSizes,
|
||||
slaveData,
|
||||
procData,
|
||||
myCommsType,
|
||||
false // do not reduce return state
|
||||
false // do not sync return state
|
||||
);
|
||||
|
||||
if (osPtr && !osPtr->good())
|
||||
@ -140,17 +132,18 @@ bool Foam::OFstreamCollator::writeFile
|
||||
|
||||
if (debug)
|
||||
{
|
||||
Pout<< "OFstreamCollator : Finished writing " << masterData.size()
|
||||
<< " bytes";
|
||||
Pout<< "OFstreamCollator : Finished writing "
|
||||
<< localData.size() << " bytes";
|
||||
|
||||
if (UPstream::master(comm))
|
||||
{
|
||||
off_t sum = 0;
|
||||
off_t total = 0;
|
||||
for (const label recv : recvSizes)
|
||||
{
|
||||
sum += recv;
|
||||
total += recv;
|
||||
}
|
||||
// Use std::to_string to display long int
|
||||
Pout<< " (overall " << std::to_string(sum) << ')';
|
||||
Pout<< " (overall " << std::to_string(total) << ')';
|
||||
}
|
||||
Pout<< " to " << fName
|
||||
<< " using comm " << comm << endl;
|
||||
@ -167,13 +160,16 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
|
||||
// Consume stack
|
||||
while (true)
|
||||
{
|
||||
writeData* ptr = nullptr;
|
||||
std::unique_ptr<writeData> ptr;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(handler.mutex_);
|
||||
|
||||
if (handler.objects_.size())
|
||||
{
|
||||
ptr = handler.objects_.pop();
|
||||
// FIFO
|
||||
ptr.reset(handler.objects_.front());
|
||||
handler.objects_.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
@ -181,51 +177,35 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
|
||||
{
|
||||
break;
|
||||
}
|
||||
else
|
||||
|
||||
writeData& obj = *ptr;
|
||||
|
||||
// Obtain views from storage
|
||||
List<std::string_view> procData(obj.procData_.size());
|
||||
forAll(procData, proci)
|
||||
{
|
||||
// Convert storage to pointers
|
||||
PtrList<SubList<char>> slaveData;
|
||||
if (ptr->slaveData_.size())
|
||||
{
|
||||
slaveData.resize(ptr->slaveData_.size());
|
||||
forAll(slaveData, proci)
|
||||
{
|
||||
if (ptr->slaveData_.set(proci))
|
||||
{
|
||||
slaveData.set
|
||||
(
|
||||
proci,
|
||||
new SubList<char>
|
||||
(
|
||||
ptr->slaveData_[proci],
|
||||
ptr->sizes_[proci]
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
procData[proci] = obj.procData_[proci].view();
|
||||
}
|
||||
|
||||
bool ok = writeFile
|
||||
(
|
||||
ptr->comm_,
|
||||
ptr->objectType_,
|
||||
ptr->pathName_,
|
||||
ptr->data_,
|
||||
ptr->sizes_,
|
||||
slaveData,
|
||||
ptr->streamOpt_,
|
||||
ptr->atomic_,
|
||||
ptr->append_,
|
||||
ptr->headerEntries_
|
||||
);
|
||||
if (!ok)
|
||||
{
|
||||
FatalIOErrorInFunction(ptr->pathName_)
|
||||
<< "Failed writing " << ptr->pathName_
|
||||
<< exit(FatalIOError);
|
||||
}
|
||||
bool ok = writeFile
|
||||
(
|
||||
obj.comm_,
|
||||
obj.objectType_,
|
||||
obj.pathName_,
|
||||
obj.localData_,
|
||||
obj.sizes_,
|
||||
procData,
|
||||
obj.streamOpt_,
|
||||
obj.atomic_,
|
||||
obj.append_,
|
||||
obj.headerEntries_
|
||||
);
|
||||
|
||||
delete ptr;
|
||||
if (!ok)
|
||||
{
|
||||
FatalIOErrorInFunction(obj.pathName_)
|
||||
<< "Failed writing " << obj.pathName_
|
||||
<< exit(FatalIOError);
|
||||
}
|
||||
//sleep(1);
|
||||
}
|
||||
@ -248,14 +228,14 @@ void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
// Count files to be written
|
||||
// The pending output size(s)
|
||||
off_t totalSize = 0;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
forAllConstIters(objects_, iter)
|
||||
for (const writeData* ptr : objects_)
|
||||
{
|
||||
totalSize += iter()->size();
|
||||
if (ptr) totalSize += ptr->size();
|
||||
}
|
||||
}
|
||||
|
||||
@ -287,10 +267,7 @@ void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
|
||||
|
||||
Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize)
|
||||
:
|
||||
maxBufferSize_(maxBufferSize),
|
||||
threadRunning_(false),
|
||||
localComm_(UPstream::worldComm),
|
||||
threadComm_(UPstream::dupCommunicator(localComm_))
|
||||
OFstreamCollator(maxBufferSize, UPstream::worldComm)
|
||||
{}
|
||||
|
||||
|
||||
@ -331,7 +308,7 @@ bool Foam::OFstreamCollator::write
|
||||
(
|
||||
const word& objectType,
|
||||
const fileName& fName,
|
||||
const string& data,
|
||||
DynamicList<char>&& localData,
|
||||
IOstreamOption streamOpt,
|
||||
IOstreamOption::atomicType atomic,
|
||||
IOstreamOption::appendType append,
|
||||
@ -341,78 +318,112 @@ bool Foam::OFstreamCollator::write
|
||||
{
|
||||
// Determine (on master) sizes to receive. Note: do NOT use thread
|
||||
// communicator
|
||||
labelList recvSizes;
|
||||
decomposedBlockData::gather(localComm_, label(data.size()), recvSizes);
|
||||
const labelList recvSizes
|
||||
(
|
||||
UPstream::listGatherValues<label>(localData.size(), localComm_)
|
||||
);
|
||||
|
||||
off_t totalSize = 0;
|
||||
label maxLocalSize = 0;
|
||||
|
||||
if (UPstream::master(localComm_))
|
||||
{
|
||||
if (UPstream::master(localComm_))
|
||||
for (const label recvSize : recvSizes)
|
||||
{
|
||||
for (const label recvSize : recvSizes)
|
||||
{
|
||||
totalSize += recvSize;
|
||||
maxLocalSize = max(maxLocalSize, recvSize);
|
||||
}
|
||||
totalSize += recvSize;
|
||||
maxLocalSize = Foam::max(maxLocalSize, recvSize);
|
||||
}
|
||||
Pstream::broadcasts(localComm_, totalSize, maxLocalSize);
|
||||
}
|
||||
Pstream::broadcasts(localComm_, totalSize, maxLocalSize);
|
||||
|
||||
|
||||
// Determine how things will be gathered and written...
|
||||
|
||||
enum class dispatchModes { DIRECT_WRITE, THREADED_WRITE, FULL_THREADED };
|
||||
|
||||
dispatchModes dispatch(dispatchModes::DIRECT_WRITE);
|
||||
|
||||
if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
|
||||
{
|
||||
dispatch = dispatchModes::DIRECT_WRITE;
|
||||
}
|
||||
else if (totalSize <= maxBufferSize_)
|
||||
{
|
||||
// Total size can be stored locally
|
||||
// - gather all data now and only do the writing in the thread
|
||||
|
||||
dispatch = dispatchModes::THREADED_WRITE;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Gather data and write in the thread
|
||||
|
||||
dispatch = dispatchModes::FULL_THREADED;
|
||||
|
||||
if (!UPstream::haveThreads())
|
||||
{
|
||||
WarningInFunction
|
||||
<< "MPI not initialized with thread support." << nl
|
||||
<< " maxThreadFileBufferSize = 0 to disable threading" << nl
|
||||
<< " or maxThreadFileBufferSize > " << totalSize
|
||||
<< " to collate before threaded writing." << nl << nl;
|
||||
|
||||
dispatch = dispatchModes::DIRECT_WRITE;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// -----------
|
||||
// Dispatching
|
||||
// -----------
|
||||
|
||||
if (dispatch == dispatchModes::DIRECT_WRITE)
|
||||
{
|
||||
if (debug)
|
||||
{
|
||||
Pout<< "OFstreamCollator : non-thread gather and write of " << fName
|
||||
<< " using local comm " << localComm_ << endl;
|
||||
Pout<< "OFstreamCollator : non-thread gather "
|
||||
<< "(local comm: " << localComm_
|
||||
<< "); non-thread write of "
|
||||
<< fName << endl;
|
||||
}
|
||||
|
||||
// Direct collating and writing (so master blocks until all written!)
|
||||
const PtrList<SubList<char>> dummySlaveData;
|
||||
return writeFile
|
||||
(
|
||||
localComm_,
|
||||
objectType,
|
||||
fName,
|
||||
data,
|
||||
localData,
|
||||
recvSizes,
|
||||
dummySlaveData,
|
||||
UList<std::string_view>::null(), // dummy proc data
|
||||
streamOpt,
|
||||
atomic,
|
||||
append,
|
||||
headerEntries
|
||||
);
|
||||
}
|
||||
else if (totalSize <= maxBufferSize_)
|
||||
else if (dispatch == dispatchModes::THREADED_WRITE)
|
||||
{
|
||||
// Total size can be stored locally so receive all data now and only
|
||||
// do the writing in the thread
|
||||
|
||||
if (debug)
|
||||
{
|
||||
Pout<< "OFstreamCollator : non-thread gather; thread write of "
|
||||
Pout<< "OFstreamCollator : non-thread gather "
|
||||
<< "(local comm: " << localComm_
|
||||
<< "); thread write of "
|
||||
<< fName << endl;
|
||||
}
|
||||
|
||||
if (Pstream::master(localComm_))
|
||||
if (UPstream::master(localComm_))
|
||||
{
|
||||
waitForBufferSpace(totalSize);
|
||||
}
|
||||
|
||||
|
||||
// Receive in chunks of labelMax (2^31-1) since this is the maximum
|
||||
// size that a List can be
|
||||
|
||||
autoPtr<writeData> fileAndDataPtr
|
||||
std::unique_ptr<writeData> fileAndDataPtr
|
||||
(
|
||||
new writeData
|
||||
(
|
||||
threadComm_, // Note: comm not actually used anymore
|
||||
threadComm_,
|
||||
objectType,
|
||||
fName,
|
||||
(
|
||||
Pstream::master(localComm_)
|
||||
? data // Only used on master
|
||||
: string::null
|
||||
),
|
||||
recvSizes,
|
||||
streamOpt,
|
||||
atomic,
|
||||
@ -420,63 +431,104 @@ bool Foam::OFstreamCollator::write
|
||||
headerEntries
|
||||
)
|
||||
);
|
||||
writeData& fileAndData = fileAndDataPtr();
|
||||
auto& fileAndData = *fileAndDataPtr;
|
||||
|
||||
PtrList<List<char>>& slaveData = fileAndData.slaveData_;
|
||||
List<List<char>>& procData = fileAndData.procData_;
|
||||
|
||||
UList<char> slice(const_cast<char*>(data.data()), label(data.size()));
|
||||
// Receive from these procs
|
||||
DynamicList<int> recvProcs;
|
||||
|
||||
if (UPstream::master(localComm_))
|
||||
{
|
||||
// Move in local data (master only!)
|
||||
fileAndData.transfer(localData);
|
||||
|
||||
// Storage for receive data
|
||||
procData.resize_nocopy(UPstream::nProcs(localComm_));
|
||||
|
||||
// Sorted by message size
|
||||
labelList order(Foam::sortedOrder(recvSizes));
|
||||
recvProcs.reserve_exact(order.size());
|
||||
|
||||
// Want to receive large messages first. Ignore empty slots
|
||||
forAllReverse(order, i)
|
||||
{
|
||||
const label proci = order[i];
|
||||
|
||||
// Ignore empty slots
|
||||
if (recvSizes[proci] > 0)
|
||||
{
|
||||
recvProcs.push_back(proci);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (UPstream::is_subrank(localComm_))
|
||||
{
|
||||
// Requires a size for decomposedBlockData::writeBlocks() logic
|
||||
procData.resize_nocopy(UPstream::nProcs(localComm_));
|
||||
}
|
||||
|
||||
slaveData.setSize(recvSizes.size());
|
||||
|
||||
// Gather all data onto master. Is done in local communicator since
|
||||
// not in write thread. Note that we do not store in contiguous
|
||||
// buffer since that would limit to 2G chars.
|
||||
// not in write thread.
|
||||
const label startOfRequests = UPstream::nRequests();
|
||||
if (Pstream::master(localComm_))
|
||||
|
||||
const int messageTag = (UPstream::msgType() + 256);
|
||||
|
||||
if (UPstream::master(localComm_))
|
||||
{
|
||||
for (label proci = 1; proci < slaveData.size(); proci++)
|
||||
// Receive from these procs (non-empty slots)
|
||||
for (const int proci : recvProcs)
|
||||
{
|
||||
slaveData.set(proci, new List<char>(recvSizes[proci]));
|
||||
auto& slot = procData[proci];
|
||||
slot.resize_nocopy(recvSizes[proci]);
|
||||
|
||||
UIPstream::read
|
||||
(
|
||||
UPstream::commsTypes::nonBlocking,
|
||||
proci,
|
||||
slaveData[proci].data(),
|
||||
slaveData[proci].size_bytes(),
|
||||
Pstream::msgType(),
|
||||
slot.data_bytes(),
|
||||
slot.size_bytes(),
|
||||
messageTag,
|
||||
localComm_
|
||||
);
|
||||
}
|
||||
}
|
||||
else
|
||||
else if (UPstream::is_subrank(localComm_) && !localData.empty())
|
||||
{
|
||||
// Send to content to master
|
||||
if
|
||||
(
|
||||
!UOPstream::write
|
||||
(
|
||||
UPstream::commsTypes::nonBlocking,
|
||||
0,
|
||||
slice.cdata(),
|
||||
slice.size_bytes(),
|
||||
Pstream::msgType(),
|
||||
UPstream::masterNo(),
|
||||
localData.cdata_bytes(),
|
||||
localData.size_bytes(),
|
||||
messageTag,
|
||||
localComm_
|
||||
)
|
||||
)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "Cannot send outgoing message. "
|
||||
<< "to:" << 0 << " nBytes:"
|
||||
<< label(slice.size_bytes())
|
||||
<< "Failure to send message (size: "
|
||||
<< localData.size() << ") to master" << nl
|
||||
<< Foam::abort(FatalError);
|
||||
}
|
||||
}
|
||||
|
||||
UPstream::waitRequests(startOfRequests);
|
||||
|
||||
// The localData has been moved (master) or communicated
|
||||
localData.clearStorage();
|
||||
|
||||
|
||||
// Queue up for threading
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
|
||||
// Append to thread buffer
|
||||
objects_.push(fileAndDataPtr.ptr());
|
||||
// Add to thread buffer (as FIFO), take ownership
|
||||
objects_.push_back(fileAndDataPtr.release());
|
||||
|
||||
// Start thread if not running
|
||||
if (!threadRunning_)
|
||||
@ -503,49 +555,48 @@ bool Foam::OFstreamCollator::write
|
||||
|
||||
return true;
|
||||
}
|
||||
else
|
||||
else if (dispatch == dispatchModes::FULL_THREADED)
|
||||
{
|
||||
if (debug)
|
||||
{
|
||||
Pout<< "OFstreamCollator : thread gather and write of " << fName
|
||||
<< " using communicator " << threadComm_ << endl;
|
||||
Pout<< "OFstreamCollator : thread gather; thread write "
|
||||
<< "(thread comm: " << threadComm_
|
||||
<< ") of " << fName << endl;
|
||||
}
|
||||
|
||||
if (!UPstream::haveThreads())
|
||||
if (UPstream::master(localComm_))
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "mpi does not seem to have thread support."
|
||||
<< " Make sure to set buffer size 'maxThreadFileBufferSize'"
|
||||
<< " to at least " << totalSize
|
||||
<< " to be able to do the collating before threading."
|
||||
<< exit(FatalError);
|
||||
waitForBufferSpace(localData.size());
|
||||
}
|
||||
|
||||
if (Pstream::master(localComm_))
|
||||
{
|
||||
waitForBufferSpace(data.size());
|
||||
}
|
||||
std::unique_ptr<writeData> fileAndDataPtr
|
||||
(
|
||||
new writeData
|
||||
(
|
||||
threadComm_,
|
||||
objectType,
|
||||
fName,
|
||||
recvSizes,
|
||||
streamOpt,
|
||||
atomic,
|
||||
append,
|
||||
headerEntries
|
||||
)
|
||||
);
|
||||
|
||||
// Move in local data (all procs)
|
||||
fileAndDataPtr->transfer(localData);
|
||||
|
||||
|
||||
// Queue up for threading
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
|
||||
// Push all file info on buffer. Note that no slave data provided
|
||||
// so it will trigger communication inside the thread
|
||||
objects_.push
|
||||
(
|
||||
new writeData
|
||||
(
|
||||
threadComm_,
|
||||
objectType,
|
||||
fName,
|
||||
data,
|
||||
recvSizes,
|
||||
streamOpt,
|
||||
atomic,
|
||||
append,
|
||||
headerEntries
|
||||
)
|
||||
);
|
||||
// Add to thread buffer (as FIFO), take ownership
|
||||
objects_.push_back(fileAndDataPtr.release());
|
||||
|
||||
// Note: no proc data provided
|
||||
// so it will trigger communication inside the thread!!!
|
||||
|
||||
if (!threadRunning_)
|
||||
{
|
||||
@ -570,14 +621,21 @@ bool Foam::OFstreamCollator::write
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
FatalErrorInFunction
|
||||
<< "Unknown dispatch mode: " << int(dispatch)
|
||||
<< " - programming error?" << abort(FatalError);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
void Foam::OFstreamCollator::waitAll()
|
||||
{
|
||||
// Wait for all buffer space to be available i.e. wait for all jobs
|
||||
// to finish
|
||||
if (Pstream::master(localComm_))
|
||||
// Wait for all buffer space to be available
|
||||
// - ie, wait for all jobs to finish
|
||||
|
||||
if (UPstream::master(localComm_))
|
||||
{
|
||||
if (debug)
|
||||
{
|
||||
|
||||
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2017-2018 OpenFOAM Foundation
|
||||
Copyright (C) 2021-2022 OpenCFD Ltd.
|
||||
Copyright (C) 2019-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -51,14 +51,14 @@ SourceFiles
|
||||
#ifndef Foam_OFstreamCollator_H
|
||||
#define Foam_OFstreamCollator_H
|
||||
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include "IOstream.H"
|
||||
#include "labelList.H"
|
||||
#include "FIFOStack.H"
|
||||
#include "SubList.H"
|
||||
#include "List.H"
|
||||
#include "CircularBuffer.H" // As FIFO
|
||||
#include "dictionary.H"
|
||||
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
|
||||
namespace Foam
|
||||
@ -72,26 +72,33 @@ class OFstreamCollator
|
||||
{
|
||||
// Private Class
|
||||
|
||||
//- Holds data to be written
|
||||
struct writeData
|
||||
{
|
||||
const label comm_;
|
||||
const word objectType_;
|
||||
const fileName pathName_;
|
||||
const string data_;
|
||||
DynamicList<char> localData_;
|
||||
const labelList sizes_;
|
||||
PtrList<List<char>> slaveData_;
|
||||
List<List<char>> procData_;
|
||||
const IOstreamOption streamOpt_;
|
||||
IOstreamOption::atomicType atomic_;
|
||||
IOstreamOption::appendType append_;
|
||||
const dictionary headerEntries_;
|
||||
|
||||
writeData() = delete; // No default construct
|
||||
writeData(const writeData&) = delete; // No copy construct
|
||||
writeData(writeData&&) = delete; // No move construct
|
||||
void operator=(const writeData&) = delete; // No copy assign
|
||||
void operator=(writeData&&) = delete; // No move assign
|
||||
|
||||
//- Construct without local or proc data
|
||||
writeData
|
||||
(
|
||||
const label comm,
|
||||
const word& objectType,
|
||||
const fileName& pathName,
|
||||
const string& data,
|
||||
const labelList& sizes,
|
||||
const labelUList& sizes,
|
||||
IOstreamOption streamOpt,
|
||||
IOstreamOption::atomicType atomic,
|
||||
IOstreamOption::appendType append,
|
||||
@ -101,27 +108,28 @@ class OFstreamCollator
|
||||
comm_(comm),
|
||||
objectType_(objectType),
|
||||
pathName_(pathName),
|
||||
data_(data),
|
||||
sizes_(sizes),
|
||||
slaveData_(),
|
||||
streamOpt_(streamOpt),
|
||||
atomic_(atomic),
|
||||
append_(append),
|
||||
headerEntries_(headerEntries)
|
||||
{}
|
||||
|
||||
//- The (approximate) size of master + any optional slave data
|
||||
//- Move reset local data
|
||||
void transfer(DynamicList<char>& localData)
|
||||
{
|
||||
localData_.transfer(localData);
|
||||
}
|
||||
|
||||
//- The (approximate) size of local + any optional proc data
|
||||
off_t size() const
|
||||
{
|
||||
off_t totalSize = data_.size();
|
||||
forAll(slaveData_, i)
|
||||
off_t total = localData_.size();
|
||||
for (const auto& data : procData_)
|
||||
{
|
||||
if (slaveData_.set(i))
|
||||
{
|
||||
totalSize += slaveData_[i].size();
|
||||
}
|
||||
total += data.size();
|
||||
}
|
||||
return totalSize;
|
||||
return total;
|
||||
}
|
||||
};
|
||||
|
||||
@ -135,8 +143,8 @@ class OFstreamCollator
|
||||
|
||||
std::unique_ptr<std::thread> thread_;
|
||||
|
||||
//- Stack of files to write + contents
|
||||
FIFOStack<writeData*> objects_;
|
||||
//- FIFO of files to write and their contents
|
||||
CircularBuffer<writeData*> objects_;
|
||||
|
||||
//- Whether thread is running (and not exited)
|
||||
bool threadRunning_;
|
||||
@ -156,9 +164,9 @@ class OFstreamCollator
|
||||
const label comm,
|
||||
const word& objectType,
|
||||
const fileName& fName,
|
||||
const string& masterData,
|
||||
const UList<char>& localData,
|
||||
const labelUList& recvSizes,
|
||||
const UPtrList<SubList<char>>& slaveData,
|
||||
const UList<std::string_view>& procData,
|
||||
IOstreamOption streamOpt,
|
||||
IOstreamOption::atomicType atomic,
|
||||
IOstreamOption::appendType append,
|
||||
@ -181,7 +189,8 @@ public:
|
||||
|
||||
// Constructors
|
||||
|
||||
//- Construct from buffer size. 0 = do not use thread
|
||||
//- Construct from buffer size (0 = do not use thread)
|
||||
//- and with worldComm
|
||||
explicit OFstreamCollator(const off_t maxBufferSize);
|
||||
|
||||
//- Construct from buffer size (0 = do not use thread)
|
||||
@ -195,14 +204,15 @@ public:
|
||||
|
||||
// Member Functions
|
||||
|
||||
//- Write file with contents.
|
||||
// Blocks until writethread has space available
|
||||
//- Write file with contents, possibly taking ownership of the
|
||||
//- content.
|
||||
// Blocks until write-thread has space available
|
||||
// (total file sizes < maxBufferSize)
|
||||
bool write
|
||||
(
|
||||
const word& objectType,
|
||||
const fileName&,
|
||||
const string& data,
|
||||
const fileName& fName,
|
||||
DynamicList<char>&& localData,
|
||||
IOstreamOption streamOpt,
|
||||
IOstreamOption::atomicType atomic,
|
||||
IOstreamOption::appendType append,
|
||||
@ -210,6 +220,37 @@ public:
|
||||
const dictionary& headerEntries = dictionary::null
|
||||
);
|
||||
|
||||
//- Write file with contents.
|
||||
FOAM_DEPRECATED_FOR(2023-09, "use write with movable content")
|
||||
bool write
|
||||
(
|
||||
const word& objectType,
|
||||
const fileName& fName,
|
||||
const std::string& s,
|
||||
IOstreamOption streamOpt,
|
||||
IOstreamOption::atomicType atomic,
|
||||
IOstreamOption::appendType append,
|
||||
const bool useThread = true,
|
||||
const dictionary& headerEntries = dictionary::null
|
||||
)
|
||||
{
|
||||
DynamicList<char> charData(label(s.size()));
|
||||
charData.resize(s.size());
|
||||
std::copy(s.begin(), s.end(), charData.begin());
|
||||
|
||||
return write
|
||||
(
|
||||
objectType,
|
||||
fName,
|
||||
std::move(charData),
|
||||
streamOpt,
|
||||
atomic,
|
||||
append,
|
||||
useThread,
|
||||
headerEntries
|
||||
);
|
||||
}
|
||||
|
||||
//- Wait for all thread actions to have finished
|
||||
void waitAll();
|
||||
};
|
||||
|
||||
@ -36,6 +36,7 @@ License
|
||||
#include "masterOFstream.H"
|
||||
#include "OFstream.H"
|
||||
#include "foamVersion.H"
|
||||
#include "UPstreamFile.H"
|
||||
|
||||
/* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
|
||||
|
||||
@ -80,6 +81,117 @@ namespace fileOperations
|
||||
}
|
||||
|
||||
|
||||
int Foam::fileOperations::collatedFileOperation::backend_
|
||||
(
|
||||
Foam::debug::optimisationSwitch("collated.backend", 0)
|
||||
);
|
||||
registerOptSwitch
|
||||
(
|
||||
"collated.backend",
|
||||
int,
|
||||
Foam::fileOperations::collatedFileOperation::backend_
|
||||
);
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
// A fixed-width 0-padded integer
|
||||
template<class IntType>
|
||||
void zeropadded(std::ostream& os, IntType val)
|
||||
{
|
||||
// set fill char and width
|
||||
os.setf(std::ios_base::right, std::ios_base::adjustfield);
|
||||
char fillch = os.fill('0');
|
||||
os.width(std::numeric_limits<IntType>::digits10+1);
|
||||
os << val;
|
||||
// restore fill char
|
||||
os.fill(fillch);
|
||||
}
|
||||
|
||||
// Some fancy rewriting of the header content to
|
||||
// include block.start, block.count, block.sizes
|
||||
template<class IntType>
|
||||
void rewriteBlockHeaderInfo
|
||||
(
|
||||
Foam::OCharStream& header,
|
||||
const Foam::UList<IntType>& blockSizes
|
||||
)
|
||||
{
|
||||
using namespace Foam;
|
||||
|
||||
// An int32 (or even smaller) is large enough for the
|
||||
// size of the header content, which is the offset to the first block
|
||||
|
||||
typedef int32_t headerOffsetType;
|
||||
|
||||
if
|
||||
(
|
||||
const auto paste = header.view().rfind('}');
|
||||
paste != std::string::npos
|
||||
)
|
||||
{
|
||||
// Keep everything in ASCII
|
||||
const auto oldFmt = header.format(IOstreamOption::ASCII);
|
||||
|
||||
// Fixed-width label entry
|
||||
Foam::ocharstream labelbuf;
|
||||
labelbuf.reserve_exact(32);
|
||||
|
||||
// Everything trailing after the last '}' from 'FoamFile {}'
|
||||
std::string trailing(header.view().substr(paste));
|
||||
header.seek(paste);
|
||||
|
||||
// <block.start< entry
|
||||
header.append(" block.start ");
|
||||
|
||||
// Position before writing the label
|
||||
const auto labelBegin = header.tellp();
|
||||
|
||||
// fixed-length integer
|
||||
{
|
||||
labelbuf.rewind();
|
||||
zeropadded(labelbuf, headerOffsetType(0));
|
||||
|
||||
header.append(labelbuf.view());
|
||||
header.endEntry();
|
||||
}
|
||||
|
||||
// block.count, block.sizes entries
|
||||
if (!blockSizes.empty())
|
||||
{
|
||||
// <block.count>
|
||||
header.append(" block.count ");
|
||||
header << blockSizes.size();
|
||||
header.endEntry();
|
||||
|
||||
// <block.sizes> : writeList for flatOutput
|
||||
header.append(" block.sizes\n");
|
||||
blockSizes.writeList(header);
|
||||
header.endEntry();
|
||||
}
|
||||
|
||||
// reattach old content
|
||||
header.append(trailing);
|
||||
|
||||
// update block.start information
|
||||
{
|
||||
labelbuf.rewind();
|
||||
zeropadded(labelbuf, headerOffsetType(header.view().size()));
|
||||
|
||||
header.overwrite(labelBegin, labelbuf.view());
|
||||
}
|
||||
|
||||
// Restore format
|
||||
header.format(oldFmt);
|
||||
}
|
||||
}
|
||||
|
||||
} // End anonymous namespace
|
||||
|
||||
|
||||
// * * * * * * * * * * * * Protected Member Functions * * * * * * * * * * * //
|
||||
|
||||
void Foam::fileOperations::collatedFileOperation::printBanner
|
||||
@ -90,7 +202,15 @@ void Foam::fileOperations::collatedFileOperation::printBanner
|
||||
DetailInfo
|
||||
<< "I/O : " << this->type();
|
||||
|
||||
if (mag(maxThreadFileBufferSize) > 1)
|
||||
if
|
||||
(
|
||||
collatedFileOperation::backend_ == backendType::MPIIO_BACKEND
|
||||
&& UPstream::File::supported()
|
||||
)
|
||||
{
|
||||
DetailInfo<< " [mpi/io]" << nl;
|
||||
}
|
||||
else if (Foam::mag(maxThreadFileBufferSize) > 1)
|
||||
{
|
||||
// FUTURE: deprecate or remove threading?
|
||||
DetailInfo
|
||||
@ -107,7 +227,7 @@ void Foam::fileOperations::collatedFileOperation::printBanner
|
||||
DetailInfo
|
||||
<< " [unthreaded] (maxThreadFileBufferSize = 0)." << nl;
|
||||
|
||||
if (mag(maxMasterFileBufferSize) < 1)
|
||||
if (Foam::mag(maxMasterFileBufferSize) < 1)
|
||||
{
|
||||
DetailInfo
|
||||
<< " With scheduled transfer" << nl;
|
||||
@ -350,6 +470,367 @@ Foam::fileName Foam::fileOperations::collatedFileOperation::objectPath
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
|
||||
bool Foam::fileOperations::collatedFileOperation::writeObject_legacy
|
||||
(
|
||||
const fileName& pathName,
|
||||
const regIOobject& io,
|
||||
IOstreamOption streamOpt,
|
||||
const bool writeOnProc
|
||||
) const
|
||||
{
|
||||
const Time& tm = io.time();
|
||||
const fileName& inst = io.instance();
|
||||
|
||||
if
|
||||
(
|
||||
(inst.isAbsolute() || !tm.processorCase())
|
||||
|| (io.global() || io.globalObject())
|
||||
|| (!UPstream::parRun())
|
||||
)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "Should not have been called for any of these conditions:"
|
||||
<< " - isAbsolute" << nl
|
||||
<< " - not processorCase" << nl
|
||||
<< " - global or globalObject" << nl
|
||||
<< " - not parRun" << nl
|
||||
<< abort(FatalError);
|
||||
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Re-check static maxThreadFileBufferSize variable to see
|
||||
// if needs to use threading
|
||||
const bool useThread = (Foam::mag(maxThreadFileBufferSize) > 1);
|
||||
|
||||
if (debug)
|
||||
{
|
||||
Pout<< "collatedFileOperation::writeObject :"
|
||||
<< " For object : " << io.name()
|
||||
<< " starting collating output to " << pathName
|
||||
<< " useThread:" << useThread << endl;
|
||||
}
|
||||
|
||||
if (!useThread)
|
||||
{
|
||||
writer_.waitAll();
|
||||
}
|
||||
|
||||
// Note: currently still NON_ATOMIC (Dec-2022)
|
||||
threadedCollatedOFstream os
|
||||
(
|
||||
writer_,
|
||||
pathName,
|
||||
streamOpt,
|
||||
useThread
|
||||
);
|
||||
|
||||
bool ok = os.good();
|
||||
|
||||
if (UPstream::master(comm_))
|
||||
{
|
||||
// Suppress comment banner
|
||||
const bool old = IOobject::bannerEnabled(false);
|
||||
|
||||
ok = ok && io.writeHeader(os);
|
||||
|
||||
IOobject::bannerEnabled(old);
|
||||
|
||||
// Additional header content
|
||||
dictionary dict;
|
||||
decomposedBlockData::writeExtraHeaderContent
|
||||
(
|
||||
dict,
|
||||
streamOpt,
|
||||
io
|
||||
);
|
||||
os.setHeaderEntries(dict);
|
||||
}
|
||||
|
||||
ok = ok && io.writeData(os);
|
||||
// No end divider for collated output
|
||||
|
||||
return ok;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool Foam::fileOperations::collatedFileOperation::writeObject_mpiio
|
||||
(
|
||||
const fileName& pathName,
|
||||
const regIOobject& io,
|
||||
IOstreamOption streamOpt,
|
||||
const bool writeOnProc
|
||||
) const
|
||||
{
|
||||
const Time& tm = io.time();
|
||||
const fileName& inst = io.instance();
|
||||
|
||||
if
|
||||
(
|
||||
(inst.isAbsolute() || !tm.processorCase())
|
||||
|| (io.global() || io.globalObject())
|
||||
|| (!UPstream::parRun())
|
||||
)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "Should not have been called for any of these conditions:"
|
||||
<< " - isAbsolute" << nl
|
||||
<< " - not processorCase" << nl
|
||||
<< " - global or globalObject" << nl
|
||||
<< " - not parRun" << nl
|
||||
<< abort(FatalError);
|
||||
|
||||
return false;
|
||||
}
|
||||
else if (!UPstream::File::supported())
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "Should not have been called without MPI/IO support" << nl
|
||||
<< abort(FatalError);
|
||||
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Stream to memory and then write with MPI/IO
|
||||
|
||||
// Fixed-width label entry
|
||||
ocharstream labelbuf;
|
||||
labelbuf.reserve_exact(32);
|
||||
|
||||
const label blocki = UPstream::myProcNo(comm_);
|
||||
const label nblock = UPstream::nProcs(comm_);
|
||||
|
||||
|
||||
// Overall header - most flexible to keep separate from block content
|
||||
OCharStream header(streamOpt);
|
||||
if (UPstream::master(comm_))
|
||||
{
|
||||
// Need binary for the overall content
|
||||
const auto oldFmt = header.format(IOstreamOption::BINARY);
|
||||
|
||||
decomposedBlockData::writeHeader
|
||||
(
|
||||
header,
|
||||
streamOpt,
|
||||
io
|
||||
);
|
||||
|
||||
header.format(oldFmt);
|
||||
}
|
||||
|
||||
// Overall footer.
|
||||
// Will be written by the last block, but format for everyone
|
||||
// so that the size is known
|
||||
OCharStream footer(streamOpt);
|
||||
{
|
||||
IOobject::writeEndDivider(footer);
|
||||
}
|
||||
|
||||
// The content buffer
|
||||
OCharStream os(streamOpt);
|
||||
|
||||
|
||||
bool ok = true;
|
||||
|
||||
// Generate content
|
||||
{
|
||||
const word procName("processor" + Foam::name(blocki));
|
||||
|
||||
// Write as primitiveEntry or commented content
|
||||
constexpr bool isDictFormat = false;
|
||||
|
||||
if constexpr (isDictFormat)
|
||||
{
|
||||
// Like writeKeyword()
|
||||
os << nl << procName << nl;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Human-readable comments
|
||||
os << nl << "// " << procName << nl;
|
||||
}
|
||||
|
||||
|
||||
// Begin of block content LABEL(...)
|
||||
|
||||
// Position before writing the label
|
||||
const auto labelBegin = os.tellp();
|
||||
|
||||
// Replace: os << label(len) << nl;
|
||||
// with a fixed-length version
|
||||
{
|
||||
labelbuf.rewind();
|
||||
zeropadded(labelbuf, label(0));
|
||||
|
||||
os.append(labelbuf.view());
|
||||
os << nl;
|
||||
}
|
||||
|
||||
const auto lineNumber = os.lineNumber();
|
||||
|
||||
// Begin binary blob
|
||||
{
|
||||
const auto oldFmt = os.format(IOstreamOption::BINARY);
|
||||
|
||||
// count is unknown but irrelevant for serial
|
||||
os.beginRawWrite(0);
|
||||
|
||||
os.format(oldFmt);
|
||||
}
|
||||
|
||||
// Position of binary blob - after the '(' begin
|
||||
const auto blobBegin = os.tellp();
|
||||
|
||||
// block 0 gets a FoamFile header, without comments
|
||||
if (UPstream::master(comm_))
|
||||
{
|
||||
// Suppress comment banner
|
||||
const bool old = IOobject::bannerEnabled(false);
|
||||
|
||||
ok = ok && io.writeHeader(os);
|
||||
|
||||
IOobject::bannerEnabled(old);
|
||||
}
|
||||
|
||||
if (writeOnProc)
|
||||
{
|
||||
ok = ok && io.writeData(os);
|
||||
}
|
||||
|
||||
// How many chars of binary data written?
|
||||
const int64_t blobCount(os.tellp() - blobBegin);
|
||||
|
||||
// Finalize the binary blob - closing ')'
|
||||
os.endRawWrite();
|
||||
os.lineNumber() = lineNumber;
|
||||
os << nl;
|
||||
|
||||
// Update the size information for the binary blob
|
||||
if (blobCount > 0)
|
||||
{
|
||||
labelbuf.rewind();
|
||||
zeropadded(labelbuf, label(blobCount));
|
||||
|
||||
os.overwrite(labelBegin, labelbuf.view());
|
||||
}
|
||||
else
|
||||
{
|
||||
// Seek with begin-1 to also overwrite newline with space
|
||||
os.seek(int64_t(labelBegin)-1);
|
||||
|
||||
if constexpr (isDictFormat)
|
||||
{
|
||||
os << ' ' << label(0);
|
||||
}
|
||||
else
|
||||
{
|
||||
os << nl << label(0) << nl;
|
||||
}
|
||||
}
|
||||
|
||||
if constexpr (isDictFormat)
|
||||
{
|
||||
os.endEntry();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// All content now exists
|
||||
// - get the offsets/sizes etc (without footer!)
|
||||
const List<int64_t> sizes
|
||||
(
|
||||
UPstream::allGatherValues<int64_t>(os.view().size(), comm_)
|
||||
);
|
||||
|
||||
constexpr bool withHeaderBlockStart = true;
|
||||
constexpr bool withHeaderBlockSizes = false; // not yet?
|
||||
|
||||
// Update the header with block.start, block.sizes information?
|
||||
if (withHeaderBlockStart && UPstream::master(comm_))
|
||||
{
|
||||
if (withHeaderBlockSizes)
|
||||
{
|
||||
// start+sizes
|
||||
rewriteBlockHeaderInfo(header, sizes);
|
||||
}
|
||||
else
|
||||
{
|
||||
// start only - pass in empty size list
|
||||
rewriteBlockHeaderInfo(header, UList<int>());
|
||||
}
|
||||
}
|
||||
|
||||
// Output starts after the header - only generated on master
|
||||
int64_t totalSize(header.view().size());
|
||||
Pstream::broadcast(totalSize, comm_);
|
||||
|
||||
for (label i = 0; i < blocki; ++i)
|
||||
{
|
||||
totalSize += sizes[i];
|
||||
}
|
||||
|
||||
// The file offset for my block
|
||||
const int64_t blockOffset(totalSize);
|
||||
|
||||
for (label i = blocki; i < nblock; ++i)
|
||||
{
|
||||
totalSize += sizes[i];
|
||||
}
|
||||
|
||||
// Add in footer size - was generated everywhere
|
||||
totalSize += int64_t(footer.view().size());
|
||||
|
||||
// The last block also gets the footer to write
|
||||
if (blocki == nblock-1)
|
||||
{
|
||||
os.extend_exact(footer.view().size());
|
||||
os.append(footer.view());
|
||||
}
|
||||
|
||||
|
||||
// Make directory, open file
|
||||
// ~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
const fileName fName
|
||||
(
|
||||
objectPath(io, decomposedBlockData::typeName)
|
||||
);
|
||||
|
||||
|
||||
// Using mkDir not Foam::mkDir
|
||||
mkDir(fName.path());
|
||||
|
||||
// Write file contents
|
||||
{
|
||||
UPstream::File file;
|
||||
|
||||
file.open_write(comm_, fName);
|
||||
|
||||
// header from master
|
||||
if (UPstream::master(comm_))
|
||||
{
|
||||
file.write_at(0, header.view());
|
||||
}
|
||||
|
||||
// data from all - footer is already in the last block
|
||||
file.write_at_all(blockOffset, os.view());
|
||||
|
||||
file.set_size(totalSize);
|
||||
file.close();
|
||||
}
|
||||
|
||||
return ok;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
|
||||
bool Foam::fileOperations::collatedFileOperation::writeObject
|
||||
(
|
||||
const regIOobject& io,
|
||||
@ -468,62 +949,35 @@ bool Foam::fileOperations::collatedFileOperation::writeObject
|
||||
}
|
||||
else
|
||||
{
|
||||
// Re-check static maxThreadFileBufferSize variable to see
|
||||
// if needs to use threading
|
||||
const bool useThread = (mag(maxThreadFileBufferSize) > 1);
|
||||
|
||||
if (debug)
|
||||
{
|
||||
Pout<< "collatedFileOperation::writeObject :"
|
||||
<< " For object : " << io.name()
|
||||
<< " starting collating output to " << pathName
|
||||
<< " useThread:" << useThread << endl;
|
||||
}
|
||||
|
||||
if (!useThread)
|
||||
{
|
||||
writer_.waitAll();
|
||||
}
|
||||
|
||||
// Note: currently still NON_ATOMIC (Dec-2022)
|
||||
threadedCollatedOFstream os
|
||||
if
|
||||
(
|
||||
writer_,
|
||||
pathName,
|
||||
streamOpt,
|
||||
useThread
|
||||
);
|
||||
|
||||
bool ok = os.good();
|
||||
|
||||
if (UPstream::master(comm_))
|
||||
collatedFileOperation::backend_ == backendType::MPIIO_BACKEND
|
||||
&& UPstream::File::supported()
|
||||
)
|
||||
{
|
||||
// Suppress comment banner
|
||||
const bool old = IOobject::bannerEnabled(false);
|
||||
|
||||
ok = ok && io.writeHeader(os);
|
||||
|
||||
IOobject::bannerEnabled(old);
|
||||
|
||||
// Additional header content
|
||||
dictionary dict;
|
||||
decomposedBlockData::writeExtraHeaderContent
|
||||
return writeObject_mpiio
|
||||
(
|
||||
dict,
|
||||
pathName,
|
||||
io,
|
||||
streamOpt,
|
||||
io
|
||||
writeOnProc
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
return writeObject_legacy
|
||||
(
|
||||
pathName,
|
||||
io,
|
||||
streamOpt,
|
||||
writeOnProc
|
||||
);
|
||||
os.setHeaderEntries(dict);
|
||||
}
|
||||
|
||||
ok = ok && io.writeData(os);
|
||||
// No end divider for collated output
|
||||
|
||||
return ok;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void Foam::fileOperations::collatedFileOperation::flush() const
|
||||
{
|
||||
if (debug)
|
||||
|
||||
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2017 OpenFOAM Foundation
|
||||
Copyright (C) 2019-2023 OpenCFD Ltd.
|
||||
Copyright (C) 2019-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -31,6 +31,9 @@ Description
|
||||
Version of masterUncollatedFileOperation that collates regIOobjects
|
||||
into a container in the processors/ subdirectory.
|
||||
|
||||
Can use MPI/IO for the backend (when backend_ == 1),
|
||||
which can be defined by the "collated.backend" optimisation switch.
|
||||
|
||||
Uses threading if maxThreadFileBufferSize != 0.
|
||||
> 0 : Can use mpi inside thread to collect data if buffer is not
|
||||
large enough. Does need full thread support inside MPI.
|
||||
@ -68,6 +71,16 @@ class collatedFileOperation
|
||||
:
|
||||
public masterUncollatedFileOperation
|
||||
{
|
||||
// Private Enumeration
|
||||
|
||||
//- Preferred backend for collated format
|
||||
enum backendType : int
|
||||
{
|
||||
LEGACY_BACKEND = 0,
|
||||
MPIIO_BACKEND = 1,
|
||||
};
|
||||
|
||||
|
||||
// Private Data
|
||||
|
||||
//- Communicator allocated/managed by us
|
||||
@ -79,6 +92,26 @@ class collatedFileOperation
|
||||
//- Any initialisation steps after constructing
|
||||
void init(bool verbose);
|
||||
|
||||
//- Writes a regIOobject (so header, contents and divider).
|
||||
// Returns success state.
|
||||
bool writeObject_legacy
|
||||
(
|
||||
const fileName& pathName,
|
||||
const regIOobject&,
|
||||
IOstreamOption streamOpt,
|
||||
const bool writeOnProc
|
||||
) const;
|
||||
|
||||
//- Writes a regIOobject (so header, contents and divider).
|
||||
// Returns success state.
|
||||
bool writeObject_mpiio
|
||||
(
|
||||
const fileName& pathName,
|
||||
const regIOobject&,
|
||||
IOstreamOption streamOpt,
|
||||
const bool writeOnProc
|
||||
) const;
|
||||
|
||||
|
||||
protected:
|
||||
|
||||
@ -101,7 +134,6 @@ protected:
|
||||
IOstreamOption streamOpt
|
||||
) const;
|
||||
|
||||
|
||||
public:
|
||||
|
||||
//- Runtime type information
|
||||
@ -110,6 +142,9 @@ public:
|
||||
|
||||
// Static Data
|
||||
|
||||
//- The type of backend to be used
|
||||
static int backend_;
|
||||
|
||||
//- Max size of thread buffer size. This is the overall size of
|
||||
// all files to be written. Starts blocking if not enough size.
|
||||
// Read as float to enable easy specification of large sizes.
|
||||
|
||||
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2017-2018 OpenFOAM Foundation
|
||||
Copyright (C) 2020-2022 OpenCFD Ltd.
|
||||
Copyright (C) 2020-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -41,14 +41,17 @@ Foam::threadedCollatedOFstream::threadedCollatedOFstream
|
||||
const bool useThread
|
||||
)
|
||||
:
|
||||
OStringStream(streamOpt),
|
||||
OCharStream(streamOpt),
|
||||
writer_(writer),
|
||||
pathName_(pathName),
|
||||
atomic_(atomic),
|
||||
compression_(streamOpt.compression()),
|
||||
useThread_(useThread),
|
||||
headerEntries_()
|
||||
{}
|
||||
{
|
||||
// Start with a slightly larger buffer
|
||||
OCharStream::reserve(4*1024);
|
||||
}
|
||||
|
||||
|
||||
Foam::threadedCollatedOFstream::threadedCollatedOFstream
|
||||
@ -74,11 +77,22 @@ Foam::threadedCollatedOFstream::threadedCollatedOFstream
|
||||
|
||||
Foam::threadedCollatedOFstream::~threadedCollatedOFstream()
|
||||
{
|
||||
commit();
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
||||
|
||||
void Foam::threadedCollatedOFstream::commit()
|
||||
{
|
||||
// Take ownership of serialized content, without copying or reallocation
|
||||
DynamicList<char> charData(OCharStream::release());
|
||||
|
||||
writer_.write
|
||||
(
|
||||
decomposedBlockData::typeName,
|
||||
pathName_,
|
||||
str(),
|
||||
std::move(charData),
|
||||
IOstreamOption(IOstreamOption::BINARY, version(), compression_),
|
||||
atomic_,
|
||||
IOstreamOption::NO_APPEND,
|
||||
|
||||
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2017-2018 OpenFOAM Foundation
|
||||
Copyright (C) 2021-2022 OpenCFD Ltd.
|
||||
Copyright (C) 2021-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -39,7 +39,7 @@ SourceFiles
|
||||
#define Foam_threadedCollatedOFstream_H
|
||||
|
||||
#include "dictionary.H"
|
||||
#include "StringStream.H"
|
||||
#include "SpanStream.H"
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
|
||||
@ -55,7 +55,7 @@ class OFstreamCollator;
|
||||
|
||||
class threadedCollatedOFstream
|
||||
:
|
||||
public OStringStream
|
||||
public OCharStream
|
||||
{
|
||||
// Private Data
|
||||
|
||||
@ -78,6 +78,11 @@ class threadedCollatedOFstream
|
||||
dictionary headerEntries_;
|
||||
|
||||
|
||||
// Private Member Functions
|
||||
|
||||
//- Commit buffered information
|
||||
void commit();
|
||||
|
||||
public:
|
||||
|
||||
// Constructors
|
||||
@ -102,12 +107,14 @@ public:
|
||||
);
|
||||
|
||||
|
||||
//- Destructor
|
||||
//- Destructor - commits buffered information to file
|
||||
~threadedCollatedOFstream();
|
||||
|
||||
|
||||
// Member Functions
|
||||
|
||||
// -> using OCharStream::rewind
|
||||
|
||||
//- Define the header entries for the data block(s)
|
||||
void setHeaderEntries(const dictionary& dict);
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user