fileHandler: Added flush method

This method waits until all the threads have completed IO operations and
then clears any cached information about the files on disk. This
replaces the deactivation of threading by means of zeroing the buffer
size when writing and reading of a file happen in sequence. It also
allows paraFoam to update the list of available times.

Patch contributed by Mattijs Janssens
Resolves bug report https://bugs.openfoam.org/view.php?id=2962
This commit is contained in:
Will Bainbridge
2018-06-27 11:45:58 +01:00
committed by mattijs
parent 70bc72c2df
commit 2ae4bf73d9
12 changed files with 71 additions and 37 deletions

View File

@ -50,12 +50,6 @@ Foam::twoPhaseMixtureThermo::twoPhaseMixtureThermo
thermo1_(nullptr), thermo1_(nullptr),
thermo2_(nullptr) thermo2_(nullptr)
{ {
// Note: we're writing files to be read in immediately afterwards.
// Avoid any thread-writing problems.
float bufSz =
fileOperations::collatedFileOperation::maxThreadFileBufferSize;
fileOperations::collatedFileOperation::maxThreadFileBufferSize = 0;
{ {
volScalarField T1(IOobject::groupName("T", phase1Name()), T_); volScalarField T1(IOobject::groupName("T", phase1Name()), T_);
T1.write(); T1.write();
@ -66,9 +60,9 @@ Foam::twoPhaseMixtureThermo::twoPhaseMixtureThermo
T2.write(); T2.write();
} }
fileOperations::collatedFileOperation::maxThreadFileBufferSize = // Note: we're writing files to be read in immediately afterwards.
bufSz; // Avoid any thread-writing problems.
fileHandler().flush();
thermo1_ = rhoThermo::New(U.mesh(), phase1Name()); thermo1_ = rhoThermo::New(U.mesh(), phase1Name());
thermo2_ = rhoThermo::New(U.mesh(), phase2Name()); thermo2_ = rhoThermo::New(U.mesh(), phase2Name());

View File

@ -2,7 +2,7 @@
========= | ========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration | \\ / O peration |
\\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation \\ / A nd | Copyright (C) 2011-2018 OpenFOAM Foundation
\\/ M anipulation | Copyright (C) 2017-2018 OpenCFD Ltd. \\/ M anipulation | Copyright (C) 2017-2018 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
@ -51,7 +51,6 @@ Description
#include "faceZoneSet.H" #include "faceZoneSet.H"
#include "pointZoneSet.H" #include "pointZoneSet.H"
#include "timeSelector.H" #include "timeSelector.H"
#include "collatedFileOperation.H"
#include <stdio.h> #include <stdio.h>
@ -283,6 +282,9 @@ void removeZone
{ {
WarningInFunction << "Failed writing zone " << setName << endl; WarningInFunction << "Failed writing zone " << setName << endl;
} }
zones.write();
// Force flushing so we know it has finished writing
fileHandler().flush();
} }
} }
@ -541,6 +543,8 @@ bool doCommand
<< "Failed writing set " << "Failed writing set "
<< currentSet.objectPath() << endl; << currentSet.objectPath() << endl;
} }
// Make sure writing is finished
fileHandler().flush();
} }
} }
} }
@ -732,8 +736,6 @@ int main(int argc, char *argv[])
// Specific to topoSet/setSet: quite often we want to block upon writing // Specific to topoSet/setSet: quite often we want to block upon writing
// a set so we can immediately re-read it. So avoid use of threading // a set so we can immediately re-read it. So avoid use of threading
// for set writing. // for set writing.
fileOperations::collatedFileOperation::maxThreadFileBufferSize = 0;
timeSelector::addOptions(true, false); timeSelector::addOptions(true, false);
#include "addRegionOption.H" #include "addRegionOption.H"
argList::addBoolOption("noVTK", "Do not write VTK files"); argList::addBoolOption("noVTK", "Do not write VTK files");

View File

