collatedFileOperation: preferentially collect all data in the simulation thread

so the write thread does not have to do any parallel communication.  This avoids
the bugs in the threading support in OpenMPI.

Patch contributed by Mattijs Janssens
Resolves bug-report https://bugs.openfoam.org/view.php?id=2669
This commit is contained in:
Henry Weller
2017-10-28 09:24:47 +01:00
parent 28fdc3c4e8
commit 16416c79ec
6 changed files with 611 additions and 162 deletions

View File

@ -31,8 +31,10 @@ License
#include "IFstream.H"
#include "IStringStream.H"
#include "dictionary.H"
#include <sys/time.h>
#include "objectRegistry.H"
#include "SubList.H"
#include "labelPair.H"
#include "masterUncollatedFileOperation.H"
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
@ -586,12 +588,114 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
}
void Foam::decomposedBlockData::gather
(
const label comm,
const label data,
labelList& datas
)
{
const label nProcs = UPstream::nProcs(comm);
datas.setSize(nProcs);
char* data0Ptr = reinterpret_cast<char*>(datas.begin());
labelList recvOffsets;
labelList recvSizes;
if (UPstream::master())
{
recvOffsets.setSize(nProcs);
forAll(recvOffsets, proci)
{
recvOffsets[proci] =
reinterpret_cast<char*>(&datas[proci])
- data0Ptr;
}
recvSizes.setSize(nProcs, sizeof(label));
}
UPstream::gather
(
reinterpret_cast<const char*>(&data),
sizeof(label),
data0Ptr,
recvSizes,
recvOffsets,
comm
);
}
void Foam::decomposedBlockData::gatherSlaveData
(
const label comm,
const UList<char>& data,
const labelUList& recvSizes,
const label startProc,
const label nProcs,
List<int>& sliceOffsets,
List<char>& recvData
)
{
// Calculate master data
List<int> sliceSizes;
if (UPstream::master(comm))
{
const label numProcs = UPstream::nProcs(comm);
sliceSizes.setSize(numProcs, 0);
sliceOffsets.setSize(numProcs+1, 0);
int totalSize = 0;
label proci = startProc;
for (label i = 0; i < nProcs; i++)
{
sliceSizes[proci] = int(recvSizes[proci]);
sliceOffsets[proci] = totalSize;
totalSize += sliceSizes[proci];
proci++;
}
sliceOffsets[proci] = totalSize;
recvData.setSize(totalSize);
}
int nSend = 0;
if
(
!UPstream::master(comm)
&& (UPstream::myProcNo(comm) >= startProc)
&& (UPstream::myProcNo(comm) < startProc+nProcs)
)
{
nSend = data.byteSize();
}
UPstream::gather
(
data.begin(),
nSend,
recvData.begin(),
sliceSizes,
sliceOffsets,
comm
);
}
bool Foam::decomposedBlockData::writeBlocks
(
const label comm,
autoPtr<OSstream>& osPtr,
List<std::streamoff>& start,
const UList<char>& data,
const labelUList& recvSizes,
const bool haveSlaveData,
const List<char>& slaveData,
const UPstream::commsTypes commsType,
const bool syncReturnState
)
@ -601,20 +705,56 @@ bool Foam::decomposedBlockData::writeBlocks
Pout<< "decomposedBlockData::writeBlocks:"
<< " stream:" << (osPtr.valid() ? osPtr().name() : "invalid")
<< " data:" << data.size()
<< " haveSlaveData:" << haveSlaveData
<< " (master only) slaveData:" << slaveData.size()
<< " commsType:" << Pstream::commsTypeNames[commsType] << endl;
}
const label nProcs = UPstream::nProcs(comm);
bool ok = true;
labelList recvSizes(Pstream::nProcs(comm));
recvSizes[Pstream::myProcNo(comm)] = data.byteSize();
Pstream::gatherList(recvSizes, Pstream::msgType(), comm);
if (haveSlaveData)
{
// Already have gathered the slave data. communicator only used to
// check who is the master
if (commsType == UPstream::commsTypes::scheduled)
if (UPstream::master(comm))
{
OSstream& os = osPtr();
start.setSize(nProcs);
// Write master data
{
os << nl << "// Processor" << UPstream::masterNo() << nl;
start[UPstream::masterNo()] = os.stdStream().tellp();
os << data;
}
// Write slaves
label slaveOffset = 0;
for (label proci = 1; proci < nProcs; proci++)
{
os << nl << nl << "// Processor" << proci << nl;
start[proci] = os.stdStream().tellp();
os << SubList<char>(slaveData, recvSizes[proci], slaveOffset);
slaveOffset += recvSizes[proci];
}
ok = os.good();
}
}
else if (commsType == UPstream::commsTypes::scheduled)
{
if (UPstream::master(comm))
{
start.setSize(UPstream::nProcs(comm));
start.setSize(nProcs);
OSstream& os = osPtr();
@ -626,7 +766,7 @@ bool Foam::decomposedBlockData::writeBlocks
}
// Write slaves
List<char> elems;
for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
for (label proci = 1; proci < nProcs; proci++)
{
elems.setSize(recvSizes[proci]);
IPstream::read
@ -661,101 +801,115 @@ bool Foam::decomposedBlockData::writeBlocks
}
else
{
if (debug)
// Write master data
if (UPstream::master(comm))
{
struct timeval tv;
gettimeofday(&tv, nullptr);
Pout<< "Starting sending at:"
<< 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
<< Foam::endl;
}
label startOfRequests = Pstream::nRequests();
if (!UPstream::master(comm))
{
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
UPstream::masterNo(),
data.begin(),
data.byteSize(),
Pstream::msgType(),
comm
);
Pstream::waitRequests(startOfRequests);
}
else
{
List<List<char>> recvBufs(Pstream::nProcs(comm));
for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
{
recvBufs[proci].setSize(recvSizes[proci]);
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
recvBufs[proci].begin(),
recvSizes[proci],
Pstream::msgType(),
comm
);
}
if (debug)
{
struct timeval tv;
gettimeofday(&tv, nullptr);
Pout<< "Starting master-only writing at:"
<< 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
<< Foam::endl;
}
start.setSize(UPstream::nProcs(comm));
start.setSize(nProcs);
OSstream& os = osPtr();
// Write master data
{
os << nl << "// Processor" << UPstream::masterNo() << nl;
start[UPstream::masterNo()] = os.stdStream().tellp();
os << data;
}
os << nl << "// Processor" << UPstream::masterNo() << nl;
start[UPstream::masterNo()] = os.stdStream().tellp();
os << data;
}
if (debug)
{
struct timeval tv;
gettimeofday(&tv, nullptr);
Pout<< "Starting slave writing at:"
<< 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
<< Foam::endl;
}
// Write slaves
for (label proci = 1; proci < UPstream::nProcs(comm); proci++)
{
os << nl << nl << "// Processor" << proci << nl;
start[proci] = os.stdStream().tellp();
// Find out how many processor can be received into
// maxMasterFileBufferSize
if (Pstream::finishedRequest(startOfRequests+proci-1))
// Starting slave processor and number of processors
labelPair startAndSize(1, nProcs-1);
while (startAndSize[1] > 0)
{
labelPair masterData(startAndSize);
if (UPstream::master(comm))
{
label totalSize = 0;
label proci = masterData[0];
while
(
proci < nProcs
&& (
totalSize+recvSizes[proci]
< fileOperations::masterUncollatedFileOperation::
maxMasterFileBufferSize
)
)
{
os << recvBufs[proci];
totalSize += recvSizes[proci];
proci++;
}
masterData[1] = proci-masterData[0];
}
// Scatter masterData
UPstream::scatter
(
reinterpret_cast<const char*>(masterData.cdata()),
List<int>(nProcs, sizeof(masterData)),
List<int>(nProcs, 0),
reinterpret_cast<char*>(startAndSize.data()),
sizeof(startAndSize),
comm
);
if (startAndSize[1] == 0)
{
break;
}
// Gather data from (a slice of) the slaves
List<int> sliceOffsets;
List<char> recvData;
gatherSlaveData
(
comm,
data,
recvSizes,
startAndSize[0], // startProc,
startAndSize[1], // nProcs,
sliceOffsets,
recvData
);
if (UPstream::master(comm))
{
OSstream& os = osPtr();
// Write slaves
for
(
label proci = startAndSize[0];
proci < startAndSize[0]+startAndSize[1];
proci++
)
{
os << nl << nl << "// Processor" << proci << nl;
start[proci] = os.stdStream().tellp();
os <<
SubList<char>
(
recvData,
sliceOffsets[proci+1]-sliceOffsets[proci],
sliceOffsets[proci]
);
}
}
Pstream::resetRequests(startOfRequests);
ok = os.good();
startAndSize[0] += startAndSize[1];
}
if (UPstream::master(comm))
{
ok = osPtr().good();
}
}
if (debug)
{
struct timeval tv;
gettimeofday(&tv, nullptr);
Pout<< "Finished master-only writing at:"
<< 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s"
<< Foam::endl;
}
if (syncReturnState)
@ -868,8 +1022,23 @@ bool Foam::decomposedBlockData::writeObject
osPtr.reset(new OFstream(objectPath(), IOstream::BINARY, ver, cmp));
IOobject::writeHeader(osPtr());
}
labelList recvSizes;
gather(comm_, this->byteSize(), recvSizes);
List<std::streamoff> start;
return writeBlocks(comm_, osPtr, start, *this, commsType_);
List<char> slaveData; // dummy already received slave data
return writeBlocks
(
comm_,
osPtr,
start,
*this,
recvSizes,
false, // don't have slave data
slaveData,
commsType_
);
}

