diff --git a/applications/solvers/multiphase/compressibleInterFoam/twoPhaseMixtureThermo/twoPhaseMixtureThermo.C b/applications/solvers/multiphase/compressibleInterFoam/twoPhaseMixtureThermo/twoPhaseMixtureThermo.C index 192dee724..d2a3ca2c2 100644 --- a/applications/solvers/multiphase/compressibleInterFoam/twoPhaseMixtureThermo/twoPhaseMixtureThermo.C +++ b/applications/solvers/multiphase/compressibleInterFoam/twoPhaseMixtureThermo/twoPhaseMixtureThermo.C @@ -26,7 +26,7 @@ License #include "twoPhaseMixtureThermo.H" #include "gradientEnergyFvPatchScalarField.H" #include "mixedEnergyFvPatchScalarField.H" - +#include "collatedFileOperation.H" // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // @@ -50,6 +50,12 @@ Foam::twoPhaseMixtureThermo::twoPhaseMixtureThermo thermo1_(nullptr), thermo2_(nullptr) { + // Note: we're writing files to be read in immediately afterwards. + // Avoid any thread-writing problems. + float bufSz = + fileOperations::collatedFileOperation::maxThreadFileBufferSize; + fileOperations::collatedFileOperation::maxThreadFileBufferSize = 0; + { volScalarField T1 ( @@ -80,6 +86,10 @@ Foam::twoPhaseMixtureThermo::twoPhaseMixtureThermo T2.write(); } + fileOperations::collatedFileOperation::maxThreadFileBufferSize = + bufSz; + + thermo1_ = rhoThermo::New(U.mesh(), phase1Name()); thermo2_ = rhoThermo::New(U.mesh(), phase2Name()); diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C index 121b4b8da..486290c8d 100644 --- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C +++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C @@ -859,7 +859,7 @@ bool Foam::decomposedBlockData::writeBlocks label startProc = 1; label nSendProcs = nProcs-1; - while (nSendProcs > 0) + while (nSendProcs > 0 && startProc < nProcs) { nSendProcs = calcNumProcs ( @@ -873,7 +873,7 @@ bool Foam::decomposedBlockData::writeBlocks startProc ); - if (startProc == nProcs || nSendProcs == 0) + if (nSendProcs == 0) { break; } diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C index 07a19f368..2d6f39ebd 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.C @@ -265,7 +265,11 @@ void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const } } - if (totalSize == 0 || (totalSize+wantedSize) <= maxBufferSize_) + if + ( + totalSize == 0 + || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_) + ) { break; } @@ -354,7 +358,8 @@ bool Foam::OFstreamCollator::write IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp, - const bool append + const bool append, + const bool useThread ) { // Determine (on master) sizes to receive. Note: do NOT use thread @@ -374,7 +379,7 @@ bool Foam::OFstreamCollator::write Pstream::scatter(maxLocalSize, Pstream::msgType(), localComm_); } - if (maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_) + if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_) { if (debug) { @@ -589,4 +594,20 @@ bool Foam::OFstreamCollator::write } +void Foam::OFstreamCollator::waitAll() +{ + // Wait for all buffer space to be available i.e. wait for all jobs + // to finish + if (Pstream::master(localComm_)) + { + if (debug) + { + Pout<< "OFstreamCollator : waiting for thread to have consumed all" + << endl; + } + waitForBufferSpace(-1); + } +} + + // ************************************************************************* // diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H index 92ef9068b..594634199 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/OFstreamCollator.H @@ -206,8 +206,12 @@ public: IOstream::streamFormat, IOstream::versionNumber, IOstream::compressionType, - const bool append + const bool append, + const bool useThread = true ); + + //- Wait for all thread actions to have finished + void waitAll(); }; diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/collatedFileOperation.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/collatedFileOperation.C index c7c6830ac..614e846af 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/collatedFileOperation.C +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/collatedFileOperation.C @@ -556,14 +556,32 @@ bool Foam::fileOperations::collatedFileOperation::writeObject } else { + // Re-check static maxThreadFileBufferSize variable to see + // if needs to use threading + bool useThread = (maxThreadFileBufferSize > 0); + if (debug) { Pout<< "collatedFileOperation::writeObject :" << " For object : " << io.name() - << " starting collating output to " << pathName << endl; + << " starting collating output to " << pathName + << " useThread:" << useThread << endl; } - threadedCollatedOFstream os(writer_, pathName, fmt, ver, cmp); + if (!useThread) + { + writer_.waitAll(); + } + + threadedCollatedOFstream os + ( + writer_, + pathName, + fmt, + ver, + cmp, + useThread + ); // If any of these fail, return (leave error handling to Ostream // class) diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.C b/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.C index df919680f..c35ba0c1f 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.C +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.C @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2017 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2017-2018 OpenFOAM Foundation \\/ M anipulation | ------------------------------------------------------------------------------- License @@ -35,13 +35,15 @@ Foam::threadedCollatedOFstream::threadedCollatedOFstream const fileName& pathName, streamFormat format, versionNumber version, - compressionType compression + compressionType compression, + const bool useThread ) : OStringStream(format, version), writer_(writer), pathName_(pathName), - compression_(compression) + compression_(compression), + useThread_(useThread) {} @@ -57,7 +59,8 @@ Foam::threadedCollatedOFstream::~threadedCollatedOFstream() IOstream::BINARY, version(), compression_, - false // append + false, // append + useThread_ ); } diff --git a/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.H b/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.H index e296e1132..f9b02eed6 100644 --- a/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.H +++ b/src/OpenFOAM/global/fileOperations/collatedFileOperation/threadedCollatedOFstream.H @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2017 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2017-2018 OpenFOAM Foundation \\/ M anipulation | ------------------------------------------------------------------------------- License @@ -60,6 +60,8 @@ class threadedCollatedOFstream const IOstream::compressionType compression_; + const bool useThread_; + public: @@ -72,7 +74,8 @@ public: const fileName& pathname, streamFormat format=ASCII, versionNumber version=currentVersion, - compressionType compression=UNCOMPRESSED + compressionType compression=UNCOMPRESSED, + const bool useThread = true );