ENH: provide UPstream intra-host and inter-host communicators

- simplifies communication structuring with intra-host communication.
  Can be used for IO only, or for specialised communication.

  Demand-driven construction. Gathers the SHA1 of host names when
  determining the connectivity. Internally uses an MPI_Gather of the
  digests and a MPI_Bcast of the unique host indices.

  NOTE:
    does not use MPI_Comm_splt or MPI_Comm_splt_type since these
    return MPI_COMM_NULL on non-participating process which does not
    easily fit into the OpenFOAM framework.

    Additionally, if using the caching version of
    UPstream::commInterHost() and UPstream::commIntraHost()
    the topology is determined simultaneously
    (ie, equivalent or potentially lower communication).
This commit is contained in:
Mark Olesen
2023-04-14 15:00:03 +02:00
parent 9d3427e0b4
commit 80bc1db5b7
5 changed files with 658 additions and 181 deletions

View File

@ -64,9 +64,12 @@ int main(int argc, char *argv[])
argList::noBanner();
argList::noCheckProcessorDirectories();
argList::addBoolOption("verbose", "Set debug level");
argList::addBoolOption("info", "information");
argList::addBoolOption("print-tree", "Report tree(s) as graph");
argList::addBoolOption("comm-split", "Test simple comm split");
argList::addBoolOption("host-comm", "Test DIY host-comm split");
argList::addBoolOption("mpi-host-comm", "Test DIY host-comm split");
argList::addBoolOption("host-comm", "Test Pstream host-comm");
argList::addBoolOption("host-broadcast", "Test host-base broadcasts");
// Capture manually. We need values before proper startup
int nVerbose = 0;
@ -90,76 +93,30 @@ int main(int argc, char *argv[])
<< " with " << UPstream::nComms() << " predefined comm(s)."
<< " proc:" << UPstream::myProcNo() << nl;
Info<< nl;
//- Process IDs within a given communicator
Info<< "procIDs: "
<< flatOutput(UPstream::procID(UPstream::commWorld())) << endl;
rankInfo(UPstream::commWorld());
Pout<< endl;
const int myProci = UPstream::myProcNo(UPstream::commWorld());
int localRanki = myProci;
labelList subRanks;
UPstream::communicator newComm;
if (UPstream::parRun() && optPrintTree)
{
Info<< "comms: " << UPstream::whichCommunication() << endl;
UPstream::printCommTree(UPstream::commWorld());
}
if (!args.found("comm-split") && !args.found("host-comm"))
if (args.found("info"))
{
#if 1
// With first ranks
subRanks = identity(UPstream::nProcs(UPstream::commWorld()) / 2);
newComm.reset(UPstream::commWorld(), subRanks);
localRanki = UPstream::myProcNo(newComm);
Pout.prefix() =
(
'[' + Foam::name(myProci) + " a:" + Foam::name(localRanki) + "] "
);
Info<< nl;
//- Process IDs within a given communicator
Info<< "procIDs: "
<< flatOutput(UPstream::procID(newComm)) << endl;
<< flatOutput(UPstream::procID(UPstream::commWorld())) << endl;
rankInfo(newComm);
rankInfo(UPstream::commWorld());
Pout<< endl;
#endif
#if 1
// With every other rank
subRanks = identity(UPstream::nProcs(UPstream::commWorld()));
for (label& val : subRanks)
{
if (val % 2) val = -1;
}
newComm.reset(UPstream::commWorld(), subRanks);
localRanki = UPstream::myProcNo(newComm);
Pout.prefix() =
(
'[' + Foam::name(myProci) + " b:" + Foam::name(localRanki) + "] "
);
Info<< "procIDs: "
<< flatOutput(UPstream::procID(newComm)) << endl;
rankInfo(newComm);
Pout<< endl;
#endif
}
if (Pstream::parRun() && args.found("comm-split"))
bool generalTest = true;
if (UPstream::parRun() && args.found("comm-split"))
{
generalTest = false;
int world_nprocs = 0;
int world_rank = -1;
MPI_Comm_size(MPI_COMM_WORLD, &world_nprocs);
@ -167,37 +124,37 @@ int main(int argc, char *argv[])
int host_nprocs = 0;
int host_rank = -1;
MPI_Comm hostComm;
MPI_Comm commIntraHost;
MPI_Comm_split_type
(
MPI_COMM_WORLD,
MPI_COMM_TYPE_SHARED, // OMPI_COMM_TYPE_NODE
0, MPI_INFO_NULL, &hostComm
0, MPI_INFO_NULL, &commIntraHost
);
MPI_Comm_size(hostComm, &host_nprocs);
MPI_Comm_rank(hostComm, &host_rank);
MPI_Comm_size(commIntraHost, &host_nprocs);
MPI_Comm_rank(commIntraHost, &host_rank);
int leader_nprocs = 0;
int leader_rank = -1;
MPI_Comm hostMasterComm;
MPI_Comm commInterHost;
if (false)
{
// Easy enough to use MPI_Comm_split, but slightly annoying
// that it returns MPI_COMM_NULL for unused ranks...
MPI_Comm hostMasterComm;
MPI_Comm commInterHost;
MPI_Comm_split
(
MPI_COMM_WORLD,
(host_rank == 0) ? 0 : MPI_UNDEFINED,
0, &hostMasterComm
0, &commInterHost
);
if (hostMasterComm != MPI_COMM_NULL)
if (commInterHost != MPI_COMM_NULL)
{
MPI_Comm_size(hostMasterComm, &leader_nprocs);
MPI_Comm_rank(hostMasterComm, &leader_rank);
MPI_Comm_size(commInterHost, &leader_nprocs);
MPI_Comm_rank(commInterHost, &leader_rank);
}
}
else
@ -242,35 +199,39 @@ int main(int argc, char *argv[])
MPI_COMM_WORLD,
active_group,
UPstream::msgType(),
&hostMasterComm
&commInterHost
);
// Groups not needed after this...
MPI_Group_free(&parent_group);
MPI_Group_free(&active_group);
MPI_Comm_size(hostMasterComm, &leader_nprocs);
MPI_Comm_rank(hostMasterComm, &leader_rank);
MPI_Comm_size(commInterHost, &leader_nprocs);
MPI_Comm_rank(commInterHost, &leader_rank);
}
Pout<< nl << "[MPI_Comm_split_type]" << nl
<< "Host comm with " << host_rank << " / " << host_nprocs
<< "Host rank " << host_rank << " / " << host_nprocs
<< " on " << hostName()
<< " master:" << (host_rank == 0)
<< " leader rank: " << leader_rank
<< " / " << leader_nprocs
<< " inter-rank: " << leader_rank << " / " << leader_nprocs
<< " host leader:" << (leader_rank == 0)
<< " sub-rank:" << (leader_rank > 0)
<< nl;
if (hostMasterComm != MPI_COMM_NULL)
if (commInterHost != MPI_COMM_NULL)
{
MPI_Comm_free(&hostMasterComm);
MPI_Comm_free(&commInterHost);
}
if (commIntraHost != MPI_COMM_NULL)
{
MPI_Comm_free(&commIntraHost);
}
MPI_Comm_free(&hostComm);
}
if (Pstream::parRun() && args.found("host-comm"))
if (UPstream::parRun() && args.found("mpi-host-comm"))
{
generalTest = false;
// Host communicator, based on the current world communicator
// Use hostname
// Lowest rank per hostname is the IO rank
@ -290,40 +251,32 @@ int main(int argc, char *argv[])
// - pro: fixed digest length enables direct MPI calls
// can avoid Pstream::gatherList() during setup...
SHA1Digest myHostDigest(SHA1(hostName()).digest());
List<SHA1Digest> digests;
if (UPstream::master(UPstream::commGlobal()))
{
digests.resize(numprocs);
}
UPstream::mpiGather
(
reinterpret_cast<const char*>(myHostDigest.cdata_bytes()),
SHA1Digest::max_size(), // Num send per proc
digests.data_bytes(), // Recv
SHA1Digest::max_size(), // Num recv per proc
UPstream::commGlobal()
);
{
const SHA1Digest myHostDigest(SHA1(hostName()).digest());
// MPI_Gather
// (
// myHostDigest.cdata_bytes(), // Send
// SHA1Digest::max_size(), // Num send per proc
// MPI_BYTE,
// digests.data_bytes(), // Recv
// SHA1Digest::max_size(), // Num recv per proc
// MPI_BYTE,
// 0, // root
// MPI_COMM_WORLD
// );
UPstream::mpiGather
(
reinterpret_cast<const char*>(myHostDigest.cdata_bytes()),
SHA1Digest::max_size(), // Num send per proc
digests.data_bytes(), // Recv
SHA1Digest::max_size(), // Num recv per proc
UPstream::commGlobal()
);
}
labelList hostIDs(numprocs);
DynamicList<label> subRanks(numprocs);
Info<< "digests: " << digests << nl;
labelList hostIDs_(numprocs);
// Compact
// Compact numbering
if (UPstream::master(UPstream::commGlobal()))
{
DynamicList<word> hostNames(numprocs);
@ -332,95 +285,301 @@ int main(int argc, char *argv[])
{
const word& host = hosts[proci];
hostIDs_[proci] = hostNames.find(host);
hostIDs[proci] = hostNames.find(host);
if (hostIDs_[proci] < 0)
if (hostIDs[proci] < 0)
{
// First appearance of host (encode as leader)
hostIDs_[proci] = -(hostNames.size() + 1);
hostIDs[proci] = -(hostNames.size() + 1);
hostNames.push_back(host);
}
}
}
hostIDs = -1;
DynamicList<SHA1Digest> hostDigest(numprocs);
DynamicList<SHA1Digest> uniqDigests(numprocs);
forAll(digests, proci)
{
const SHA1Digest& dig = digests[proci];
hostIDs_[proci] = hostDigest.find(dig);
hostIDs[proci] = uniqDigests.find(dig);
if (hostIDs_[proci] < 0)
if (hostIDs[proci] < 0)
{
// First appearance of host (encode as leader)
hostIDs_[proci] = -(hostDigest.size() + 1);
hostDigest.push_back(dig);
hostIDs[proci] = -(uniqDigests.size() + 1);
uniqDigests.push_back(dig);
}
}
}
Info<< "hosts = " << hosts << endl;
Info<< "hostIDs_ = " << hostIDs_ << endl;
Info<< "hostIDs = " << hostIDs << endl;
UPstream::broadcast
(
hostIDs_.data_bytes(),
hostIDs_.size_bytes(),
hostIDs.data_bytes(),
hostIDs.size_bytes(),
UPstream::commGlobal(),
UPstream::masterNo()
);
DynamicList<label> subRanks(numprocs);
// Ranks for world to inter-host communicator
// - very straightforward
// Ranks for world to hostMaster
forAll(hostIDs_, proci)
#if 0
subRanks.clear();
forAll(hostIDs, proci)
{
// Is host leader?
if (hostIDs_[proci] < 0)
if (hostIDs[proci] < 0)
{
subRanks.push_back(proci);
// Flip back to generic host id
hostIDs_[proci] = -(hostIDs_[proci] + 1);
hostIDs[proci] = -(hostIDs[proci] + 1);
}
}
// From world to hostMaster
const label hostMasterComm =
const label commInterHost =
UPstream::allocateCommunicator(UPstream::commGlobal(), subRanks);
#endif
const label myWorldProci = UPstream::myProcNo(UPstream::commGlobal());
const label myHostId =
hostIDs_[Pstream::myProcNo(UPstream::commGlobal())];
label myHostId = hostIDs[myWorldProci];
if (myHostId < 0) myHostId = -(myHostId + 1); // Flip to generic id
// Ranks for within a host
subRanks.clear();
forAll(hostIDs_, proci)
forAll(hostIDs, proci)
{
if (hostIDs_[proci] == myHostId)
label id = hostIDs[proci];
if (id < 0) id = -(id + 1); // Flip to generic id
if (id == myHostId)
{
subRanks.push_back(proci);
}
}
// The intra-host ranks
const label hostComm =
const label commIntraHost =
UPstream::allocateCommunicator(UPstream::commGlobal(), subRanks);
// Test what if we have intra-host comm and we want host-master
List<bool> isHostMaster(numprocs, false);
if (UPstream::master(commIntraHost))
{
isHostMaster[myWorldProci] = true;
}
UPstream::mpiAllGather
(
isHostMaster.data_bytes(),
sizeof(bool),
UPstream::commGlobal()
);
// Ranks for world to hostMaster
// - very straightforward
subRanks.clear();
forAll(isHostMaster, proci)
{
if (isHostMaster[proci])
{
subRanks.push_back(proci);
}
}
// From world to hostMaster
const label commInterHost =
UPstream::allocateCommunicator(UPstream::commGlobal(), subRanks);
Pout<< nl << "[manual split]" << nl
<< nl << "Host comm with "
<< UPstream::myProcNo(hostComm)
<< " / " << UPstream::nProcs(hostComm)
<< nl << "Host rank " << UPstream::myProcNo(commIntraHost)
<< " / " << UPstream::nProcs(commIntraHost)
<< " on " << hostName()
<< " master:" << UPstream::master(hostComm)
<< " leader rank: " << UPstream::myProcNo(hostMasterComm)
<< " / " << UPstream::nProcs(hostMasterComm)
<< " host leader:" << UPstream::master(hostMasterComm)
<< " sub-rank:" << UPstream::is_subrank(hostMasterComm)
<< ", inter-rank: " << UPstream::myProcNo(commInterHost)
<< " / " << UPstream::nProcs(commInterHost)
<< " host leader:" << UPstream::master(commInterHost)
<< " sub-rank:" << UPstream::is_subrank(commInterHost)
<< nl;
UPstream::freeCommunicator(hostMasterComm);
UPstream::freeCommunicator(hostComm);
UPstream::freeCommunicator(commInterHost);
UPstream::freeCommunicator(commIntraHost);
}
if (UPstream::parRun() && args.found("host-comm"))
{
generalTest = false;
Info<< nl << "[pstream host-comm]" << nl << endl;
const label commInterHost = UPstream::commInterHost();
const label commIntraHost = UPstream::commIntraHost();
Pout<< "Host rank " << UPstream::myProcNo(commIntraHost)
<< " / " << UPstream::nProcs(commIntraHost)
<< " on " << hostName()
<< ", inter-rank: " << UPstream::myProcNo(commInterHost)
<< " / " << UPstream::nProcs(commInterHost)
<< ", host leader:" << UPstream::master(commInterHost)
<< " sub-rank:" << UPstream::is_subrank(commInterHost)
<< endl;
{
Info<< "host-master: "
<< UPstream::whichCommunication(commInterHost) << endl;
UPstream::printCommTree(commInterHost);
UPstream::printCommTree(commIntraHost);
}
}
if (UPstream::parRun() && args.found("host-broadcast"))
{
generalTest = false;
Info<< nl << "[pstream host-broadcast]" << nl << endl;
const label commInterHost = UPstream::commInterHost();
const label commIntraHost = UPstream::commIntraHost();
Pout<< "world rank: " << UPstream::myProcNo(UPstream::commWorld())
<< " host-leader rank: "
<< UPstream::myProcNo(UPstream::commInterHost())
<< " intra-host rank: "
<< UPstream::myProcNo(UPstream::commIntraHost())
<< endl;
label value1(0), value2(0), value3(0);
label hostIndex = UPstream::myProcNo(commInterHost);
if (UPstream::master(commInterHost))
{
value1 = 100;
value2 = 200;
}
if (UPstream::master(commIntraHost))
{
value3 = 300;
}
Pstream::broadcast(value1, commInterHost);
Pstream::broadcast(value2, commIntraHost);
Pstream::broadcast(hostIndex, commIntraHost);
Pout<< "host: " << hostIndex
<< " broadcast 1: "
<< value1 << ' '
<< value2 << ' '
<< value3 << endl;
// re-broadcast
Pstream::broadcast(value1, commIntraHost);
Pout<< "host: " << hostIndex
<< " broadcast 2: "
<< value1 << endl;
label reduced1 = value1;
label reduced2 = value1;
reduce
(
reduced1,
sumOp<label>(),
UPstream::msgType(),
commIntraHost
);
reduce
(
reduced2,
sumOp<label>(),
UPstream::msgType(),
commInterHost
);
Pout<< "value1: (host) " << reduced1
<< " (leader) " << reduced2 << endl;
// Pout<< "ranks: " << UPstream::nProcs(commInterHost) << endl;
wordList strings;
if (UPstream::is_rank(commInterHost))
{
strings.resize(UPstream::nProcs(commInterHost));
strings[UPstream::myProcNo(commInterHost)] = name(pid());
}
// Some basic gather/scatter
Pstream::allGatherList(strings, UPstream::msgType(), commInterHost);
Pout<< "pids " << flatOutput(strings) << endl;
Foam::reverse(strings);
Pstream::broadcast(strings, commIntraHost);
Pout<< "PIDS " << flatOutput(strings) << endl;
}
if (UPstream::parRun() && generalTest)
{
#if 1
// With first ranks
labelList subRanks =
identity(UPstream::nProcs(UPstream::commWorld()) / 2);
UPstream::communicator newComm;
newComm.reset(UPstream::commWorld(), subRanks);
label localRanki = UPstream::myProcNo(newComm);
const int myProci = UPstream::myProcNo(UPstream::commWorld());
Pout.prefix() =
(
'[' + Foam::name(myProci) + " a:" + Foam::name(localRanki) + "] "
);
Info<< "procIDs: "
<< flatOutput(UPstream::procID(newComm)) << endl;
rankInfo(newComm);
Pout<< endl;
#endif
#if 1
// With every other rank
subRanks = identity(UPstream::nProcs(UPstream::commWorld()));
for (label& val : subRanks)
{
if (val % 2) val = -1;
}
newComm.reset(UPstream::commWorld(), subRanks);
localRanki = UPstream::myProcNo(newComm);
Pout.prefix() =
(
'[' + Foam::name(myProci) + " b:" + Foam::name(localRanki) + "] "
);
Info<< "procIDs: "
<< flatOutput(UPstream::procID(newComm)) << endl;
rankInfo(newComm);
Pout<< endl;
#endif
}
Info<< "\nEnd\n" << endl;