ENH: misc Pstream adjustments

- additional startup guard for inter-node/local-node queries (UPstream)

- impose linear communication tree for inter-node/local-node
  communicators. Was previously defaulted to a basic tree, but more
  consistent to have flat addressing for these types of connections.

- demand-driven UPstream::interNode_offsets() for walking
  inter-node ranges instead of creating it manually in various places.

- (style): List<int> instead of labelList for internal commsStruct
  since the communication structures are tied to MPI sizes
  and not to the OpenFOAM label sizes

- reduce the number of intermediate buffer allocations within
  gatherList, scatterList.
This commit is contained in:
Mark Olesen
2025-02-14 15:49:05 +01:00
parent 4720b61313
commit eb4345ed44
8 changed files with 479 additions and 285 deletions

View File

@ -59,13 +59,13 @@ int main(int argc, char *argv[])
label nProcs = UPstream::nProcs(UPstream::worldComm); label nProcs = UPstream::nProcs(UPstream::worldComm);
List<int> interNodeProcs_fake; DynamicList<int> fake_interNode_offsets;
if (UPstream::parRun()) if (UPstream::parRun())
{ {
if (args.found("numProcs")) if (args.found("numProcs"))
{ {
InfoErr<< "ignoring -np option in parallel" << nl; InfoErr<< "ignoring -numProcs option in parallel" << nl;
} }
if (args.found("cores")) if (args.found("cores"))
{ {
@ -78,25 +78,40 @@ int main(int argc, char *argv[])
nProcs = args.getOrDefault<label>("numProcs", 16); nProcs = args.getOrDefault<label>("numProcs", 16);
label nCores = args.getOrDefault<label>("cores", 4); label nCores = args.getOrDefault<label>("cores", 4);
auto& interNode_offsets = fake_interNode_offsets;
if (nCores > 1 && nCores < nProcs) if (nCores > 1 && nCores < nProcs)
{ {
const label numNodes // Build the inter-node offsets
= (nProcs/nCores) + ((nProcs % nCores) ? 1 : 0); interNode_offsets.reserve((nProcs/nCores) + 4);
interNode_offsets.push_back(0);
interNodeProcs_fake.resize(numNodes); for
(
for (label nodei = 0; nodei < numNodes; ++nodei) int count = interNode_offsets.back() + nCores;
count < nProcs;
count += nCores
)
{ {
interNodeProcs_fake[nodei] = nodei * nCores; interNode_offsets.push_back(count);
} }
interNode_offsets.push_back(nProcs);
}
else
{
// Some fallback
interNode_offsets.reserve(2);
interNode_offsets.push_back(0);
interNode_offsets.push_back(nProcs);
} }
} }
const List<int>& interNodeProcs = const List<int>& interNodeOffsets =
( (
UPstream::parRun() UPstream::parRun()
? UPstream::procID(UPstream::commInterNode()) ? UPstream::interNode_offsets()
: interNodeProcs_fake : fake_interNode_offsets
); );
@ -111,79 +126,31 @@ int main(int argc, char *argv[])
// Prefer left-to-right layout for large graphs // Prefer left-to-right layout for large graphs
os << indent << "rankdir=LR" << nl; os << indent << "rankdir=LR" << nl;
int pos = 0; const label numNodes = interNodeOffsets.size()-1;
// First level are the inter-node connections // First level are the inter-node connections
const label parent = 0;
for (const auto proci : interNodeProcs)
{ {
if (parent == proci) continue; os << indent << 0 << " -- " << token::LBRACE;
if (pos) for (label nodei = 1; nodei < numNodes; ++nodei)
{ {
os << " "; os << ' ' << interNodeOffsets[nodei];
}
else
{
os << indent;
}
os << parent << " -- " << proci;
if (++pos >= 4) // Max 4 items per line
{
pos = 0;
os << nl;
}
} }
if (pos) os << token::SPACE << token::RBRACE
{ << " // inter-node: " << flatOutput(interNodeOffsets)
pos = 0; << nl;
os << nl;
} }
// Next level are within the nodes // Next level are the local-node connections
for (label nodei = 0; nodei < interNodeProcs.size(); ++nodei) for (label nodei = 0; nodei < numNodes; ++nodei)
{ {
pos = 0; const auto firstProc = interNodeOffsets[nodei];
const auto lastProc = interNodeOffsets[nodei+1];
label firstProc = interNodeProcs[nodei]; os << indent << firstProc << " -- " << token::DQUOTE
const label lastProc = << (firstProc+1) << ".." << (lastProc-1)
( << token::DQUOTE << nl;
(nodei+1 < interNodeProcs.size())
? interNodeProcs[nodei+1]
: nProcs
);
os << indent << "// inter-node " << nodei
<< " [" << firstProc
<< ".." << lastProc-1 << "]" << nl;
for (label proci = firstProc; proci < lastProc; ++proci)
{
if (firstProc == proci) continue;
if (pos)
{
os << " ";
}
else
{
os << indent;
}
os << firstProc << " -- " << proci;
if (++pos >= 4) // Max 4 items per line
{
pos = 0;
os << nl;
}
}
if (pos)
{
pos = 0;
os << nl;
}
} }
os.endBlock(); os.endBlock();

View File

@ -60,7 +60,7 @@ void Foam::Pstream::combineGather
const auto& myComm = comms[UPstream::myProcNo(comm)]; const auto& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours // Receive from my downstairs neighbours
for (const label belowID : myComm.below()) for (const auto belowID : myComm.below())
{ {
if constexpr (is_contiguous_v<T>) if constexpr (is_contiguous_v<T>)
{ {
@ -172,7 +172,7 @@ void Foam::Pstream::listCombineGather
const auto& myComm = comms[UPstream::myProcNo(comm)]; const auto& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours // Receive from my downstairs neighbours
for (const label belowID : myComm.below()) for (const auto belowID : myComm.below())
{ {
if constexpr (is_contiguous_v<T>) if constexpr (is_contiguous_v<T>)
{ {
@ -288,7 +288,7 @@ void Foam::Pstream::mapCombineGather
const auto& myComm = comms[UPstream::myProcNo(comm)]; const auto& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours // Receive from my downstairs neighbours
for (const label belowID : myComm.below()) for (const auto belowID : myComm.below())
{ {
// Map/HashTable: non-contiguous // Map/HashTable: non-contiguous

View File

@ -44,18 +44,18 @@ void Foam::Pstream::gather
T& value, T& value,
const BinaryOp& bop, const BinaryOp& bop,
const int tag, const int tag,
const label comm const label communicator
) )
{ {
if (UPstream::is_parallel(comm)) if (UPstream::is_parallel(communicator))
{ {
// Communication order // Communication order
const auto& comms = UPstream::whichCommunication(comm); const auto& comms = UPstream::whichCommunication(communicator);
// if (comms.empty()) return; // extra safety? // if (comms.empty()) return; // extra safety?
const auto& myComm = comms[UPstream::myProcNo(comm)]; const auto& myComm = comms[UPstream::myProcNo(communicator)];
// Receive from my downstairs neighbours // Receive from my downstairs neighbours
for (const label belowID : myComm.below()) for (const auto belowID : myComm.below())
{ {
T received; T received;
@ -68,12 +68,12 @@ void Foam::Pstream::gather
reinterpret_cast<char*>(&received), reinterpret_cast<char*>(&received),
sizeof(T), sizeof(T),
tag, tag,
comm communicator
); );
} }
else else
{ {
IPstream::recv(received, belowID, tag, comm); IPstream::recv(received, belowID, tag, communicator);
} }
value = bop(value, received); value = bop(value, received);
@ -91,12 +91,12 @@ void Foam::Pstream::gather
reinterpret_cast<const char*>(&value), reinterpret_cast<const char*>(&value),
sizeof(T), sizeof(T),
tag, tag,
comm communicator
); );
} }
else else
{ {
OPstream::send(value, myComm.above(), tag, comm); OPstream::send(value, myComm.above(), tag, communicator);
} }
} }
} }

View File

@ -32,13 +32,13 @@ Description
values[UPstream::myProcNo(comm)]. values[UPstream::myProcNo(comm)].
Note: after gather every processor only knows its own data and that of the Note: after gather every processor only knows its own data and that of the
processors below it. Only the 'master' of the communication schedule holds processors below it. Only the 'master' of the communication schedule holds
a fully filled List. Use scatter to distribute the data. a fully filled List. Use broadcast to distribute the data.
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "contiguous.H"
#include "IPstream.H" #include "IPstream.H"
#include "OPstream.H" #include "OPstream.H"
#include "contiguous.H"
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
@ -48,13 +48,13 @@ void Foam::Pstream::gatherList
const UPstream::commsStructList& comms, const UPstream::commsStructList& comms,
UList<T>& values, UList<T>& values,
const int tag, const int tag,
const label comm const label communicator
) )
{ {
if (!comms.empty() && UPstream::is_parallel(comm)) if (!comms.empty() && UPstream::is_parallel(communicator))
{ {
const label myProci = UPstream::myProcNo(comm); const label myProci = UPstream::myProcNo(communicator);
const label numProc = UPstream::nProcs(comm); const label numProc = UPstream::nProcs(communicator);
if (values.size() < numProc) if (values.size() < numProc)
{ {
@ -67,29 +67,71 @@ void Foam::Pstream::gatherList
// My communication order // My communication order
const auto& myComm = comms[myProci]; const auto& myComm = comms[myProci];
// Receive from my downstairs neighbours
for (const label belowID : myComm.below()) // Local buffer for send/recv of contiguous
[[maybe_unused]] DynamicList<T> buffer;
// Presize buffer
if constexpr (is_contiguous_v<T>)
{ {
const labelList& belowLeaves = comms[belowID].allBelow(); label maxCount = 0;
for (const auto belowID : myComm.below())
{
auto count = comms[belowID].allBelow().size();
maxCount = Foam::max(maxCount, count);
}
if (myComm.above() >= 0)
{
auto count = myComm.allBelow().size();
maxCount = Foam::max(maxCount, count);
}
buffer.reserve(maxCount + 1);
}
// Receive from my downstairs neighbours
for (const auto belowID : myComm.below())
{
const auto& leaves = comms[belowID].allBelow();
if constexpr (is_contiguous_v<T>) if constexpr (is_contiguous_v<T>)
{ {
List<T> received(belowLeaves.size() + 1); if (leaves.empty())
{
// Receive directly into destination
UIPstream::read
(
UPstream::commsTypes::scheduled,
belowID,
values[belowID],
tag,
communicator
);
}
else
{
// Receive via intermediate buffer
buffer.resize_nocopy(leaves.size() + 1);
UIPstream::read UIPstream::read
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
belowID, belowID,
received, buffer,
tag, tag,
comm communicator
); );
values[belowID] = received[0]; label recvIdx(0);
values[belowID] = buffer[recvIdx++];
forAll(belowLeaves, leafI) for (const auto leafID : leaves)
{ {
values[belowLeaves[leafI]] = received[leafI + 1]; values[leafID] = buffer[recvIdx++];
}
} }
} }
else else
@ -100,7 +142,7 @@ void Foam::Pstream::gatherList
belowID, belowID,
0, // bufsize 0, // bufsize
tag, tag,
comm communicator
); );
fromBelow >> values[belowID]; fromBelow >> values[belowID];
@ -112,7 +154,7 @@ void Foam::Pstream::gatherList
} }
// Receive from all other processors below belowID // Receive from all other processors below belowID
for (const label leafID : belowLeaves) for (const auto leafID : leaves)
{ {
fromBelow >> values[leafID]; fromBelow >> values[leafID];
@ -131,7 +173,7 @@ void Foam::Pstream::gatherList
// - all belowLeaves next // - all belowLeaves next
if (myComm.above() >= 0) if (myComm.above() >= 0)
{ {
const labelList& belowLeaves = myComm.allBelow(); const auto& leaves = myComm.allBelow();
if (debug & 2) if (debug & 2)
{ {
@ -142,23 +184,41 @@ void Foam::Pstream::gatherList
if constexpr (is_contiguous_v<T>) if constexpr (is_contiguous_v<T>)
{ {
List<T> sending(belowLeaves.size() + 1); if (leaves.empty())
sending[0] = values[myProci];
forAll(belowLeaves, leafI)
{ {
sending[leafI + 1] = values[belowLeaves[leafI]]; // Send directly
UOPstream::write
(
UPstream::commsTypes::scheduled,
myComm.above(),
values[myProci],
tag,
communicator
);
}
else
{
// Send via intermediate buffer
buffer.resize_nocopy(leaves.size() + 1);
label sendIdx(0);
buffer[sendIdx++] = values[myProci];
for (const auto leafID : leaves)
{
buffer[sendIdx++] = values[leafID];
} }
UOPstream::write UOPstream::write
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.above(), myComm.above(),
sending, buffer,
tag, tag,
comm communicator
); );
} }
}
else else
{ {
OPstream toAbove OPstream toAbove
@ -167,11 +227,11 @@ void Foam::Pstream::gatherList
myComm.above(), myComm.above(),
0, // bufsize 0, // bufsize
tag, tag,
comm communicator
); );
toAbove << values[myProci]; toAbove << values[myProci];
for (const label leafID : belowLeaves) for (const auto leafID : leaves)
{ {
if (debug & 2) if (debug & 2)
{ {
@ -193,17 +253,17 @@ void Foam::Pstream::scatterList
const UPstream::commsStructList& comms, const UPstream::commsStructList& comms,
UList<T>& values, UList<T>& values,
const int tag, const int tag,
const label comm const label communicator
) )
{ {
// Apart from the additional size check, the only difference // Apart from the additional size check, the only difference
// between scatterList() and using broadcast(List<T>&) or a regular // between scatterList() and using broadcast(List<T>&) or a regular
// scatter(List<T>&) is that processor-local data is skipped. // scatter(List<T>&) is that processor-local data is skipped.
if (!comms.empty() && UPstream::is_parallel(comm)) if (!comms.empty() && UPstream::is_parallel(communicator))
{ {
const label myProci = UPstream::myProcNo(comm); const label myProci = UPstream::myProcNo(communicator);
const label numProc = UPstream::nProcs(comm); const label numProc = UPstream::nProcs(communicator);
if (values.size() < numProc) if (values.size() < numProc)
{ {
@ -216,27 +276,53 @@ void Foam::Pstream::scatterList
// My communication order // My communication order
const auto& myComm = comms[myProci]; const auto& myComm = comms[myProci];
// Local buffer for send/recv of contiguous
[[maybe_unused]] DynamicList<T> buffer;
// Presize buffer
if constexpr (is_contiguous_v<T>)
{
label maxCount = 0;
if (myComm.above() >= 0)
{
auto count = myComm.allNotBelow().size();
maxCount = Foam::max(maxCount, count);
}
for (const auto belowID : myComm.below())
{
auto count = comms[belowID].allNotBelow().size();
maxCount = Foam::max(maxCount, count);
}
buffer.reserve(maxCount);
}
// Receive from up // Receive from up
if (myComm.above() >= 0) if (myComm.above() >= 0)
{ {
const labelList& notBelowLeaves = myComm.allNotBelow(); const auto& leaves = myComm.allNotBelow();
if constexpr (is_contiguous_v<T>) if constexpr (is_contiguous_v<T>)
{ {
List<T> received(notBelowLeaves.size()); buffer.resize_nocopy(leaves.size());
UIPstream::read UIPstream::read
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.above(), myComm.above(),
received, buffer,
tag, tag,
comm communicator
); );
forAll(notBelowLeaves, leafI) label recvIdx(0);
for (const auto leafID : leaves)
{ {
values[notBelowLeaves[leafI]] = received[leafI]; values[leafID] = buffer[recvIdx++];
} }
} }
else else
@ -247,10 +333,10 @@ void Foam::Pstream::scatterList
myComm.above(), myComm.above(),
0, // bufsize 0, // bufsize
tag, tag,
comm communicator
); );
for (const label leafID : notBelowLeaves) for (const auto leafID : leaves)
{ {
fromAbove >> values[leafID]; fromAbove >> values[leafID];
@ -267,25 +353,26 @@ void Foam::Pstream::scatterList
// Send to my downstairs neighbours // Send to my downstairs neighbours
forAllReverse(myComm.below(), belowI) forAllReverse(myComm.below(), belowI)
{ {
const label belowID = myComm.below()[belowI]; const auto belowID = myComm.below()[belowI];
const labelList& notBelowLeaves = comms[belowID].allNotBelow(); const auto& leaves = comms[belowID].allNotBelow();
if constexpr (is_contiguous_v<T>) if constexpr (is_contiguous_v<T>)
{ {
List<T> sending(notBelowLeaves.size()); buffer.resize_nocopy(leaves.size());
forAll(notBelowLeaves, leafI) label sendIdx(0);
for (const auto leafID : leaves)
{ {
sending[leafI] = values[notBelowLeaves[leafI]]; buffer[sendIdx++] = values[leafID];
} }
UOPstream::write UOPstream::write
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
belowID, belowID,
sending, buffer,
tag, tag,
comm communicator
); );
} }
else else
@ -296,11 +383,11 @@ void Foam::Pstream::scatterList
belowID, belowID,
0, // bufsize 0, // bufsize
tag, tag,
comm communicator
); );
// Send data destined for all other processors below belowID // Send data destined for all other processors below belowID
for (const label leafID : notBelowLeaves) for (const auto leafID : leaves)
{ {
toBelow << values[leafID]; toBelow << values[leafID];

View File

@ -640,7 +640,7 @@ Foam::UPstream::treeCommunication(const label communicator)
} }
void Foam::UPstream::printCommTree(const label communicator) void Foam::UPstream::printCommTree(int communicator)
{ {
const auto& comms = UPstream::whichCommunication(communicator); const auto& comms = UPstream::whichCommunication(communicator);
@ -663,14 +663,60 @@ bool Foam::UPstream::usingNodeComms(const label communicator)
( (
parRun_ && (constWorldComm_ == communicator) parRun_ && (constWorldComm_ == communicator)
&& (nodeCommsControl_ > 0) && (nodeCommsControl_ > 0)
// More than one node and above defined threshold // More than one node and above defined threshold
&& (numNodes_ > 1) && (numNodes_ >= nodeCommsMin_) && (numNodes_ > 1) && (numNodes_ >= nodeCommsMin_)
// Some processes do share nodes // Some processes do share nodes
&& (numNodes_ < procIDs_[constWorldComm_].size()) && (numNodes_ < procIDs_[constWorldComm_].size())
// Extra paranoid (guard against calling during startup)
&& (commInterNode_ > constWorldComm_)
&& (commLocalNode_ > constWorldComm_)
); );
} }
const Foam::List<int>& Foam::UPstream::interNode_offsets()
{
static std::unique_ptr<List<int>> singleton;
if (!singleton)
{
// Extra paranoid (guard against calling during startup)
if
(
(commInterNode_ <= constWorldComm_)
|| (commInterNode_ >= procIDs_.size())
)
{
return List<int>::null();
}
singleton = std::make_unique<List<int>>();
auto& offsets = *singleton;
const auto& procs = procIDs_[commInterNode_];
// The procIDs_ are already the offsets, but missing the end offset
if (!procs.empty())
{
const auto count = procs.size();
offsets.resize(count+1);
std::copy_n
(
procs.begin(),
count,
offsets.begin()
);
offsets[count] = UPstream::nProcs(constWorldComm_);
}
}
return *singleton;
}
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
bool Foam::UPstream::parRun_(false); bool Foam::UPstream::parRun_(false);

View File

@ -108,18 +108,18 @@ public:
// Private Data // Private Data
//- The procID of the processor \em directly above //- The procID of the processor \em directly above
label above_; int above_;
//- The procIDs of processors \em directly below //- The procIDs of processors \em directly below
labelList below_; List<int> below_;
//- The procIDs of all processors below myProcNo, //- The procIDs of all processors below myProcNo,
//- not just directly below //- not just directly below
labelList allBelow_; List<int> allBelow_;
//- The procIDs of all processors not below myProcNo //- The procIDs of all processors not below myProcNo
// (inverse of allBelow_ without myProcNo) //- (inverse of allBelow_ without myProcNo)
labelList allNotBelow_; List<int> allNotBelow_;
public: public:
@ -132,20 +132,20 @@ public:
//- Move construct from components //- Move construct from components
commsStruct commsStruct
( (
const label above, const int above,
labelList&& below, List<int>&& below,
labelList&& allBelow, List<int>&& allBelow,
labelList&& allNotBelow List<int>&& allNotBelow
); );
//- Copy construct from below, allBelow components //- Copy construct from below, allBelow components
commsStruct commsStruct
( (
const label numProcs, const int numProcs,
const label myProcID, const int myProcID,
const label above, const int above,
const labelUList& below, const UList<int>& below,
const labelUList& allBelow const UList<int>& allBelow
); );
@ -153,26 +153,26 @@ public:
// Access // Access
//- The number of processors addressed by the structure
label nProcs() const noexcept;
//- The procID of the processor \em directly above //- The procID of the processor \em directly above
label above() const noexcept { return above_; } int above() const noexcept { return above_; }
//- The procIDs of the processors \em directly below //- The procIDs of the processors \em directly below
const labelList& below() const noexcept { return below_; } const List<int>& below() const noexcept { return below_; }
//- The procIDs of all processors below //- The procIDs of \em all processors below
//- (so not just directly below) //- (so not just directly below)
const labelList& allBelow() const noexcept { return allBelow_; } const List<int>& allBelow() const noexcept { return allBelow_; }
//- The procIDs of all processors not below myProcNo. //- The procIDs of all processors not below myProcNo.
//- The inverse set of allBelow without myProcNo. //- The inverse set of allBelow without myProcNo.
const labelList& allNotBelow() const noexcept const List<int>& allNotBelow() const noexcept
{ {
return allNotBelow_; return allNotBelow_;
} }
//- The number of processors addressed by the structure
int nProcs() const noexcept;
// Edit // Edit
@ -183,9 +183,9 @@ public:
//- possibly with communicator-specific adjustments //- possibly with communicator-specific adjustments
void reset void reset
( (
const label procID, const int myProci,
const label numProcs, const int numProcs,
const label comm = -1 const int communicator
); );
@ -203,7 +203,7 @@ public:
// Private Data // Private Data
//- The communicator index //- The communicator index
label comm_; int comm_;
//- The communication tree //- The communication tree
List<commsStruct> tree_; List<commsStruct> tree_;
@ -216,7 +216,7 @@ public:
commsStructList() noexcept : comm_(-1) {} commsStructList() noexcept : comm_(-1) {}
//- Construct empty with given communicator //- Construct empty with given communicator
commsStructList(label comm) noexcept : comm_(comm) {} explicit commsStructList(int comm) noexcept : comm_(comm) {}
// Static Functions // Static Functions
@ -230,8 +230,8 @@ public:
//- True if communicator is non-negative (ie, was assigned) //- True if communicator is non-negative (ie, was assigned)
bool good() const noexcept { return (comm_ >= 0); } bool good() const noexcept { return (comm_ >= 0); }
//- The communicator label //- The communicator internal index
label comm() const noexcept { return comm_; } int comm() const noexcept { return comm_; }
//- Clear the list //- Clear the list
void clear() { return tree_.clear(); } void clear() { return tree_.clear(); }
@ -242,20 +242,23 @@ public:
//- The number of entries //- The number of entries
label size() const noexcept { return tree_.size(); } label size() const noexcept { return tree_.size(); }
//- Reset communicator index and clear demand-driven entries //- Reset communicator index, fill tree with empty entries
void init(const label comm); void init(int communicator);
//- Reset communicator index, clear tree entries
void reset(int communicator);
//- Get existing or create (demand-driven) entry //- Get existing or create (demand-driven) entry
const UPstream::commsStruct& get(const label proci) const; const UPstream::commsStruct& get(int proci) const;
//- Get existing or create (demand-driven) entry //- Get existing or create (demand-driven) entry
const UPstream::commsStruct& operator[](const label proci) const const UPstream::commsStruct& operator[](int proci) const
{ {
return get(proci); return get(proci);
} }
//- Print un-directed graph in graphviz dot format //- Print un-directed graph in graphviz dot format
void printGraph(Ostream& os, label proci = 0) const; void printGraph(Ostream& os, int proci = 0) const;
}; };
@ -1074,6 +1077,10 @@ public:
return rangeType(1, static_cast<int>(nProcs(communicator)-1)); return rangeType(1, static_cast<int>(nProcs(communicator)-1));
} }
//- Processor offsets corresponding to the inter-node communicator
static const List<int>& interNode_offsets();
//- Communication schedule for linear all-to-master (proc 0) //- Communication schedule for linear all-to-master (proc 0)
static const commsStructList& linearCommunication static const commsStructList& linearCommunication
( (
@ -1105,7 +1112,7 @@ public:
( (
np <= 1 np <= 1
? commsStructList::null() ? commsStructList::null()
: (np <= 2 || np < nProcsSimpleSum) : (np <= 2 || np < UPstream::nProcsSimpleSum)
? linearCommunication(communicator) ? linearCommunication(communicator)
: treeCommunication(communicator) : treeCommunication(communicator)
); );

View File

@ -28,6 +28,9 @@ License
#include "UPstream.H" #include "UPstream.H"
#include <algorithm>
#include <numeric>
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
namespace Foam namespace Foam
@ -38,9 +41,9 @@ static void printGraph_impl
( (
Ostream& os, Ostream& os,
const UPstream::commsStructList& comms, const UPstream::commsStructList& comms,
const label proci, const int proci,
label depth, int depth,
const label maxDepth = 1024 const int maxDepth = 1024
) )
{ {
if (proci >= comms.size()) if (proci >= comms.size())
@ -59,41 +62,80 @@ static void printGraph_impl
// Prefer left-to-right layout for large graphs // Prefer left-to-right layout for large graphs
os << indent << "rankdir=LR" << nl; os << indent << "rankdir=LR" << nl;
}
// Output the immediate neighbours below
if (below.empty()) if (below.empty())
{
if (proci == 0)
{ {
// A graph with a single-node (eg, self-comm) // A graph with a single-node (eg, self-comm)
os << indent << proci << nl; os << indent << proci << nl;
} }
} }
else
int pos = 0;
for (const auto nbrProci : below)
{ {
if (pos) os << indent << proci << " -- " << token::BEGIN_BLOCK;
// Accumulate into ranges whenever possible
IntRange<int> range;
// Print accumulated range and reset
auto emit_range = [&]()
{ {
os << " "; if (!range.empty())
{
os << ' ';
if (range.min() < range.max())
{
os << '"' << range.min() << ".." << range.max() << '"';
} }
else else
{ {
os << indent; os << range.min();
} }
os << proci << " -- " << nbrProci; range.reset();
}
};
if (++pos >= 4) // Max 4 items per line for (const auto nbrProci : below)
{ {
pos = 0; const bool terminal = comms[nbrProci].below().empty();
os << nl;
}
}
if (pos) if
(
terminal
&& (!range.empty() && (range.max()+1 == nbrProci))
)
{ {
os << nl; // Accumulate
++range;
continue;
} }
// Limit the maximum depth // Emit accumulated range
emit_range();
if (terminal)
{
range.reset(nbrProci, 1);
}
else
{
os << token::SPACE << nbrProci;
}
}
// Emit accumulated range
emit_range();
os << token::SPACE << token::END_BLOCK << nl;
}
// Recurse into below neighbours, but limit the maximum depth
++depth; ++depth;
if (depth >= maxDepth && (proci != 0)) if (depth >= maxDepth && (proci != 0))
{ {
@ -109,7 +151,6 @@ static void printGraph_impl
if (proci == 0) if (proci == 0)
{ {
os.endBlock(); os.endBlock();
os << "// end graph" << nl; os << "// end graph" << nl;
} }
} }
@ -150,46 +191,46 @@ static void printGraph_impl
namespace Foam namespace Foam
{ {
static label simpleTree static int simpleTree
( (
const label procID, const int myProci,
const label numProcs, const int numProcs,
DynamicList<label>& below, DynamicList<int>& below,
DynamicList<label>& allBelow DynamicList<int>& allBelow
) )
{ {
label above(-1); int above(-1);
for (label mod = 2, step = 1; step < numProcs; step = mod) for (int mod = 2, step = 1; step < numProcs; step = mod)
{ {
mod = step * 2; mod = step * 2;
if (procID % mod) if (myProci % mod)
{ {
// The rank above // The rank above
above = procID - (procID % mod); above = myProci - (myProci % mod);
break; break;
} }
else else
{ {
for for
( (
label j = procID + step; int i = myProci + step;
j < numProcs && j < procID + mod; i < numProcs && i < myProci + mod;
j += step i += step
) )
{ {
below.push_back(j); below.push_back(i);
} }
for for
( (
label j = procID + step; int i = myProci + step;
j < numProcs && j < procID + mod; i < numProcs && i < myProci + mod;
j++ ++i
) )
{ {
allBelow.push_back(j); allBelow.push_back(i);
} }
} }
} }
@ -204,10 +245,10 @@ static label simpleTree
Foam::UPstream::commsStruct::commsStruct Foam::UPstream::commsStruct::commsStruct
( (
const label above, const int above,
labelList&& below, List<int>&& below,
labelList&& allBelow, List<int>&& allBelow,
labelList&& allNotBelow List<int>&& allNotBelow
) )
: :
above_(above), above_(above),
@ -219,11 +260,11 @@ Foam::UPstream::commsStruct::commsStruct
Foam::UPstream::commsStruct::commsStruct Foam::UPstream::commsStruct::commsStruct
( (
const label numProcs, const int numProcs,
const label myProcID, const int myProcID,
const label above, const int above,
const labelUList& below, const UList<int>& below,
const labelUList& allBelow const UList<int>& allBelow
) )
: :
above_(above), above_(above),
@ -237,14 +278,14 @@ Foam::UPstream::commsStruct::commsStruct
isNotBelow[myProcID] = false; isNotBelow[myProcID] = false;
// Exclude allBelow // Exclude allBelow
for (const label proci : allBelow) for (const auto proci : allBelow)
{ {
isNotBelow[proci] = false; isNotBelow[proci] = false;
} }
// Compacting to obtain allNotBelow_ // Compacting to obtain allNotBelow_
label nNotBelow = 0; int nNotBelow = 0;
forAll(isNotBelow, proci) for (int proci = 0; proci < numProcs; ++proci)
{ {
if (isNotBelow[proci]) if (isNotBelow[proci])
{ {
@ -266,7 +307,7 @@ Foam::UPstream::commsStruct::commsStruct
void Foam::UPstream::commsStructList::printGraph void Foam::UPstream::commsStructList::printGraph
( (
Ostream& os, Ostream& os,
const label proci const int proci
) const ) const
{ {
// Print graph - starting at depth 0 // Print graph - starting at depth 0
@ -282,9 +323,9 @@ void Foam::UPstream::commsStructList::printGraph
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::UPstream::commsStruct::nProcs() const noexcept int Foam::UPstream::commsStruct::nProcs() const noexcept
{ {
return (1 + allBelow_.size() + allNotBelow_.size()); return (1 + int(allBelow_.size() + allNotBelow_.size()));
} }
@ -299,46 +340,65 @@ void Foam::UPstream::commsStruct::reset()
void Foam::UPstream::commsStruct::reset void Foam::UPstream::commsStruct::reset
( (
const label procID, const int myProci,
const label numProcs, const int numProcs,
[[maybe_unused]] const label comm const int communicator
) )
{ {
reset(); reset();
if (numProcs <= 2 || numProcs < UPstream::nProcsSimpleSum) // Linear (flat) communication pattern
if
(
// Trivially small domains
(numProcs <= 2 || numProcs < UPstream::nProcsSimpleSum)
// local-node: assume that the local communication is low-latency
|| (
UPstream::commLocalNode() == communicator
&& UPstream::commLocalNode() > UPstream::commConstWorld()
)
// inter-node: presumably relatively few nodes and/or
// higher latency with larger messages being sent
|| (
UPstream::commInterNode() == communicator
&& UPstream::commInterNode() > UPstream::commConstWorld()
)
)
{ {
// Linear communication pattern // Linear communication pattern
label above(-1); int above(-1);
labelList below; List<int> below;
if (procID == 0) if (myProci == 0)
{ {
below = identity(numProcs-1, 1); below.resize(numProcs-1);
std::iota(below.begin(), below.end(), 1);
} }
else else
{ {
above = 0; above = 0;
} }
*this = UPstream::commsStruct(numProcs, procID, above, below, below); *this = UPstream::commsStruct(numProcs, myProci, above, below, below);
return; return;
} }
// Simple tree communication pattern
DynamicList<label> below;
DynamicList<label> allBelow;
label above = simpleTree DynamicList<int> below;
DynamicList<int> allBelow;
// Simple tree communication pattern
int above = simpleTree
( (
procID, myProci,
numProcs, numProcs,
below, below,
allBelow allBelow
); );
*this = UPstream::commsStruct(numProcs, procID, above, below, allBelow); *this = UPstream::commsStruct(numProcs, myProci, above, below, allBelow);
} }
@ -360,19 +420,36 @@ Foam::UPstream::commsStructList::null()
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
void Foam::UPstream::commsStructList::init(const label comm) void Foam::UPstream::commsStructList::init(int communicator)
{ {
comm_ = comm; comm_ = communicator;
tree_.clear();
if (comm_ >= 0)
{
tree_.resize(UPstream::nProcs(comm_));
}
}
void Foam::UPstream::commsStructList::reset(int communicator)
{
comm_ = communicator;
tree_.clear(); tree_.clear();
tree_.resize(UPstream::nProcs(comm));
} }
const Foam::UPstream::commsStruct& const Foam::UPstream::commsStruct&
Foam::UPstream::commsStructList::get(const label proci) const Foam::UPstream::commsStructList::get(int proci) const
{ {
const auto numProcs = UPstream::nProcs(comm_);
// Only if reset(comm) instead of init(comm) was used
if (tree_.size() < numProcs)
{
const_cast<List<commsStruct>&>(tree_).resize(numProcs);
}
const UPstream::commsStruct& entry = tree_[proci]; const UPstream::commsStruct& entry = tree_[proci];
const auto numProcs = tree_.size();
if (entry.nProcs() != numProcs) if (entry.nProcs() != numProcs)
{ {
@ -391,10 +468,8 @@ bool Foam::UPstream::commsStruct::operator==(const commsStruct& comm) const
{ {
return return
( (
(above_ == comm.above()) (above() == comm.above())
&& (below_ == comm.below()) && (below() == comm.below())
// && (allBelow_ == comm.allBelow())
// && (allNotBelow_ == comm.allNotBelow())
); );
} }
@ -409,10 +484,10 @@ bool Foam::UPstream::commsStruct::operator!=(const commsStruct& comm) const
Foam::Ostream& Foam::operator<<(Ostream& os, const UPstream::commsStruct& comm) Foam::Ostream& Foam::operator<<(Ostream& os, const UPstream::commsStruct& comm)
{ {
os << comm.above() << nl << token::SPACE << token::SPACE; os << comm.above() << nl;
comm.below().writeList(os) << nl << token::SPACE << token::SPACE; os << " "; comm.below().writeList(os) << nl;
comm.allBelow().writeList(os) << nl << token::SPACE << token::SPACE; os << " "; comm.allBelow().writeList(os) << nl;
comm.allNotBelow().writeList(os); os << " "; comm.allNotBelow().writeList(os);
os.check(FUNCTION_NAME); os.check(FUNCTION_NAME);
return os; return os;

View File

@ -2110,21 +2110,33 @@ void Foam::argList::parse
Info<< " (" << UPstream::nProcs() << " ranks, " Info<< " (" << UPstream::nProcs() << " ranks, "
<< UPstream::numNodes() << " nodes)" << nl; << UPstream::numNodes() << " nodes)" << nl;
Info<< " floatTransfer : " if (UPstream::floatTransfer)
<< Switch::name(UPstream::floatTransfer) << nl {
<< " maxCommsSize : " Info<< " floatTransfer : enabled" << nl;
<< UPstream::maxCommsSize << nl }
<< " nProcsSimpleSum : " if (UPstream::maxCommsSize)
<< UPstream::nProcsSimpleSum << nl {
<< " nonBlockingExchange: " Info<< " maxCommsSize : "
<< UPstream::maxCommsSize << nl;
}
if (UPstream::nProcsSimpleSum > 2)
{
Info<< " nProcsSimpleSum : "
<< UPstream::nProcsSimpleSum << nl;
}
{
const auto& commsType =
UPstream::commsTypeNames[UPstream::defaultCommsType];
Info<< " nonBlockingExchange: "
<< UPstream::nProcsNonblockingExchange << UPstream::nProcsNonblockingExchange
<< " (tuning: " << UPstream::tuning_NBX_ << ')' << nl << " (tuning: " << UPstream::tuning_NBX_ << ')' << nl
<< " exchange algorithm : " << " exchange algorithm : "
<< PstreamBuffers::algorithm << nl << PstreamBuffers::algorithm << nl
<< " commsType : " << " commsType : " << commsType << nl
<< UPstream::commsTypeNames[UPstream::defaultCommsType] << nl
<< " polling iterations : " << " polling iterations : "
<< UPstream::nPollProcInterfaces << nl; << UPstream::nPollProcInterfaces << nl;
}
if (UPstream::allWorlds().size() > 1) if (UPstream::allWorlds().size() > 1)
{ {