ENH: cleaner separation of global and local world naming

- UPstream::globalComm constant always refers to MPI_COMM_WORLD but
  UPstream::worldComm could be MPI_COMM_WORLD (single world)
  or a dedicated local communicator (for multi-world).

- provide a Pstream wrapped version of MPI_COMM_SELF,
  references as UPstream::selfComm

- UPstream::isUserComm(label)
  test for additional user-defined communicators
This commit is contained in:
Mark Olesen
2022-11-29 11:37:29 +01:00
committed by Andrew Heather
parent 7fe8bdcf99
commit ffeef76d8f
14 changed files with 575 additions and 229 deletions

View File

@ -0,0 +1,3 @@
Test-parallel-comm0.C
EXE = $(FOAM_USER_APPBIN)/Test-parallel-comm0

View File

@ -0,0 +1,161 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Application
Test-parallel-comm0
Description
Very basic checks on standard communicators
\*---------------------------------------------------------------------------*/
#include "argList.H"
#include "Time.H"
#include "IPstream.H"
#include "OPstream.H"
#include "Pair.H"
#include "Tuple2.H"
#include "IOstreams.H"
#include "PstreamReduceOps.H"
using namespace Foam;
void printInfo(const label comm)
{
Info<< "comm:" << comm
<< " nprocs:" << UPstream::nProcs(comm)
<< " all:" << UPstream::allProcs(comm)
<< " sub:" << UPstream::subProcs(comm) << nl;
if (UPstream::selfComm == comm)
{
Pout<< "self all:" << UPstream::allProcs(comm)
<< " sub:" << UPstream::subProcs(comm) << nl;
}
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
int main(int argc, char *argv[])
{
argList::noBanner();
argList::noCheckProcessorDirectories();
argList::addBoolOption("verbose", "Set debug level");
// Capture manually. We need values before proper startup
int nVerbose = 0;
for (int argi = 1; argi < argc; ++argi)
{
if (strcmp(argv[argi], "-verbose") == 0)
{
++nVerbose;
}
}
UPstream::debug = nVerbose;
#include "setRootCase.H"
Info<< nl
<< "nProcs = " << UPstream::nProcs()
<< " with " << UPstream::nComms() << " predefined comm(s)" << nl;
Info<< "worldComm : ";
printInfo(UPstream::worldComm);
Info<< "selfComm : ";
printInfo(UPstream::selfComm);
Info<< nl;
// Reductions (using MPI intrinsics)
{
label val = Pstream::myProcNo(UPstream::worldComm);
label worldVal = returnReduce
(
val,
sumOp<label>(),
Pstream::msgType(),
UPstream::worldComm
);
label selfVal = returnReduce
(
val,
sumOp<label>(),
Pstream::msgType(),
UPstream::selfComm
);
Pout<< "value " << val
<< " (world) reduced " << worldVal
<< " (self) reduced " << selfVal << nl;
}
// Reductions (not using MPI intrinsics)
{
Pair<label> val
(
Pstream::myProcNo(UPstream::worldComm),
Pstream::myProcNo(UPstream::worldComm)
);
Pair<label> worldVal = val;
Pstream::combineReduce
(
worldVal,
minFirstEqOp<label>(),
Pstream::msgType(),
UPstream::worldComm
);
Pair<label> selfVal = val;
Pstream::combineReduce
(
worldVal,
minFirstEqOp<label>(),
Pstream::msgType(),
UPstream::selfComm
);
Pout<< "value " << val
<< " (world) reduced " << worldVal
<< " (self) reduced " << selfVal << nl;
}
Pout<< "\nEnd\n" << endl;
return 0;
}
// ************************************************************************* //

View File

@ -0,0 +1,3 @@
Test-parallel-comm1.C
EXE = $(FOAM_USER_APPBIN)/Test-parallel-comm1

View File

@ -0,0 +1,2 @@
/* EXE_INC = */
/* EXE_LIBS = */

View File

@ -1,3 +0,0 @@
Test-parallel-communicators.C
EXE = $(FOAM_USER_APPBIN)/Test-parallel-communicators

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2019 OpenCFD Ltd.
Copyright (C) 2019-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -47,16 +47,88 @@ using namespace Foam;
bool startMPI()
{
int nprocs = 0, rank = 0;
enum whichComm : int { worldComm = 0, selfComm, nullComm };
int nprocs[3];
int rank[3];
int group_nprocs[3];
int group_rank[3];
MPI_Group mpiGroup;
MPI_Init(nullptr, nullptr);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs[worldComm]);
MPI_Comm_rank(MPI_COMM_WORLD, &rank[worldComm]);
if (nprocs && rank == 0)
const bool isMaster = (rank[worldComm] == 0);
const string prefix = '[' + Foam::name(rank[worldComm]) + "] ";
MPI_Comm_group(MPI_COMM_WORLD, &mpiGroup);
MPI_Group_size(mpiGroup, &group_nprocs[worldComm]);
MPI_Group_rank(mpiGroup, &group_rank[worldComm]);
if (isMaster && nprocs[worldComm])
{
std::cout<< nl << "Using MPI with " << nprocs << " procs" << nl << nl;
std::cout
<< nl << "Using MPI with " << nprocs[worldComm]
<< " procs, group:"
<< group_nprocs[worldComm] << nl
<< "World group: " << Foam::name(mpiGroup) << nl
<< nl;
}
MPI_Comm worldMpiComm;
MPI_Comm_dup(MPI_COMM_WORLD, &worldMpiComm);
MPI_Comm_group(MPI_COMM_WORLD, &mpiGroup);
if (isMaster && nprocs[worldComm])
{
std::cout
<< "dup comm group: " << Foam::name(mpiGroup) << nl;
}
MPI_Comm_free(&worldMpiComm);
// May be a bad idea
MPI_Group_free(&mpiGroup);
MPI_Comm_size(MPI_COMM_SELF, &nprocs[selfComm]);
MPI_Comm_rank(MPI_COMM_SELF, &rank[selfComm]);
MPI_Comm_group(MPI_COMM_SELF, &mpiGroup);
MPI_Group_size(mpiGroup, &group_nprocs[selfComm]);
MPI_Group_rank(mpiGroup, &group_rank[selfComm]);
if (isMaster && nprocs[worldComm])
{
std::cout
<< nl
<< "Self group: " << Foam::name(mpiGroup) << nl;
}
// Should be a bad idea
MPI_Group_free(&mpiGroup);
// if (nprocs && isMaster)
{
std::cout
<< prefix
<< "Self: " << rank[selfComm] << " from " << nprocs[selfComm]
<< " procs, group:"
<< group_nprocs[selfComm] << nl;
}
if (isMaster)
{
std::cout
<< "MPI_COMM_NULL: " << MPI_COMM_NULL << nl
<< "MPI_COMM_SELF: " << MPI_COMM_SELF << nl
<< "MPI_COMM_WORLD: " << MPI_COMM_WORLD << nl;
}
return true;
@ -77,8 +149,8 @@ string message()
{
return
(
"rank " + name(Pstream::myProcNo())
+ " / " + name(Pstream::nProcs()) + "\n"
"rank " + Foam::name(Pstream::myProcNo())
+ " / " + Foam::name(Pstream::nProcs()) + "\n"
);
}
@ -87,9 +159,21 @@ string message()
int main(int argc, char *argv[])
{
argList::noBanner();
argList::noCheckProcessorDirectories();
argList::addBoolOption("verbose", "Set debug level");
UPstream::debug = 1;
// Need to capture manually, since we need values before proper startup
int nVerbose = 0;
for (int argi = 1; argi < argc; ++argi)
{
if (strcmp(argv[argi], "-verbose") == 0)
{
++nVerbose;
}
}
UPstream::debug = nVerbose;
startMPI();

View File

@ -58,43 +58,78 @@ Foam::UPstream::commsTypeNames
void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
{
if (nProcs == 0)
{
parRun_ = false;
parRun_ = (nProcs > 0);
haveThreads_ = haveThreads;
freeCommunicator(UPstream::worldComm);
label comm = allocateCommunicator(-1, labelList(Foam::one{}, 0), false);
if (comm != UPstream::worldComm)
label comm = -1;
if (!parRun_)
{
// These are already correct from the static initialisation,
// but just in case of future changes
// Using (world, self) ordering
freeCommunicator(UPstream::selfComm);
freeCommunicator(UPstream::globalComm);
// 0: worldComm
comm = allocateCommunicator(-1, Foam::labelList(Foam::one{}, 0), false);
if (comm != UPstream::globalComm)
{
// Failed sanity check
FatalErrorInFunction
<< "problem : comm:" << comm
<< " UPstream::worldComm:" << UPstream::worldComm
<< " UPstream::globalComm:" << UPstream::globalComm
<< Foam::exit(FatalError);
}
Pout.prefix() = "";
Perr.prefix() = "";
// 1: selfComm
comm = allocateCommunicator(-2, Foam::labelList(Foam::one{}, 0), false);
if (comm != UPstream::selfComm)
{
// Failed sanity check
FatalErrorInFunction
<< "problem : comm:" << comm
<< " UPstream::selfComm:" << UPstream::selfComm
<< Foam::exit(FatalError);
}
Pout.prefix().clear();
Perr.prefix().clear();
}
else
{
parRun_ = true;
haveThreads_ = haveThreads;
// Redo communicators that were created during static initialisation
// but this time with Pstream components
// Redo worldComm communicator (this has been created at static
// initialisation time)
freeCommunicator(UPstream::worldComm);
label comm = allocateCommunicator(-1, identity(nProcs), true);
if (comm != UPstream::worldComm)
// Using (world, self) ordering
freeCommunicator(UPstream::selfComm);
freeCommunicator(UPstream::globalComm);
// 0: worldComm
comm = allocateCommunicator(-1, identity(nProcs), true);
if (comm != UPstream::globalComm)
{
// Failed sanity check
FatalErrorInFunction
<< "problem : comm:" << comm
<< " UPstream::worldComm:" << UPstream::worldComm
<< " UPstream::globalComm:" << UPstream::globalComm
<< Foam::exit(FatalError);
}
Pout.prefix() = '[' + name(myProcNo(comm)) + "] ";
Perr.prefix() = '[' + name(myProcNo(comm)) + "] ";
Pout.prefix() = '[' + Foam::name(myProcNo(comm)) + "] ";
Perr.prefix() = Pout.prefix();
// 1: selfComm
comm = allocateCommunicator(-2, Foam::labelList(Foam::one{}, 0), true);
if (comm != UPstream::selfComm)
{
// Failed sanity check
FatalErrorInFunction
<< "problem : comm:" << comm
<< " UPstream::selfComm:" << UPstream::selfComm
<< Foam::exit(FatalError);
}
}
if (debug)
@ -110,23 +145,25 @@ void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
Foam::label Foam::UPstream::allocateCommunicator
(
const label parentIndex,
const labelList& subRanks,
const labelUList& subRanks,
const bool doPstream
)
{
label index;
if (!freeComms_.empty())
{
index = freeComms_.remove(); // LIFO pop
// LIFO pop
index = freeComms_.back();
freeComms_.pop_back();
}
else
{
// Extend storage
index = parentCommunicator_.size();
index = parentComm_.size();
myProcNo_.append(-1);
procIDs_.append(List<int>());
parentCommunicator_.append(-1);
parentComm_.append(-1);
linearCommunication_.append(List<commsStruct>());
treeCommunication_.append(List<commsStruct>());
}
@ -142,8 +179,10 @@ Foam::label Foam::UPstream::allocateCommunicator
// Initialise; overwritten by allocatePstreamCommunicator
myProcNo_[index] = 0;
const label numSubRanks = subRanks.size();
// Convert from label to int
procIDs_[index].setSize(subRanks.size());
procIDs_[index].resize_nocopy(numSubRanks);
forAll(procIDs_[index], i)
{
procIDs_[index][i] = subRanks[i];
@ -158,11 +197,11 @@ Foam::label Foam::UPstream::allocateCommunicator
<< Foam::abort(FatalError);
}
}
parentCommunicator_[index] = parentIndex;
parentComm_[index] = parentIndex;
// Size but do not fill structure - this is done on-the-fly
linearCommunication_[index] = List<commsStruct>(procIDs_[index].size());
treeCommunication_[index] = List<commsStruct>(procIDs_[index].size());
linearCommunication_[index] = List<commsStruct>(numSubRanks);
treeCommunication_[index] = List<commsStruct>(numSubRanks);
if (doPstream && parRun())
{
@ -179,10 +218,16 @@ void Foam::UPstream::freeCommunicator
const bool doPstream
)
{
// Filter out any placeholders
if (communicator < 0)
{
return;
}
if (debug)
{
Pout<< "Communicators : Freeing communicator " << communicator << endl
<< " parent : " << parentCommunicator_[communicator] << endl
<< " parent : " << parentComm_[communicator] << endl
<< " myProcNo : " << myProcNo_[communicator] << endl
<< endl;
}
@ -191,13 +236,15 @@ void Foam::UPstream::freeCommunicator
{
freePstreamCommunicator(communicator);
}
myProcNo_[communicator] = -1;
//procIDs_[communicator].clear();
parentCommunicator_[communicator] = -1;
parentComm_[communicator] = -1;
linearCommunication_[communicator].clear();
treeCommunication_[communicator].clear();
freeComms_.append(communicator); // LIFO push
// LIFO push
freeComms_.push_back(communicator);
}
@ -213,48 +260,44 @@ void Foam::UPstream::freeCommunicators(const bool doPstream)
}
int Foam::UPstream::baseProcNo(const label myComm, const int myProcID)
int Foam::UPstream::baseProcNo(label comm, int procID)
{
int procID = myProcID;
label comm = myComm;
while (parent(comm) != -1)
while (parent(comm) >= 0)
{
const List<int>& parentRanks = UPstream::procID(comm);
const auto& parentRanks = UPstream::procID(comm);
procID = parentRanks[procID];
comm = UPstream::parent(comm);
comm = parent(comm);
}
return procID;
}
Foam::label Foam::UPstream::procNo(const label myComm, const int baseProcID)
Foam::label Foam::UPstream::procNo(const label comm, const int baseProcID)
{
const List<int>& parentRanks = procID(myComm);
label parentComm = parent(myComm);
const auto& parentRanks = procID(comm);
label parentComm = parent(comm);
if (parentComm == -1)
int procID = baseProcID;
if (parentComm >= 0)
{
return parentRanks.find(baseProcID);
}
else
{
const label parentRank = procNo(parentComm, baseProcID);
return parentRanks.find(parentRank);
procID = procNo(parentComm, baseProcID);
}
return parentRanks.find(procID);
}
Foam::label Foam::UPstream::procNo
(
const label myComm,
const label comm,
const label currentComm,
const int currentProcID
)
{
label physProcID = UPstream::baseProcNo(currentComm, currentProcID);
return procNo(myComm, physProcID);
return procNo(comm, physProcID);
}
@ -380,7 +423,7 @@ Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
Foam::DynamicList<Foam::List<int>> Foam::UPstream::procIDs_(10);
Foam::DynamicList<Foam::label> Foam::UPstream::parentCommunicator_(10);
Foam::DynamicList<Foam::label> Foam::UPstream::parentComm_(10);
Foam::DynamicList<Foam::label> Foam::UPstream::freeComms_;
@ -394,14 +437,25 @@ Foam::DynamicList<Foam::List<Foam::UPstream::commsStruct>>
Foam::UPstream::treeCommunication_(10);
// Allocate a serial communicator. This gets overwritten in parallel mode
// (by UPstream::setParRun())
Foam::UPstream::communicator serialComm
(
-1,
Foam::labelList(Foam::one{}, 0),
false
);
Foam::label Foam::UPstream::worldComm(0);
Foam::label Foam::UPstream::warnComm(-1);
// Predefine worldComm, selfComm slots.
// These are overwritten in parallel mode (by UPstream::setParRun())
const Foam::label nPredefinedComm = []()
{
const Foam::labelList singleProc(Foam::one{}, 0);
// 0: worldComm
(void) Foam::UPstream::allocateCommunicator(-1, singleProc, false);
// 1: selfComm
(void) Foam::UPstream::allocateCommunicator(-2, singleProc, false);
return Foam::UPstream::nComms();
}();
bool Foam::UPstream::floatTransfer
@ -466,10 +520,6 @@ namespace Foam
addcommsTypeToOpt addcommsTypeToOpt_("commsType");
}
Foam::label Foam::UPstream::worldComm(0);
Foam::label Foam::UPstream::warnComm(-1);
int Foam::UPstream::nPollProcInterfaces
(
Foam::debug::optimisationSwitch("nPollProcInterfaces", 0)

View File

@ -107,9 +107,9 @@ public:
commsStruct
(
const label above,
const labelList& below,
const labelList& allBelow,
const labelList& allNotBelow
const labelUList& below,
const labelUList& allBelow,
const labelUList& allNotBelow
);
//- Construct from components; construct allNotBelow_
@ -118,13 +118,16 @@ public:
const label nProcs,
const label myProcID,
const label above,
const labelList& below,
const labelList& allBelow
const labelUList& below,
const labelUList& allBelow
);
// Member Functions
//- Reset to default constructed state
void clear();
//- The procID of the processor directly above
label above() const noexcept
{
@ -203,7 +206,7 @@ private:
//- Names of all worlds
static wordList allWorlds_;
//- Per processor the name of the world
//- Per processor the world index (into allWorlds_)
static labelList worldIDs_;
@ -216,7 +219,7 @@ private:
static DynamicList<List<int>> procIDs_;
//- Parent communicator
static DynamicList<label> parentCommunicator_;
static DynamicList<label> parentComm_;
//- Free communicators
static DynamicList<label> freeComms_;
@ -233,21 +236,6 @@ private:
//- Set data for parallel running
static void setParRun(const label nProcs, const bool haveThreads);
//- Calculate linear communication schedule
static List<commsStruct> calcLinearComm(const label nProcs);
//- Calculate tree communication schedule
static List<commsStruct> calcTreeComm(const label nProcs);
//- Helper function for tree communication schedule determination
// Collects all processorIDs below a processor
static void collectReceives
(
const label procID,
const List<DynamicList<label>>& receives,
DynamicList<label>& allReceives
);
//- Allocate a communicator with index
static void allocatePstreamCommunicator
(
@ -255,11 +243,9 @@ private:
const label index
);
//- Free a communicator
static void freePstreamCommunicator
(
const label index
);
//- Free MPI components of communicator.
// Does not touch the first two communicators (SELF, WORLD)
static void freePstreamCommunicator(const label index);
public:
@ -290,12 +276,31 @@ public:
//- MPI buffer-size (bytes)
static const int mpiBufferSize;
//- Default communicator (all processors)
// Standard Communicators
//- Default world communicator (all processors).
//- May differ from globalComm if local worlds are in use
static label worldComm;
//- Debugging: warn for use of any communicator differing from warnComm
static label warnComm;
//- Communicator for all processors, irrespective of any local worlds
static constexpr label globalComm = 0;
//- A communicator within the current rank only
static constexpr label selfComm = 1;
//- Number of currently defined communicators
static label nComms() noexcept { return parentComm_.size(); }
//- True if communicator appears to be user-allocated
static bool isUserComm(label communicator) noexcept
{
return (communicator > worldComm && communicator > selfComm);
}
// Constructors
@ -312,11 +317,12 @@ public:
static label allocateCommunicator
(
const label parent,
const labelList& subRanks,
const labelUList& subRanks,
const bool doPstream = true
);
//- Free a previously allocated communicator
//- Free a previously allocated communicator.
// Ignores placeholder (negative) communicators.
static void freeCommunicator
(
const label communicator,
@ -326,43 +332,52 @@ public:
//- Free all communicators
static void freeCommunicators(const bool doPstream);
//- Helper class for allocating/freeing communicators
//- Wrapper class for allocating/freeing communicators
class communicator
{
label comm_;
public:
//- Default construct (a placeholder communicator)
communicator() : comm_(-1) {}
//- Move construct
communicator(communicator&&) = default;
//- Move assignment
communicator& operator=(communicator&&) = default;
//- No copy construct
communicator(const communicator&) = delete;
//- No copy assignment
void operator=(const communicator&) = delete;
public:
//- Allocate a communicator from given parent
communicator
(
const label parent,
const labelList& subRanks,
const labelUList& subRanks,
const bool doPstream
)
:
comm_(allocateCommunicator(parent, subRanks, doPstream))
{}
//- Free allocated communicator and group
~communicator()
{
freeCommunicator(comm_);
}
operator label() const noexcept
{
return comm_;
}
operator label() const noexcept { return comm_; }
};
//- Return physical processor number (i.e. processor number in
//- worldComm) given communicator and procssor
static int baseProcNo(const label myComm, const int procID);
//- worldComm) given communicator and processor
static int baseProcNo(label comm, int procID);
//- Return processor number in communicator (given physical processor
//- number) (= reverse of baseProcNo)
@ -372,7 +387,7 @@ public:
//- and communicator)
static label procNo
(
const label myComm,
const label comm,
const label currentComm,
const int currentProcID
);
@ -414,13 +429,8 @@ public:
// A no-op and returns true if parRun() == false
static bool finishedRequest(const label i);
static int allocateTag(const char*);
static int allocateTag(const std::string&);
static void freeTag(const char*, const int tag);
static void freeTag(const std::string&, const int tag);
static int allocateTag(const char* const msg = nullptr);
static void freeTag(const int tag, const char* const msg = nullptr);
//- Set as parallel run on/off.
@ -445,7 +455,8 @@ public:
return haveThreads_;
}
//- Number of processes in parallel run, and 1 for serial run
//- Number of ranks in parallel run (for given communicator)
//- is 1 for serial run
static label nProcs(const label communicator = worldComm)
{
return procIDs_[communicator].size();
@ -469,13 +480,14 @@ public:
return myProcNo_[communicator];
}
//- The parent communicator
static label parent(const label communicator)
{
return parentCommunicator_(communicator);
return parentComm_(communicator);
}
//- Process ID of given process index
static List<int>& procID(label communicator)
//- Process IDs within a given communicator
static List<int>& procID(const label communicator)
{
return procIDs_[communicator];
}
@ -489,7 +501,7 @@ public:
return allWorlds_;
}
//- worldID (index in allWorlds) of all processes
//- The indices into allWorlds for all processes
static const labelList& worldIDs() noexcept
{
return worldIDs_;
@ -498,13 +510,13 @@ public:
//- My worldID
static label myWorldID()
{
return worldIDs_[myProcNo(0)];
return worldIDs_[myProcNo(globalComm)];
}
//- My world
static const word& myWorld()
{
return allWorlds()[myWorldID()];
return allWorlds_[worldIDs_[myProcNo(globalComm)]];
}

View File

@ -42,9 +42,9 @@ Foam::UPstream::commsStruct::commsStruct() noexcept
Foam::UPstream::commsStruct::commsStruct
(
const label above,
const labelList& below,
const labelList& allBelow,
const labelList& allNotBelow
const labelUList& below,
const labelUList& allBelow,
const labelUList& allNotBelow
)
:
above_(above),
@ -59,8 +59,8 @@ Foam::UPstream::commsStruct::commsStruct
const label nProcs,
const label myProcID,
const label above,
const labelList& below,
const labelList& allBelow
const labelUList& below,
const labelUList& allBelow
)
:
above_(above),
@ -90,6 +90,17 @@ Foam::UPstream::commsStruct::commsStruct
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
void Foam::UPstream::commsStruct::clear()
{
above_ = -1;
below_.clear();
allBelow_.clear();
allNotBelow_.clear();
}
// * * * * * * * * * * * * * * * Member Operators * * * * * * * * * * * * * //
bool Foam::UPstream::commsStruct::operator==(const commsStruct& comm) const

View File

@ -58,10 +58,14 @@ extern int nTags_;
//- Free'd message tags
extern DynamicList<int> freedTags_;
// Current communicators. First element will be MPI_COMM_WORLD
// Current communicators, which may be allocated or predefined
// (eg, MPI_COMM_SELF, MPI_COMM_WORLD)
extern DynamicList<MPI_Comm> MPICommunicators_;
// Groups associated with the currrent communicators.
extern DynamicList<MPI_Group> MPIGroups_;
//- Fatal if comm is outside the allocated range
void checkCommunicator(const label comm, const label toProcNo);

View File

@ -30,9 +30,9 @@ License
#include "PstreamReduceOps.H"
#include "PstreamGlobals.H"
#include "profilingPstream.H"
#include "int.H"
#include "SubList.H"
#include "UPstreamWrapping.H"
#include "int.H"
#include "collatedFileOperation.H"
#include <mpi.h>
@ -293,41 +293,56 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
if (worldIndex != -1)
{
// During startup, so worldComm == globalComm
wordList worlds(numprocs);
worlds[Pstream::myProcNo()] = world;
Pstream::gatherList(worlds);
Pstream::broadcast(worlds);
worlds[Pstream::myProcNo(UPstream::globalComm)] = world;
Pstream::gatherList(worlds, UPstream::msgType(), UPstream::globalComm);
// Compact
if (Pstream::master())
if (Pstream::master(UPstream::globalComm))
{
DynamicList<word> allWorlds(numprocs);
for (const word& world : worlds)
{
allWorlds.appendUniq(world);
}
allWorlds_ = std::move(allWorlds);
DynamicList<word> worldNames(numprocs);
worldIDs_.resize_nocopy(numprocs);
worldIDs_.setSize(numprocs);
forAll(worlds, proci)
{
const word& world = worlds[proci];
worldIDs_[proci] = allWorlds_.find(world);
worldIDs_[proci] = worldNames.find(world);
if (worldIDs_[proci] == -1)
{
worldIDs_[proci] = worldNames.size();
worldNames.push_back(world);
}
}
Pstream::broadcasts(UPstream::worldComm, allWorlds_, worldIDs_);
allWorlds_.transfer(worldNames);
}
Pstream::broadcasts(UPstream::globalComm, allWorlds_, worldIDs_);
const label myWorldId =
worldIDs_[Pstream::myProcNo(UPstream::globalComm)];
DynamicList<label> subRanks;
forAll(worlds, proci)
forAll(worldIDs_, proci)
{
if (worlds[proci] == worlds[Pstream::myProcNo()])
if (worldIDs_[proci] == myWorldId)
{
subRanks.append(proci);
subRanks.push_back(proci);
}
}
// Allocate new communicator 1 with parent 0 (= mpi_world)
const label subComm = allocateCommunicator(0, subRanks, true);
// Allocate new communicator with globalComm as its parent
const label subComm =
UPstream::allocateCommunicator
(
UPstream::globalComm, // parent
subRanks,
true
);
// Override worldComm
UPstream::worldComm = subComm;
@ -337,11 +352,11 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
if (debug)
{
// Check
int subNProcs, subRank;
int subNumProcs, subRank;
MPI_Comm_size
(
PstreamGlobals::MPICommunicators_[subComm],
&subNProcs
&subNumProcs
);
MPI_Comm_rank
(
@ -351,19 +366,20 @@ bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
Pout<< "UPstream::init : in world:" << world
<< " using local communicator:" << subComm
<< " with procs:" << subNProcs
<< " and rank:" << subRank
<< " rank " << subRank
<< " of " << subNumProcs
<< endl;
}
// Override Pout prefix (move to setParRun?)
Pout.prefix() = '[' + world + '/' + name(myProcNo(subComm)) + "] ";
Perr.prefix() = '[' + world + '/' + name(myProcNo(subComm)) + "] ";
Perr.prefix() = Pout.prefix();
}
else
{
// All processors use world 0
worldIDs_.setSize(numprocs, 0);
worldIDs_.resize_nocopy(numprocs);
worldIDs_ = 0;
}
attachOurBuffers();
@ -490,10 +506,10 @@ void Foam::UPstream::allocatePstreamCommunicator
if (index == PstreamGlobals::MPIGroups_.size())
{
// Extend storage with dummy values
MPI_Group newGroup = MPI_GROUP_NULL;
PstreamGlobals::MPIGroups_.append(newGroup);
MPI_Comm newComm = MPI_COMM_NULL;
PstreamGlobals::MPICommunicators_.append(newComm);
MPI_Group newGroup = MPI_GROUP_NULL;
PstreamGlobals::MPIGroups_.push_back(newGroup);
PstreamGlobals::MPICommunicators_.push_back(newComm);
}
else if (index > PstreamGlobals::MPIGroups_.size())
{
@ -505,34 +521,57 @@ void Foam::UPstream::allocatePstreamCommunicator
if (parentIndex == -1)
{
// Allocate world communicator
// Global communicator. Same as world communicator for single-world
if (index != UPstream::worldComm)
if (index != UPstream::globalComm)
{
FatalErrorInFunction
<< "world communicator should always be index "
<< UPstream::worldComm << Foam::exit(FatalError);
<< UPstream::globalComm
<< Foam::exit(FatalError);
}
PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
MPI_Comm_rank
(
PstreamGlobals::MPICommunicators_[index],
&myProcNo_[index]
);
MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_[index]);
// Set the number of processes to the actual number
// Set the number of ranks to the actual number
int numProcs;
MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
//procIDs_[index] = identity(numProcs);
procIDs_[index].setSize(numProcs);
procIDs_[index].resize_nocopy(numProcs);
forAll(procIDs_[index], i)
{
procIDs_[index][i] = i;
}
}
else if (parentIndex == -2)
{
// Self communicator
PstreamGlobals::MPICommunicators_[index] = MPI_COMM_SELF;
MPI_Comm_group(MPI_COMM_SELF, &PstreamGlobals::MPIGroups_[index]);
MPI_Comm_rank(MPI_COMM_SELF, &myProcNo_[index]);
// Number of ranks is always 1 (self communicator)
#ifdef FULLDEBUG
int numProcs;
MPI_Comm_size(MPI_COMM_SELF, &numProcs);
if (numProcs != 1)
{
// Already finalized - this is an error
FatalErrorInFunction
<< "MPI_COMM_SELF had " << numProcs << " != 1 ranks!\n"
<< Foam::abort(FatalError);
}
#endif
procIDs_[index].resize_nocopy(1);
procIDs_[index] = 0;
}
else
{
// Create new group
@ -558,7 +597,7 @@ void Foam::UPstream::allocatePstreamCommunicator
(
PstreamGlobals::MPICommunicators_[parentIndex],
PstreamGlobals::MPIGroups_[index],
Pstream::msgType(),
UPstream::msgType(),
&PstreamGlobals::MPICommunicators_[index]
);
#endif
@ -593,16 +632,26 @@ void Foam::UPstream::allocatePstreamCommunicator
void Foam::UPstream::freePstreamCommunicator(const label communicator)
{
if (communicator != 0)
// Skip placeholders and pre-defined (not allocated) communicators
if (UPstream::debug)
{
if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
Pout<< "freePstreamCommunicator: " << communicator
<< " from " << PstreamGlobals::MPICommunicators_.size() << endl;
}
// Not touching the first two communicators (SELF, WORLD)
if (communicator > 1)
{
if (MPI_COMM_NULL != PstreamGlobals::MPICommunicators_[communicator])
{
// Free communicator. Sets communicator to MPI_COMM_NULL
MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
}
if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
if (MPI_GROUP_NULL != PstreamGlobals::MPIGroups_[communicator])
{
// Free greoup. Sets group to MPI_GROUP_NULL
// Free group. Sets group to MPI_GROUP_NULL
MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
}
}
@ -715,7 +764,7 @@ void Foam::UPstream::waitRequest(const label i)
profilingPstream::addWaitTime();
// Push index onto free cache
PstreamGlobals::freedRequests_.append(i);
PstreamGlobals::freedRequests_.push_back(i);
if (debug)
{
@ -766,69 +815,39 @@ bool Foam::UPstream::finishedRequest(const label i)
}
int Foam::UPstream::allocateTag(const char* s)
int Foam::UPstream::allocateTag(const char* const msg)
{
int tag;
if (PstreamGlobals::freedTags_.size())
{
tag = PstreamGlobals::freedTags_.remove();
tag = PstreamGlobals::freedTags_.back();
(void)PstreamGlobals::freedTags_.pop_back();
}
else
{
tag = PstreamGlobals::nTags_++;
tag = ++PstreamGlobals::nTags_;
}
if (debug)
{
Pout<< "UPstream::allocateTag "
<< s << " : tag:" << tag << endl;
Pout<< "UPstream::allocateTag";
if (msg) Pout<< ' ' << msg;
Pout<< " : tag:" << tag << endl;
}
return tag;
}
int Foam::UPstream::allocateTag(const std::string& s)
{
int tag;
if (PstreamGlobals::freedTags_.size())
{
tag = PstreamGlobals::freedTags_.remove();
}
else
{
tag = PstreamGlobals::nTags_++;
}
if (debug)
{
Pout<< "UPstream::allocateTag "
<< s.c_str() << " : tag:" << tag << endl;
}
return tag;
}
void Foam::UPstream::freeTag(const char* s, const int tag)
void Foam::UPstream::freeTag(const int tag, const char* const msg)
{
if (debug)
{
Pout<< "UPstream::freeTag "
<< s << " tag:" << tag << endl;
Pout<< "UPstream::freeTag ";
if (msg) Pout<< ' ' << msg;
Pout<< " : tag:" << tag << endl;
}
PstreamGlobals::freedTags_.append(tag);
}
void Foam::UPstream::freeTag(const std::string& s, const int tag)
{
if (debug)
{
Pout<< "UPstream::freeTag "
<< s.c_str() << " tag:" << tag << endl;
}
PstreamGlobals::freedTags_.append(tag);
PstreamGlobals::freedTags_.push_back(tag);
}

View File

@ -161,8 +161,8 @@ Foam::label Foam::multiWorldConnections::createCommunicator(const edge& worlds)
}
}
// Allocate new communicator with parent 0 (= world)
comm = UPstream::allocateCommunicator(0, subRanks, true);
// Allocate new communicator with global world
comm = UPstream::allocateCommunicator(UPstream::globalComm, subRanks, true);
if (debug & 2)
{
@ -234,10 +234,10 @@ void Foam::multiWorldConnections::createComms()
// Use MPI_COMM_WORLD
const label oldWorldComm(Pstream::worldComm);
const label oldWarnComm(Pstream::warnComm);
Pstream::worldComm = 0;
Pstream::warnComm = Pstream::worldComm;
const label oldWorldComm(UPstream::worldComm);
const label oldWarnComm(UPstream::warnComm);
UPstream::worldComm = UPstream::globalComm;
UPstream::warnComm = UPstream::worldComm;
if (Pstream::parRun())
{