@ -43,7 +43,6 @@ Description
#include "faceZoneSet.H" #include "faceZoneSet.H"
#include "pointZoneSet.H" #include "pointZoneSet.H"
#include "IOdictionary.H" #include "IOdictionary.H"
#include "collatedFileOperation.H"
using namespace Foam; using namespace Foam;
@ -91,6 +90,7 @@ void removeZone
{ {
WarningInFunction << "Failed writing zone " << setName << endl; WarningInFunction << "Failed writing zone " << setName << endl;
} }
fileHandler().flush();
} }
} }
@ -199,11 +199,6 @@ polyMesh::readUpdateState meshReadUpdate(polyMesh& mesh)
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
// Specific to topoSet/setSet: quite often we want to block upon writing
// a set so we can immediately re-read it. So avoid use of threading
// for set writing.
fileOperations::collatedFileOperation::maxThreadFileBufferSize = 0;
timeSelector::addOptions(true, false); timeSelector::addOptions(true, false);
#include "addDictOption.H" #include "addDictOption.H"
#include "addRegionOption.H" #include "addRegionOption.H"
@ -307,6 +302,7 @@ int main(int argc, char *argv[])
<< "Failed writing set " << "Failed writing set "
<< currentSet().objectPath() << endl; << currentSet().objectPath() << endl;
} }
fileHandler().flush();
} }
break; break;
@ -347,6 +343,7 @@ int main(int argc, char *argv[])
<< "Failed writing set " << "Failed writing set "
<< currentSet().objectPath() << endl; << currentSet().objectPath() << endl;
} }
fileHandler().flush();
} }
break; break;
@ -359,6 +356,7 @@ int main(int argc, char *argv[])
<< "Failed writing set " << "Failed writing set "
<< currentSet().objectPath() << endl; << currentSet().objectPath() << endl;
} }
fileHandler().flush();
break; break;
case topoSetSource::INVERT: case topoSetSource::INVERT:
@ -370,6 +368,7 @@ int main(int argc, char *argv[])
<< "Failed writing set " << "Failed writing set "
<< currentSet().objectPath() << endl; << currentSet().objectPath() << endl;
} }
fileHandler().flush();
break; break;
case topoSetSource::REMOVE: case topoSetSource::REMOVE:

View File

@ -108,7 +108,6 @@ Usage
#include "pointFieldDecomposer.H" #include "pointFieldDecomposer.H"
#include "lagrangianFieldDecomposer.H" #include "lagrangianFieldDecomposer.H"
#include "decompositionModel.H" #include "decompositionModel.H"
#include "collatedFileOperation.H"
#include "faCFD.H" #include "faCFD.H"
#include "emptyFaPatch.H" #include "emptyFaPatch.H"
@ -523,12 +522,6 @@ int main(int argc, char *argv[])
// Decompose the mesh // Decompose the mesh
if (!decomposeFieldsOnly) if (!decomposeFieldsOnly)
{ {
// Disable buffering when writing mesh since we need to read
// it later on when decomposing the fields
float bufSz =
fileOperations::collatedFileOperation::maxThreadFileBufferSize;
fileOperations::collatedFileOperation::maxThreadFileBufferSize = 0;
mesh.decomposeMesh(); mesh.decomposeMesh();
mesh.writeDecomposition(decomposeSets); mesh.writeDecomposition(decomposeSets);
@ -587,8 +580,7 @@ int main(int argc, char *argv[])
<< " for use in manual decomposition." << endl; << " for use in manual decomposition." << endl;
} }
fileOperations::collatedFileOperation::maxThreadFileBufferSize = fileHandler().flush();
bufSz;
} }
@ -1486,8 +1478,14 @@ int main(int argc, char *argv[])
fieldDecomposer.decomposeFields(areaScalarFields); fieldDecomposer.decomposeFields(areaScalarFields);
fieldDecomposer.decomposeFields(areaVectorFields); fieldDecomposer.decomposeFields(areaVectorFields);
fieldDecomposer.decomposeFields(areaSphericalTensorFields); fieldDecomposer.decomposeFields
fieldDecomposer.decomposeFields(areaSymmTensorFields); (
areaSphericalTensorFields
);
fieldDecomposer.decomposeFields
(
areaSymmTensorFields
);
fieldDecomposer.decomposeFields(areaTensorFields); fieldDecomposer.decomposeFields(areaTensorFields);
fieldDecomposer.decomposeFields(edgeScalarFields); fieldDecomposer.decomposeFields(edgeScalarFields);

View File

