diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C index 7afbd2fb8..78bb27c06 100644 --- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C +++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C @@ -31,8 +31,10 @@ License #include "IFstream.H" #include "IStringStream.H" #include "dictionary.H" -#include #include "objectRegistry.H" +#include "SubList.H" +#include "labelPair.H" +#include "masterUncollatedFileOperation.H" // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // @@ -586,12 +588,114 @@ Foam::autoPtr Foam::decomposedBlockData::readBlocks } +void Foam::decomposedBlockData::gather +( + const label comm, + const label data, + labelList& datas +) +{ + const label nProcs = UPstream::nProcs(comm); + datas.setSize(nProcs); + + char* data0Ptr = reinterpret_cast(datas.begin()); + + labelList recvOffsets; + labelList recvSizes; + if (UPstream::master()) + { + recvOffsets.setSize(nProcs); + forAll(recvOffsets, proci) + { + recvOffsets[proci] = + reinterpret_cast(&datas[proci]) + - data0Ptr; + } + recvSizes.setSize(nProcs, sizeof(label)); + } + + UPstream::gather + ( + reinterpret_cast(&data), + sizeof(label), + data0Ptr, + recvSizes, + recvOffsets, + comm + ); +} + + +void Foam::decomposedBlockData::gatherSlaveData +( + const label comm, + const UList& data, + const labelUList& recvSizes, + + const label startProc, + const label nProcs, + + List& sliceOffsets, + List& recvData +) +{ + // Calculate master data + List sliceSizes; + if (UPstream::master(comm)) + { + const label numProcs = UPstream::nProcs(comm); + + sliceSizes.setSize(numProcs, 0); + sliceOffsets.setSize(numProcs+1, 0); + + int totalSize = 0; + label proci = startProc; + for (label i = 0; i < nProcs; i++) + { + sliceSizes[proci] = int(recvSizes[proci]); + sliceOffsets[proci] = totalSize; + totalSize += sliceSizes[proci]; + proci++; + } + sliceOffsets[proci] = totalSize; + recvData.setSize(totalSize); + } + + int nSend = 0; + if + ( + !UPstream::master(comm) + && (UPstream::myProcNo(comm) >= startProc) + && (UPstream::myProcNo(comm) < startProc+nProcs) + ) + { + nSend = data.byteSize(); + } + + UPstream::gather + ( + data.begin(), + nSend, + + recvData.begin(), + sliceSizes, + sliceOffsets, + comm + ); +} + + bool Foam::decomposedBlockData::writeBlocks ( const label comm, autoPtr& osPtr, List& start, const UList& data, + + const labelUList& recvSizes, + const bool haveSlaveData, + const List& slaveData, + const UPstream::commsTypes commsType, const bool syncReturnState ) @@ -601,20 +705,56 @@ bool Foam::decomposedBlockData::writeBlocks Pout<< "decomposedBlockData::writeBlocks:" << " stream:" << (osPtr.valid() ? osPtr().name() : "invalid") << " data:" << data.size() + << " haveSlaveData:" << haveSlaveData + << " (master only) slaveData:" << slaveData.size() << " commsType:" << Pstream::commsTypeNames[commsType] << endl; } + const label nProcs = UPstream::nProcs(comm); + + bool ok = true; - labelList recvSizes(Pstream::nProcs(comm)); - recvSizes[Pstream::myProcNo(comm)] = data.byteSize(); - Pstream::gatherList(recvSizes, Pstream::msgType(), comm); + if (haveSlaveData) + { + // Already have gathered the slave data. communicator only used to + // check who is the master - if (commsType == UPstream::commsTypes::scheduled) + if (UPstream::master(comm)) + { + OSstream& os = osPtr(); + + start.setSize(nProcs); + + // Write master data + { + os << nl << "// Processor" << UPstream::masterNo() << nl; + start[UPstream::masterNo()] = os.stdStream().tellp(); + os << data; + } + + // Write slaves + + label slaveOffset = 0; + + for (label proci = 1; proci < nProcs; proci++) + { + os << nl << nl << "// Processor" << proci << nl; + start[proci] = os.stdStream().tellp(); + + os << SubList(slaveData, recvSizes[proci], slaveOffset); + + slaveOffset += recvSizes[proci]; + } + + ok = os.good(); + } + } + else if (commsType == UPstream::commsTypes::scheduled) { if (UPstream::master(comm)) { - start.setSize(UPstream::nProcs(comm)); + start.setSize(nProcs); OSstream& os = osPtr(); @@ -626,7 +766,7 @@ bool Foam::decomposedBlockData::writeBlocks } // Write slaves List elems; - for (label proci = 1; proci < UPstream::nProcs(comm); proci++) + for (label proci = 1; proci < nProcs; proci++) { elems.setSize(recvSizes[proci]); IPstream::read @@ -661,101 +801,115 @@ bool Foam::decomposedBlockData::writeBlocks } else { - if (debug) + // Write master data + if (UPstream::master(comm)) { - struct timeval tv; - gettimeofday(&tv, nullptr); - Pout<< "Starting sending at:" - << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s" - << Foam::endl; - } - - - label startOfRequests = Pstream::nRequests(); - - if (!UPstream::master(comm)) - { - UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - UPstream::masterNo(), - data.begin(), - data.byteSize(), - Pstream::msgType(), - comm - ); - Pstream::waitRequests(startOfRequests); - } - else - { - List> recvBufs(Pstream::nProcs(comm)); - for (label proci = 1; proci < UPstream::nProcs(comm); proci++) - { - recvBufs[proci].setSize(recvSizes[proci]); - UIPstream::read - ( - UPstream::commsTypes::nonBlocking, - proci, - recvBufs[proci].begin(), - recvSizes[proci], - Pstream::msgType(), - comm - ); - } - - if (debug) - { - struct timeval tv; - gettimeofday(&tv, nullptr); - Pout<< "Starting master-only writing at:" - << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s" - << Foam::endl; - } - - start.setSize(UPstream::nProcs(comm)); + start.setSize(nProcs); OSstream& os = osPtr(); - // Write master data - { - os << nl << "// Processor" << UPstream::masterNo() << nl; - start[UPstream::masterNo()] = os.stdStream().tellp(); - os << data; - } + os << nl << "// Processor" << UPstream::masterNo() << nl; + start[UPstream::masterNo()] = os.stdStream().tellp(); + os << data; + } - if (debug) - { - struct timeval tv; - gettimeofday(&tv, nullptr); - Pout<< "Starting slave writing at:" - << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s" - << Foam::endl; - } - // Write slaves - for (label proci = 1; proci < UPstream::nProcs(comm); proci++) - { - os << nl << nl << "// Processor" << proci << nl; - start[proci] = os.stdStream().tellp(); + // Find out how many processor can be received into + // maxMasterFileBufferSize - if (Pstream::finishedRequest(startOfRequests+proci-1)) + // Starting slave processor and number of processors + labelPair startAndSize(1, nProcs-1); + + while (startAndSize[1] > 0) + { + labelPair masterData(startAndSize); + if (UPstream::master(comm)) + { + label totalSize = 0; + label proci = masterData[0]; + while + ( + proci < nProcs + && ( + totalSize+recvSizes[proci] + < fileOperations::masterUncollatedFileOperation:: + maxMasterFileBufferSize + ) + ) { - os << recvBufs[proci]; + totalSize += recvSizes[proci]; + proci++; + } + + masterData[1] = proci-masterData[0]; + } + + + // Scatter masterData + UPstream::scatter + ( + reinterpret_cast(masterData.cdata()), + List(nProcs, sizeof(masterData)), + List(nProcs, 0), + reinterpret_cast(startAndSize.data()), + sizeof(startAndSize), + comm + ); + + if (startAndSize[1] == 0) + { + break; + } + + + // Gather data from (a slice of) the slaves + List sliceOffsets; + List recvData; + gatherSlaveData + ( + comm, + data, + recvSizes, + + startAndSize[0], // startProc, + startAndSize[1], // nProcs, + + sliceOffsets, + recvData + ); + + if (UPstream::master(comm)) + { + OSstream& os = osPtr(); + + // Write slaves + for + ( + label proci = startAndSize[0]; + proci < startAndSize[0]+startAndSize[1]; + proci++ + ) + { + os << nl << nl << "// Processor" << proci << nl; + start[proci] = os.stdStream().tellp(); + + os << + SubList + ( + recvData, + sliceOffsets[proci+1]-sliceOffsets[proci], + sliceOffsets[proci] + ); } } - Pstream::resetRequests(startOfRequests); - - ok = os.good(); + startAndSize[0] += startAndSize[1]; + } + + if (UPstream::master(comm)) + { + ok = osPtr().good(); } - } - if (debug) - { - struct timeval tv; - gettimeofday(&tv, nullptr); - Pout<< "Finished master-only writing at:" - << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s" - << Foam::endl; } if (syncReturnState) @@ -868,8 +1022,23 @@ bool Foam::decomposedBlockData::writeObject osPtr.reset(new OFstream(objectPath(), IOstream::BINARY, ver, cmp)); IOobject::writeHeader(osPtr()); } + + labelList recvSizes; + gather(comm_, this->byteSize(), recvSizes); + List start; - return writeBlocks(comm_, osPtr, start, *this, commsType_); + List slaveData; // dummy already received slave data + return writeBlocks + ( + comm_, + osPtr, + start, + *this, + recvSizes, + false, // don't have slave data + slaveData, + commsType_ + ); } diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H index dc4e3d70f..1def38f9e 100644 --- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H +++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H @@ -169,6 +169,32 @@ public: const UPstream::commsTypes commsType ); + //- Helper: gather single label. Note: using native Pstream. + // datas sized with num procs but undefined contents on + // slaves + static void gather + ( + const label comm, + const label data, + labelList& datas + ); + + //- Helper: gather data from (subset of) slaves. Returns + // recvData : received data + // recvOffsets : offset in data. recvOffsets is nProcs+1 + static void gatherSlaveData + ( + const label comm, + const UList& data, + const labelUList& recvSizes, + + const label startProc, + const label nProcs, + + List& recvOffsets, + List& recvData + ); + //- Write *this. Ostream only valid on master. Returns starts of // processor blocks static bool writeBlocks @@ -177,6 +203,12 @@ public: autoPtr& osPtr, List& start, const UList&, + + const labelUList& recvSizes, + + const bool haveSlaveData, // does master have slaveData + const List& slaveData, // optional slave data (on master) + const UPstream::commsTypes, const bool syncReturnState = true ); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C index 9dad08b9a..754473745 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation \\/ M anipulation | ------------------------------------------------------------------------------- License @@ -55,11 +55,13 @@ const Foam::NamedEnum // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // -void Foam::UPstream::setParRun(const label nProcs) +void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads) { if (nProcs == 0) { parRun_ = false; + haveThreads_ = haveThreads; + freeCommunicator(UPstream::worldComm); label comm = allocateCommunicator(-1, labelList(1, label(0)), false); if (comm != UPstream::worldComm) @@ -76,6 +78,7 @@ void Foam::UPstream::setParRun(const label nProcs) else { parRun_ = true; + haveThreads_ = haveThreads; // Redo worldComm communicator (this has been created at static // initialisation time) @@ -401,6 +404,8 @@ Foam::label Foam::UPstream::procNo bool Foam::UPstream::parRun_(false); +bool Foam::UPstream::haveThreads_(false); + Foam::LIFOStack Foam::UPstream::freeComms_; Foam::DynamicList Foam::UPstream::myProcNo_(10); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index fab2dd7ff..6e846aa3e 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -184,6 +184,9 @@ private: //- By default this is not a parallel run static bool parRun_; + //- Have support for threads? + static bool haveThreads_; + //- Standard transfer message type static int msgType_; @@ -211,7 +214,7 @@ private: // Private Member Functions //- Set data for parallel running - static void setParRun(const label nProcs); + static void setParRun(const label nProcs, const bool haveThreads); //- Calculate linear communication schedule static List calcLinearComm(const label nProcs); @@ -397,6 +400,12 @@ public: return parRun_; } + //- Have support for threads + static bool haveThreads() + { + return haveThreads_; + } + //- Number of processes in parallel run static label nProcs(const label communicator = 0) { @@ -499,6 +508,47 @@ public: labelUList& recvData, const label communicator = 0 ); + + //- Exchange data with all processors (in the communicator) + // sendSizes, sendOffsets give (per processor) the slice of + // sendData to send, similarly recvSizes, recvOffsets give the slice + // of recvData to receive + static void allToAll + ( + const char* sendData, + const UList& sendSizes, + const UList& sendOffsets, + + char* recvData, + const UList& recvSizes, + const UList& recvOffsets, + + const label communicator = 0 + ); + + //- Receive data from all processors on the master + static void gather + ( + const char* sendData, + int sendSize, + + char* recvData, + const UList& recvSizes, + const UList& recvOffsets, + const label communicator = 0 + ); + + //- Send data to all processors from the root of the communicator + static void scatter + ( + const char* sendData, + const UList& sendSizes, + const UList& sendOffsets, + + char* recvData, + int recvSize, + const label communicator = 0 + ); }; diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C index b5c8ab494..597d1c514 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C @@ -25,11 +25,7 @@ License #include "OFstreamCollator.H" #include "OFstream.H" -#include "OSspecific.H" -#include "IOstreams.H" -#include "Pstream.H" #include "decomposedBlockData.H" -#include "PstreamReduceOps.H" // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // @@ -46,7 +42,10 @@ bool Foam::OFstreamCollator::writeFile const label comm, const word& typeName, const fileName& fName, - const string& s, + const string& masterData, + const labelUList& recvSizes, + const bool haveSlaveData, // does master have slaveData + const UList& slaveData, // on master: slave data IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp, @@ -55,7 +54,7 @@ bool Foam::OFstreamCollator::writeFile { if (debug) { - Pout<< "OFstreamCollator : Writing " << s.size() + Pout<< "OFstreamCollator : Writing " << masterData.size() << " bytes to " << fName << " using comm " << comm << endl; } @@ -90,11 +89,17 @@ bool Foam::OFstreamCollator::writeFile ); } - UList slice(const_cast(s.data()), label(s.size())); - // Assuming threaded writing hides any slowness so we might - // as well use scheduled communication to send the data to - // the master processor in order. + UList slice + ( + const_cast(masterData.data()), + label(masterData.size()) + ); + + // Assuming threaded writing hides any slowness so we + // can use scheduled communication to send the data to + // the master processor in order. However can be unstable + // for some mpi so default is non-blocking. List start; decomposedBlockData::writeBlocks @@ -103,7 +108,10 @@ bool Foam::OFstreamCollator::writeFile osPtr, start, slice, - UPstream::commsTypes::scheduled, + recvSizes, + haveSlaveData, + slaveData, + UPstream::commsTypes::nonBlocking, //scheduled, false // do not reduce return state ); @@ -115,8 +123,18 @@ bool Foam::OFstreamCollator::writeFile if (debug) { - Pout<< "OFstreamCollator : Finished writing " << s.size() - << " bytes to " << fName + Pout<< "OFstreamCollator : Finished writing " << masterData.size() + << " bytes"; + if (UPstream::master(comm)) + { + off_t sum = 0; + forAll(recvSizes, i) + { + sum += recvSizes[i]; + } + Pout<< " (overall " << sum << ")"; + } + Pout<< " to " << fName << " using comm " << comm << endl; } @@ -133,14 +151,11 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) { writeData* ptr = nullptr; - //pthread_mutex_lock(&handler.mutex_); lockMutex(handler.mutex_); - if (handler.objects_.size()) { ptr = handler.objects_.pop(); } - //pthread_mutex_unlock(&handler.mutex_); unlockMutex(handler.mutex_); if (!ptr) @@ -151,10 +166,14 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) { bool ok = writeFile ( - handler.comm_, + ptr->comm_, ptr->typeName_, ptr->pathName_, ptr->data_, + ptr->sizes_, + ptr->haveSlaveData_, + ptr->slaveData_, + ptr->format_, ptr->version_, ptr->compression_, @@ -177,22 +196,54 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) Pout<< "OFstreamCollator : Exiting write thread " << endl; } - //pthread_mutex_lock(&handler.mutex_); lockMutex(handler.mutex_); handler.threadRunning_ = false; - //pthread_mutex_unlock(&handler.mutex_); unlockMutex(handler.mutex_); return nullptr; } +void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const +{ + while (true) + { + // Count files to be written + off_t totalSize = 0; + + lockMutex(mutex_); + forAllConstIter(FIFOStack, objects_, iter) + { + totalSize += iter()->size(); + } + unlockMutex(mutex_); + + if (totalSize == 0 || (totalSize+wantedSize) <= maxBufferSize_) + { + break; + } + + if (debug) + { + lockMutex(mutex_); + Pout<< "OFstreamCollator : Waiting for buffer space." + << " Currently in use:" << totalSize + << " limit:" << maxBufferSize_ + << " files:" << objects_.size() + << endl; + unlockMutex(mutex_); + } + + sleep(5); + } +} + + // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize) : maxBufferSize_(maxBufferSize), - //mutex_(PTHREAD_MUTEX_INITIALIZER), mutex_ ( maxBufferSize_ > 0 @@ -228,7 +279,6 @@ Foam::OFstreamCollator::~OFstreamCollator() Pout<< "~OFstreamCollator : Waiting for write thread" << endl; } - //pthread_join(thread_, nullptr); joinThread(thread_); } if (thread_ != -1) @@ -259,56 +309,163 @@ bool Foam::OFstreamCollator::write const bool append ) { - if (maxBufferSize_ > 0) + // Determine (on master) sizes to receive. Note: do NOT use thread + // communicator + labelList recvSizes; + decomposedBlockData::gather(Pstream::worldComm, data.size(), recvSizes); + off_t totalSize = 0; + label maxLocalSize = 0; { - while (true) + for (label proci = 0; proci < recvSizes.size(); proci++) { - // Count files to be written - off_t totalSize = 0; - //pthread_mutex_lock(&mutex_); - lockMutex(mutex_); - forAllConstIter(FIFOStack, objects_, iter) - { - totalSize += iter()->data_.size(); - } - //pthread_mutex_unlock(&mutex_); - unlockMutex(mutex_); - - if - ( - totalSize == 0 - || (totalSize+off_t(data.size()) < maxBufferSize_) - ) - { - break; - } - - if (debug) - { - Pout<< "OFstreamCollator : Waiting for buffer space." - << " Currently in use:" << totalSize - << " limit:" << maxBufferSize_ - << endl; - } - - sleep(5); + totalSize += recvSizes[proci]; + maxLocalSize = max(maxLocalSize, recvSizes[proci]); } + Pstream::scatter(totalSize, Pstream::msgType(), Pstream::worldComm); + Pstream::scatter(maxLocalSize, Pstream::msgType(), Pstream::worldComm); + } + + if (maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_) + { + if (debug) + { + Pout<< "OFstreamCollator : non-thread gather and write of " << fName + << " using worldComm" << endl; + } + // Direct collating and writing (so master blocks until all written!) + const List dummySlaveData; + return writeFile + ( + UPstream::worldComm, + typeName, + fName, + data, + recvSizes, + false, // no slave data provided yet + dummySlaveData, + fmt, + ver, + cmp, + append + ); + } + else if (totalSize <= maxBufferSize_) + { + // Total size can be stored locally so receive all data now and only + // do the writing in the thread if (debug) { - Pout<< "OFstreamCollator : relaying write of " << fName - << " to thread " << endl; + Pout<< "OFstreamCollator : non-thread gather; thread write of " + << fName << endl; } - //pthread_mutex_lock(&mutex_); - lockMutex(mutex_); - objects_.push + + if (Pstream::master()) + { + waitForBufferSpace(totalSize); + } + + // Allocate local buffer for all collated data + autoPtr fileAndDataPtr ( - new writeData(typeName, fName, data, fmt, ver, cmp, append) + new writeData + ( + comm_, // Note: comm not actually used anymore + typeName, + fName, + data, + recvSizes, + true, // have slave data (collected below) + fmt, + ver, + cmp, + append + ) + ); + writeData& fileAndData = fileAndDataPtr(); + + // Gather the slave data and insert into fileAndData + UList slice(const_cast(data.data()), label(data.size())); + List slaveOffsets; + decomposedBlockData::gatherSlaveData + ( + Pstream::worldComm, // Note: using simulation thread + slice, + recvSizes, + + 1, // startProc, + Pstream::nProcs()-1, // n procs + + slaveOffsets, + fileAndData.slaveData_ + ); + + // Append to thread buffer + lockMutex(mutex_); + objects_.push(fileAndDataPtr.ptr()); + unlockMutex(mutex_); + + // Start thread if not running + lockMutex(mutex_); + if (!threadRunning_) + { + createThread(thread_, writeAll, this); + if (debug) + { + Pout<< "OFstreamCollator : Started write thread " + << thread_ << endl; + } + threadRunning_ = true; + } + unlockMutex(mutex_); + + return true; + } + else + { + if (debug) + { + Pout<< "OFstreamCollator : thread gather and write of " << fName + << " in thread " << thread_ + << " using communicator " << comm_ << endl; + } + + if (!UPstream::haveThreads) + { + FatalErrorInFunction + << "mpi does not seem to have thread support." + << "Please increase the buffer size 'maxThreadFileBufferSize'" + << " to at least " << totalSize + << " to be able to do the collating before threading." + << exit(FatalError); + } + + if (Pstream::master()) + { + waitForBufferSpace(data.size()); + } + + lockMutex(mutex_); + // Push all file info on buffer. Note that no slave data provided + // so it will trigger communication inside the thread + objects_.push + ( + new writeData + ( + comm_, + typeName, + fName, + data, + recvSizes, + false, // Have no slave data; collect in thread + fmt, + ver, + cmp, + append + ) ); - //pthread_mutex_unlock(&mutex_); unlockMutex(mutex_); - //pthread_mutex_lock(&mutex_); lockMutex(mutex_); if (!threadRunning_) { @@ -319,16 +476,10 @@ bool Foam::OFstreamCollator::write } threadRunning_ = true; } - //pthread_mutex_unlock(&mutex_); unlockMutex(mutex_); return true; } - else - { - // Immediate writing - return writeFile(comm_, typeName, fName, data, fmt, ver, cmp, append); - } } diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H index 17904fc38..bf4eeb571 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H @@ -27,6 +27,22 @@ Class Description Threaded file writer. + Collects all data from all processors and writes as single + 'decomposedBlockData' file. The operation is determined by the + buffer size (maxThreadFileBufferSize setting): + - local size of data is larger than buffer: receive and write processor + by processor (i.e. 'scheduled'). Does not use a thread, no file size + limit. + - total size of data is larger than buffer (but local is not): + thread does all the collecting and writing of the processors. No file + size limit. + - total size of data is less than buffer: + collecting is done locally; the thread only does the writing + (since the data has already been collected) + + +Operation determine + SourceFiles OFstreamCollator.C @@ -36,7 +52,7 @@ SourceFiles #define OFstreamCollator_H #include "IOstream.H" -#include "List.H" +#include "labelList.H" #include "FIFOStack.H" // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // @@ -56,9 +72,15 @@ class OFstreamCollator { public: + const label comm_; const word typeName_; const fileName pathName_; const string data_; + const labelList sizes_; + + const bool haveSlaveData_; + List slaveData_; + const IOstream::streamFormat format_; const IOstream::versionNumber version_; const IOstream::compressionType compression_; @@ -66,23 +88,36 @@ class OFstreamCollator writeData ( + const label comm, const word& typeName, const fileName& pathName, const string& data, + const labelList& sizes, + const bool haveSlaveData, IOstream::streamFormat format, IOstream::versionNumber version, IOstream::compressionType compression, const bool append ) : + comm_(comm), typeName_(typeName), pathName_(pathName), data_(data), + sizes_(sizes), + haveSlaveData_(haveSlaveData), + slaveData_(0), format_(format), version_(version), compression_(compression), append_(append) {} + + //- (approximate) size of master + any optional slave data + off_t size() const + { + return data_.size() + slaveData_.size(); + } }; @@ -112,7 +147,10 @@ class OFstreamCollator const label comm, const word& typeName, const fileName& fName, - const string& data, + const string& masterData, + const labelUList& recvSizes, + const bool haveSlaveData, // (does master) have slave data + const UList& slaveData, // (on master) all slave data IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp, @@ -122,6 +160,10 @@ class OFstreamCollator //- Write all files in stack static void* writeAll(void *threadarg); + //- Wait for total size of objects_ (master + optional slave data) + // to be wantedSize less than overall maxBufferSize. + void waitForBufferSpace(const off_t wantedSize) const; + public: