diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C index 21ad121dfb..95145381de 100644 --- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C +++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C @@ -31,8 +31,9 @@ License #include "StringStream.H" #include "dictionary.H" #include "objectRegistry.H" -#include "foamVersion.H" -#include +#include "SubList.H" +#include "labelPair.H" +#include "masterUncollatedFileOperation.H" // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // @@ -648,12 +649,114 @@ Foam::autoPtr Foam::decomposedBlockData::readBlocks } +void Foam::decomposedBlockData::gather +( + const label comm, + const label data, + labelList& datas +) +{ + const label nProcs = UPstream::nProcs(comm); + datas.setSize(nProcs); + + char* data0Ptr = reinterpret_cast(datas.begin()); + + labelList recvOffsets; + labelList recvSizes; + if (UPstream::master()) + { + recvOffsets.setSize(nProcs); + forAll(recvOffsets, proci) + { + recvOffsets[proci] = + reinterpret_cast(&datas[proci]) + - data0Ptr; + } + recvSizes.setSize(nProcs, sizeof(label)); + } + + UPstream::gather + ( + reinterpret_cast(&data), + sizeof(label), + data0Ptr, + recvSizes, + recvOffsets, + comm + ); +} + + +void Foam::decomposedBlockData::gatherSlaveData +( + const label comm, + const UList& data, + const labelUList& recvSizes, + + const label startProc, + const label nProcs, + + List& sliceOffsets, + List& recvData +) +{ + // Calculate master data + List sliceSizes; + if (UPstream::master(comm)) + { + const label numProcs = UPstream::nProcs(comm); + + sliceSizes.setSize(numProcs, 0); + sliceOffsets.setSize(numProcs+1, 0); + + int totalSize = 0; + label proci = startProc; + for (label i = 0; i < nProcs; i++) + { + sliceSizes[proci] = int(recvSizes[proci]); + sliceOffsets[proci] = totalSize; + totalSize += sliceSizes[proci]; + proci++; + } + sliceOffsets[proci] = totalSize; + recvData.setSize(totalSize); + } + + int nSend = 0; + if + ( + !UPstream::master(comm) + && (UPstream::myProcNo(comm) >= startProc) + && (UPstream::myProcNo(comm) < startProc+nProcs) + ) + { + nSend = data.byteSize(); + } + + UPstream::gather + ( + data.begin(), + nSend, + + recvData.begin(), + sliceSizes, + sliceOffsets, + comm + ); +} + + bool Foam::decomposedBlockData::writeBlocks ( const label comm, autoPtr& osPtr, List& start, const UList& data, + + const labelUList& recvSizes, + const bool haveSlaveData, + const List& slaveData, + const UPstream::commsTypes commsType, const bool syncReturnState ) @@ -663,20 +766,56 @@ bool Foam::decomposedBlockData::writeBlocks Pout<< "decomposedBlockData::writeBlocks:" << " stream:" << (osPtr.valid() ? osPtr().name() : "invalid") << " data:" << data.size() + << " haveSlaveData:" << haveSlaveData + << " (master only) slaveData:" << slaveData.size() << " commsType:" << Pstream::commsTypeNames[commsType] << endl; } + const label nProcs = UPstream::nProcs(comm); + + bool ok = true; - labelList recvSizes(Pstream::nProcs(comm)); - recvSizes[Pstream::myProcNo(comm)] = data.byteSize(); - Pstream::gatherList(recvSizes, Pstream::msgType(), comm); + if (haveSlaveData) + { + // Already have gathered the slave data. communicator only used to + // check who is the master - if (commsType == UPstream::commsTypes::scheduled) + if (UPstream::master(comm)) + { + OSstream& os = osPtr(); + + start.setSize(nProcs); + + // Write master data + { + os << nl << "// Processor" << UPstream::masterNo() << nl; + start[UPstream::masterNo()] = os.stdStream().tellp(); + os << data; + } + + // Write slaves + + label slaveOffset = 0; + + for (label proci = 1; proci < nProcs; proci++) + { + os << nl << nl << "// Processor" << proci << nl; + start[proci] = os.stdStream().tellp(); + + os << SubList(slaveData, recvSizes[proci], slaveOffset); + + slaveOffset += recvSizes[proci]; + } + + ok = os.good(); + } + } + else if (commsType == UPstream::commsTypes::scheduled) { if (UPstream::master(comm)) { - start.setSize(UPstream::nProcs(comm)); + start.setSize(nProcs); OSstream& os = osPtr(); @@ -688,7 +827,7 @@ bool Foam::decomposedBlockData::writeBlocks } // Write slaves List elems; - for (label proci = 1; proci < UPstream::nProcs(comm); proci++) + for (label proci = 1; proci < nProcs; proci++) { elems.setSize(recvSizes[proci]); IPstream::read @@ -723,101 +862,115 @@ bool Foam::decomposedBlockData::writeBlocks } else { - if (debug) + // Write master data + if (UPstream::master(comm)) { - struct timeval tv; - gettimeofday(&tv, nullptr); - Pout<< "Starting sending at:" - << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s" - << Foam::endl; - } - - - label startOfRequests = Pstream::nRequests(); - - if (!UPstream::master(comm)) - { - UOPstream::write - ( - UPstream::commsTypes::nonBlocking, - UPstream::masterNo(), - data.begin(), - data.byteSize(), - Pstream::msgType(), - comm - ); - Pstream::waitRequests(startOfRequests); - } - else - { - List> recvBufs(Pstream::nProcs(comm)); - for (label proci = 1; proci < UPstream::nProcs(comm); proci++) - { - recvBufs[proci].setSize(recvSizes[proci]); - UIPstream::read - ( - UPstream::commsTypes::nonBlocking, - proci, - recvBufs[proci].begin(), - recvSizes[proci], - Pstream::msgType(), - comm - ); - } - - if (debug) - { - struct timeval tv; - gettimeofday(&tv, nullptr); - Pout<< "Starting master-only writing at:" - << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s" - << Foam::endl; - } - - start.setSize(UPstream::nProcs(comm)); + start.setSize(nProcs); OSstream& os = osPtr(); - // Write master data - { - os << nl << "// Processor" << UPstream::masterNo() << nl; - start[UPstream::masterNo()] = os.stdStream().tellp(); - os << data; - } + os << nl << "// Processor" << UPstream::masterNo() << nl; + start[UPstream::masterNo()] = os.stdStream().tellp(); + os << data; + } - if (debug) - { - struct timeval tv; - gettimeofday(&tv, nullptr); - Pout<< "Starting slave writing at:" - << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s" - << Foam::endl; - } - // Write slaves - for (label proci = 1; proci < UPstream::nProcs(comm); proci++) - { - os << nl << nl << "// Processor" << proci << nl; - start[proci] = os.stdStream().tellp(); + // Find out how many processor can be received into + // maxMasterFileBufferSize - if (Pstream::finishedRequest(startOfRequests+proci-1)) + // Starting slave processor and number of processors + labelPair startAndSize(1, nProcs-1); + + while (startAndSize[1] > 0) + { + labelPair masterData(startAndSize); + if (UPstream::master(comm)) + { + label totalSize = 0; + label proci = masterData[0]; + while + ( + proci < nProcs + && ( + totalSize+recvSizes[proci] + < fileOperations::masterUncollatedFileOperation:: + maxMasterFileBufferSize + ) + ) { - os << recvBufs[proci]; + totalSize += recvSizes[proci]; + proci++; + } + + masterData[1] = proci-masterData[0]; + } + + + // Scatter masterData + UPstream::scatter + ( + reinterpret_cast(masterData.cdata()), + List(nProcs, sizeof(masterData)), + List(nProcs, 0), + reinterpret_cast(startAndSize.data()), + sizeof(startAndSize), + comm + ); + + if (startAndSize[1] == 0) + { + break; + } + + + // Gather data from (a slice of) the slaves + List sliceOffsets; + List recvData; + gatherSlaveData + ( + comm, + data, + recvSizes, + + startAndSize[0], // startProc, + startAndSize[1], // nProcs, + + sliceOffsets, + recvData + ); + + if (UPstream::master(comm)) + { + OSstream& os = osPtr(); + + // Write slaves + for + ( + label proci = startAndSize[0]; + proci < startAndSize[0]+startAndSize[1]; + proci++ + ) + { + os << nl << nl << "// Processor" << proci << nl; + start[proci] = os.stdStream().tellp(); + + os << + SubList + ( + recvData, + sliceOffsets[proci+1]-sliceOffsets[proci], + sliceOffsets[proci] + ); } } - Pstream::resetRequests(startOfRequests); - - ok = os.good(); + startAndSize[0] += startAndSize[1]; + } + + if (UPstream::master(comm)) + { + ok = osPtr().good(); } - } - if (debug) - { - struct timeval tv; - gettimeofday(&tv, nullptr); - Pout<< "Finished master-only writing at:" - << 1.0*tv.tv_sec+tv.tv_usec/1e6 << " s" - << Foam::endl; } if (syncReturnState) @@ -936,8 +1089,23 @@ bool Foam::decomposedBlockData::writeObject osPtr.reset(new OFstream(objectPath(), IOstream::BINARY, ver, cmp)); IOobject::writeHeader(osPtr()); } + + labelList recvSizes; + gather(comm_, this->byteSize(), recvSizes); + List start; - return writeBlocks(comm_, osPtr, start, *this, commsType_); + List slaveData; // dummy already received slave data + return writeBlocks + ( + comm_, + osPtr, + start, + *this, + recvSizes, + false, // don't have slave data + slaveData, + commsType_ + ); } diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H index dc4e3d70f2..1def38f9eb 100644 --- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H +++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.H @@ -169,6 +169,32 @@ public: const UPstream::commsTypes commsType ); + //- Helper: gather single label. Note: using native Pstream. + // datas sized with num procs but undefined contents on + // slaves + static void gather + ( + const label comm, + const label data, + labelList& datas + ); + + //- Helper: gather data from (subset of) slaves. Returns + // recvData : received data + // recvOffsets : offset in data. recvOffsets is nProcs+1 + static void gatherSlaveData + ( + const label comm, + const UList& data, + const labelUList& recvSizes, + + const label startProc, + const label nProcs, + + List& recvOffsets, + List& recvData + ); + //- Write *this. Ostream only valid on master. Returns starts of // processor blocks static bool writeBlocks @@ -177,6 +203,12 @@ public: autoPtr& osPtr, List& start, const UList&, + + const labelUList& recvSizes, + + const bool haveSlaveData, // does master have slaveData + const List& slaveData, // optional slave data (on master) + const UPstream::commsTypes, const bool syncReturnState = true ); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C index 4c72c28cc2..383fe03319 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation \\/ M anipulation | Copyright (C) 2015-2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License @@ -50,11 +50,13 @@ Foam::UPstream::commsTypeNames // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // -void Foam::UPstream::setParRun(const label nProcs) +void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads) { if (nProcs == 0) { parRun_ = false; + haveThreads_ = haveThreads; + freeCommunicator(UPstream::worldComm); label comm = allocateCommunicator(-1, labelList(1, label(0)), false); if (comm != UPstream::worldComm) @@ -71,6 +73,7 @@ void Foam::UPstream::setParRun(const label nProcs) else { parRun_ = true; + haveThreads_ = haveThreads; // Redo worldComm communicator (this has been created at static // initialisation time) @@ -354,6 +357,8 @@ Foam::UList::operator[](const label procID) const bool Foam::UPstream::parRun_(false); +bool Foam::UPstream::haveThreads_(false); + Foam::LIFOStack Foam::UPstream::freeComms_; Foam::DynamicList Foam::UPstream::myProcNo_(10); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index 4993ed9d12..ea72f84912 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -185,6 +185,9 @@ private: //- By default this is not a parallel run static bool parRun_; + //- Have support for threads? + static bool haveThreads_; + //- Standard transfer message type static int msgType_; @@ -211,6 +214,9 @@ private: // Private Member Functions + //- Set data for parallel running + static void setParRun(const label nProcs, const bool haveThreads); + //- Calculate linear communication schedule static List calcLinearComm(const label nProcs); @@ -411,9 +417,11 @@ public: return parRun_; } - //- Set data for parallel running. Special case nProcs=0 to switch off - //- parallel running - static void setParRun(const label nProcs); + //- Have support for threads + static bool haveThreads() + { + return haveThreads_; + } //- Number of processes in parallel run static label nProcs(const label communicator = 0) @@ -517,6 +525,47 @@ public: labelUList& recvData, const label communicator = 0 ); + + //- Exchange data with all processors (in the communicator) + // sendSizes, sendOffsets give (per processor) the slice of + // sendData to send, similarly recvSizes, recvOffsets give the slice + // of recvData to receive + static void allToAll + ( + const char* sendData, + const UList& sendSizes, + const UList& sendOffsets, + + char* recvData, + const UList& recvSizes, + const UList& recvOffsets, + + const label communicator = 0 + ); + + //- Receive data from all processors on the master + static void gather + ( + const char* sendData, + int sendSize, + + char* recvData, + const UList& recvSizes, + const UList& recvOffsets, + const label communicator = 0 + ); + + //- Send data to all processors from the root of the communicator + static void scatter + ( + const char* sendData, + const UList& sendSizes, + const UList& sendOffsets, + + char* recvData, + int recvSize, + const label communicator = 0 + ); }; diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C index b5c8ab4942..597d1c5140 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C @@ -25,11 +25,7 @@ License #include "OFstreamCollator.H" #include "OFstream.H" -#include "OSspecific.H" -#include "IOstreams.H" -#include "Pstream.H" #include "decomposedBlockData.H" -#include "PstreamReduceOps.H" // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // @@ -46,7 +42,10 @@ bool Foam::OFstreamCollator::writeFile const label comm, const word& typeName, const fileName& fName, - const string& s, + const string& masterData, + const labelUList& recvSizes, + const bool haveSlaveData, // does master have slaveData + const UList& slaveData, // on master: slave data IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp, @@ -55,7 +54,7 @@ bool Foam::OFstreamCollator::writeFile { if (debug) { - Pout<< "OFstreamCollator : Writing " << s.size() + Pout<< "OFstreamCollator : Writing " << masterData.size() << " bytes to " << fName << " using comm " << comm << endl; } @@ -90,11 +89,17 @@ bool Foam::OFstreamCollator::writeFile ); } - UList slice(const_cast(s.data()), label(s.size())); - // Assuming threaded writing hides any slowness so we might - // as well use scheduled communication to send the data to - // the master processor in order. + UList slice + ( + const_cast(masterData.data()), + label(masterData.size()) + ); + + // Assuming threaded writing hides any slowness so we + // can use scheduled communication to send the data to + // the master processor in order. However can be unstable + // for some mpi so default is non-blocking. List start; decomposedBlockData::writeBlocks @@ -103,7 +108,10 @@ bool Foam::OFstreamCollator::writeFile osPtr, start, slice, - UPstream::commsTypes::scheduled, + recvSizes, + haveSlaveData, + slaveData, + UPstream::commsTypes::nonBlocking, //scheduled, false // do not reduce return state ); @@ -115,8 +123,18 @@ bool Foam::OFstreamCollator::writeFile if (debug) { - Pout<< "OFstreamCollator : Finished writing " << s.size() - << " bytes to " << fName + Pout<< "OFstreamCollator : Finished writing " << masterData.size() + << " bytes"; + if (UPstream::master(comm)) + { + off_t sum = 0; + forAll(recvSizes, i) + { + sum += recvSizes[i]; + } + Pout<< " (overall " << sum << ")"; + } + Pout<< " to " << fName << " using comm " << comm << endl; } @@ -133,14 +151,11 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) { writeData* ptr = nullptr; - //pthread_mutex_lock(&handler.mutex_); lockMutex(handler.mutex_); - if (handler.objects_.size()) { ptr = handler.objects_.pop(); } - //pthread_mutex_unlock(&handler.mutex_); unlockMutex(handler.mutex_); if (!ptr) @@ -151,10 +166,14 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) { bool ok = writeFile ( - handler.comm_, + ptr->comm_, ptr->typeName_, ptr->pathName_, ptr->data_, + ptr->sizes_, + ptr->haveSlaveData_, + ptr->slaveData_, + ptr->format_, ptr->version_, ptr->compression_, @@ -177,22 +196,54 @@ void* Foam::OFstreamCollator::writeAll(void *threadarg) Pout<< "OFstreamCollator : Exiting write thread " << endl; } - //pthread_mutex_lock(&handler.mutex_); lockMutex(handler.mutex_); handler.threadRunning_ = false; - //pthread_mutex_unlock(&handler.mutex_); unlockMutex(handler.mutex_); return nullptr; } +void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const +{ + while (true) + { + // Count files to be written + off_t totalSize = 0; + + lockMutex(mutex_); + forAllConstIter(FIFOStack, objects_, iter) + { + totalSize += iter()->size(); + } + unlockMutex(mutex_); + + if (totalSize == 0 || (totalSize+wantedSize) <= maxBufferSize_) + { + break; + } + + if (debug) + { + lockMutex(mutex_); + Pout<< "OFstreamCollator : Waiting for buffer space." + << " Currently in use:" << totalSize + << " limit:" << maxBufferSize_ + << " files:" << objects_.size() + << endl; + unlockMutex(mutex_); + } + + sleep(5); + } +} + + // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // Foam::OFstreamCollator::OFstreamCollator(const off_t maxBufferSize) : maxBufferSize_(maxBufferSize), - //mutex_(PTHREAD_MUTEX_INITIALIZER), mutex_ ( maxBufferSize_ > 0 @@ -228,7 +279,6 @@ Foam::OFstreamCollator::~OFstreamCollator() Pout<< "~OFstreamCollator : Waiting for write thread" << endl; } - //pthread_join(thread_, nullptr); joinThread(thread_); } if (thread_ != -1) @@ -259,56 +309,163 @@ bool Foam::OFstreamCollator::write const bool append ) { - if (maxBufferSize_ > 0) + // Determine (on master) sizes to receive. Note: do NOT use thread + // communicator + labelList recvSizes; + decomposedBlockData::gather(Pstream::worldComm, data.size(), recvSizes); + off_t totalSize = 0; + label maxLocalSize = 0; { - while (true) + for (label proci = 0; proci < recvSizes.size(); proci++) { - // Count files to be written - off_t totalSize = 0; - //pthread_mutex_lock(&mutex_); - lockMutex(mutex_); - forAllConstIter(FIFOStack, objects_, iter) - { - totalSize += iter()->data_.size(); - } - //pthread_mutex_unlock(&mutex_); - unlockMutex(mutex_); - - if - ( - totalSize == 0 - || (totalSize+off_t(data.size()) < maxBufferSize_) - ) - { - break; - } - - if (debug) - { - Pout<< "OFstreamCollator : Waiting for buffer space." - << " Currently in use:" << totalSize - << " limit:" << maxBufferSize_ - << endl; - } - - sleep(5); + totalSize += recvSizes[proci]; + maxLocalSize = max(maxLocalSize, recvSizes[proci]); } + Pstream::scatter(totalSize, Pstream::msgType(), Pstream::worldComm); + Pstream::scatter(maxLocalSize, Pstream::msgType(), Pstream::worldComm); + } + + if (maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_) + { + if (debug) + { + Pout<< "OFstreamCollator : non-thread gather and write of " << fName + << " using worldComm" << endl; + } + // Direct collating and writing (so master blocks until all written!) + const List dummySlaveData; + return writeFile + ( + UPstream::worldComm, + typeName, + fName, + data, + recvSizes, + false, // no slave data provided yet + dummySlaveData, + fmt, + ver, + cmp, + append + ); + } + else if (totalSize <= maxBufferSize_) + { + // Total size can be stored locally so receive all data now and only + // do the writing in the thread if (debug) { - Pout<< "OFstreamCollator : relaying write of " << fName - << " to thread " << endl; + Pout<< "OFstreamCollator : non-thread gather; thread write of " + << fName << endl; } - //pthread_mutex_lock(&mutex_); - lockMutex(mutex_); - objects_.push + + if (Pstream::master()) + { + waitForBufferSpace(totalSize); + } + + // Allocate local buffer for all collated data + autoPtr fileAndDataPtr ( - new writeData(typeName, fName, data, fmt, ver, cmp, append) + new writeData + ( + comm_, // Note: comm not actually used anymore + typeName, + fName, + data, + recvSizes, + true, // have slave data (collected below) + fmt, + ver, + cmp, + append + ) + ); + writeData& fileAndData = fileAndDataPtr(); + + // Gather the slave data and insert into fileAndData + UList slice(const_cast(data.data()), label(data.size())); + List slaveOffsets; + decomposedBlockData::gatherSlaveData + ( + Pstream::worldComm, // Note: using simulation thread + slice, + recvSizes, + + 1, // startProc, + Pstream::nProcs()-1, // n procs + + slaveOffsets, + fileAndData.slaveData_ + ); + + // Append to thread buffer + lockMutex(mutex_); + objects_.push(fileAndDataPtr.ptr()); + unlockMutex(mutex_); + + // Start thread if not running + lockMutex(mutex_); + if (!threadRunning_) + { + createThread(thread_, writeAll, this); + if (debug) + { + Pout<< "OFstreamCollator : Started write thread " + << thread_ << endl; + } + threadRunning_ = true; + } + unlockMutex(mutex_); + + return true; + } + else + { + if (debug) + { + Pout<< "OFstreamCollator : thread gather and write of " << fName + << " in thread " << thread_ + << " using communicator " << comm_ << endl; + } + + if (!UPstream::haveThreads) + { + FatalErrorInFunction + << "mpi does not seem to have thread support." + << "Please increase the buffer size 'maxThreadFileBufferSize'" + << " to at least " << totalSize + << " to be able to do the collating before threading." + << exit(FatalError); + } + + if (Pstream::master()) + { + waitForBufferSpace(data.size()); + } + + lockMutex(mutex_); + // Push all file info on buffer. Note that no slave data provided + // so it will trigger communication inside the thread + objects_.push + ( + new writeData + ( + comm_, + typeName, + fName, + data, + recvSizes, + false, // Have no slave data; collect in thread + fmt, + ver, + cmp, + append + ) ); - //pthread_mutex_unlock(&mutex_); unlockMutex(mutex_); - //pthread_mutex_lock(&mutex_); lockMutex(mutex_); if (!threadRunning_) { @@ -319,16 +476,10 @@ bool Foam::OFstreamCollator::write } threadRunning_ = true; } - //pthread_mutex_unlock(&mutex_); unlockMutex(mutex_); return true; } - else - { - // Immediate writing - return writeFile(comm_, typeName, fName, data, fmt, ver, cmp, append); - } } diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H index 17904fc386..bf4eeb5713 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H @@ -27,6 +27,22 @@ Class Description Threaded file writer. + Collects all data from all processors and writes as single + 'decomposedBlockData' file. The operation is determined by the + buffer size (maxThreadFileBufferSize setting): + - local size of data is larger than buffer: receive and write processor + by processor (i.e. 'scheduled'). Does not use a thread, no file size + limit. + - total size of data is larger than buffer (but local is not): + thread does all the collecting and writing of the processors. No file + size limit. + - total size of data is less than buffer: + collecting is done locally; the thread only does the writing + (since the data has already been collected) + + +Operation determine + SourceFiles OFstreamCollator.C @@ -36,7 +52,7 @@ SourceFiles #define OFstreamCollator_H #include "IOstream.H" -#include "List.H" +#include "labelList.H" #include "FIFOStack.H" // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // @@ -56,9 +72,15 @@ class OFstreamCollator { public: + const label comm_; const word typeName_; const fileName pathName_; const string data_; + const labelList sizes_; + + const bool haveSlaveData_; + List slaveData_; + const IOstream::streamFormat format_; const IOstream::versionNumber version_; const IOstream::compressionType compression_; @@ -66,23 +88,36 @@ class OFstreamCollator writeData ( + const label comm, const word& typeName, const fileName& pathName, const string& data, + const labelList& sizes, + const bool haveSlaveData, IOstream::streamFormat format, IOstream::versionNumber version, IOstream::compressionType compression, const bool append ) : + comm_(comm), typeName_(typeName), pathName_(pathName), data_(data), + sizes_(sizes), + haveSlaveData_(haveSlaveData), + slaveData_(0), format_(format), version_(version), compression_(compression), append_(append) {} + + //- (approximate) size of master + any optional slave data + off_t size() const + { + return data_.size() + slaveData_.size(); + } }; @@ -112,7 +147,10 @@ class OFstreamCollator const label comm, const word& typeName, const fileName& fName, - const string& data, + const string& masterData, + const labelUList& recvSizes, + const bool haveSlaveData, // (does master) have slave data + const UList& slaveData, // (on master) all slave data IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp, @@ -122,6 +160,10 @@ class OFstreamCollator //- Write all files in stack static void* writeAll(void *threadarg); + //- Wait for total size of objects_ (master + optional slave data) + // to be wantedSize less than overall maxBufferSize. + void waitForBufferSpace(const off_t wantedSize) const; + public: diff --git a/src/Pstream/dummy/UPstream.C b/src/Pstream/dummy/UPstream.C index 832c6afec4..797f390179 100644 --- a/src/Pstream/dummy/UPstream.C +++ b/src/Pstream/dummy/UPstream.C @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License @@ -104,6 +104,36 @@ void Foam::UPstream::allToAll } +void Foam::UPstream::gather +( + const char* sendData, + int sendSize, + + char* recvData, + const UList& recvSizes, + const UList& recvOffsets, + const label communicator +) +{ + memmove(recvData, sendData, sendSize); +} + + +void Foam::UPstream::scatter +( + const char* sendData, + const UList& sendSizes, + const UList& sendOffsets, + + char* recvData, + int recvSize, + const label communicator +) +{ + memmove(recvData, sendData, recvSize); +} + + void Foam::UPstream::allocatePstreamCommunicator ( const label, diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C index 004bbc74a4..321ae90c4a 100644 --- a/src/Pstream/mpi/UPstream.C +++ b/src/Pstream/mpi/UPstream.C @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd. ------------------------------------------------------------------------------- License @@ -157,16 +157,7 @@ bool Foam::UPstream::init(int& argc, char**& argv) // Initialise parallel structure - setParRun(numprocs); - - if (Pstream::master() && provided_thread_support != MPI_THREAD_MULTIPLE) - { - WarningInFunction - << "mpi does not seem to have thread support." - << " There might be issues with e.g. threaded IO" - << endl; - } - + setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE); #ifndef SGIMPI { @@ -448,6 +439,199 @@ void Foam::UPstream::allToAll } +void Foam::UPstream::allToAll +( + const char* sendData, + const UList& sendSizes, + const UList& sendOffsets, + + char* recvData, + const UList& recvSizes, + const UList& recvOffsets, + + const label communicator +) +{ + label np = nProcs(communicator); + + if + ( + sendSizes.size() != np + || sendOffsets.size() != np + || recvSizes.size() != np + || recvOffsets.size() != np + ) + { + FatalErrorInFunction + << "Size of sendSize " << sendSizes.size() + << ", sendOffsets " << sendOffsets.size() + << ", recvSizes " << recvSizes.size() + << " or recvOffsets " << recvOffsets.size() + << " is not equal to the number of processors in the domain " + << np + << Foam::abort(FatalError); + } + + if (!UPstream::parRun()) + { + if (recvSizes[0] != sendSizes[0]) + { + FatalErrorInFunction + << "Bytes to send " << sendSizes[0] + << " does not equal bytes to receive " << recvSizes[0] + << Foam::abort(FatalError); + } + memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]); + } + else + { + if + ( + MPI_Alltoallv + ( + sendData, + sendSizes.begin(), + sendOffsets.begin(), + MPI_BYTE, + recvData, + recvSizes.begin(), + recvOffsets.begin(), + MPI_BYTE, + PstreamGlobals::MPICommunicators_[communicator] + ) + ) + { + FatalErrorInFunction + << "MPI_Alltoallv failed for sendSizes " << sendSizes + << " recvSizes " << recvSizes + << " communicator " << communicator + << Foam::abort(FatalError); + } + } +} + + +void Foam::UPstream::gather +( + const char* sendData, + int sendSize, + + char* recvData, + const UList& recvSizes, + const UList& recvOffsets, + const label communicator +) +{ + label np = nProcs(communicator); + + if + ( + UPstream::master(communicator) + && (recvSizes.size() != np || recvOffsets.size() < np) + ) + { + // Note: allow recvOffsets to be e.g. 1 larger than np so we + // can easily loop over the result + + FatalErrorInFunction + << "Size of recvSizes " << recvSizes.size() + << " or recvOffsets " << recvOffsets.size() + << " is not equal to the number of processors in the domain " + << np + << Foam::abort(FatalError); + } + + if (!UPstream::parRun()) + { + memmove(recvData, sendData, sendSize); + } + else + { + if + ( + MPI_Gatherv + ( + sendData, + sendSize, + MPI_BYTE, + recvData, + recvSizes.begin(), + recvOffsets.begin(), + MPI_BYTE, + 0, + MPI_Comm(PstreamGlobals::MPICommunicators_[communicator]) + ) + ) + { + FatalErrorInFunction + << "MPI_Gatherv failed for sendSize " << sendSize + << " recvSizes " << recvSizes + << " communicator " << communicator + << Foam::abort(FatalError); + } + } +} + + +void Foam::UPstream::scatter +( + const char* sendData, + const UList& sendSizes, + const UList& sendOffsets, + + char* recvData, + int recvSize, + const label communicator +) +{ + label np = nProcs(communicator); + + if + ( + UPstream::master(communicator) + && (sendSizes.size() != np || sendOffsets.size() != np) + ) + { + FatalErrorInFunction + << "Size of sendSizes " << sendSizes.size() + << " or sendOffsets " << sendOffsets.size() + << " is not equal to the number of processors in the domain " + << np + << Foam::abort(FatalError); + } + + if (!UPstream::parRun()) + { + memmove(recvData, sendData, recvSize); + } + else + { + if + ( + MPI_Scatterv + ( + sendData, + sendSizes.begin(), + sendOffsets.begin(), + MPI_BYTE, + recvData, + recvSize, + MPI_BYTE, + 0, + MPI_Comm(PstreamGlobals::MPICommunicators_[communicator]) + ) + ) + { + FatalErrorInFunction + << "MPI_Scatterv failed for sendSizes " << sendSizes + << " sendOffsets " << sendOffsets + << " communicator " << communicator + << Foam::abort(FatalError); + } + } +} + + void Foam::UPstream::allocatePstreamCommunicator ( const label parentIndex,