Merge branch 'update-argList-mpi-startup' into 'develop'

reduce communication argList at startup

See merge request Development/openfoam!678
This commit is contained in:
Andrew Heather
2024-04-23 09:20:19 +00:00
16 changed files with 591 additions and 220 deletions

View File

@ -41,14 +41,7 @@ Description
#include "PstreamReduceOps.H"
#include "SHA1.H"
// Include MPI without any C++ bindings
#ifndef MPICH_SKIP_MPICXX
#define MPICH_SKIP_MPICXX
#endif
#ifndef OMPI_SKIP_MPICXX
#define OMPI_SKIP_MPICXX
#endif
#include <mpi.h>
#include "openfoam_mpi.H"
using namespace Foam;

View File

@ -41,14 +41,7 @@ Description
#include "StringStream.H"
#include "Random.H"
// Include MPI without any C++ bindings
#ifndef MPICH_SKIP_MPICXX
#define MPICH_SKIP_MPICXX
#endif
#ifndef OMPI_SKIP_MPICXX
#define OMPI_SKIP_MPICXX
#endif
#include <mpi.h>
#include "openfoam_mpi.H"
using namespace Foam;

View File

@ -40,14 +40,7 @@ Description
#include "Pstream.H"
#include <iostream>
// Include MPI without any C++ bindings
#ifndef MPICH_SKIP_MPICXX
#define MPICH_SKIP_MPICXX
#endif
#ifndef OMPI_SKIP_MPICXX
#define OMPI_SKIP_MPICXX
#endif
#include <mpi.h>
#include "openfoam_mpi.H"
using namespace Foam;

View File

@ -34,14 +34,7 @@ Description
#include "globalMeshData.H"
#include "OFstream.H"
// Include MPI without any C++ bindings
#ifndef MPICH_SKIP_MPICXX
#define MPICH_SKIP_MPICXX
#endif
#ifndef OMPI_SKIP_MPICXX
#define OMPI_SKIP_MPICXX
#endif
#include <mpi.h>
#include "openfoam_mpi.H"
using namespace Foam;

View File

@ -56,6 +56,9 @@ namespace Foam
//- Implementation details for UPstream/Pstream/MPI etc.
namespace PstreamDetail {}
//- Interface handling for UPstream/Pstream/MPI etc.
namespace PstreamUtils {}
/*---------------------------------------------------------------------------*\
Class UPstream Declaration
\*---------------------------------------------------------------------------*/
@ -88,6 +91,9 @@ public:
// Public Classes
//- Wrapper for MPI_Comm
class Communicator; // Forward Declaration
//- Wrapper for MPI_Request
class Request; // Forward Declaration
@ -1218,12 +1224,119 @@ public:
};
/*---------------------------------------------------------------------------*\
Class UPstream::Communicator Declaration
\*---------------------------------------------------------------------------*/
//- An opaque wrapper for MPI_Comm with a vendor-independent
//- representation without any \c <mpi.h> header.
// The MPI standard states that MPI_Comm is always an opaque object.
// Generally it is either an integer (eg, mpich) or a pointer (eg, openmpi).
class UPstream::Communicator
{
public:
// Public Types
//- Storage for MPI_Comm (as integer or pointer)
typedef std::intptr_t value_type;
private:
// Private Data
//- The MPI_Comm (as wrapped value)
value_type value_;
public:
// Generated Methods
//- Copy construct
Communicator(const Communicator&) noexcept = default;
//- Move construct
Communicator(Communicator&&) noexcept = default;
//- Copy assignment
Communicator& operator=(const Communicator&) noexcept = default;
//- Move assignment
Communicator& operator=(Communicator&&) noexcept = default;
// Member Operators
//- Test for equality
bool operator==(const Communicator& rhs) const noexcept
{
return (value_ == rhs.value_);
}
//- Test for inequality
bool operator!=(const Communicator& rhs) const noexcept
{
return (value_ != rhs.value_);
}
// Constructors
//- Default construct as MPI_COMM_NULL
Communicator() noexcept;
//- Construct from MPI_Comm (as pointer type)
explicit Communicator(const void* p) noexcept
:
value_(reinterpret_cast<value_type>(p))
{}
//- Construct from MPI_Comm (as integer type)
explicit Communicator(value_type val) noexcept
:
value_(val)
{}
// Factory Methods
//- Transcribe internally indexed communicator to wrapped value.
// Example,
// \code
// PstreamUtils::Cast::to_mpi
// (
// UPstream::Communicator::lookup(UPstream::commWorld())
// )
// \endcode
static Communicator lookup(const label comm);
// Member Functions
//- Return raw value
value_type value() const noexcept { return value_; }
//- Return as pointer value
const void* pointer() const noexcept
{
return reinterpret_cast<const void*>(value_);
}
//- True if not equal to MPI_COMM_NULL
bool good() const noexcept;
//- Reset to default constructed value (MPI_COMM_NULL)
void reset() noexcept;
};
/*---------------------------------------------------------------------------*\
Class UPstream::Request Declaration
\*---------------------------------------------------------------------------*/
//- An opaque wrapper for MPI_Request with a vendor-independent
//- representation independent of any \c <mpi.h> header
//- representation without any \c <mpi.h> header.
// The MPI standard states that MPI_Request is always an opaque object.
// Generally it is either an integer (eg, mpich) or a pointer (eg, openmpi).
class UPstream::Request

