ENH: redistributePar: loose matching of processors directories

This commit is contained in:
mattijs
2023-10-19 11:43:10 +01:00
committed by Andrew Heather
parent 9ed0bb1e4f
commit 910713398e
5 changed files with 245 additions and 176 deletions

View File

@ -117,7 +117,7 @@ const int debug(::Foam::debug::debugSwitch("redistributePar", 0));
// Allocate a new file handler on valid processors only
// retaining the original IO ranks if possible
autoPtr<fileOperation>
getNewHandler(const boolUList& useProc, bool verbose = true)
getNewHandler(const boolUList& useProc, const bool verbose = true)
{
autoPtr<fileOperation> handler
(
@ -127,6 +127,7 @@ getNewHandler(const boolUList& useProc, bool verbose = true)
if (::debug && handler)
{
Pout<< "Allocated " << handler().info()
<< " myProcNo:" << UPstream::myProcNo(handler().comm())
<< " ptr:" << Foam::name(handler.get()) << endl;
}
@ -154,9 +155,6 @@ void createTimeDirs(const fileName& path)
instantList masterTimeDirs;
if (Pstream::master())
{
//const bool oldParRun = Pstream::parRun(false);
//timeDirs = Time::findTimes(path, "constant");
//Pstream::parRun(oldParRun); // Restore parallel state
masterTimeDirs = localTimeDirs;
}
Pstream::broadcast(masterTimeDirs);
@ -213,16 +211,10 @@ void copyUniform
// - write using writeDb + writeHandler
fileName readPath;
if (readHandler)
{
auto oldHandler = fileOperation::fileHandler(readHandler);
const label oldComm = UPstream::commWorld(fileHandler().comm());
//Pout<< "** copyUniform: switching to handler:" << fileHandler().type()
// << " with comm:" << fileHandler().comm()
// << " with procs:" << UPstream::procID(fileHandler().comm())
// << endl;
const label oldComm = UPstream::commWorld(readHandler().comm());
Time& readTime = const_cast<Time&>(readDb.time());
bool oldProcCase = readTime.processorCase();
@ -237,7 +229,7 @@ void copyUniform
}
// Detect uniform/ at original database + time
readPath = fileHandler().dirPath
readPath = readHandler().dirPath
(
false, // local directory
IOobject("uniform", readTimeName, readDb),
@ -245,16 +237,8 @@ void copyUniform
);
readHandler = fileOperation::fileHandler(oldHandler);
UPstream::commWorld(oldComm);
//Pout<< "** copyUniform:"
// << " switched back to handler:" << fileHandler().type()
// << " with comm:" << fileHandler().comm()
// << " with procs:" << UPstream::procID(fileHandler().comm())
// << endl;
if (decompose)
{
// Reset caseName on master
@ -263,7 +247,6 @@ void copyUniform
readTime.processorCase(oldProcCase);
}
}
Pstream::broadcast(readPath, UPstream::worldComm);
if (!readPath.empty())
@ -293,16 +276,15 @@ void copyUniform
// Switch to writeHandler
if (writeHandler)
{
//const label oldWorldComm = UPstream::worldComm;
auto oldHandler = fileOperation::fileHandler(writeHandler);
// Check: fileHandler.comm() is size 1 for uncollated
const label writeComm = fileHandler().comm();
//UPstream::worldComm = writeComm;
if (reconstruct)
{
const bool oldParRun = UPstream::parRun(false);
const label oldNumProcs(fileHandler().nProcs());
const fileName writePath
(
fileHandler().objectPath
@ -312,6 +294,7 @@ void copyUniform
)
);
fileHandler().cp(readPath, writePath);
const_cast<fileOperation&>(fileHandler()).nProcs(oldNumProcs);
UPstream::parRun(oldParRun);
}
else
@ -340,7 +323,6 @@ void copyUniform
);
}
writeHandler = fileOperation::fileHandler(oldHandler);
//UPstream::worldComm = oldWorldComm;
}
}
}
@ -686,6 +668,7 @@ autoPtr<mapDistributePolyMesh> redistributeAndWrite
//Pout<< "Before distribution:" << endl;
//printMeshData(mesh);
// Storage of fields
PtrList<volScalarField> volScalarFields;
@ -769,21 +752,9 @@ autoPtr<mapDistributePolyMesh> redistributeAndWrite
auto oldHandler = fileOperation::fileHandler(readHandler);
const label oldComm = UPstream::commWorld(fileHandler().comm());
//Pout<< "** redistributeAndWrite:"
// << " switching to handler:" << fileHandler().type()
// << " with comm:" << fileHandler().comm()
// << " with procs:" << UPstream::procID(fileHandler().comm())
// << endl;
objects = IOobjectList(mesh, runTime.timeName());
readHandler = fileOperation::fileHandler(oldHandler);
UPstream::commWorld(oldComm);
//Pout<< "** redistributeAndWrite:"
// << " switched back to handler:" << fileHandler().type()
// << " with comm:" << fileHandler().comm()
// << " with procs:" << UPstream::procID(fileHandler().comm())
// << endl;
}
if (decompose)
@ -1271,6 +1242,12 @@ int main(int argc, char *argv[])
true // Advanced option
);
//- Disable caching of times/processor dirs etc. Cause massive parallel
// problems when e.g decomposing.
fileOperation::cacheLevel(0);
// Handle arguments
// ~~~~~~~~~~~~~~~~
// (replacement for setRootCase that does not abort)
@ -1287,16 +1264,17 @@ int main(int argc, char *argv[])
// - reconstruct - reads parallel, write on master only and to parent
// directory
//- Disable caching of times etc. Cause massive parallel problems when e.g
// decomposing.
fileOperation::cacheLevel(0);
// Detect if running data-distributed (processors cannot see all other
// processors' files)
const bool nfs = !fileHandler().distributed();
// Switch off parallel synchronisation of cached time directories
//fileHandler().distributed(true);
// Set up loose processorsXXX directory matching (for collated) so e.g.
// when checking for -latestTime we don't miss anything. Once we know
// the time, actual number of processors etc we switch back to strict
// matching.
fileOperation::nProcsFilter(0);
// Read handler on processors with a volMesh
refPtr<fileOperation> volMeshReadHandler;
@ -1304,11 +1282,12 @@ int main(int argc, char *argv[])
// Read handler on processors with an areaMesh
refPtr<fileOperation> areaMeshReadHandler;
// Handler for master-only operation
refPtr<fileOperation> masterOnlyHandler
(
getNewHandler(boolList(one{}, true))
);
// Handler for master-only operation (read/writing from/to undecomposed)
refPtr<fileOperation> masterOnlyHandler;
if (UPstream::master(UPstream::worldComm))
{
masterOnlyHandler = fileOperation::NewUncollated();
}
// Need this line since we don't include "setRootCase.H"
@ -1362,6 +1341,25 @@ int main(int argc, char *argv[])
<< nl << endl;
}
if (decompose || reconstruct)
{
// The UPstream::nProcs is either the source or destination procs
fileOperation::nProcsFilter(UPstream::nProcs());
InfoOrPout<< "Switching to exact matching for "
<< fileOperation::processorsBaseDir + Foam::name(UPstream::nProcs())
<< " processor directories"
<< nl << endl;
}
else
{
// Redistribute mode. Accept any processorsXXX naming since we don't
// know yet what the source/target number of processors is
fileOperation::nProcsFilter(0);
InfoOrPout<< "Switching to matching any "
<< fileOperation::processorsBaseDir << " directory" << nl << endl;
}
if ((decompose || reconstruct) && !overwrite)
{
overwrite = true;
@ -1400,6 +1398,7 @@ int main(int argc, char *argv[])
fileHandler().mkDir(args.globalPath());
}
// Check if we have processor directories. Ideally would like to
// use fileHandler().dirPath here but we don't have runTime yet and
// want to delay constructing runTime until we've synced all time
@ -1427,6 +1426,17 @@ int main(int argc, char *argv[])
}
// If master changed to decompose mode make sure all nodes know about it
Pstream::broadcast(decompose);
if (decompose)
{
// The UPstream::nProcs is either the source or destination procs
fileOperation::nProcsFilter(UPstream::nProcs());
InfoOrPout<< "Switching to exact matching for "
<< fileOperation::processorsBaseDir + Foam::name(UPstream::nProcs())
<< " processor directories"
<< nl << endl;
}
// If running distributed we have problem of new processors not finding
// a system/controlDict. However if we switch on the master-only reading
@ -1444,13 +1454,6 @@ int main(int argc, char *argv[])
// Construct time
// ~~~~~~~~~~~~~~
// Disable master-only file checking since
// - uncollated : interferes with running read-handler on less processors
// than nProcs (when redistributing from less to more)
// - (masterUn)collated : does not use timeStampMaster anyway
Info<< "Disabling uncollated master reading" << nl << endl;
IOobject::fileModificationChecking = IOobject::timeStamp;
// Replace #include "createTime.H" with our own version
// that has MUST_READ instead of READ_MODIFIED
@ -1616,7 +1619,7 @@ int main(int argc, char *argv[])
// - check for 'faces' file (polyMesh)
// - check for 'faceLabels' file (faMesh)
boolList volMeshOnProc;
boolList areaMeshOnProc;
boolList areaMeshOnProc(UPstream::nProcs(), false);
volMeshOnProc = haveMeshFile
(
@ -1759,6 +1762,7 @@ int main(int argc, char *argv[])
<< endl;
const bool oldParRun = Pstream::parRun(false);
const label oldNumProcs(fileHandler().nProcs());
// Volume
{
@ -1794,6 +1798,10 @@ int main(int argc, char *argv[])
areaMeshHaveUndecomposed = labelsIO.headerOk();
}
const_cast<fileOperation&>
(
fileHandler()
).nProcs(oldNumProcs);
Pstream::parRun(oldParRun); // Restore parallel state
}
@ -1990,11 +1998,16 @@ int main(int argc, char *argv[])
runTime.caseName() = baseRunTime.caseName();
const bool oldProcCase(runTime.processorCase(false));
const bool oldParRun(Pstream::parRun(false));
const label oldNumProcs(fileHandler().nProcs());
areaBaseMeshPtr().write();
// Now we've written all. Reset caseName on master
InfoOrPout<< "Restoring caseName" << endl;
const_cast<fileOperation&>
(
fileHandler()
).nProcs(oldNumProcs);
Pstream::parRun(oldParRun);
runTime.caseName() = proc0CaseName;
runTime.processorCase(oldProcCase);
@ -2421,26 +2434,19 @@ int main(int argc, char *argv[])
// slave : dummy mesh
// redistribute : all read mesh or dummy mesh
Time& readRunTime =
(
(decompose)
? baseRunTime
: runTime
);
// Time coming from processor0 (or undecomposed if no processor0)
scalar masterTime;
if (decompose)
{
// Use base time. This is to handle e.g. startTime = latestTime
// which will not do anything if there are no processor directories
masterTime = timeSelector::selectIfPresent
(
baseRunTime,
args
)[0].value();
}
else
{
masterTime = timeSelector::selectIfPresent
(
runTime,
args
)[0].value();
}
scalar masterTime = timeSelector::selectIfPresent
(
readRunTime,
args
)[0].value();
Pstream::broadcast(masterTime);
InfoOrPout
<< "Setting time to that of master or undecomposed case : "
@ -2448,6 +2454,9 @@ int main(int argc, char *argv[])
runTime.setTime(masterTime, 0);
baseRunTime.setTime(masterTime, 0);
// Save old time name (since might be incremented)
const word oldTimeName(runTime.timeName());
@ -2490,7 +2499,8 @@ int main(int argc, char *argv[])
}
const bool oldParRun = Pstream::parRun(false);
volMeshMasterInstance = runTime.findInstance
const label oldNumProcs(fileHandler().nProcs());
volMeshMasterInstance = readRunTime.findInstance
(
volMeshSubDir,
"faces",
@ -2499,7 +2509,7 @@ int main(int argc, char *argv[])
if (doFiniteArea)
{
areaMeshMasterInstance = runTime.findInstance
areaMeshMasterInstance = readRunTime.findInstance
(
areaMeshSubDir,
"faceLabels",
@ -2515,7 +2525,7 @@ int main(int argc, char *argv[])
(
haveMeshFile
(
runTime,
readRunTime,
areaMeshMasterInstance/areaMeshSubDir,
"faceLabels",
false // verbose=false
@ -2529,6 +2539,7 @@ int main(int argc, char *argv[])
}
}
const_cast<fileOperation&>(fileHandler()).nProcs(oldNumProcs);
Pstream::parRun(oldParRun); // Restore parallel state
if (decompose)
@ -2564,25 +2575,59 @@ int main(int argc, char *argv[])
boolList volMeshOnProc;
boolList areaMeshOnProc;
volMeshOnProc = haveMeshFile
(
runTime,
volMeshMasterInstance/volMeshSubDir,
"faces"
);
if (decompose)
{
// Already determined above that master can read 'faces' file.
// This avoids doing all the casename setting/restoring again.
volMeshOnProc.setSize(UPstream::nProcs(), false);
volMeshOnProc[UPstream::masterNo()] = volMeshHaveUndecomposed;
}
else
{
// All check if can read 'faces' file
volMeshOnProc = haveMeshFile
(
runTime,
volMeshMasterInstance/volMeshSubDir,
"faces"
);
}
// Create handler for reading
newHandler(volMeshOnProc, volMeshReadHandler);
// Now we've determined which processors are reading switch back
// to exact matching of 'processorsXXX' directory names.
// - this determines the processorsXXX fileNames
// - the XXX comes from the number of read processors
// - also adapt the masterOnlyReader (used in copyUniform)
fileOperation::nProcsFilter
(
findIndices(volMeshOnProc, true).size()
);
if (doFiniteArea)
{
areaMeshOnProc = haveMeshFile
(
runTime,
areaMeshMasterInstance/areaMeshSubDir,
"faceLabels"
);
if (decompose)
{
// Already determined above that master can read
// 'faceLabels' file.
areaMeshOnProc.setSize(UPstream::nProcs(), false);
areaMeshOnProc[UPstream::masterNo()] =
areaMeshHaveUndecomposed;
}
else
{
areaMeshOnProc = haveMeshFile
(
runTime,
areaMeshMasterInstance/areaMeshSubDir,
"faceLabels"
);
}
// Create handler for reading
if (areaMeshOnProc == volMeshOnProc)
@ -2599,6 +2644,7 @@ int main(int argc, char *argv[])
}
}
// Prior to loadOrCreateMesh, note which meshes already exist
// for the current file handler.
// - where mesh would be written if it didn't exist already.
@ -2723,18 +2769,6 @@ int main(int argc, char *argv[])
)
)
{
//areaMeshPtr = faMeshTools::loadOrCreateMesh
//(
// IOobject
// (
// regionName,
// areaMeshMasterInstance,
// runTime,
// IOobjectOption::MUST_READ
// ),
// mesh, // <- The referenced polyMesh (from above)
// decompose
//);
areaMeshPtr = faMeshTools::loadOrCreateMesh
(
IOobject
@ -2762,12 +2796,6 @@ int main(int argc, char *argv[])
const label nOldCells = mesh.nCells();
// const label nOldAreaFaces =
// (areaMeshPtr ? areaMeshPtr().nFaces() : 0);
//
//Pout<< "Loaded mesh : nCells:" << nOldCells
// << " nPatches:" << mesh.boundaryMesh().size() << endl;
//Pout<< "Loaded area mesh : nFaces:" << nOldAreaFaces << endl;
// Determine decomposition
// ~~~~~~~~~~~~~~~~~~~~~~~
@ -2788,7 +2816,6 @@ int main(int argc, char *argv[])
finalDecomp
);
if (dryrun)
{
continue;
@ -2850,6 +2877,7 @@ int main(int argc, char *argv[])
runTime.processorCase(oldProcCase);
}
// Load fields, do all distribution (mesh and fields)
// - but not lagrangian fields; these are done later
autoPtr<mapDistributePolyMesh> distMap = redistributeAndWrite
@ -2875,7 +2903,6 @@ int main(int argc, char *argv[])
mesh
);
// Redistribute any clouds
redistributeLagrangian
(
@ -2918,11 +2945,16 @@ int main(int argc, char *argv[])
runTime.caseName() = baseRunTime.caseName();
const bool oldProcCase(runTime.processorCase(false));
const bool oldParRun = UPstream::parRun(false);
const label oldNumProcs(fileHandler().nProcs());
areaProcMeshPtr->write();
// Now we've written all. Reset caseName on master
InfoOrPout<< "Restoring caseName" << endl;
const_cast<fileOperation&>
(
fileHandler()
).nProcs(oldNumProcs);
UPstream::parRun(oldParRun);
runTime.caseName() = proc0CaseName;
runTime.processorCase(oldProcCase);
@ -2981,11 +3013,19 @@ int main(int argc, char *argv[])
// Get reference to standard write handler
refPtr<fileOperation> defaultHandler;
defaultHandler.ref(const_cast<fileOperation&>(fileHandler()));
if (writeHandler)
{
defaultHandler.ref(writeHandler.ref());
}
else
{
defaultHandler.ref(const_cast<fileOperation&>(fileHandler()));
}
copyUniform
(
masterOnlyHandler, // read handler
volMeshReadHandler, // read handler
defaultHandler, //TBD: should be all IOranks
reconstruct, // reconstruct
@ -3001,11 +3041,18 @@ int main(int argc, char *argv[])
// Get reference to standard write handler
refPtr<fileOperation> defaultHandler;
defaultHandler.ref(const_cast<fileOperation&>(fileHandler()));
if (writeHandler)
{
defaultHandler.ref(writeHandler.ref());
}
else
{
defaultHandler.ref(const_cast<fileOperation&>(fileHandler()));
}
copyUniform
(
masterOnlyHandler, // read handler
volMeshReadHandler, // read handler
defaultHandler, //TBD: should be all IOranks
reconstruct, // reconstruct (=false)
@ -3013,11 +3060,7 @@ int main(int argc, char *argv[])
oldTimeName, // provided read time
(decompose ? baseRunTime.caseName() : proc0CaseName),
( // read location
decompose
? baseRunTime
: runTime
),
readRunTime,
runTime // writing location
);
}