View File

@ -169,6 +169,32 @@ public:
const UPstream::commsTypes commsType
);
//- Helper: gather single label. Note: using native Pstream.
// datas sized with num procs but undefined contents on
// slaves
static void gather
(
const label comm,
const label data,
labelList& datas
);
//- Helper: gather data from (subset of) slaves. Returns
// recvData : received data
// recvOffsets : offset in data. recvOffsets is nProcs+1
static void gatherSlaveData
(
const label comm,
const UList<char>& data,
const labelUList& recvSizes,
const label startProc,
const label nProcs,
List<int>& recvOffsets,
List<char>& recvData
);
//- Write *this. Ostream only valid on master. Returns starts of
// processor blocks
static bool writeBlocks
@ -177,6 +203,12 @@ public:
autoPtr<OSstream>& osPtr,
List<std::streamoff>& start,
const UList<char>&,
const labelUList& recvSizes,
const bool haveSlaveData, // does master have slaveData
const List<char>& slaveData, // optional slave data (on master)
const UPstream::commsTypes,
const bool syncReturnState = true
);

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
\\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -55,11 +55,13 @@ const Foam::NamedEnum<Foam::UPstream::commsTypes, 3>
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::UPstream::setParRun(const label nProcs)
void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
{
if (nProcs == 0)
{
parRun_ = false;
haveThreads_ = haveThreads;
freeCommunicator(UPstream::worldComm);
label comm = allocateCommunicator(-1, labelList(1, label(0)), false);
if (comm != UPstream::worldComm)
@ -76,6 +78,7 @@ void Foam::UPstream::setParRun(const label nProcs)
else
{
parRun_ = true;
haveThreads_ = haveThreads;
// Redo worldComm communicator (this has been created at static
// initialisation time)
@ -401,6 +404,8 @@ Foam::label Foam::UPstream::procNo
bool Foam::UPstream::parRun_(false);
bool Foam::UPstream::haveThreads_(false);
Foam::LIFOStack<Foam::label> Foam::UPstream::freeComms_;
Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);

View File

@ -184,6 +184,9 @@ private:
//- By default this is not a parallel run
static bool parRun_;
//- Have support for threads?
static bool haveThreads_;
//- Standard transfer message type
static int msgType_;
@ -211,7 +214,7 @@ private:
// Private Member Functions
//- Set data for parallel running
static void setParRun(const label nProcs);
static void setParRun(const label nProcs, const bool haveThreads);
//- Calculate linear communication schedule
static List<commsStruct> calcLinearComm(const label nProcs);
@ -397,6 +400,12 @@ public:
return parRun_;
}
//- Have support for threads
static bool haveThreads()
{
return haveThreads_;
}
//- Number of processes in parallel run
static label nProcs(const label communicator = 0)
{
@ -499,6 +508,47 @@ public:
labelUList& recvData,
const label communicator = 0
);
//- Exchange data with all processors (in the communicator)
// sendSizes, sendOffsets give (per processor) the slice of
// sendData to send, similarly recvSizes, recvOffsets give the slice
// of recvData to receive
static void allToAll
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator = 0
);
//- Receive data from all processors on the master
static void gather
(
const char* sendData,
int sendSize,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator = 0
);
//- Send data to all processors from the root of the communicator
static void scatter
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
int recvSize,
const label communicator = 0
);
};