@ -194,7 +194,8 @@ int Foam::vtkPVFoam::setTime(const std::vector<double>& requestTimes)
Time& runTime = dbPtr_(); Time& runTime = dbPtr_();
// Get times list // Get times list. Flush first to force refresh.
fileHandler().flush();
instantList Times = runTime.times(); instantList Times = runTime.times();
int nearestIndex = timeIndex_; int nearestIndex = timeIndex_;
@ -301,11 +302,6 @@ Foam::vtkPVFoam::vtkPVFoam
fileName FileName(vtkFileName); fileName FileName(vtkFileName);
// Make sure not to use the threaded version - it does not like
// being loaded as a shared library - static cleanup order is problematic.
// For now just disable the threaded writer.
fileOperations::collatedFileOperation::maxThreadFileBufferSize = 0;
// avoid argList and get rootPath/caseName directly from the file // avoid argList and get rootPath/caseName directly from the file
fileName fullCasePath(FileName.path()); fileName fullCasePath(FileName.path());
@ -729,6 +725,8 @@ std::vector<double> Foam::vtkPVFoam::findTimes(const bool skipZero) const
if (dbPtr_.valid()) if (dbPtr_.valid())
{ {
const Time& runTime = dbPtr_(); const Time& runTime = dbPtr_();
// Get times list. Flush first to force refresh.
fileHandler().flush();
instantList timeLst = runTime.times(); instantList timeLst = runTime.times();
// find the first time for which this mesh appears to exist // find the first time for which this mesh appears to exist

View File

@ -2,7 +2,7 @@
========= | ========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration | \\ / O peration |
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation \\ / A nd | Copyright (C) 2011-2018 OpenFOAM Foundation
\\/ M anipulation | Copyright (C) 2017 OpenCFD Ltd. \\/ M anipulation | Copyright (C) 2017 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
@ -225,6 +225,9 @@ void Foam::vtkPVFoam::updateInfoLagrangian
// List of lagrangian objects across all times // List of lagrangian objects across all times
HashSet<fileName> names; HashSet<fileName> names;
// Get times list. Flush first to force refresh.
fileHandler().flush();
for (const instant& t : dbPtr_().times()) for (const instant& t : dbPtr_().times())
{ {
names.insert names.insert
@ -697,6 +700,7 @@ void Foam::vtkPVFoam::updateInfoLagrangianFields
// to some of the clouds. // to some of the clouds.
HashTable<wordHashSet> fields; HashTable<wordHashSet> fields;
fileHandler().flush();
for (const instant& t : dbPtr_().times()) for (const instant& t : dbPtr_().times())
{ {
for (const auto& cloudName : cloudNames) for (const auto& cloudName : cloudNames)

View File

@ -611,6 +611,18 @@ bool Foam::fileOperations::collatedFileOperation::writeObject
} }
} }
void Foam::fileOperations::collatedFileOperation::flush() const
{
if (debug)
{
Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
<< endl;
}
masterUncollatedFileOperation::flush();
// Wait for thread to finish (note: also removes thread)
writer_.waitAll();
}
Foam::word Foam::fileOperations::collatedFileOperation::processorsDir Foam::word Foam::fileOperations::collatedFileOperation::processorsDir
( (

View File

@ -155,6 +155,9 @@ public:
// Other // Other
//- Forcibly wait until all output done. Flush any cached data
virtual void flush() const;
//- Actual name of processors dir //- Actual name of processors dir
virtual word processorsDir(const IOobject&) const; virtual word processorsDir(const IOobject&) const;

View File

@ -991,6 +991,17 @@ Foam::label Foam::fileOperation::nProcs
} }
void Foam::fileOperation::flush() const
{
if (debug)
{
Pout<< "fileOperation::flush : clearing processor directories cache"
<< endl;
}
procsDirs_.clear();
}
Foam::fileName Foam::fileOperation::processorsCasePath Foam::fileName Foam::fileOperation::processorsCasePath
( (
const IOobject& io, const IOobject& io,

View File

@ -514,6 +514,9 @@ public:
virtual void setTime(const Time&) const virtual void setTime(const Time&) const
{} {}
//- Forcibly wait until all output done. Flush any cached data
virtual void flush() const;
//- Generate path (like io.path) from root+casename with any //- Generate path (like io.path) from root+casename with any
// 'processorXXX' replaced by procDir (usually 'processsors') // 'processorXXX' replaced by procDir (usually 'processsors')
fileName processorsCasePath fileName processorsCasePath

View File

@ -2561,6 +2561,13 @@ Foam::fileOperations::masterUncollatedFileOperation::NewOFstream
} }
void Foam::fileOperations::masterUncollatedFileOperation::flush() const
{
fileOperation::flush();
times_.clear();
}
Foam::label Foam::fileOperations::masterUncollatedFileOperation::addWatch Foam::label Foam::fileOperations::masterUncollatedFileOperation::addWatch
( (
const fileName& fName const fileName& fName

View File

@ -769,6 +769,9 @@ public:
//- Callback for time change //- Callback for time change
virtual void setTime(const Time&) const; virtual void setTime(const Time&) const;
//- Forcibly wait until all output done. Flush any cached data
virtual void flush() const;
//- Return cached times //- Return cached times
const HashPtrTable<instantList>& times() const const HashPtrTable<instantList>& times() const
{ {