ENH: reduce communication argList at startup (#3142)

- replace point-to-point transmission of the argList args/options with
  a broadcast. This is sufficient for most cases (without distributed
  roots).

  For "normal" cases (non-distributed roots) this will replace the
  nProcs-1 messages with a single broadcast.

- for cases with distributed roots, distinguish between a single,
  identical root and different roots.  An identical root can also be
  subsequently handled with a broadcast. Different roots will still
  require individual point-to-point communication.

  For cases with distributed roots, it will add the overhead of an
  additional broadcast.
This commit is contained in:
Mark Olesen
2024-04-16 13:22:22 +02:00
parent 582b613d5f
commit d75c60d8ae

View File

@ -271,8 +271,8 @@ static bool printRootsSubscription
if (index == -1)
{
sortedProcs.append(host);
sortedRoots.append(i);
sortedProcs.push_back(host);
sortedRoots.push_back(i);
}
else if (roots[sortedRoots[index]] != root)
{
@ -360,9 +360,9 @@ void Foam::argList::addArgument
const string& usage
)
{
validArgs.append(argName);
validArgs.push_back(argName);
// The first program argument starts at 1 - obtain index after the append
// The first program argument starts at 1 - obtain index after push_back()
const label index = validArgs.size();
@ -470,7 +470,7 @@ void Foam::argList::addNote(const string& note)
{
if (!note.empty())
{
notes.append(note);
notes.push_back(note);
}
}
@ -1096,8 +1096,8 @@ Foam::argList::argList
if (strcmp(optName, "lib") == 0)
{
// The '-lib' option:
// Append name(s) to libs for later opening
libs().append(this->getList<fileName>(argi));
// Add name(s) to libs for later opening
libs().push_back(this->getList<fileName>(argi));
}
else if (strcmp(optName, "debug-switch") == 0)
{
@ -1262,7 +1262,7 @@ void Foam::argList::parse
const string timeString = clock::clockTime();
// Print the banner once only for parallel runs
if (Pstream::master() && bannerEnabled())
if (UPstream::master() && bannerEnabled())
{
IOobject::writeBanner(Info, true)
<< "Build : ";
@ -1353,16 +1353,16 @@ void Foam::argList::parse
// Collect machine/pid, and check that the build is identical
if (runControl_.parRun())
{
if (Pstream::master())
if (UPstream::master())
{
hostMachine.resize(Pstream::nProcs()-1);
hostProcs.resize(Pstream::nProcs()-1);
hostMachine.resize(UPstream::nProcs()-1);
hostProcs.resize(UPstream::nProcs()-1);
string procBuild;
label procPid;
int proci = 0;
for (const int subproci : Pstream::subProcs())
for (const int subproci : UPstream::subProcs())
{
IPstream fromSubproc(Pstream::commsTypes::scheduled, subproci);
IPstream fromSubproc(UPstream::commsTypes::scheduled, subproci);
fromSubproc >> procBuild >> hostMachine[proci] >> procPid;
@ -1384,8 +1384,8 @@ void Foam::argList::parse
{
OPstream toMaster
(
Pstream::commsTypes::scheduled,
Pstream::masterNo()
UPstream::commsTypes::scheduled,
UPstream::masterNo()
);
toMaster << foamVersion::build << Foam::hostName() << Foam::pid();
}
@ -1395,14 +1395,34 @@ void Foam::argList::parse
// Case is a single processor run unless it is running parallel
int nProcs = 1;
// Roots if running distributed
// Roots if running distributed. Only sized on the master
fileNameList roots;
enum distributedCodes
{
NON_DISTRIBUTED = 0,
DISTRIBUTED = 1,
DISTRIBUTED_SINGLE_ROOT = 2,
DISTRIBUTED_MULTIPLE_ROOTS = 3
};
// Track which type of distributed roots etc are being used
label distributedType
(
runControl_.distributed()
? distributedCodes::DISTRIBUTED
: distributedCodes::NON_DISTRIBUTED
);
// Some cases where knowing the writeFormat can be useful...
// label writeFormat(-1);
// If this actually is a parallel run
if (runControl_.parRun())
{
// For the master
if (Pstream::master())
if (UPstream::master())
{
// Establish rootPath_/globalCase_/case_ for master
setCasePaths();
@ -1414,7 +1434,7 @@ void Foam::argList::parse
{
bool adjustOpt = false;
if (isDir(source))
if (Foam::isDir(source))
{
source /= "decomposeParDict";
adjustOpt = true;
@ -1441,6 +1461,7 @@ void Foam::argList::parse
{
source = "-roots";
runControl_.distributed(true);
distributedType = distributedCodes::DISTRIBUTED;
if (roots.empty())
{
@ -1457,6 +1478,7 @@ void Foam::argList::parse
{
source = "-hostRoots";
runControl_.distributed(true);
distributedType = distributedCodes::DISTRIBUTED;
ITstream is(this->lookup("hostRoots"));
@ -1471,7 +1493,7 @@ void Foam::argList::parse
}
// Match machine names to roots
roots.resize(Pstream::nProcs()-1, fileName::null);
roots.resize(UPstream::nProcs()-1, fileName::null);
for (const auto& hostRoot : hostRoots)
{
labelList matched
@ -1511,7 +1533,7 @@ void Foam::argList::parse
dictNProcs = roots.size()+1;
}
}
else if (checkProcessorDirectories_ && Pstream::nProcs() > 1)
else if (checkProcessorDirectories_ && UPstream::nProcs() > 1)
{
// Check values from decomposeParDict
@ -1526,7 +1548,7 @@ void Foam::argList::parse
// the masterUncollated/collated handler. Note that we
// also have to protect the actual dictionary parsing since
// it might trigger file access (e.g. #include, #codeStream)
const bool oldParRun = Pstream::parRun(false);
const bool oldParRun = UPstream::parRun(false);
// Note: non-parallel running might update
// fileOperation::nProcs() so store & restore below
const label nOldProcs = fileHandler().nProcs();
@ -1550,6 +1572,8 @@ void Foam::argList::parse
{
nDomainsReadOpt = IOobjectOption::MUST_READ;
runControl_.distributed(true);
distributedType = distributedCodes::DISTRIBUTED;
decompDict.readEntry("roots", roots);
if (roots.empty())
@ -1587,10 +1611,10 @@ void Foam::argList::parse
}
}
Pstream::parRun(oldParRun); // Restore parallel state
UPstream::parRun(oldParRun); // Restore parallel state
const_cast<fileOperation&>(fileHandler()).nProcs(nOldProcs);
if (Pstream::nProcs() == 1)
if (UPstream::nProcs() == 1)
{
Warning
<< "Running parallel on single processor. This only"
@ -1599,22 +1623,55 @@ void Foam::argList::parse
}
}
// Convenience:
// when a single root is specified, use it for all processes
if (roots.size() == 1)
{
const fileName rootName(roots[0]);
roots.resize(Pstream::nProcs()-1, rootName);
// Adjust dictNProcs for command-line '-roots' option
if (dictNProcs <= 0)
// Distributed roots
if (!roots.empty())
{
for (fileName& dir : roots)
{
dictNProcs = roots.size()+1;
dir.expand();
}
// Identical root specified everywhere?
// - use optimized single-root variant
if (roots.size() > 1 && roots.uniform())
{
roots.resize(1);
}
if (roots.size() == 1)
{
// Single root specified, use it for all processes
distributedType =
distributedCodes::DISTRIBUTED_SINGLE_ROOT;
// Adjust dictNProcs for command-line '-roots' option
if (dictNProcs <= 0)
{
dictNProcs = UPstream::nProcs();
}
}
else if (roots.size() > 1)
{
distributedType =
distributedCodes::DISTRIBUTED_MULTIPLE_ROOTS;
if (roots.size() != UPstream::nProcs()-1)
{
FatalError
<< "Number of roots " << roots.size()
<< " != number of sub-ranks "
<< UPstream::nProcs()-1
<< exit(FatalError);
}
}
}
//
// Check number of processors.
//
// nProcs => number of actual procs
// dictNProcs => number of procs specified in decompositionDict
// nProcDirs => number of processor directories
@ -1626,73 +1683,33 @@ void Foam::argList::parse
if
(
checkProcessorDirectories_
&& Pstream::nProcs() > 1
&& dictNProcs > Pstream::nProcs()
&& UPstream::nProcs() > 1
)
{
FatalError
<< this->relativePath(source)
<< " specifies " << dictNProcs
<< " processors but job was started with "
<< Pstream::nProcs() << " processors."
<< exit(FatalError);
}
// Distributed data
if (roots.size())
{
if (roots.size() != Pstream::nProcs()-1)
if (dictNProcs > UPstream::nProcs())
{
FatalError
<< "number of entries in roots "
<< roots.size()
<< " is not equal to the number of sub-processes "
<< Pstream::nProcs()-1
<< this->relativePath(source)
<< " specifies " << dictNProcs
<< " processors but job was started with "
<< UPstream::nProcs() << " ranks."
<< exit(FatalError);
}
for (fileName& dir : roots)
{
dir.expand();
}
// Distribute the master's argument list (with new root)
const bool hadCaseOpt = options_.contains("case");
for (const int subproci : Pstream::subProcs())
{
options_.set("case", roots[subproci-1]/globalCase_);
OPstream toProc(Pstream::commsTypes::scheduled, subproci);
toProc
<< args_ << options_
<< runControl_.distributed()
<< label(runControl_.dryRun())
<< label(runControl_.verbose());
}
options_.erase("case");
// Restore [-case dir]
if (hadCaseOpt)
{
options_.set("case", rootPath_/globalCase_);
}
}
else
{
// Possibly going to fewer processors.
// Check if all procDirs are there.
// NOTE: Only works when not using distributed roots!
if
(
checkProcessorDirectories_
&& Pstream::nProcs() > 1
// Can only rely on directory scanning *without* distributed roots!
roots.empty()
&& dictNProcs >= 1
&& dictNProcs < Pstream::nProcs()
&& dictNProcs < UPstream::nProcs()
)
{
label nProcDirs = 0;
{
const bool oldParRun(UPstream::parRun(false));
const bool oldParRun = UPstream::parRun(false);
// Don't cache processor directories (probably not
// needed since master-only
const int oldCacheLevel(fileOperation::cacheLevel(0));
@ -1711,60 +1728,110 @@ void Foam::argList::parse
UPstream::parRun(oldParRun);
}
if (nProcDirs < UPstream::nProcs())
{
FatalError
<< "number of processor directories = "
<< nProcDirs
<< " is not equal to the number of processors = "
<< "Number of processor directories = " << nProcDirs
<< " is not equal to the number of ranks = "
<< UPstream::nProcs()
<< exit(FatalError);
}
}
}
// Distribute the master's argument list (unaltered)
for (const int proci : UPstream::subProcs())
{
OPstream toProc(UPstream::commsTypes::scheduled, proci);
toProc
<< args_ << options_
<< runControl_.distributed()
<< label(runControl_.dryRun())
<< label(runControl_.verbose());
}
// Broadcast the master's argument list (unaltered)
{
OPBstream toProcs(UPstream::worldComm);
toProcs
<< args_ << options_
<< distributedType
<< label(runControl_.dryRun())
<< label(runControl_.verbose());
}
}
else
{
// Collect the master's argument list
bool isDistributed;
// Receive the broadcasted master's argument list
label numDryRun, numVerbose;
IPstream fromMaster
(
Pstream::commsTypes::scheduled,
Pstream::masterNo()
);
IPBstream fromMaster(UPstream::worldComm);
fromMaster
>> args_ >> options_
>> isDistributed
>> distributedType
>> numDryRun >> numVerbose;
runControl_.distributed(isDistributed);
runControl_.distributed(distributedType);
runControl_.dryRun(numDryRun);
runControl_.verbose(numVerbose);
}
// Establish rootPath_/globalCase_/case_ for sub-process
// Final handling of distributed roots (if any)
if
(
distributedType == distributedCodes::DISTRIBUTED_SINGLE_ROOT
)
{
// The same root for all sub-ranks
// - use broadcast to transmit value
fileName newCasePath;
if (UPstream::master())
{
newCasePath = roots[0]/globalCase_;
OPBstream::send(newCasePath); // worldComm
}
else
{
IPBstream::recv(newCasePath); // worldComm
options_.set("case", newCasePath);
}
}
else if
(
distributedType == distributedCodes::DISTRIBUTED_MULTIPLE_ROOTS
)
{
// Different roots for each sub-rank
// - use point-to-point communication to transmit values
fileName newCasePath;
if (UPstream::master())
{
for (const int subproci : UPstream::subProcs())
{
newCasePath = roots[subproci-1]/globalCase_;
OPstream::send(newCasePath, subproci); // worldComm
}
}
else
{
IPstream::recv(newCasePath, UPstream::masterNo()); // worldComm
options_.set("case", newCasePath);
}
}
// Establish rootPath_/globalCase_/case_ for sub-process
if (!UPstream::master())
{
setCasePaths();
}
nProcs = Pstream::nProcs();
if (Pstream::nProcs() > 1)
nProcs = UPstream::nProcs();
if (UPstream::nProcs() > 1)
{
case_ = globalCase_/("processor" + Foam::name(Pstream::myProcNo()));
case_ =
(
globalCase_
/ ("processor" + Foam::name(UPstream::myProcNo()))
);
}
else
{
@ -1778,6 +1845,7 @@ void Foam::argList::parse
case_ = globalCase_; // Redundant, but extra safety?
}
// If needed, adjust fileHandler for distributed roots
if (runControl_.distributed() && fileOperation::fileHandlerPtr_)
{
@ -1834,7 +1902,7 @@ void Foam::argList::parse
List<fileNameList> rankToDirs(UPstream::nProcs());
if (UPstream::master())
{
const bool oldParRun = Pstream::parRun(false);
const bool oldParRun = UPstream::parRun(false);
// Note: non-parallel running might update
// fileOperation::nProcs() so store & restore below
const label nOldProcs = fileHandler().nProcs();
@ -2025,7 +2093,7 @@ void Foam::argList::parse
sigQuit::set(bannerEnabled());
sigSegv::set(bannerEnabled());
if (Pstream::master() && bannerEnabled())
if (UPstream::master() && bannerEnabled())
{
Info<< "fileModificationChecking : "
<< "Monitoring run-time modified files using "
@ -2274,7 +2342,7 @@ bool Foam::argList::check(bool checkArgs, bool checkOpts) const
{
bool ok = true;
if (Pstream::master())
if (UPstream::master())
{
const label nargs = args_.size()-1;
if (checkArgs && nargs != validArgs.size())
@ -2332,7 +2400,7 @@ bool Foam::argList::checkRootCase() const
const fileName pathDir(fileHandler().filePath(path(), false));
if (checkProcessorDirectories_ && pathDir.empty() && Pstream::master())
if (checkProcessorDirectories_ && pathDir.empty() && UPstream::master())
{
// Allow non-existent processor directories on sub-processes,
// to be created later (e.g. redistributePar)