View File

@ -271,8 +271,8 @@ static bool printRootsSubscription
if (index == -1)
{
sortedProcs.append(host);
sortedRoots.append(i);
sortedProcs.push_back(host);
sortedRoots.push_back(i);
}
else if (roots[sortedRoots[index]] != root)
{
@ -360,9 +360,9 @@ void Foam::argList::addArgument
const string& usage
)
{
validArgs.append(argName);
validArgs.push_back(argName);
// The first program argument starts at 1 - obtain index after the append
// The first program argument starts at 1 - obtain index after push_back()
const label index = validArgs.size();
@ -470,7 +470,7 @@ void Foam::argList::addNote(const string& note)
{
if (!note.empty())
{
notes.append(note);
notes.push_back(note);
}
}
@ -1096,8 +1096,8 @@ Foam::argList::argList
if (strcmp(optName, "lib") == 0)
{
// The '-lib' option:
// Append name(s) to libs for later opening
libs().append(this->getList<fileName>(argi));
// Add name(s) to libs for later opening
libs().push_back(this->getList<fileName>(argi));
}
else if (strcmp(optName, "debug-switch") == 0)
{
@ -1262,7 +1262,7 @@ void Foam::argList::parse
const string timeString = clock::clockTime();
// Print the banner once only for parallel runs
if (Pstream::master() && bannerEnabled())
if (UPstream::master() && bannerEnabled())
{
IOobject::writeBanner(Info, true)
<< "Build : ";
@ -1353,16 +1353,16 @@ void Foam::argList::parse
// Collect machine/pid, and check that the build is identical
if (runControl_.parRun())
{
if (Pstream::master())
if (UPstream::master())
{
hostMachine.resize(Pstream::nProcs()-1);
hostProcs.resize(Pstream::nProcs()-1);
hostMachine.resize(UPstream::nProcs()-1);
hostProcs.resize(UPstream::nProcs()-1);
string procBuild;
label procPid;
int proci = 0;
for (const int subproci : Pstream::subProcs())
for (const int subproci : UPstream::subProcs())
{
IPstream fromSubproc(Pstream::commsTypes::scheduled, subproci);
IPstream fromSubproc(UPstream::commsTypes::scheduled, subproci);
fromSubproc >> procBuild >> hostMachine[proci] >> procPid;
@ -1384,8 +1384,8 @@ void Foam::argList::parse
{
OPstream toMaster
(
Pstream::commsTypes::scheduled,
Pstream::masterNo()
UPstream::commsTypes::scheduled,
UPstream::masterNo()
);
toMaster << foamVersion::build << Foam::hostName() << Foam::pid();
}
@ -1395,14 +1395,34 @@ void Foam::argList::parse
// Case is a single processor run unless it is running parallel
int nProcs = 1;
// Roots if running distributed
// Roots if running distributed. Only sized on the master
fileNameList roots;
enum distributedCodes
{
NON_DISTRIBUTED = 0,
DISTRIBUTED = 1,
DISTRIBUTED_SINGLE_ROOT = 2,
DISTRIBUTED_MULTIPLE_ROOTS = 3
};
// Track which type of distributed roots etc are being used
label distributedType
(
runControl_.distributed()
? distributedCodes::DISTRIBUTED
: distributedCodes::NON_DISTRIBUTED
);
// Some cases where knowing the writeFormat can be useful...
// label writeFormat(-1);
// If this actually is a parallel run
if (runControl_.parRun())
{
// For the master
if (Pstream::master())
if (UPstream::master())
{
// Establish rootPath_/globalCase_/case_ for master
setCasePaths();
@ -1414,7 +1434,7 @@ void Foam::argList::parse
{
bool adjustOpt = false;
if (isDir(source))
if (Foam::isDir(source))
{
source /= "decomposeParDict";
adjustOpt = true;
@ -1441,6 +1461,7 @@ void Foam::argList::parse
{
source = "-roots";
runControl_.distributed(true);
distributedType = distributedCodes::DISTRIBUTED;
if (roots.empty())
{
@ -1457,6 +1478,7 @@ void Foam::argList::parse
{
source = "-hostRoots";
runControl_.distributed(true);
distributedType = distributedCodes::DISTRIBUTED;
ITstream is(this->lookup("hostRoots"));
@ -1471,7 +1493,7 @@ void Foam::argList::parse
}
// Match machine names to roots
roots.resize(Pstream::nProcs()-1, fileName::null);
roots.resize(UPstream::nProcs()-1, fileName::null);
for (const auto& hostRoot : hostRoots)
{
labelList matched
@ -1511,7 +1533,7 @@ void Foam::argList::parse
dictNProcs = roots.size()+1;
}
}
else if (checkProcessorDirectories_ && Pstream::nProcs() > 1)
else if (checkProcessorDirectories_ && UPstream::nProcs() > 1)
{
// Check values from decomposeParDict
@ -1526,7 +1548,7 @@ void Foam::argList::parse
// the masterUncollated/collated handler. Note that we
// also have to protect the actual dictionary parsing since
// it might trigger file access (e.g. #include, #codeStream)
const bool oldParRun = Pstream::parRun(false);
const bool oldParRun = UPstream::parRun(false);
// Note: non-parallel running might update
// fileOperation::nProcs() so store & restore below
const label nOldProcs = fileHandler().nProcs();
@ -1550,6 +1572,8 @@ void Foam::argList::parse
{
nDomainsReadOpt = IOobjectOption::MUST_READ;
runControl_.distributed(true);
distributedType = distributedCodes::DISTRIBUTED;
decompDict.readEntry("roots", roots);
if (roots.empty())
@ -1587,10 +1611,10 @@ void Foam::argList::parse
}
}
Pstream::parRun(oldParRun); // Restore parallel state
UPstream::parRun(oldParRun); // Restore parallel state
const_cast<fileOperation&>(fileHandler()).nProcs(nOldProcs);
if (Pstream::nProcs() == 1)
if (UPstream::nProcs() == 1)
{
Warning
<< "Running parallel on single processor. This only"
@ -1599,22 +1623,55 @@ void Foam::argList::parse
}
}
// Convenience:
// when a single root is specified, use it for all processes
if (roots.size() == 1)
{
const fileName rootName(roots[0]);
roots.resize(Pstream::nProcs()-1, rootName);
// Adjust dictNProcs for command-line '-roots' option
if (dictNProcs <= 0)
// Distributed roots
if (!roots.empty())
{
for (fileName& dir : roots)
{
dictNProcs = roots.size()+1;
dir.expand();
}
// Identical root specified everywhere?
// - use optimized single-root variant
if (roots.size() > 1 && roots.uniform())
{
roots.resize(1);
}
if (roots.size() == 1)
{
// Single root specified, use it for all processes
distributedType =
distributedCodes::DISTRIBUTED_SINGLE_ROOT;
// Adjust dictNProcs for command-line '-roots' option
if (dictNProcs <= 0)
{
dictNProcs = UPstream::nProcs();
}
}
else if (roots.size() > 1)
{
distributedType =
distributedCodes::DISTRIBUTED_MULTIPLE_ROOTS;
if (roots.size() != UPstream::nProcs()-1)
{
FatalError
<< "Number of roots " << roots.size()
<< " != number of sub-ranks "
<< UPstream::nProcs()-1
<< exit(FatalError);
}
}
}
//
// Check number of processors.
//
// nProcs => number of actual procs
// dictNProcs => number of procs specified in decompositionDict
// nProcDirs => number of processor directories
@ -1626,73 +1683,33 @@ void Foam::argList::parse
if
(
checkProcessorDirectories_
&& Pstream::nProcs() > 1
&& dictNProcs > Pstream::nProcs()
&& UPstream::nProcs() > 1
)
{
FatalError
<< this->relativePath(source)
<< " specifies " << dictNProcs
<< " processors but job was started with "
<< Pstream::nProcs() << " processors."
<< exit(FatalError);
}
// Distributed data
if (roots.size())
{
if (roots.size() != Pstream::nProcs()-1)
if (dictNProcs > UPstream::nProcs())
{
FatalError
<< "number of entries in roots "
<< roots.size()
<< " is not equal to the number of sub-processes "
<< Pstream::nProcs()-1
<< this->relativePath(source)
<< " specifies " << dictNProcs
<< " processors but job was started with "
<< UPstream::nProcs() << " ranks."
<< exit(FatalError);
}
for (fileName& dir : roots)
{
dir.expand();
}
// Distribute the master's argument list (with new root)
const bool hadCaseOpt = options_.contains("case");
for (const int subproci : Pstream::subProcs())
{
options_.set("case", roots[subproci-1]/globalCase_);
OPstream toProc(Pstream::commsTypes::scheduled, subproci);
toProc
<< args_ << options_
<< runControl_.distributed()
<< label(runControl_.dryRun())
<< label(runControl_.verbose());
}
options_.erase("case");
// Restore [-case dir]
if (hadCaseOpt)
{
options_.set("case", rootPath_/globalCase_);
}
}
else
{
// Possibly going to fewer processors.
// Check if all procDirs are there.
// NOTE: Only works when not using distributed roots!
if
(
checkProcessorDirectories_
&& Pstream::nProcs() > 1
// Can only rely on directory scanning *without* distributed roots!
roots.empty()
&& dictNProcs >= 1
&& dictNProcs < Pstream::nProcs()
&& dictNProcs < UPstream::nProcs()
)
{
label nProcDirs = 0;
{
const bool oldParRun(UPstream::parRun(false));
const bool oldParRun = UPstream::parRun(false);
// Don't cache processor directories (probably not
// needed since master-only
const int oldCacheLevel(fileOperation::cacheLevel(0));
@ -1711,60 +1728,110 @@ void Foam::argList::parse
UPstream::parRun(oldParRun);
}
if (nProcDirs < UPstream::nProcs())
{
FatalError
<< "number of processor directories = "
<< nProcDirs
<< " is not equal to the number of processors = "
<< "Number of processor directories = " << nProcDirs
<< " is not equal to the number of ranks = "
<< UPstream::nProcs()
<< exit(FatalError);
}
}
}
// Distribute the master's argument list (unaltered)
for (const int proci : UPstream::subProcs())
{
OPstream toProc(UPstream::commsTypes::scheduled, proci);
toProc
<< args_ << options_
<< runControl_.distributed()
<< label(runControl_.dryRun())
<< label(runControl_.verbose());
}
// Broadcast the master's argument list (unaltered)
{
OPBstream toProcs(UPstream::worldComm);
toProcs
<< args_ << options_
<< distributedType
<< label(runControl_.dryRun())
<< label(runControl_.verbose());
}
}
else
{
// Collect the master's argument list
bool isDistributed;
// Receive the broadcasted master's argument list
label numDryRun, numVerbose;
IPstream fromMaster
(
Pstream::commsTypes::scheduled,
Pstream::masterNo()
);
IPBstream fromMaster(UPstream::worldComm);
fromMaster
>> args_ >> options_
>> isDistributed
>> distributedType
>> numDryRun >> numVerbose;
runControl_.distributed(isDistributed);
runControl_.distributed(distributedType);
runControl_.dryRun(numDryRun);
runControl_.verbose(numVerbose);
}
// Establish rootPath_/globalCase_/case_ for sub-process
// Final handling of distributed roots (if any)
if
(
distributedType == distributedCodes::DISTRIBUTED_SINGLE_ROOT
)
{
// The same root for all sub-ranks
// - use broadcast to transmit value
fileName newCasePath;
if (UPstream::master())
{
newCasePath = roots[0]/globalCase_;
OPBstream::send(newCasePath); // worldComm
}
else
{
IPBstream::recv(newCasePath); // worldComm
options_.set("case", newCasePath);
}
}
else if
(
distributedType == distributedCodes::DISTRIBUTED_MULTIPLE_ROOTS
)
{
// Different roots for each sub-rank
// - use point-to-point communication to transmit values
fileName newCasePath;
if (UPstream::master())
{
for (const int subproci : UPstream::subProcs())
{
newCasePath = roots[subproci-1]/globalCase_;
OPstream::send(newCasePath, subproci); // worldComm
}
}
else
{
IPstream::recv(newCasePath, UPstream::masterNo()); // worldComm
options_.set("case", newCasePath);
}
}
// Establish rootPath_/globalCase_/case_ for sub-process
if (!UPstream::master())
{
setCasePaths();
}
nProcs = Pstream::nProcs();
if (Pstream::nProcs() > 1)
nProcs = UPstream::nProcs();
if (UPstream::nProcs() > 1)
{
case_ = globalCase_/("processor" + Foam::name(Pstream::myProcNo()));
case_ =
(
globalCase_
/ ("processor" + Foam::name(UPstream::myProcNo()))
);
}
else
{
@ -1778,6 +1845,7 @@ void Foam::argList::parse
case_ = globalCase_; // Redundant, but extra safety?
}
// If needed, adjust fileHandler for distributed roots
if (runControl_.distributed() && fileOperation::fileHandlerPtr_)
{
@ -1834,7 +1902,7 @@ void Foam::argList::parse
List<fileNameList> rankToDirs(UPstream::nProcs());
if (UPstream::master())
{
const bool oldParRun = Pstream::parRun(false);
const bool oldParRun = UPstream::parRun(false);
// Note: non-parallel running might update
// fileOperation::nProcs() so store & restore below
const label nOldProcs = fileHandler().nProcs();
@ -2025,7 +2093,7 @@ void Foam::argList::parse
sigQuit::set(bannerEnabled());
sigSegv::set(bannerEnabled());
if (Pstream::master() && bannerEnabled())
if (UPstream::master() && bannerEnabled())
{
Info<< "fileModificationChecking : "
<< "Monitoring run-time modified files using "
@ -2274,7 +2342,7 @@ bool Foam::argList::check(bool checkArgs, bool checkOpts) const
{
bool ok = true;
if (Pstream::master())
if (UPstream::master())
{
const label nargs = args_.size()-1;
if (checkArgs && nargs != validArgs.size())
@ -2332,7 +2400,7 @@ bool Foam::argList::checkRootCase() const
const fileName pathDir(fileHandler().filePath(path(), false));
if (checkProcessorDirectories_ && pathDir.empty() && Pstream::master())
if (checkProcessorDirectories_ && pathDir.empty() && UPstream::master())
{
// Allow non-existent processor directories on sub-processes,
// to be created later (e.g. redistributePar)

View File

@ -0,0 +1,99 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Description
Header for low-level interfaces between MPI and OpenFOAM.
The detail interfaces are subject to change.
\*---------------------------------------------------------------------------*/
#ifndef Foam_UPstreamMPI_H
#define Foam_UPstreamMPI_H
#include "UPstream.H"
// Include MPI without any C++ bindings
#ifndef MPICH_SKIP_MPICXX
#define MPICH_SKIP_MPICXX
#endif
#ifndef OMPI_SKIP_MPICXX
#define OMPI_SKIP_MPICXX
#endif
#include <mpi.h>
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
namespace PstreamUtils
{
// Casting helpers
struct Cast
{
// Cast UPstream::Communicator to MPI_Comm (pointer)
template<typename Type = MPI_Comm>
static typename std::enable_if<std::is_pointer<Type>::value, Type>::type
to_mpi(const UPstream::Communicator& arg) noexcept
{
return reinterpret_cast<Type>(arg.value());
}
// Cast UPstream::Communicator to MPI_Comm (integer)
template<typename Type = MPI_Comm>
static typename std::enable_if<std::is_integral<Type>::value, Type>::type
to_mpi(const UPstream::Communicator& arg) noexcept
{
return static_cast<Type>(arg.value());
}
// Cast UPstream::Request to MPI_Request (pointer)
template<typename Type = MPI_Request>
static typename std::enable_if<std::is_pointer<Type>::value, Type>::type
to_mpi(const UPstream::Request& arg) noexcept
{
return reinterpret_cast<Type>(arg.value());
}
// Cast UPstream::Request to MPI_Request (integer)
template<typename Type = MPI_Request>
static typename std::enable_if<std::is_integral<Type>::value, Type>::type
to_mpi(const UPstream::Request& arg) noexcept
{
return static_cast<Type>(arg.value());
}
};
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace PstreamUtils
} // End namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#endif
// ************************************************************************* //

View File

@ -1,6 +1,7 @@
UPstream.C
UPstreamAllToAll.C
UPstreamBroadcast.C
UPstreamCommunicator.C
UPstreamGatherScatter.C
UPstreamReduce.C
UPstreamRequest.C

View File

@ -0,0 +1,59 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "UPstream.H"
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::UPstream::Communicator::Communicator() noexcept
:
UPstream::Communicator(nullptr)
{}
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
Foam::UPstream::Communicator
Foam::UPstream::Communicator::lookup(const label comm)
{
return UPstream::Communicator(nullptr);
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UPstream::Communicator::good() const noexcept
{
return false;
}
void Foam::UPstream::Communicator::reset() noexcept
{}
// ************************************************************************* //

View File

@ -47,6 +47,15 @@ void Foam::UPstream::Request::reset() noexcept
{}
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
// Foam::UPstream::Request
// Foam::UPstream::Request::lookup(const label req)
// {
// return UPstream::Request(nullptr);
// }
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::UPstream::nRequests() noexcept { return 0; }

View File

@ -2,6 +2,7 @@ PstreamGlobals.C
UPstream.C
UPstreamAllToAll.C
UPstreamBroadcast.C
UPstreamCommunicator.C
UPstreamGatherScatter.C
UPstreamReduce.C
UPstreamRequest.C

View File

@ -40,16 +40,8 @@ SourceFiles
#define Foam_PstreamGlobals_H
#include "DynamicList.H"
#include "UPstream.H" // for UPstream::Request
// Include MPI without any C++ bindings
#ifndef MPICH_SKIP_MPICXX
#define MPICH_SKIP_MPICXX
#endif
#ifndef OMPI_SKIP_MPICXX
#define OMPI_SKIP_MPICXX
#endif
#include <mpi.h>
#include "UPstream.H" // For UPstream::Request
#include "openfoam_mpi.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2016-2023 OpenCFD Ltd.
Copyright (C) 2016-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -548,7 +548,7 @@ void Foam::UPstream::allocateCommunicatorComponents
if (index != UPstream::commGlobal())
{
FatalErrorInFunction
<< "world communicator should always be index "
<< "base world communicator should always be index "
<< UPstream::commGlobal()
<< Foam::exit(FatalError);
}
@ -687,36 +687,30 @@ void Foam::UPstream::allocateCommunicatorComponents
void Foam::UPstream::freeCommunicatorComponents(const label index)
{
// Skip placeholders and pre-defined (not allocated) communicators
if (UPstream::debug)
{
Pout<< "freeCommunicatorComponents: " << index
<< " from " << PstreamGlobals::MPICommunicators_.size() << endl;
}
// Not touching the first two communicators (SELF, WORLD)
// or anything out-of bounds.
// Only free communicators that we have specifically allocated ourselves
//
// No UPstream communicator indices when MPI is initialized outside
// of OpenFOAM - thus needs a bounds check too!
// Bounds checking needed since there are no UPstream communicator indices
// when MPI is initialized outside of OpenFOAM
if
(
index > 1
&& index < PstreamGlobals::MPICommunicators_.size()
(index >= 0 && index < PstreamGlobals::MPICommunicators_.size())
&& PstreamGlobals::pendingMPIFree_[index]
)
{
if
(
PstreamGlobals::pendingMPIFree_[index]
&& (MPI_COMM_NULL != PstreamGlobals::MPICommunicators_[index])
)
PstreamGlobals::pendingMPIFree_[index] = false;
// Free communicator. Sets communicator to MPI_COMM_NULL
if (MPI_COMM_NULL != PstreamGlobals::MPICommunicators_[index])
{
// Free communicator. Sets communicator to MPI_COMM_NULL
MPI_Comm_free(&PstreamGlobals::MPICommunicators_[index]);
}
PstreamGlobals::pendingMPIFree_[index] = false;
}
}

View File

@ -0,0 +1,73 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "UPstream.H"
#include "PstreamGlobals.H"
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::UPstream::Communicator::Communicator() noexcept
:
UPstream::Communicator(MPI_COMM_NULL)
{}
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
Foam::UPstream::Communicator
Foam::UPstream::Communicator::lookup(const label comm)
{
if (comm < 0 || comm >= PstreamGlobals::MPICommunicators_.size())
{
WarningInFunction
<< "Illegal communicator " << comm << nl
<< "Should be within range [0,"
<< PstreamGlobals::MPICommunicators_.size()
<< ')' << endl;
return UPstream::Communicator(MPI_COMM_NULL);
}
return UPstream::Communicator(PstreamGlobals::MPICommunicators_[comm]);
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UPstream::Communicator::good() const noexcept
{
return MPI_COMM_NULL != PstreamUtils::Cast::to_mpi(*this);
}
void Foam::UPstream::Communicator::reset() noexcept
{
*this = UPstream::Communicator(MPI_COMM_NULL);
}
// ************************************************************************* //

View File

@ -42,7 +42,7 @@ Foam::UPstream::Request::Request() noexcept
bool Foam::UPstream::Request::good() const noexcept
{
return MPI_REQUEST_NULL != PstreamDetail::Request::get(*this);
return MPI_REQUEST_NULL != PstreamUtils::Cast::to_mpi(*this);
}
@ -52,6 +52,26 @@ void Foam::UPstream::Request::reset() noexcept
}
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
// Foam::UPstream::Request
// Foam::UPstream::Request::lookup(const label req)
// {
// if (req < 0 || req >= PstreamGlobals::outstandingRequests_.size())
// {
// WarningInFunction
// << "Illegal request " << req << nl
// << "Should be within range [0,"
// << PstreamGlobals::outstandingRequests_.size()
// << ')' << endl;
//
// return UPstream::Communicator(MPI_REQUEST_NULL);
// }
//
// return UPstream::Request(PstreamGlobals::outstandingRequests_[req]);
// }
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::UPstream::nRequests() noexcept
@ -80,7 +100,7 @@ void Foam::UPstream::addRequest(UPstream::Request& req)
// Transcribe as a MPI_Request
PstreamGlobals::outstandingRequests_.push_back
(
PstreamDetail::Request::get(req)
PstreamUtils::Cast::to_mpi(req)
);
// Invalidate parameter
@ -121,7 +141,7 @@ void Foam::UPstream::cancelRequest(UPstream::Request& req)
}
{
MPI_Request request = PstreamDetail::Request::get(req);
MPI_Request request = PstreamUtils::Cast::to_mpi(req);
if (MPI_REQUEST_NULL != request) // Active handle is mandatory
{
MPI_Cancel(&request);
@ -142,7 +162,7 @@ void Foam::UPstream::cancelRequests(UList<UPstream::Request>& requests)
for (auto& req : requests)
{
MPI_Request request = PstreamDetail::Request::get(req);
MPI_Request request = PstreamUtils::Cast::to_mpi(req);
if (MPI_REQUEST_NULL != request) // Active handle is mandatory
{
MPI_Cancel(&request);
@ -203,7 +223,7 @@ void Foam::UPstream::freeRequest(UPstream::Request& req)
}
{
MPI_Request request = PstreamDetail::Request::get(req);
MPI_Request request = PstreamUtils::Cast::to_mpi(req);
if (MPI_REQUEST_NULL != request) // Active handle is mandatory
{
// if (cancel)
@ -227,7 +247,7 @@ void Foam::UPstream::freeRequests(UList<UPstream::Request>& requests)
for (auto& req : requests)
{
MPI_Request request = PstreamDetail::Request::get(req);
MPI_Request request = PstreamUtils::Cast::to_mpi(req);
if (MPI_REQUEST_NULL != request) // Active handle is mandatory
{
// if (cancel)
@ -329,7 +349,7 @@ void Foam::UPstream::waitRequests(UList<UPstream::Request>& requests)
for (auto& req : requests)
{
MPI_Request request = PstreamDetail::Request::get(req);
MPI_Request request = PstreamUtils::Cast::to_mpi(req);
if (MPI_REQUEST_NULL != request) // Apply some prefiltering
{
@ -526,7 +546,7 @@ bool Foam::UPstream::waitSomeRequests
for (auto& req : requests)
{
waitRequests[count] = PstreamDetail::Request::get(req);
waitRequests[count] = PstreamUtils::Cast::to_mpi(req);
++count;
}
@ -617,7 +637,7 @@ Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
// for the return index.
for (auto& req : requests)
{
waitRequests[count] = PstreamDetail::Request::get(req);
waitRequests[count] = PstreamUtils::Cast::to_mpi(req);
++count;
}
@ -675,13 +695,13 @@ Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
/// int count = 0;
/// MPI_Request waitRequests[2];
///
/// waitRequests[count] = PstreamDetail::Request::get(req0);
/// waitRequests[count] = PstreamUtils::Cast::to_mpi(req0);
/// if (MPI_REQUEST_NULL != waitRequests[count])
/// {
/// ++count;
/// }
///
/// waitRequests[count] = PstreamDetail::Request::get(req1);
/// waitRequests[count] = PstreamUtils::Cast::to_mpi(req1);
/// if (MPI_REQUEST_NULL != waitRequests[count])
/// {
/// ++count;
@ -765,7 +785,7 @@ void Foam::UPstream::waitRequest(UPstream::Request& req)
return;
}
MPI_Request request = PstreamDetail::Request::get(req);
MPI_Request request = PstreamUtils::Cast::to_mpi(req);
// No-op for null request
if (MPI_REQUEST_NULL == request)
@ -831,7 +851,7 @@ bool Foam::UPstream::finishedRequest(UPstream::Request& req)
return true;
}
MPI_Request request = PstreamDetail::Request::get(req);
MPI_Request request = PstreamUtils::Cast::to_mpi(req);
// Fast-path (no-op) for null request
if (MPI_REQUEST_NULL == request)
@ -924,7 +944,7 @@ bool Foam::UPstream::finishedRequests(UList<UPstream::Request>& requests)
for (auto& req : requests)
{
MPI_Request request = PstreamDetail::Request::get(req);
MPI_Request request = PstreamUtils::Cast::to_mpi(req);
if (MPI_REQUEST_NULL != request) // Apply some prefiltering
{

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2012-2016 OpenFOAM Foundation
Copyright (C) 2022-2023 OpenCFD Ltd.
Copyright (C) 2022-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -38,16 +38,7 @@ SourceFiles
#ifndef Foam_UPstreamWrapping_H
#define Foam_UPstreamWrapping_H
#include "UPstream.H"
// Include MPI without any C++ bindings
#ifndef MPICH_SKIP_MPICXX
#define MPICH_SKIP_MPICXX
#endif
#ifndef OMPI_SKIP_MPICXX
#define OMPI_SKIP_MPICXX
#endif
#include <mpi.h>
#include "openfoam_mpi.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -56,27 +47,6 @@ namespace Foam
namespace PstreamDetail
{
// Helper for casting to MPI_Request
struct Request
{
// To pointer
template<typename Type = MPI_Request>
static typename std::enable_if<std::is_pointer<Type>::value, Type>::type
get(const UPstream::Request& req) noexcept
{
return reinterpret_cast<Type>(req.value());
}
// To integer
template<typename Type = MPI_Request>
static typename std::enable_if<std::is_integral<Type>::value, Type>::type
get(const UPstream::Request& req) noexcept
{
return static_cast<Type>(req.value());
}
};
// MPI_Bcast, using root=0
template<class Type>
void broadcast0