twoPhaseMixtureThermo: Fix for collated file operation

twoPhaseMixtureThermo writes the temperatures during construction only
for them to be read again immediately after by construction of the
individual phases' thermo models. When running with collated file
handling this behaviour is not thread safe. This change deactivates
threading for the duration of this behaviour.

Patch contributed by Mattijs Janssens
This commit is contained in:
Will Bainbridge
2018-06-14 10:55:27 +01:00
parent 268f1f612e
commit a3177bd759
7 changed files with 74 additions and 15 deletions

View File

@ -26,7 +26,7 @@ License
#include "twoPhaseMixtureThermo.H" #include "twoPhaseMixtureThermo.H"
#include "gradientEnergyFvPatchScalarField.H" #include "gradientEnergyFvPatchScalarField.H"
#include "mixedEnergyFvPatchScalarField.H" #include "mixedEnergyFvPatchScalarField.H"
#include "collatedFileOperation.H"
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
@ -50,6 +50,12 @@ Foam::twoPhaseMixtureThermo::twoPhaseMixtureThermo
thermo1_(nullptr), thermo1_(nullptr),
thermo2_(nullptr) thermo2_(nullptr)
{ {
// Note: we're writing files to be read in immediately afterwards.
// Avoid any thread-writing problems.
float bufSz =
fileOperations::collatedFileOperation::maxThreadFileBufferSize;
fileOperations::collatedFileOperation::maxThreadFileBufferSize = 0;
{ {
volScalarField T1 volScalarField T1
( (
@ -80,6 +86,10 @@ Foam::twoPhaseMixtureThermo::twoPhaseMixtureThermo
T2.write(); T2.write();
} }
fileOperations::collatedFileOperation::maxThreadFileBufferSize =
bufSz;
thermo1_ = rhoThermo::New(U.mesh(), phase1Name()); thermo1_ = rhoThermo::New(U.mesh(), phase1Name());
thermo2_ = rhoThermo::New(U.mesh(), phase2Name()); thermo2_ = rhoThermo::New(U.mesh(), phase2Name());

View File

@ -859,7 +859,7 @@ bool Foam::decomposedBlockData::writeBlocks
label startProc = 1; label startProc = 1;
label nSendProcs = nProcs-1; label nSendProcs = nProcs-1;
while (nSendProcs > 0) while (nSendProcs > 0 && startProc < nProcs)
{ {
nSendProcs = calcNumProcs nSendProcs = calcNumProcs
( (
@ -873,7 +873,7 @@ bool Foam::decomposedBlockData::writeBlocks
startProc startProc
); );
if (startProc == nProcs || nSendProcs == 0) if (nSendProcs == 0)
{ {
break; break;
} }

View File

@ -265,7 +265,11 @@ void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
} }
} }
if (totalSize == 0 || (totalSize+wantedSize) <= maxBufferSize_) if
(
totalSize == 0
|| (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
)
{ {
break; break;
} }
@ -354,7 +358,8 @@ bool Foam::OFstreamCollator::write
IOstream::streamFormat fmt, IOstream::streamFormat fmt,
IOstream::versionNumber ver, IOstream::versionNumber ver,
IOstream::compressionType cmp, IOstream::compressionType cmp,
const bool append const bool append,
const bool useThread
) )
{ {
// Determine (on master) sizes to receive. Note: do NOT use thread // Determine (on master) sizes to receive. Note: do NOT use thread
@ -374,7 +379,7 @@ bool Foam::OFstreamCollator::write
Pstream::scatter(maxLocalSize, Pstream::msgType(), localComm_); Pstream::scatter(maxLocalSize, Pstream::msgType(), localComm_);
} }
if (maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_) if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
{ {
if (debug) if (debug)
{ {
@ -589,4 +594,20 @@ bool Foam::OFstreamCollator::write
} }
void Foam::OFstreamCollator::waitAll()
{
// Wait for all buffer space to be available i.e. wait for all jobs
// to finish
if (Pstream::master(localComm_))
{
if (debug)
{
Pout<< "OFstreamCollator : waiting for thread to have consumed all"
<< endl;
}
waitForBufferSpace(-1);
}
}
// ************************************************************************* // // ************************************************************************* //

View File

@ -206,8 +206,12 @@ public:
IOstream::streamFormat, IOstream::streamFormat,
IOstream::versionNumber, IOstream::versionNumber,
IOstream::compressionType, IOstream::compressionType,
const bool append const bool append,
const bool useThread = true
); );
//- Wait for all thread actions to have finished
void waitAll();
}; };

View File

@ -556,14 +556,32 @@ bool Foam::fileOperations::collatedFileOperation::writeObject
} }
else else
{ {
// Re-check static maxThreadFileBufferSize variable to see
// if needs to use threading
bool useThread = (maxThreadFileBufferSize > 0);
if (debug) if (debug)
{ {
Pout<< "collatedFileOperation::writeObject :" Pout<< "collatedFileOperation::writeObject :"
<< " For object : " << io.name() << " For object : " << io.name()
<< " starting collating output to " << pathName << endl; << " starting collating output to " << pathName
<< " useThread:" << useThread << endl;
} }
threadedCollatedOFstream os(writer_, pathName, fmt, ver, cmp); if (!useThread)
{
writer_.waitAll();
}
threadedCollatedOFstream os
(
writer_,
pathName,
fmt,
ver,
cmp,
useThread
);
// If any of these fail, return (leave error handling to Ostream // If any of these fail, return (leave error handling to Ostream
// class) // class)

View File

@ -2,7 +2,7 @@
========= | ========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration | \\ / O peration |
\\ / A nd | Copyright (C) 2017 OpenFOAM Foundation \\ / A nd | Copyright (C) 2017-2018 OpenFOAM Foundation
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
@ -35,13 +35,15 @@ Foam::threadedCollatedOFstream::threadedCollatedOFstream
const fileName& pathName, const fileName& pathName,
streamFormat format, streamFormat format,
versionNumber version, versionNumber version,
compressionType compression compressionType compression,
const bool useThread
) )
: :
OStringStream(format, version), OStringStream(format, version),
writer_(writer), writer_(writer),
pathName_(pathName), pathName_(pathName),
compression_(compression) compression_(compression),
useThread_(useThread)
{} {}
@ -57,7 +59,8 @@ Foam::threadedCollatedOFstream::~threadedCollatedOFstream()
IOstream::BINARY, IOstream::BINARY,
version(), version(),
compression_, compression_,
false // append false, // append
useThread_
); );
} }

View File

@ -2,7 +2,7 @@
========= | ========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration | \\ / O peration |
\\ / A nd | Copyright (C) 2017 OpenFOAM Foundation \\ / A nd | Copyright (C) 2017-2018 OpenFOAM Foundation
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
@ -60,6 +60,8 @@ class threadedCollatedOFstream
const IOstream::compressionType compression_; const IOstream::compressionType compression_;
const bool useThread_;
public: public:
@ -72,7 +74,8 @@ public:
const fileName& pathname, const fileName& pathname,
streamFormat format=ASCII, streamFormat format=ASCII,
versionNumber version=currentVersion, versionNumber version=currentVersion,
compressionType compression=UNCOMPRESSED compressionType compression=UNCOMPRESSED,
const bool useThread = true
); );