From 7a41a9c9c3dd78045d758601b5a10a541aabc5ef Mon Sep 17 00:00:00 2001 From: Henry Weller Date: Fri, 27 Oct 2017 17:13:43 +0100 Subject: [PATCH] collatedFileOperation: preferentially collect all data in the simulation thread so the write thread does not have to do any parallel communication. This avoids the bugs in the threading support in OpenMPI. Patch contributed by Mattijs Janssens Resolves bug-report https://bugs.openfoam.org/view.php?id=2669 --- .../decomposedBlockData/decomposedBlockData.C | 352 +++++++++++++----- .../decomposedBlockData/decomposedBlockData.H | 32 ++ src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C | 9 +- src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H | 55 ++- .../collatedFileOperation/OFstreamCollator.C | 283 ++++++++++---- .../collatedFileOperation/OFstreamCollator.H | 46 ++- src/Pstream/dummy/UPstream.C | 32 +- src/Pstream/mpi/UPstream.C | 206 +++++++++- 8 files changed, 838 insertions(+), 177 deletions(-) diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C index 21ad121dfb..95145381de 100644 --- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C +++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C @@ -31,8 +31,9 @@ License #include "StringStream.H" #include "dictionary.H" #include "objectRegistry.H" -#include "foamVersion.H" -#include +#include "SubList.H" +#include "labelPair.H" +#include "masterUncollatedFileOperation.H" // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // @@ -648,12 +649,114 @@ Foam::autoPtr Foam::decomposedBlockData::readBlocks } +void Foam::decomposedBlockData::gather +( + const label comm, + const label data, + labelList& datas +) +{ + const label nProcs = UPstream::nProcs(comm); + datas.setSize(nProcs); + + char* data0Ptr = reinterpret_cast(datas.begin()); + + labelList recvOffsets; + labelList recvSizes; + if (UPstream::master()) + { + recvOffsets.setSize(nProcs); + forAll(recvOffsets, proci) + { + recvOffsets[proci] = + reinterpret_cast(&datas[proci]) + - data0Ptr; + } + recvSizes.setSize(nProcs, sizeof(label)); + } + + UPstream::gather + ( + reinterpret_cast(&data), + sizeof(label), + data0Ptr, + recvSizes, + recvOffsets, + comm + ); +} + + +void Foam::decomposedBlockData::gatherSlaveData +( + const label comm, + const UList& data, + const labelUList& recvSizes, + + const label startProc, + const label nProcs, + + List& sliceOffsets, + List& recvData +) +{ + // Calculate master data + List sliceSizes; + if (UPstream::master(comm)) + { + const label numProcs = UPstream::nProcs(comm); + + sliceSizes.setSize(numProcs, 0); + sliceOffsets.setSize(numProcs+1, 0); + + int totalSize = 0; + label proci = startProc; + for (label i = 0; i < nProcs; i++) + { + sliceSizes[proci] = int(recvSizes[proci]); + sliceOffsets[proci] = totalSize; + totalSize += sliceSizes[proci]; + proci++; + } + sliceOffsets[proci] = totalSize; + recvData.setSize(totalSize); + } + + int nSend = 0; + if + ( + !UPstream::master(comm) + && (UPstream::myProcNo(comm) >= startProc) + && (UPstream::myProcNo(comm) < startProc+nProcs) + ) + { + nSend = data.byteSize(); + } + + UPstream::gather + ( + data.begin(), + nSend, + + recvData.begin(), + sliceSizes, + sliceOffsets, + comm + ); +} + + bool Foam::decomposedBlockData::writeBlocks ( const label comm, autoPtr& osPtr, List& start, const UList& data, + + const labelUList& recvSizes, + const bool haveSlaveData, + const List& slaveData, + const UPstream::commsTypes commsType, const bool syncReturnState ) @@ -663,20 +766,56 @@ bool Foam::decomposedBlockData::writeBlocks Pout<< "decomposedBlockData::writeBlocks:" << " stream:" << (osPtr.valid() ? osPtr().name() : "invalid") << " data:" << data.size() + << " haveSlaveData:" << haveSlaveData + << " (master only) slaveData:" << slaveData.size() << " commsType:" << Pstream::commsTypeNames[commsType] << endl; } + const label nProcs = UPstream::nProcs(comm); + + bool ok = true; - labelList recvSizes(Pstream::nProcs(comm)); - recvSizes[Pstream::myProcNo(comm)] = data.byteSize(); - Pstream::gatherList(recvSizes, Pstream::msgType(), comm); + if (haveSlaveData) + { + // Already have gathered the slave data. communicator only used to + // check who is the master - if (commsType == UPstream::commsTypes::scheduled) + if (UPstream::master(comm)) + { + OSstream& os = osPtr(); + + start.setSize(nProcs); + + // Write master data + { + os << nl << "// Processor" << UPstream::masterNo() << nl; + start[UPstream::masterNo()] = os.stdStream().tellp(); + os << data; + } + + // Write slaves + + label slaveOffset = 0; + + for (label proci = 1; proci < nProcs; proci++) + { + os << nl << nl << "// Processor" << proci << nl; + start[proci] = os.stdStream().tellp(); + + os << SubList(slaveData, recvSizes[proci], slaveOffset); + + slaveOffset += recvSizes[proci]; + } + + ok = os.good(); + } + } + else if (commsType == UPstream::commsTypes::scheduled) { if (UPstream::master(comm)) { - start.setSize(UPstream::nProcs(comm)); + start.setSize(nProcs); OSstream& os = osPtr(); @@ -688,7 +827,7 @@ bool Foam::decomposedBlockData::writeBlocks } // Write slaves List elems; - for (label proci = 1; proci < UPstream::nProcs(comm); proci++) + for (label proci = 1; proci < nProcs; proci++) { elems.setSize(recvSizes[proci]); IPstream::read @@ -723,101 +862,115 @@ bool Foam::decomposedBlockData::writeBlocks } else { - if (debug) + // Write master data + if (UPstream::master(comm)) { - struct timeval tv; - gettimeofday(&tv, nullptr); - Pout<< "Starting sending at:" - << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s" - << Foam::endl; - } - - - label startOfRequests = Pstream::nRequests(); - - if (!UPstream::master(comm)) - { - UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - UPstream::masterNo(), - data.begin(), - data.byteSize(), - Pstream::msgType(), - comm - ); - Pstream::waitRequests(startOfRequests); - } - else - { - List> recvBufs(Pstream::nProcs(comm)); - for (label proci = 1; proci < UPstream::nProcs(comm); proci++) - { - recvBufs[proci].setSize(recvSizes[proci]); - UIPstream::read - ( - UPstream::commsTypes::nonBlocking, - proci, - recvBufs[proci].begin(), - recvSizes[proci], - Pstream::msgType(), - comm - ); - } - - if (debug) - { - struct timeval tv; - gettimeofday(&tv, nullptr); - Pout<< "Starting master-only writing at:" - << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s" - << Foam::endl; - } - - start.setSize(UPstream::nProcs(comm)); + start.setSize(nProcs); OSstream& os = osPtr(); - // Write master data - { - os << nl << "// Processor" << UPstream::masterNo() << nl; - start[UPstream::masterNo()] = os.stdStream().tellp(); - os << data; - } + os << nl << "// Processor" << UPstream::masterNo() << nl; + start[UPstream::masterNo()] = os.stdStream().tellp(); + os << data; + } - if (debug) - { - struct timeval tv; - gettimeofday(&tv, nullptr); - Pout<< "Starting slave writing at:" - << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s" - << Foam::endl; - } - // Write slaves - for (label proci = 1; proci < UPstream::nProcs(comm); proci++) - { - os << nl << nl << "// Processor" << proci << nl; - start[proci] = os.stdStream().tellp(); + // Find out how many processor can be received into + // maxMasterFileBufferSize - if (Pstream::finishedRequest(startOfRequests+proci-1)) + // Starting slave processor and number of processors + labelPair startAndSize(1, nProcs-1); + + while (startAndSize[1] > 0) + { + labelPair masterData(startAndSize); + if (UPstream::master(comm)) + { + label totalSize = 0; + label proci = masterData[0]; + while + ( + proci < nProcs + && ( + totalSize+recvSizes[proci] + < fileOperations::masterUncollatedFileOperation:: + maxMasterFileBufferSize + ) + ) { - os << recvBufs[proci]; + totalSize += recvSizes[proci]; + proci++; + } + + masterData[1] = proci-masterData[0]; + } + + + // Scatter masterData + UPstream::scatter + ( + reinterpret_cast(masterData.cdata()), + List(nProcs, sizeof(masterData)), + List(nProcs, 0), + reinterpret_cast(startAndSize.data()), + sizeof(startAndSize), + comm + ); + + if (startAndSize[1] == 0) + { + break; + } + + + // Gather data from (a slice of) the slaves + List sliceOffsets; + List recvData; + gatherSlaveData + ( + comm, + data, + recvSizes, + + startAndSize[0], // startProc, + startAndSize[1], // nProcs, + + sliceOffsets, + recvData + ); + + if (UPstream::master(comm)) + { + OSstream& os = osPtr(); + + // Write slaves + for + ( + label proci = startAndSize[0]; + proci < startAndSize[0]+startAndSize[1]; + proci++ + ) + { + os << nl << nl << "// Processor" << proci << nl; + start[proci] = os.stdStream().tellp(); + + os << + SubList + ( + recvData, + sliceOffsets[proci+1]-sliceOffsets[proci], + sliceOffsets[proci] + ); } } - Pstream::resetRequests(startOfRequests); - - ok = os.good(); + startAndSize[0] += startAndSize[1]; + } + + if (UPstream::master(comm)) + { + ok = osPtr().good(); } - } - if (debug) - { - struct timeval tv; - gettimeofday(&tv, nullptr); - Pout<< "Finished master-only writing at:" - << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s" - << Foam::endl; } if (syncReturnState) @@ -936,8 +1089,23 @@ bool Foam::decomposedBlockData::writeObject osPtr.reset(new OFstream(objectPath(), IOstream::BINARY, ver, cmp)); IOobject::writeHeader(osPtr()); } + + labelList recvSizes; + gather(comm_, this->byteSize(), recvSizes); + List start; - return writeBlocks(comm_, osPtr, start, *this, commsType_); + List slaveData; // dummy already received slave data + return writeBlocks + ( + comm_, + osPtr, + start, + *this, + recvSizes, + false, // don't have slave data + slaveData, + commsType_ + ); } diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H index dc4e3d70f2..1def38f9eb 100644 --- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H +++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H @@ -169,6 +169,32 @@ public: const UPstream::commsTypes commsType ); + //- Helper: gather single label. Note: using native Pstream. + // datas sized with num procs but undefined contents on + // slaves + static void gather + ( + const label comm, + const label data, + labelList& datas + ); + + //- Helper: gather data from (subset of) slaves. Returns + // recvData : received data + // recvOffsets : offset in data. recvOffsets is nProcs+1 + static void gatherSlaveData + ( + const label comm, + const UList& data, + const labelUList& recvSizes, + + const label startProc, + const label nProcs, + + List& recvOffsets, + List& recvData + ); + //- Write *this. Ostream only valid on master. Returns starts of // processor blocks static bool writeBlocks @@ -177,6 +203,12 @@ public: autoPtr& osPtr, List& start, const UList&, + + const labelUList& recvSizes, + + const bool haveSlaveData, // does master have slaveData + const List& slaveData, // optional slave data (on master) + const UPstream::commsTypes, const bool syncReturnState = true ); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C index 4c72c28cc2..383fe03319 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation \\/ M anipulation | Copyright (C) 2015-2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License @@ -50,11 +50,13 @@ Foam::UPstream::commsTypeNames // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // -void Foam::UPstream::setParRun(const label nProcs) +void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads) { if (nProcs == 0) { parRun_ = false; + haveThreads_ = haveThreads; + freeCommunicator(UPstream::worldComm); label comm = allocateCommunicator(-1, labelList(1, label(0)), false); if (comm != UPstream::worldComm) @@ -71,6 +73,7 @@ void Foam::UPstream::setParRun(const label nProcs) else { parRun_ = true; + haveThreads_ = haveThreads; // Redo worldComm communicator (this has been created at static // initialisation time) @@ -354,6 +357,8 @@ Foam::UList::operator[](const label procID) const bool Foam::UPstream::parRun_(false); +bool Foam::UPstream::haveThreads_(false); + Foam::LIFOStack Foam::UPstream::freeComms_; Foam::DynamicList Foam::UPstream::myProcNo_(10); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index 4993ed9d12..ea72f84912 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -185,6 +185,9 @@ private: //- By default this is not a parallel run static bool parRun_; + //- Have support for threads? + static bool haveThreads_; + //- Standard transfer message type static int msgType_; @@ -211,6 +214,9 @@ private: // Private Member Functions + //- Set data for parallel running + static void setParRun(const label nProcs, const bool haveThreads); + //- Calculate linear communication schedule static List calcLinearComm(const label nProcs); @@ -411,9 +417,11 @@ public: return parRun_; } - //- Set data for parallel running. Special case nProcs=0 to switch off - //- parallel running - static void setParRun(const label nProcs); + //- Have support for threads + static bool haveThreads() + { + return haveThreads_; + } //- Number of processes in parallel run static label nProcs(const label communicator = 0) @@ -517,6 +525,47 @@ public: labelUList& recvData, const label communicator = 0 ); + + //- Exchange data with all processors (in the communicator) + // sendSizes, sendOffsets give (per processor) the slice of + // sendData to send, similarly recvSizes, recvOffsets give the slice + // of recvData to receive + static void allToAll + ( + const char* sendData, + const UList& sendSizes, + const UList& sendOffsets, + + char* recvData, + const UList& recvSizes, + const UList& recvOffsets, + + const label communicator = 0 + ); + + //- Receive data from all processors on the master + static void gather + ( + const char* sendData, + int sendSize, + + char* recvData, + const UList& recvSizes, + const UList& recvOffsets, + const label communicator = 0 + ); + + //- Send data to all processors from the root of the communicator + static void scatter + ( + const char* sendData, + const UList& sendSizes, + const UList& sendOffsets, + + char* recvData, + int recvSize, + const label communicator = 0 + ); }; diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C index b5c8ab4942..597d1c5140 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C @@ -25,11 +25,7 @@ License #include "OFstreamCollator.H" #include "OFstream.H" -#include "OSspecific.H" -#include "IOstreams.H" -#include "Pstream.H" #include "decomposedBlockData.H" -#include "PstreamReduceOps.H" // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // @@ -46,7 +42,10 @@ bool Foam::OFstreamCollator::writeFile const label comm, const word& typeName, const fileName& fName, - const string& s, + const string& masterData, + const labelUList& recvSizes, + const bool haveSlaveData, // does master have slaveData + const UList& slaveData, // on master: slave data IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp, @@ -55,7 +54,7 @@ bool Foam::OFstreamCollator::writeFile { if (debug) { - Pout<< "OFstreamCollator : Writing " << s.size() + Pout<< "OFstreamCollator : Writing " << masterData.size() << " bytes to " << fName << " using comm " << comm << endl; } @@ -90,11 +89,17 @@ bool Foam::OFstreamCollator::writeFile ); } - UList slice(const_cast(s.data()), label(s.size())); - // Assuming threaded writing hides any slowness so we might - // as well use scheduled communication to send the data to - // the master processor in order. + UList slice + ( + const_cast(masterData.data()), + label(masterData.size()) + ); + + // Assuming threaded writing hides any slowness so we + // can use scheduled communication to send the data to + // the master processor in order. However can be unstable + // for some mpi so default is non-blocking. List start; decomposedBlockData::writeBlocks @@ -103,7 +108,10 @@ bool Foam::OFstreamCollator::writeFile osPtr, start, slice, - UPstream::commsTypes::scheduled, + recvSizes, + haveSlaveData, + slaveData, + UPstream::commsTypes::nonBlocking, //scheduled, false // do not reduce return state ); @@ -115,8 +123,18 @@ bool Foam::OFstreamCollator::writeFile if (debug) { - Pout<< "OFstreamCollator : Finished writing " << s.size() - << " bytes to " << fName + Pout<< "OFstreamCollator : Finished writing " << masterData.size() + << " bytes"; + if (UPstream::master(comm)) + { + off_t sum = 0; + forAll(recvSizes, i) + { + sum += recvSizes[i]; + } + Pout<< " (overall " << sum << ")"; + } + Pout<< " to " << fName << " using comm " << comm << endl; } @@ -133,14 +151,11 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) { writeData* ptr = nullptr; - //pthread_mutex_lock(&handler.mutex_); lockMutex(handler.mutex_); - if (handler.objects_.size()) { ptr = handler.objects_.pop(); } - //pthread_mutex_unlock(&handler.mutex_); unlockMutex(handler.mutex_); if (!ptr) @@ -151,10 +166,14 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) { bool ok = writeFile ( - handler.comm_, + ptr->comm_, ptr->typeName_, ptr->pathName_, ptr->data_, + ptr->sizes_, + ptr->haveSlaveData_, + ptr->slaveData_, + ptr->format_, ptr->version_, ptr->compression_, @@ -177,22 +196,54 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) Pout<< "OFstreamCollator : Exiting write thread " << endl; } - //pthread_mutex_lock(&handler.mutex_); lockMutex(handler.mutex_); handler.threadRunning_ = false; - //pthread_mutex_unlock(&handler.mutex_); unlockMutex(handler.mutex_); return nullptr; } +void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const +{ + while (true) + { + // Count files to be written + off_t totalSize = 0; + + lockMutex(mutex_); + forAllConstIter(FIFOStack, objects_, iter) + { + totalSize += iter()->size(); + } + unlockMutex(mutex_); + + if (totalSize == 0 || (totalSize+wantedSize) <= maxBufferSize_) + { + break; + } + + if (debug) + { + lockMutex(mutex_); + Pout<< "OFstreamCollator : Waiting for buffer space." + << " Currently in use:" << totalSize + << " limit:" << maxBufferSize_ + << " files:" << objects_.size() + << endl; + unlockMutex(mutex_); + } + + sleep(5); + } +} + + // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize) : maxBufferSize_(maxBufferSize), - //mutex_(PTHREAD_MUTEX_INITIALIZER), mutex_ ( maxBufferSize_ > 0 @@ -228,7 +279,6 @@ Foam::OFstreamCollator::~OFstreamCollator() Pout<< "~OFstreamCollator : Waiting for write thread" << endl; } - //pthread_join(thread_, nullptr); joinThread(thread_); } if (thread_ != -1) @@ -259,56 +309,163 @@ bool Foam::OFstreamCollator::write const bool append ) { - if (maxBufferSize_ > 0) + // Determine (on master) sizes to receive. Note: do NOT use thread + // communicator + labelList recvSizes; + decomposedBlockData::gather(Pstream::worldComm, data.size(), recvSizes); + off_t totalSize = 0; + label maxLocalSize = 0; { - while (true) + for (label proci = 0; proci < recvSizes.size(); proci++) { - // Count files to be written - off_t totalSize = 0; - //pthread_mutex_lock(&mutex_); - lockMutex(mutex_); - forAllConstIter(FIFOStack, objects_, iter) - { - totalSize += iter()->data_.size(); - } - //pthread_mutex_unlock(&mutex_); - unlockMutex(mutex_); - - if - ( - totalSize == 0 - || (totalSize+off_t(data.size()) < maxBufferSize_) - ) - { - break; - } - - if (debug) - { - Pout<< "OFstreamCollator : Waiting for buffer space." - << " Currently in use:" << totalSize - << " limit:" << maxBufferSize_ - << endl; - } - - sleep(5); + totalSize += recvSizes[proci]; + maxLocalSize = max(maxLocalSize, recvSizes[proci]); } + Pstream::scatter(totalSize, Pstream::msgType(), Pstream::worldComm); + Pstream::scatter(maxLocalSize, Pstream::msgType(), Pstream::worldComm); + } + + if (maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_) + { + if (debug) + { + Pout<< "OFstreamCollator : non-thread gather and write of " << fName + << " using worldComm" << endl; + } + // Direct collating and writing (so master blocks until all written!) + const List dummySlaveData; + return writeFile + ( + UPstream::worldComm, + typeName, + fName, + data, + recvSizes, + false, // no slave data provided yet + dummySlaveData, + fmt, + ver, + cmp, + append + ); + } + else if (totalSize <= maxBufferSize_) + { + // Total size can be stored locally so receive all data now and only + // do the writing in the thread if (debug) { - Pout<< "OFstreamCollator : relaying write of " << fName - << " to thread " << endl; + Pout<< "OFstreamCollator : non-thread gather; thread write of " + << fName << endl; } - //pthread_mutex_lock(&mutex_); - lockMutex(mutex_); - objects_.push + + if (Pstream::master()) + { + waitForBufferSpace(totalSize); + } + + // Allocate local buffer for all collated data + autoPtr fileAndDataPtr ( - new writeData(typeName, fName, data, fmt, ver, cmp, append) + new writeData + ( + comm_, // Note: comm not actually used anymore + typeName, + fName, + data, + recvSizes, + true, // have slave data (collected below) + fmt, + ver, + cmp, + append + ) + ); + writeData& fileAndData = fileAndDataPtr(); + + // Gather the slave data and insert into fileAndData + UList slice(const_cast(data.data()), label(data.size())); + List slaveOffsets; + decomposedBlockData::gatherSlaveData + ( + Pstream::worldComm, // Note: using simulation thread + slice, + recvSizes, + + 1, // startProc, + Pstream::nProcs()-1, // n procs + + slaveOffsets, + fileAndData.slaveData_ + ); + + // Append to thread buffer + lockMutex(mutex_); + objects_.push(fileAndDataPtr.ptr()); + unlockMutex(mutex_); + + // Start thread if not running + lockMutex(mutex_); + if (!threadRunning_) + { + createThread(thread_, writeAll, this); + if (debug) + { + Pout<< "OFstreamCollator : Started write thread " + << thread_ << endl; + } + threadRunning_ = true; + } + unlockMutex(mutex_); + + return true; + } + else + { + if (debug) + { + Pout<< "OFstreamCollator : thread gather and write of " << fName + << " in thread " << thread_ + << " using communicator " << comm_ << endl; + } + + if (!UPstream::haveThreads) + { + FatalErrorInFunction + << "mpi does not seem to have thread support." + << "Please increase the buffer size 'maxThreadFileBufferSize'" + << " to at least " << totalSize + << " to be able to do the collating before threading." + << exit(FatalError); + } + + if (Pstream::master()) + { + waitForBufferSpace(data.size()); + } + + lockMutex(mutex_); + // Push all file info on buffer. Note that no slave data provided + // so it will trigger communication inside the thread + objects_.push + ( + new writeData + ( + comm_, + typeName, + fName, + data, + recvSizes, + false, // Have no slave data; collect in thread + fmt, + ver, + cmp, + append + ) ); - //pthread_mutex_unlock(&mutex_); unlockMutex(mutex_); - //pthread_mutex_lock(&mutex_); lockMutex(mutex_); if (!threadRunning_) { @@ -319,16 +476,10 @@ bool Foam::OFstreamCollator::write } threadRunning_ = true; } - //pthread_mutex_unlock(&mutex_); unlockMutex(mutex_); return true; } - else - { - // Immediate writing - return writeFile(comm_, typeName, fName, data, fmt, ver, cmp, append); - } } diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H index 17904fc386..bf4eeb5713 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H @@ -27,6 +27,22 @@ Class Description Threaded file writer. + Collects all data from all processors and writes as single + 'decomposedBlockData' file. The operation is determined by the + buffer size (maxThreadFileBufferSize setting): + - local size of data is larger than buffer: receive and write processor + by processor (i.e. 'scheduled'). Does not use a thread, no file size + limit. + - total size of data is larger than buffer (but local is not): + thread does all the collecting and writing of the processors. No file + size limit. + - total size of data is less than buffer: + collecting is done locally; the thread only does the writing + (since the data has already been collected) + + +Operation determine + SourceFiles OFstreamCollator.C @@ -36,7 +52,7 @@ SourceFiles #define OFstreamCollator_H #include "IOstream.H" -#include "List.H" +#include "labelList.H" #include "FIFOStack.H" // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // @@ -56,9 +72,15 @@ class OFstreamCollator { public: + const label comm_; const word typeName_; const fileName pathName_; const string data_; + const labelList sizes_; + + const bool haveSlaveData_; + List slaveData_; + const IOstream::streamFormat format_; const IOstream::versionNumber version_; const IOstream::compressionType compression_; @@ -66,23 +88,36 @@ class OFstreamCollator writeData ( + const label comm, const word& typeName, const fileName& pathName, const string& data, + const labelList& sizes, + const bool haveSlaveData, IOstream::streamFormat format, IOstream::versionNumber version, IOstream::compressionType compression, const bool append ) : + comm_(comm), typeName_(typeName), pathName_(pathName), data_(data), + sizes_(sizes), + haveSlaveData_(haveSlaveData), + slaveData_(0), format_(format), version_(version), compression_(compression), append_(append) {} + + //- (approximate) size of master + any optional slave data + off_t size() const + { + return data_.size() + slaveData_.size(); + } }; @@ -112,7 +147,10 @@ class OFstreamCollator const label comm, const word& typeName, const fileName& fName, - const string& data, + const string& masterData, + const labelUList& recvSizes, + const bool haveSlaveData, // (does master) have slave data + const UList& slaveData, // (on master) all slave data IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp, @@ -122,6 +160,10 @@ class OFstreamCollator //- Write all files in stack static void* writeAll(void *threadarg); + //- Wait for total size of objects_ (master + optional slave data) + // to be wantedSize less than overall maxBufferSize. + void waitForBufferSpace(const off_t wantedSize) const; + public: diff --git a/src/Pstream/dummy/UPstream.C b/src/Pstream/dummy/UPstream.C index 832c6afec4..797f390179 100644 --- a/src/Pstream/dummy/UPstream.C +++ b/src/Pstream/dummy/UPstream.C @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License @@ -104,6 +104,36 @@ void Foam::UPstream::allToAll } +void Foam::UPstream::gather +( + const char* sendData, + int sendSize, + + char* recvData, + const UList& recvSizes, + const UList& recvOffsets, + const label communicator +) +{ + memmove(recvData, sendData, sendSize); +} + + +void Foam::UPstream::scatter +( + const char* sendData, + const UList& sendSizes, + const UList& sendOffsets, + + char* recvData, + int recvSize, + const label communicator +) +{ + memmove(recvData, sendData, recvSize); +} + + void Foam::UPstream::allocatePstreamCommunicator ( const label, diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C index 004bbc74a4..321ae90c4a 100644 --- a/src/Pstream/mpi/UPstream.C +++ b/src/Pstream/mpi/UPstream.C @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License @@ -157,16 +157,7 @@ bool Foam::UPstream::init(int& argc, char**& argv) // Initialise parallel structure - setParRun(numprocs); - - if (Pstream::master() && provided_thread_support != MPI_THREAD_MULTIPLE) - { - WarningInFunction - << "mpi does not seem to have thread support." - << " There might be issues with e.g. threaded IO" - << endl; - } - + setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE); #ifndef SGIMPI { @@ -448,6 +439,199 @@ void Foam::UPstream::allToAll } +void Foam::UPstream::allToAll +( + const char* sendData, + const UList& sendSizes, + const UList& sendOffsets, + + char* recvData, + const UList& recvSizes, + const UList& recvOffsets, + + const label communicator +) +{ + label np = nProcs(communicator); + + if + ( + sendSizes.size() != np + || sendOffsets.size() != np + || recvSizes.size() != np + || recvOffsets.size() != np + ) + { + FatalErrorInFunction + << "Size of sendSize " << sendSizes.size() + << ", sendOffsets " << sendOffsets.size() + << ", recvSizes " << recvSizes.size() + << " or recvOffsets " << recvOffsets.size() + << " is not equal to the number of processors in the domain " + << np + << Foam::abort(FatalError); + } + + if (!UPstream::parRun()) + { + if (recvSizes[0] != sendSizes[0]) + { + FatalErrorInFunction + << "Bytes to send " << sendSizes[0] + << " does not equal bytes to receive " << recvSizes[0] + << Foam::abort(FatalError); + } + memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]); + } + else + { + if + ( + MPI_Alltoallv + ( + sendData, + sendSizes.begin(), + sendOffsets.begin(), + MPI_BYTE, + recvData, + recvSizes.begin(), + recvOffsets.begin(), + MPI_BYTE, + PstreamGlobals::MPICommunicators_[communicator] + ) + ) + { + FatalErrorInFunction + << "MPI_Alltoallv failed for sendSizes " << sendSizes + << " recvSizes " << recvSizes + << " communicator " << communicator + << Foam::abort(FatalError); + } + } +} + + +void Foam::UPstream::gather +( + const char* sendData, + int sendSize, + + char* recvData, + const UList& recvSizes, + const UList& recvOffsets, + const label communicator +) +{ + label np = nProcs(communicator); + + if + ( + UPstream::master(communicator) + && (recvSizes.size() != np || recvOffsets.size() < np) + ) + { + // Note: allow recvOffsets to be e.g. 1 larger than np so we + // can easily loop over the result + + FatalErrorInFunction + << "Size of recvSizes " << recvSizes.size() + << " or recvOffsets " << recvOffsets.size() + << " is not equal to the number of processors in the domain " + << np + << Foam::abort(FatalError); + } + + if (!UPstream::parRun()) + { + memmove(recvData, sendData, sendSize); + } + else + { + if + ( + MPI_Gatherv + ( + sendData, + sendSize, + MPI_BYTE, + recvData, + recvSizes.begin(), + recvOffsets.begin(), + MPI_BYTE, + 0, + MPI_Comm(PstreamGlobals::MPICommunicators_[communicator]) + ) + ) + { + FatalErrorInFunction + << "MPI_Gatherv failed for sendSize " << sendSize + << " recvSizes " << recvSizes + << " communicator " << communicator + << Foam::abort(FatalError); + } + } +} + + +void Foam::UPstream::scatter +( + const char* sendData, + const UList& sendSizes, + const UList& sendOffsets, + + char* recvData, + int recvSize, + const label communicator +) +{ + label np = nProcs(communicator); + + if + ( + UPstream::master(communicator) + && (sendSizes.size() != np || sendOffsets.size() != np) + ) + { + FatalErrorInFunction + << "Size of sendSizes " << sendSizes.size() + << " or sendOffsets " << sendOffsets.size() + << " is not equal to the number of processors in the domain " + << np + << Foam::abort(FatalError); + } + + if (!UPstream::parRun()) + { + memmove(recvData, sendData, recvSize); + } + else + { + if + ( + MPI_Scatterv + ( + sendData, + sendSizes.begin(), + sendOffsets.begin(), + MPI_BYTE, + recvData, + recvSize, + MPI_BYTE, + 0, + MPI_Comm(PstreamGlobals::MPICommunicators_[communicator]) + ) + ) + { + FatalErrorInFunction + << "MPI_Scatterv failed for sendSizes " << sendSizes + << " sendOffsets " << sendOffsets + << " communicator " << communicator + << Foam::abort(FatalError); + } + } +} + + void Foam::UPstream::allocatePstreamCommunicator ( const label parentIndex,