View File

@ -25,11 +25,7 @@ License
#include "OFstreamCollator.H"
#include "OFstream.H"
#include "OSspecific.H"
#include "IOstreams.H"
#include "Pstream.H"
#include "decomposedBlockData.H"
#include "PstreamReduceOps.H"
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
@ -46,7 +42,10 @@ bool Foam::OFstreamCollator::writeFile
const label comm,
const word& typeName,
const fileName& fName,
const string& s,
const string& masterData,
const labelUList& recvSizes,
const bool haveSlaveData, // does master have slaveData
const UList<char>& slaveData, // on master: slave data
IOstream::streamFormat fmt,
IOstream::versionNumber ver,
IOstream::compressionType cmp,
@ -55,7 +54,7 @@ bool Foam::OFstreamCollator::writeFile
{
if (debug)
{
Pout<< "OFstreamCollator : Writing " << s.size()
Pout<< "OFstreamCollator : Writing " << masterData.size()
<< " bytes to " << fName
<< " using comm " << comm << endl;
}
@ -90,11 +89,17 @@ bool Foam::OFstreamCollator::writeFile
);
}
UList<char> slice(const_cast<char*>(s.data()), label(s.size()));
// Assuming threaded writing hides any slowness so we might
// as well use scheduled communication to send the data to
// the master processor in order.
UList<char> slice
(
const_cast<char*>(masterData.data()),
label(masterData.size())
);
// Assuming threaded writing hides any slowness so we
// can use scheduled communication to send the data to
// the master processor in order. However can be unstable
// for some mpi so default is non-blocking.
List<std::streamoff> start;
decomposedBlockData::writeBlocks
@ -103,7 +108,10 @@ bool Foam::OFstreamCollator::writeFile
osPtr,
start,
slice,
UPstream::commsTypes::scheduled,
recvSizes,
haveSlaveData,
slaveData,
UPstream::commsTypes::nonBlocking, //scheduled,
false // do not reduce return state
);
@ -115,8 +123,18 @@ bool Foam::OFstreamCollator::writeFile
if (debug)
{
Pout<< "OFstreamCollator : Finished writing " << s.size()
<< " bytes to " << fName
Pout<< "OFstreamCollator : Finished writing " << masterData.size()
<< " bytes";
if (UPstream::master(comm))
{
off_t sum = 0;
forAll(recvSizes, i)
{
sum += recvSizes[i];
}
Pout<< " (overall " << sum << ")";
}
Pout<< " to " << fName
<< " using comm " << comm << endl;
}
@ -133,14 +151,11 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
{
writeData* ptr = nullptr;
//pthread_mutex_lock(&handler.mutex_);
lockMutex(handler.mutex_);
if (handler.objects_.size())
{
ptr = handler.objects_.pop();
}
//pthread_mutex_unlock(&handler.mutex_);
unlockMutex(handler.mutex_);
if (!ptr)
@ -151,10 +166,14 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
{
bool ok = writeFile
(
handler.comm_,
ptr->comm_,
ptr->typeName_,
ptr->pathName_,
ptr->data_,
ptr->sizes_,
ptr->haveSlaveData_,
ptr->slaveData_,
ptr->format_,
ptr->version_,
ptr->compression_,
@ -177,22 +196,54 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg)
Pout<< "OFstreamCollator : Exiting write thread " << endl;
}
//pthread_mutex_lock(&handler.mutex_);
lockMutex(handler.mutex_);
handler.threadRunning_ = false;
//pthread_mutex_unlock(&handler.mutex_);
unlockMutex(handler.mutex_);
return nullptr;
}
void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
{
while (true)
{
// Count files to be written
off_t totalSize = 0;
lockMutex(mutex_);
forAllConstIter(FIFOStack<writeData*>, objects_, iter)
{
totalSize += iter()->size();
}
unlockMutex(mutex_);
if (totalSize == 0 || (totalSize+wantedSize) <= maxBufferSize_)
{
break;
}
if (debug)
{
lockMutex(mutex_);
Pout<< "OFstreamCollator : Waiting for buffer space."
<< " Currently in use:" << totalSize
<< " limit:" << maxBufferSize_
<< " files:" << objects_.size()
<< endl;
unlockMutex(mutex_);
}
sleep(5);
}
}
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize)
:
maxBufferSize_(maxBufferSize),
//mutex_(PTHREAD_MUTEX_INITIALIZER),
mutex_
(
maxBufferSize_ > 0
@ -228,7 +279,6 @@ Foam::OFstreamCollator::~OFstreamCollator()
Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
}
//pthread_join(thread_, nullptr);
joinThread(thread_);
}
if (thread_ != -1)
@ -259,56 +309,163 @@ bool Foam::OFstreamCollator::write
const bool append
)
{
if (maxBufferSize_ > 0)
// Determine (on master) sizes to receive. Note: do NOT use thread
// communicator
labelList recvSizes;
decomposedBlockData::gather(Pstream::worldComm, data.size(), recvSizes);
off_t totalSize = 0;
label maxLocalSize = 0;
{
while (true)
for (label proci = 0; proci < recvSizes.size(); proci++)
{
// Count files to be written
off_t totalSize = 0;
//pthread_mutex_lock(&mutex_);
lockMutex(mutex_);
forAllConstIter(FIFOStack<writeData*>, objects_, iter)
{
totalSize += iter()->data_.size();
}
//pthread_mutex_unlock(&mutex_);
unlockMutex(mutex_);
if
(
totalSize == 0
|| (totalSize+off_t(data.size()) < maxBufferSize_)
)
{
break;
}
if (debug)
{
Pout<< "OFstreamCollator : Waiting for buffer space."
<< " Currently in use:" << totalSize
<< " limit:" << maxBufferSize_
<< endl;
}
sleep(5);
totalSize += recvSizes[proci];
maxLocalSize = max(maxLocalSize, recvSizes[proci]);
}
Pstream::scatter(totalSize, Pstream::msgType(), Pstream::worldComm);
Pstream::scatter(maxLocalSize, Pstream::msgType(), Pstream::worldComm);
}
if (maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
{
if (debug)
{
Pout<< "OFstreamCollator : non-thread gather and write of " << fName
<< " using worldComm" << endl;
}
// Direct collating and writing (so master blocks until all written!)
const List<char> dummySlaveData;
return writeFile
(
UPstream::worldComm,
typeName,
fName,
data,
recvSizes,
false, // no slave data provided yet
dummySlaveData,
fmt,
ver,
cmp,
append
);
}
else if (totalSize <= maxBufferSize_)
{
// Total size can be stored locally so receive all data now and only
// do the writing in the thread
if (debug)
{
Pout<< "OFstreamCollator : relaying write of " << fName
<< " to thread " << endl;
Pout<< "OFstreamCollator : non-thread gather; thread write of "
<< fName << endl;
}
//pthread_mutex_lock(&mutex_);
lockMutex(mutex_);
objects_.push
if (Pstream::master())
{
waitForBufferSpace(totalSize);
}
// Allocate local buffer for all collated data
autoPtr<writeData> fileAndDataPtr
(
new writeData(typeName, fName, data, fmt, ver, cmp, append)
new writeData
(
comm_, // Note: comm not actually used anymore
typeName,
fName,
data,
recvSizes,
true, // have slave data (collected below)
fmt,
ver,
cmp,
append
)
);
writeData& fileAndData = fileAndDataPtr();
// Gather the slave data and insert into fileAndData
UList<char> slice(const_cast<char*>(data.data()), label(data.size()));
List<int> slaveOffsets;
decomposedBlockData::gatherSlaveData
(
Pstream::worldComm, // Note: using simulation thread
slice,
recvSizes,
1, // startProc,
Pstream::nProcs()-1, // n procs
slaveOffsets,
fileAndData.slaveData_
);
// Append to thread buffer
lockMutex(mutex_);
objects_.push(fileAndDataPtr.ptr());
unlockMutex(mutex_);
// Start thread if not running
lockMutex(mutex_);
if (!threadRunning_)
{
createThread(thread_, writeAll, this);
if (debug)
{
Pout<< "OFstreamCollator : Started write thread "
<< thread_ << endl;
}
threadRunning_ = true;
}
unlockMutex(mutex_);
return true;
}
else
{
if (debug)
{
Pout<< "OFstreamCollator : thread gather and write of " << fName
<< " in thread " << thread_
<< " using communicator " << comm_ << endl;
}
if (!UPstream::haveThreads)
{
FatalErrorInFunction
<< "mpi does not seem to have thread support."
<< "Please increase the buffer size 'maxThreadFileBufferSize'"
<< " to at least " << totalSize
<< " to be able to do the collating before threading."
<< exit(FatalError);
}
if (Pstream::master())
{
waitForBufferSpace(data.size());
}
lockMutex(mutex_);
// Push all file info on buffer. Note that no slave data provided
// so it will trigger communication inside the thread
objects_.push
(
new writeData
(
comm_,
typeName,
fName,
data,
recvSizes,
false, // Have no slave data; collect in thread
fmt,
ver,
cmp,
append
)
);
//pthread_mutex_unlock(&mutex_);
unlockMutex(mutex_);
//pthread_mutex_lock(&mutex_);
lockMutex(mutex_);
if (!threadRunning_)
{
@ -319,16 +476,10 @@ bool Foam::OFstreamCollator::write
}
threadRunning_ = true;
}
//pthread_mutex_unlock(&mutex_);
unlockMutex(mutex_);
return true;
}
else
{
// Immediate writing
return writeFile(comm_, typeName, fName, data, fmt, ver, cmp, append);
}
}

View File

@ -27,6 +27,22 @@ Class
Description
Threaded file writer.
Collects all data from all processors and writes as single
'decomposedBlockData' file. The operation is determined by the
buffer size (maxThreadFileBufferSize setting):
- local size of data is larger than buffer: receive and write processor
by processor (i.e. 'scheduled'). Does not use a thread, no file size
limit.
- total size of data is larger than buffer (but local is not):
thread does all the collecting and writing of the processors. No file
size limit.
- total size of data is less than buffer:
collecting is done locally; the thread only does the writing
(since the data has already been collected)
Operation determine
SourceFiles
OFstreamCollator.C
@ -36,7 +52,7 @@ SourceFiles
#define OFstreamCollator_H
#include "IOstream.H"
#include "List.H"
#include "labelList.H"
#include "FIFOStack.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -56,9 +72,15 @@ class OFstreamCollator
{
public:
const label comm_;
const word typeName_;
const fileName pathName_;
const string data_;
const labelList sizes_;
const bool haveSlaveData_;
List<char> slaveData_;
const IOstream::streamFormat format_;
const IOstream::versionNumber version_;
const IOstream::compressionType compression_;
@ -66,23 +88,36 @@ class OFstreamCollator
writeData
(
const label comm,
const word& typeName,
const fileName& pathName,
const string& data,
const labelList& sizes,
const bool haveSlaveData,
IOstream::streamFormat format,
IOstream::versionNumber version,
IOstream::compressionType compression,
const bool append
)
:
comm_(comm),
typeName_(typeName),
pathName_(pathName),
data_(data),
sizes_(sizes),
haveSlaveData_(haveSlaveData),
slaveData_(0),
format_(format),
version_(version),
compression_(compression),
append_(append)
{}
//- (approximate) size of master + any optional slave data
off_t size() const
{
return data_.size() + slaveData_.size();
}
};
@ -112,7 +147,10 @@ class OFstreamCollator
const label comm,
const word& typeName,
const fileName& fName,
const string& data,
const string& masterData,
const labelUList& recvSizes,
const bool haveSlaveData, // (does master) have slave data
const UList<char>& slaveData, // (on master) all slave data
IOstream::streamFormat fmt,
IOstream::versionNumber ver,
IOstream::compressionType cmp,
@ -122,6 +160,10 @@ class OFstreamCollator
//- Write all files in stack
static void* writeAll(void *threadarg);
//- Wait for total size of objects_ (master + optional slave data)
// to be wantedSize less than overall maxBufferSize.
void waitForBufferSpace(const off_t wantedSize) const;
public: