ENH: extend mpiAllGather to include integer and float types

- was previously limited to 'char' whereas gatherv/scatterv
  already supported various integer and float types

STYLE: rebundle allToAll declarations with macros

ENH: provide a version of allToAllConsensus returning the Map

- simplifies use and avoids ambiguities in the send/recv parameters

- the Map version will now also transmit zero value data if they exist
  in the Map. Unlike the List version, zero values are not necessary to
  signal connectivity with a Map.

COMP: forwarding template parameters for NBX routines

ENH: consolidate PstreamBuffers size exchange options

- had a variety of nearly identical backends for all-to-all,
  gather/scatter. Now combined internally with a dispatch enumeration
  which provides better control over which size exchange algorithm
  is used.

DEFEATURE: remove experimental full-NBX PstreamBuffers variant

- no advantages seen compared to the hybrid NBX/PEX approach.
  Removal reduces some code cruft.

DEFEATURE: remove experimental "double non-blocking" NBX version

- the idea was to avoid blocking receives for very large data transfers,
  but that is usually better accomplished with a hybrid NBX/PEX approach
  like PstreamBuffers allows
This commit is contained in:
Mark Olesen
2023-11-17 09:08:21 +01:00
parent 3fd1b74b26
commit 8818201196
9 changed files with 514 additions and 495 deletions

View File

@ -1,7 +1,7 @@
/*--------------------------------*- C++ -*----------------------------------*\
| ========= | |
| \\ / F ield | OpenFOAM: The Open Source CFD Toolbox |
| \\ / O peration | Version: v2306 |
| \\ / O peration | Version: v2312 |
| \\ / A nd | Website: www.openfoam.com |
| \\/ M anipulation | |
\*---------------------------------------------------------------------------*/
@ -174,12 +174,13 @@ OptimisationSwitches
nbx.tuning 0;
// Additional PstreamBuffers tuning parameters (experimental)
// -1 : PEX with all-to-all for buffer sizes and point-to-point
// for contents (legacy approach)
// 0 : hybrid PEX with NBX for buffer sizes and point-to-point
// for contents (proposed new approach)
// 1 : full NBX for buffer sizes and contents (very experimental)
pbufs.tuning -1;
// 0 : (legacy PEX)
// * all-to-all for buffer sizes [legacy approach]
// * point-to-point for contents
// 1 : (hybrid PEX)
// * NBX for buffer sizes [new approach]
// * point-to-point for contents
pbufs.tuning 0;
// =====

View File

@ -599,7 +599,7 @@ public:
// Non-blocking exchange
//- Exchange \em contiguous data using non-blocking consensus
//- Exchange \em contiguous data using non-blocking consensus (NBX)
//- Sends sendData, receives into recvData.
//
// Each entry of the recvBufs list is cleared before receipt.
@ -614,10 +614,10 @@ public:
List<Container>& recvBufs,
const int tag,
const label comm,
const bool wait = true //!< Wait for requests to complete
const bool wait = true //!< (ignored)
);
//- Exchange \em contiguous data using non-blocking consensus
//- Exchange \em contiguous data using non-blocking consensus (NBX)
//- Sends sendData, receives into recvData.
//
// Each \em entry of the recvBufs map is cleared before receipt,
@ -636,7 +636,23 @@ public:
Map<Container>& recvBufs,
const int tag,
const label comm,
const bool wait = true //!< Wait for requests to complete
const bool wait = true //!< (ignored)
);
//- Exchange \em contiguous data using non-blocking consensus (NBX)
//- Sends sendData returns receive information.
//
// For \b non-parallel : copy own rank (if it exists and non-empty)
//
// \note The message tag should be chosen to be a unique value
// since the implementation uses probing with ANY_SOURCE !!
template<class Container, class Type>
static Map<Container> exchangeConsensus
(
const Map<Container>& sendBufs,
const int tag,
const label comm,
const bool wait = true //!< (ignored)
);
};

View File

@ -36,7 +36,7 @@ License
int Foam::PstreamBuffers::algorithm
(
// Name may change in the future (JUN-2023)
Foam::debug::optimisationSwitch("pbufs.tuning", -1)
Foam::debug::optimisationSwitch("pbufs.tuning", 0)
);
registerOptSwitch
(
@ -46,20 +46,19 @@ registerOptSwitch
);
// Simple enumerations
// -------------------
static constexpr int algorithm_PEX_allToAll = -1; // Traditional PEX
//static constexpr int algorithm_PEX_hybrid = 0; // Possible new default?
static constexpr int algorithm_full_NBX = 1; // Very experimental
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
inline void Foam::PstreamBuffers::setFinished(bool on) noexcept
{
finishedSendsCalled_ = on;
}
inline void Foam::PstreamBuffers::initFinalExchange()
{
// Could also check that it is not called twice
// but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true;
setFinished(true);
clearUnregistered();
}
@ -67,65 +66,149 @@ inline void Foam::PstreamBuffers::initFinalExchange()
void Foam::PstreamBuffers::finalExchange
(
enum modeOption mode,
const bool wait,
labelList& recvSizes
)
{
initFinalExchange();
// Pre-flight checks
switch (mode)
{
case modeOption::DEFAULT :
{
// Choose (ALL_TO_ALL | NBX_PEX) from static settings
mode =
(
(algorithm <= 0)
? modeOption::ALL_TO_ALL
: modeOption::NBX_PEX
);
break;
}
case modeOption::GATHER :
{
// gather mode (all-to-one) : master [0] <- everyone
// - only send to master [0]
// note: master [0] is also allowed to 'send' to itself
for (label proci = 1; proci < sendBuffers_.size(); ++proci)
{
sendBuffers_[proci].clear();
}
break;
}
case modeOption::SCATTER :
{
// scatter mode (one-to-all) : master [0] -> everyone
if (!UPstream::master(comm_))
{
// Non-master: has no sends
clearSends();
}
break;
}
default :
break;
}
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
if
(
wait
&& (algorithm >= algorithm_full_NBX)
&& (UPstream::maxCommsSize <= 0)
)
// PEX algorithm with different flavours of exchanging sizes
// PEX stage 1: exchange sizes
labelList sendSizes; // Not used by gather/scatter
switch (mode)
{
// NBX algorithm (nonblocking exchange)
// - when requested and waiting, no data chunking etc
case modeOption::GATHER :
{
// gather mode (all-to-one): master [0] <- everyone
// - presumed that MPI_Gather will be the most efficient
PstreamDetail::exchangeConsensus<DynamicList<char>, char>
(
sendBuffers_,
recvBuffers_,
recvSizes,
(tag_ + 271828), // some unique tag?
comm_,
wait
);
recvSizes =
UPstream::listGatherValues(sendBuffers_[0].size(), comm_);
return;
if (!UPstream::master(comm_))
{
recvSizes.resize_nocopy(nProcs_);
recvSizes = Zero;
}
break;
}
case modeOption::SCATTER :
{
// scatter mode (one-to-all): master [0] -> everyone
// - presumed that MPI_Scatter will be the most efficient
recvSizes.resize_nocopy(nProcs_);
if (UPstream::master(comm_))
{
forAll(sendBuffers_, proci)
{
recvSizes[proci] = sendBuffers_[proci].size();
}
}
const label myRecv
(
UPstream::listScatterValues(recvSizes, comm_)
);
recvSizes = Zero;
recvSizes[0] = myRecv;
break;
}
case modeOption::NBX_PEX :
{
// Assemble the send sizes (cf. Pstream::exchangeSizes)
sendSizes.resize_nocopy(nProcs_);
forAll(sendBuffers_, proci)
{
sendSizes[proci] = sendBuffers_[proci].size();
}
recvSizes.resize_nocopy(nProcs_);
// Exchange sizes (non-blocking consensus)
UPstream::allToAllConsensus
(
sendSizes,
recvSizes,
(tag_ + 314159), // some unique tag?
comm_
);
break;
}
case modeOption::DEFAULT :
case modeOption::ALL_TO_ALL :
{
// Assemble the send sizes (cf. Pstream::exchangeSizes)
sendSizes.resize_nocopy(nProcs_);
forAll(sendBuffers_, proci)
{
sendSizes[proci] = sendBuffers_[proci].size();
}
recvSizes.resize_nocopy(nProcs_);
// Exchange sizes (all-to-all)
UPstream::allToAll(sendSizes, recvSizes, comm_);
break;
}
}
// PEX algorithm with two different flavours of exchanging sizes
// Assemble the send sizes (cf. Pstream::exchangeSizes)
labelList sendSizes(nProcs_);
forAll(sendBuffers_, proci)
{
sendSizes[proci] = sendBuffers_[proci].size();
}
recvSizes.resize_nocopy(nProcs_);
if (algorithm == algorithm_PEX_allToAll)
{
// PEX stage 1: exchange sizes (all-to-all)
UPstream::allToAll(sendSizes, recvSizes, comm_);
}
else
{
// PEX stage 1: exchange sizes (non-blocking consensus)
UPstream::allToAllConsensus
(
sendSizes,
recvSizes,
(tag_ + 314159), // some unique tag?
comm_
);
}
// PEX stage 2: point-to-point data exchange
Pstream::exchange<DynamicList<char>, char>
(
@ -166,7 +249,7 @@ void Foam::PstreamBuffers::finalExchange
recvSizes[proci] = 1; // Connected
}
for (label proci=0; proci < nProcs_; ++proci)
for (label proci = 0; proci < nProcs_; ++proci)
{
if (!recvSizes[proci]) // Not connected
{
@ -200,93 +283,6 @@ void Foam::PstreamBuffers::finalExchange
}
void Foam::PstreamBuffers::finalGatherScatter
(
const bool isGather,
const bool wait,
labelList& recvSizes
)
{
initFinalExchange();
if (isGather)
{
// gather mode (all-to-one)
// Only send to master [0]. Master is also allowed to 'send' to itself
for (label proci=1; proci < sendBuffers_.size(); ++proci)
{
sendBuffers_[proci].clear();
}
}
else
{
// scatter mode (one-to-all)
if (!UPstream::master(comm_))
{
// Non-master: has no sends
clearSends();
}
}
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
// Use PEX algorithm
// - for a non-sparse gather/scatter, it is presumed that
// MPI_Gather/MPI_Scatter will be the most efficient way to
// communicate the sizes.
// PEX stage 1: exchange sizes (gather or scatter)
if (isGather)
{
// gather mode (all-to-one): master [0] <- everyone
recvSizes =
UPstream::listGatherValues(sendBuffers_[0].size(), comm_);
if (!UPstream::master(comm_))
{
recvSizes.resize_nocopy(nProcs_);
recvSizes = Zero;
}
}
else
{
// scatter mode (one-to-all): master [0] -> everyone
recvSizes.resize_nocopy(nProcs_);
if (UPstream::master(comm_))
{
forAll(sendBuffers_, proci)
{
recvSizes[proci] = sendBuffers_[proci].size();
}
}
const label myRecv(UPstream::listScatterValues(recvSizes, comm_));
recvSizes = Zero;
recvSizes[0] = myRecv;
}
// PEX stage 2: point-to-point data exchange
Pstream::exchange<DynamicList<char>, char>
(
sendBuffers_,
recvSizes,
recvBuffers_,
tag_,
comm_,
wait
);
}
}
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::PstreamBuffers::PstreamBuffers
@ -382,7 +378,7 @@ void Foam::PstreamBuffers::clear()
{
clearSends();
clearRecvs();
finishedSendsCalled_ = false;
setFinished(false);
}
@ -431,13 +427,13 @@ void Foam::PstreamBuffers::clearStorage()
}
recvPositions_ = Zero;
finishedSendsCalled_ = false;
setFinished(false);
}
void Foam::PstreamBuffers::initRegisterSend()
{
if (!finishedSendsCalled_)
if (!finished())
{
for (label proci = 0; proci < nProcs_; ++proci)
{
@ -474,7 +470,7 @@ bool Foam::PstreamBuffers::hasSendData() const
bool Foam::PstreamBuffers::hasRecvData() const
{
if (finishedSendsCalled_)
if (finished())
{
forAll(recvBuffers_, proci)
{
@ -504,7 +500,7 @@ Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const
Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
{
if (finishedSendsCalled_)
if (finished())
{
const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
@ -529,7 +525,7 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
{
labelList counts(nProcs_, Zero);
if (finishedSendsCalled_)
if (finished())
{
forAll(recvBuffers_, proci)
{
@ -560,7 +556,7 @@ Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount
{
label maxLen = 0;
if (finishedSendsCalled_)
if (finished())
{
forAll(recvBuffers_, proci)
{
@ -599,7 +595,7 @@ Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount() const
const Foam::UList<char>
Foam::PstreamBuffers::peekRecvData(const label proci) const
{
if (finishedSendsCalled_)
if (finished())
{
const label pos = recvPositions_[proci];
const label len = recvBuffers_[proci].size();
@ -625,18 +621,17 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const
}
bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
{
bool old(allowClearRecv_);
allowClearRecv_ = on;
return old;
}
void Foam::PstreamBuffers::finishedSends(const bool wait)
{
labelList recvSizes;
finalExchange(wait, recvSizes);
finalExchange(modeOption::DEFAULT, wait, recvSizes);
}
void Foam::PstreamBuffers::finishedSendsNBX(const bool wait)
{
labelList recvSizes;
finalExchange(modeOption::NBX_PEX, wait, recvSizes);
}
@ -649,7 +644,7 @@ void Foam::PstreamBuffers::finishedSends
// Resize for copying back
recvSizes.resize_nocopy(sendBuffers_.size());
finalExchange(wait, recvSizes);
finalExchange(modeOption::DEFAULT, wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@ -717,8 +712,9 @@ bool Foam::PstreamBuffers::finishedSends
if (changed)
{
// Update send/recv topology
labelList recvSizes;
finishedSends(recvSizes, wait); // eg, using all-to-all
finishedSends(recvSizes, wait); // modeOption::DEFAULT (eg all-to-all)
// The send ranks
sendProcs.clear();
@ -754,14 +750,14 @@ bool Foam::PstreamBuffers::finishedSends
void Foam::PstreamBuffers::finishedGathers(const bool wait)
{
labelList recvSizes;
finalGatherScatter(true, wait, recvSizes);
finalExchange(modeOption::GATHER, wait, recvSizes);
}
void Foam::PstreamBuffers::finishedScatters(const bool wait)
{
labelList recvSizes;
finalGatherScatter(false, wait, recvSizes);
finalExchange(modeOption::SCATTER, wait, recvSizes);
}
@ -771,7 +767,7 @@ void Foam::PstreamBuffers::finishedGathers
const bool wait
)
{
finalGatherScatter(true, wait, recvSizes);
finalExchange(modeOption::GATHER, wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@ -793,7 +789,7 @@ void Foam::PstreamBuffers::finishedScatters
const bool wait
)
{
finalGatherScatter(false, wait, recvSizes);
finalExchange(modeOption::SCATTER, wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@ -809,4 +805,27 @@ void Foam::PstreamBuffers::finishedScatters
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// Controls
bool Foam::PstreamBuffers::finished() const noexcept
{
return finishedSendsCalled_;
}
bool Foam::PstreamBuffers::allowClearRecv() const noexcept
{
return allowClearRecv_;
}
bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
{
bool old(allowClearRecv_);
allowClearRecv_ = on;
return old;
}
// ************************************************************************* //

View File

@ -32,7 +32,7 @@ Description
Use UOPstream to stream data into buffers, call finishedSends() to
notify that data is in buffers and then use IUPstream to get data out
of received buffers. Works with both blocking and nonBlocking. Does
of received buffers. Works with both blocking and non-blocking. Does
not make much sense with scheduled since there you would not need these
explicit buffers.
@ -151,6 +151,19 @@ class bitSet;
class PstreamBuffers
{
// Private Data Types
//- Private enumeration for handling PEX stage 1 (sizing) modes
enum class modeOption : unsigned char
{
DEFAULT, //!< Use statically configured algorithm
GATHER, //!< Use all-to-one (gather) of sizes
SCATTER, //!< Use one-to-all (scatter) of sizes
ALL_TO_ALL, //!< Use allToAll to obtain sizes
NBX_PEX //!< Use consensus exchange (NBX) to obtain sizes
};
// Private Data
//- Track if sends are complete
@ -190,20 +203,24 @@ class PstreamBuffers
// Private Member Functions
//- Change status of finished sends called
inline void setFinished(bool on) noexcept;
//- Clear 'unregistered' send buffers, tag as being send-ready
inline void initFinalExchange();
//- Mark all sends as having been done.
// This will start receives (nonBlocking comms).
// This will start receives (non-blocking comms).
void finalExchange
(
enum modeOption mode,
const bool wait,
labelList& recvSizes
);
//- Mark sends as done.
// Only exchange sizes using the neighbour ranks
// (nonBlocking comms).
// (non-blocking comms).
void finalExchange
(
const labelUList& sendProcs,
@ -212,14 +229,6 @@ class PstreamBuffers
labelList& recvSizes
);
//- For all-to-one or one-to-all
void finalGatherScatter
(
const bool isGather,
const bool wait,
labelList& recvSizes
);
// Friendship Access
@ -343,17 +352,11 @@ public:
// Queries
//- True if finishedSends() or finishedNeighbourSends() has been called
bool finished() const noexcept
{
return finishedSendsCalled_;
}
bool finished() const noexcept;
//- Is clearStorage of individual receive buffer by external hooks
//- allowed? (default: true)
bool allowClearRecv() const noexcept
{
return allowClearRecv_;
}
bool allowClearRecv() const noexcept;
//- True if any (local) send buffers have data
bool hasSendData() const;
@ -436,74 +439,96 @@ public:
// Regular Functions
//- Mark sends as done
//- Mark the send phase as being finished.
//
// Non-blocking mode: populates receive buffers (all-to-all).
// \param wait wait for requests to complete (in nonBlocking mode)
// Non-blocking mode: populates receive buffers using all-to-all
// or NBX (depending on tuning parameters).
// \param wait wait for requests to complete (in non-blocking mode)
void finishedSends(const bool wait = true);
//- Mark sends as done.
//- Mark the send phase as being finished.
//
// Non-blocking mode: populates receive buffers using NBX.
// \param wait wait for requests to complete (in non-blocking mode)
void finishedSendsNBX(const bool wait = true);
//- Mark the send phase as being finished.
//- Recovers the sizes (bytes) received.
//
// Non-blocking mode: populates receive buffers (all-to-all).
// \param[out] recvSizes the sizes (bytes) received
// \param wait wait for requests to complete (in nonBlocking mode)
// Non-blocking mode: populates receive buffers using all-to-all
// or NBX (depending on tuning parameters).
// \warning currently only valid for non-blocking comms.
void finishedSends
(
//! [out] the sizes (bytes) received
labelList& recvSizes,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true
);
//- Mark the send phase as being finished.
//- Recovers the sizes (bytes) received.
//
// \warning currently only valid for nonBlocking comms.
void finishedSends(labelList& recvSizes, const bool wait = true);
// Non-blocking mode: populates receive buffers using NBX.
// \warning currently only valid for non-blocking comms.
void finishedSendsNBX
(
//! [out] the sizes (bytes) received
labelList& recvSizes,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true
);
// Functions with restricted neighbours
//- Mark sends as done using subset of send/recv ranks
//- and recover the sizes (bytes) received.
//- Mark the send phase as being finished, with communication
//- being limited to a known subset of send/recv ranks.
//
// Non-blocking mode: populates receive buffers.
//
// \param neighProcs ranks used for sends/recvs
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
// \warning currently only valid for non-blocking comms.
// \note Same as finishedSends with identical sendProcs/recvProcs
void finishedNeighbourSends
(
//! ranks used for sends/recvs
const labelUList& neighProcs,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true
);
//- Mark sends as done using subset of send/recv ranks
//- and recover the sizes (bytes) received.
//- Mark the send phase as being finished, with communication
//- being limited to a known subset of send/recv ranks.
//- Recovers the sizes (bytes) received.
//
// Non-blocking mode: it will populate receive buffers.
//
// \param neighProcs ranks used for sends/recvs
// \param[out] recvSizes the sizes (bytes) received
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking mode.
// \warning currently only valid for non-blocking mode.
void finishedNeighbourSends
(
//! ranks used for sends/recvs
const labelUList& neighProcs,
//! [out] the sizes (bytes) received
labelList& recvSizes,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true
);
//- A caching version that uses a limited send/recv connectivity.
//
// Non-blocking mode: populates receive buffers.
// \param sendConnections on/off for sending ranks
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \return True if the send/recv connectivity changed
//
// \warning currently only valid for nonBlocking comms.
// \warning currently only valid for non-blocking comms.
bool finishedSends
(
//! inter-rank connections (on/off) for sending ranks
bitSet& sendConnections,
//! ranks used for sends
DynamicList<label>& sendProcs,
//! ranks used for recvs
DynamicList<label>& recvProcs,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true
);
@ -515,40 +540,46 @@ public:
// Non-blocking mode: populates receive buffers.
// Can use recvDataCount, maxRecvCount etc to recover sizes received.
//
// \param wait wait for requests to complete (in nonBlocking mode)
// \param wait wait for requests to complete (in non-blocking mode)
//
// \warning currently only valid for nonBlocking comms.
// \warning currently only valid for non-blocking comms.
void finishedGathers(const bool wait = true);
//- Mark all sends to master as done.
//- Recovers the sizes (bytes) received.
//
// Non-blocking mode: populates receive buffers (all-to-one).
// \param[out] recvSizes the sizes (bytes) received
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedGathers(labelList& recvSizes, const bool wait = true);
// \warning currently only valid for non-blocking comms.
void finishedGathers
(
//! [out] the sizes (bytes) received
labelList& recvSizes,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true
);
//- Mark all sends to sub-procs as done.
//
// Non-blocking mode: populates receive buffers.
// Can use recvDataCount, maxRecvCount etc to recover sizes received.
//
// \param wait wait for requests to complete (in nonBlocking mode)
// \param wait wait for requests to complete (in non-blocking mode)
//
// \warning currently only valid for nonBlocking comms.
// \warning currently only valid for non-blocking comms.
void finishedScatters(const bool wait = true);
//- Mark all sends to sub-procs as done.
//- Recovers the sizes (bytes) received.
//
// Non-blocking mode: populates receive buffers (all-to-one).
// \param[out] recvSizes the sizes (bytes) received
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedScatters(labelList& recvSizes, const bool wait = true);
// \warning currently only valid for non-blocking comms.
void finishedScatters
(
//! [out] the sizes (bytes) received
labelList& recvSizes,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true
);
};

View File

@ -46,21 +46,19 @@ namespace Foam
namespace PstreamDetail
{
//- Exchange \em contiguous data using non-blocking consensus exchange
//- Exchange \em contiguous data using non-blocking consensus exchange (NBX)
//- with optional tracking of the receive sizes.
//
// No internal guards or resizing - data containers are all properly
// sized before calling.
//
// \param[in] sendBufs The send buffers list (size: numProcs)
// \param[out] recvBufs The recv buffers list (size: numProcs)
// \param[out] recvSizes The recv sizes (size: 0 or numProcs).
// \param[in] sendBufs The send buffers list (size: numProc)
// \param[out] recvBufs The recv buffers list (size: numProc)
// \param[out] recvSizes The recv sizes (size: 0 or numProc).
// This parameter can be an empty list, in which case the receive sizes
// are not returned.
// \param tag The message tag
// \param comm The communicator
// \param wait Wait for non-blocking receives to complete
// \param recvCommType If blocking or (default) non-blocking
template<class Container, class Type>
void exchangeConsensus
@ -69,20 +67,17 @@ void exchangeConsensus
UList<Container>& recvBufs,
labelUList& recvSizes,
const int tag,
const label comm,
const bool wait = true,
const UPstream::commsTypes recvCommType = UPstream::commsTypes::nonBlocking
const label comm
)
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
const label startOfRequests = UPstream::nRequests();
const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::nProcs(comm);
// Initial: clear all receive information
// Initial: clear all receive locations
for (auto& buf : recvBufs)
{
buf.clear();
@ -98,28 +93,37 @@ void exchangeConsensus
if (sendBufs.size() > numProc)
{
FatalErrorInFunction
<< "Send buffers:" << sendBufs.size() << " > numProcs:" << numProc
<< "Send buffers:" << sendBufs.size() << " > numProc:" << numProc
<< Foam::abort(FatalError);
}
if (recvBufs.size() < numProc)
{
FatalErrorInFunction
<< "Recv buffers:" << recvBufs.size() << " < numProcs:" << numProc
<< "Recv buffers:" << recvBufs.size() << " < numProc:" << numProc
<< Foam::abort(FatalError);
}
// #endif
if (!UPstream::is_parallel(comm))
// Fake send/recv for myself - parallel or non-parallel
{
// Do myself
recvBufs[myProci] = sendBufs[myProci];
if (myProci < recvSizes.size())
{
recvSizes[myProci] = recvBufs.size();
}
}
if (!UPstream::is_parallel(comm))
{
// Nothing left to do
return;
}
// ------------------------------------------------------------------------
// Setup sends
// ------------------------------------------------------------------------
// An initial barrier may help to avoid synchronisation problems
// caused elsewhere
if (initialBarrier)
@ -127,11 +131,12 @@ void exchangeConsensus
UPstream::barrier(comm);
}
// Algorithm NBX: Nonblocking consensus with List containers
DynamicList<UPstream::Request> sendRequests(sendBufs.size());
// Start nonblocking synchronous send to processor dest
// Start nonblocking synchronous send to destination ranks
for (label proci = 0; proci < numProc; ++proci)
{
const auto& sendData = sendBufs[proci];
@ -140,19 +145,8 @@ void exchangeConsensus
{
// Do not send/recv empty data
}
else if (proci == myProci)
else if (proci != myProci)
{
// Do myself
recvBufs[proci] = sendData;
if (proci < recvSizes.size())
{
recvSizes[proci] = sendData.size();
}
}
else
{
// Has data to send.
// The MPI send requests are tracked on a local list
UOPstream::write
(
sendRequests.emplace_back(),
@ -167,7 +161,15 @@ void exchangeConsensus
}
// ------------------------------------------------------------------------
// Probe and receive
// ------------------------------------------------------------------------
//
// When receiving can use resize() instead of resize_nocopy() since the
// slots were already initially cleared.
// The resize() also works fine with FixedList since it will
// corresponds to a no-op: send and recv sizes will always be
// identical to its fixed size() / max_size()
UPstream::Request barrierRequest;
@ -191,17 +193,16 @@ void exchangeConsensus
const label count = (probed.second / sizeof(Type));
auto& recvData = recvBufs[proci];
recvData.resize_nocopy(count);
recvData.resize(count); // OK with resize() instead of _nocopy()
if (proci < recvSizes.size())
{
recvSizes[proci] = count;
}
// Any non-blocking MPI recv requests are tracked on internal stack
UIPstream::read
(
recvCommType,
UPstream::commsTypes::blocking,
proci,
recvData.data_bytes(),
recvData.size_bytes(),
@ -229,26 +230,18 @@ void exchangeConsensus
}
}
}
// Wait for non-blocking receives to finish
if (wait && recvCommType == UPstream::commsTypes::nonBlocking)
{
UPstream::waitRequests(startOfRequests);
}
}
//- Exchange \em contiguous data using non-blocking consensus exchange.
//- Exchange \em contiguous data using non-blocking consensus exchange (NBX)
//
// No internal guards - the sending Map corresponds to a segment of
// 0-numProcs.
// 0-numProc.
//
// \param[in] sendBufs The send buffers map (addr: 0-numProcs)
// \param[in] sendBufs The send buffers map (addr: 0-numProc)
// \param[out] recvBufs The recv buffers map
// \param tag The message tag
// \param comm The communicator
// \param wait Wait for non-blocking receives to complete
// \param recvCommType If blocking or (default) non-blocking
template<class Container, class Type>
void exchangeConsensus
@ -256,17 +249,17 @@ void exchangeConsensus
const Map<Container>& sendBufs,
Map<Container>& recvBufs,
const int tag,
const label comm,
const bool wait = true,
const UPstream::commsTypes recvCommType = UPstream::commsTypes::nonBlocking
const label comm
)
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
const label startOfRequests = UPstream::nRequests();
const label myProci = UPstream::myProcNo(comm);
// TDB: const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
// Initial: clear out receive 'slots'
const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::myProcNo(comm);
// Initial: clear all receive locations
// Preferrable to clear out the map entries instead of the map itself
// since this can potentially preserve allocated space
// (eg DynamicList entries) between calls
@ -276,9 +269,13 @@ void exchangeConsensus
iter.val().clear();
}
if (!UPstream::is_parallel(comm))
if (!UPstream::is_rank(comm))
{
return; // Process not in communicator
}
// Fake send/recv for myself - parallel or non-parallel
{
// Do myself
const auto iter = sendBufs.find(myProci);
if (iter.good())
{
@ -290,43 +287,38 @@ void exchangeConsensus
recvBufs(iter.key()) = sendData;
}
}
}
if (!UPstream::is_parallel(comm))
{
// Nothing left to do
return;
}
// ------------------------------------------------------------------------
// Setup sends
// ------------------------------------------------------------------------
// TDB: initialBarrier ...
// Algorithm NBX: Nonblocking consensus with Map (HashTable) containers
DynamicList<UPstream::Request> sendRequests(sendBufs.size());
// Start nonblocking synchronous send to process dest
// Start nonblocking synchronous send to destination ranks
forAllConstIters(sendBufs, iter)
{
const label proci = iter.key();
const auto& sendData = iter.val();
#ifdef FULLDEBUG
if (proci >= UPstream::nProcs(comm))
if (sendData.empty() || proci < 0 || proci >= numProc)
{
FatalErrorInFunction
<< "Send buffer:" << proci << " >= numProcs:"
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
// Do not send/recv empty data or invalid destinations
}
#endif
if (sendData.empty())
else if (proci != myProci)
{
// Do not send/recv empty data
}
else if (proci == myProci)
{
// Do myself: insert_or_assign
recvBufs(proci) = sendData;
}
else
{
// Has data to send.
// The MPI send requests are tracked on a local list
UOPstream::write
(
sendRequests.emplace_back(),
@ -341,7 +333,15 @@ void exchangeConsensus
}
// ------------------------------------------------------------------------
// Probe and receive
// ------------------------------------------------------------------------
//
// When receiving can use resize() instead of resize_nocopy() since the
// slots were already initially cleared.
// The resize() also works fine with FixedList since it will
// corresponds to a no-op: send and recv sizes will always be
// identical to its fixed size() / max_size()
UPstream::Request barrierRequest;
@ -365,12 +365,11 @@ void exchangeConsensus
const label count = (probed.second / sizeof(Type));
auto& recvData = recvBufs(proci);
recvData.resize_nocopy(count);
recvData.resize(count); // OK with resize() instead of _nocopy()
// Any non-blocking MPI recv requests are tracked on internal stack
UIPstream::read
(
recvCommType,
UPstream::commsTypes::blocking,
proci,
recvData.data_bytes(),
recvData.size_bytes(),
@ -397,12 +396,6 @@ void exchangeConsensus
}
}
}
// Wait for non-blocking receives to finish
if (wait && recvCommType == UPstream::commsTypes::nonBlocking)
{
UPstream::waitRequests(startOfRequests);
}
}
} // namespace PstreamDetail
@ -418,7 +411,7 @@ void Foam::Pstream::exchangeConsensus
List<Container>& recvBufs,
const int tag,
const label comm,
const bool wait
const bool /* wait (ignored) */
)
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
@ -427,7 +420,7 @@ void Foam::Pstream::exchangeConsensus
{
FatalErrorInFunction
<< "Send buffers size:" << sendBufs.size()
<< " != numProcs:" << UPstream::nProcs(comm)
<< " != numProc:" << UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
@ -435,14 +428,13 @@ void Foam::Pstream::exchangeConsensus
recvBufs.resize_nocopy(sendBufs.size());
labelList dummyRecvSizes;
PstreamDetail::exchangeConsensus
PstreamDetail::exchangeConsensus<Container, Type>
(
sendBufs,
recvBufs,
dummyRecvSizes,
tag,
comm,
wait
comm
);
}
@ -454,20 +446,45 @@ void Foam::Pstream::exchangeConsensus
Map<Container>& recvBufs,
const int tag,
const label comm,
const bool wait
const bool /* wait (ignored) */
)
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
PstreamDetail::exchangeConsensus
PstreamDetail::exchangeConsensus<Container, Type>
(
sendBufs,
recvBufs,
tag,
comm,
wait
comm
);
}
template<class Container, class Type>
Foam::Map<Container>
Foam::Pstream::exchangeConsensus
(
const Map<Container>& sendBufs,
const int tag,
const label comm,
const bool /* wait (ignored) */
)
{
Map<Container> recvBufs;
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
PstreamDetail::exchangeConsensus<Container, Type>
(
sendBufs,
recvBufs,
tag,
comm
);
return recvBufs;
}
// ************************************************************************* //

View File

@ -44,6 +44,7 @@ SourceFiles
#include "labelList.H"
#include "DynamicList.H"
#include "HashTable.H"
#include "Map.H"
#include "Enum.H"
#include "ListOps.H"
@ -55,9 +56,6 @@ namespace Foam
//- Implementation details for UPstream/Pstream/MPI etc.
namespace PstreamDetail {}
// Forward Declarations
template<class T> class Map;
/*---------------------------------------------------------------------------*\
Class UPstream Declaration
\*---------------------------------------------------------------------------*/
@ -968,119 +966,76 @@ public:
//- Shutdown (finalize) MPI as required and exit program with errNo.
static void exit(int errNo = 1);
//- Exchange integer data with all processors (in the communicator).
// \c sendData[proci] is the value to send to proci.
// After return recvData contains the data from the other processors.
// \n
// For \b non-parallel : does a simple copy of sendData to recvData
static void allToAll
(
const UList<int32_t>& sendData,
UList<int32_t>& recvData,
const label communicator = worldComm
);
//- Exchange integer data with all processors (in the communicator).
// \c sendData[proci] is the value to send to proci.
// After return recvData contains the data from the other processors.
// \n
// For \b non-parallel : does a simple copy of sendData to recvData
static void allToAll
(
const UList<int64_t>& sendData,
UList<int64_t>& recvData,
const label communicator = worldComm
);
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native) \
\
/*!\brief Exchange \c Native data with all ranks in communicator */ \
/*! \em non-parallel : simple copy of \p sendData to \p recvData */ \
static void allToAll \
( \
/*! [in] The value at [proci] is sent to proci */ \
const UList<Native>& sendData, \
/*! [out] The data received from the other ranks */ \
UList<Native>& recvData, \
const label communicator = worldComm \
); \
\
/*!\brief Exchange \em non-zero \c Native data between ranks [NBX] */ \
/*! \p recvData is always initially assigned zero and no non-zero */ \
/*! values are sent/received from other ranks. */ \
/*! \em non-parallel : simple copy of \p sendData to \p recvData */ \
/*! \note The message tag should be chosen to be a unique value */ \
/*! since the implementation uses probing with ANY_SOURCE !! */ \
/*! An initial barrier may help to avoid synchronisation problems */ \
/*! caused elsewhere (See "nbx.tuning" opt switch) */ \
static void allToAllConsensus \
( \
/*! [in] The \em non-zero value at [proci] is sent to proci */ \
const UList<Native>& sendData, \
/*! [out] The non-zero value received from each rank */ \
UList<Native>& recvData, \
/*! Message tag for the communication */ \
const int tag, \
const label communicator = worldComm \
); \
\
/*!\brief Exchange \c Native data between ranks [NBX] */ \
/*! \p recvData map is always cleared initially so a simple check */ \
/*! of its keys is sufficient to determine connectivity. */ \
/*! \em non-parallel : copy own rank (if it exists) */ \
/*! See notes about message tags and "nbx.tuning" opt switch */ \
static void allToAllConsensus \
( \
/*! [in] The value at [proci] is sent to proci. */ \
const Map<Native>& sendData, \
/*! [out] The values received from given ranks. */ \
Map<Native>& recvData, \
/*! Message tag for the communication */ \
const int tag, \
const label communicator = worldComm \
); \
\
/*!\brief Exchange \c Native data between ranks [NBX] */ \
/*! \returns any received data as a Map */ \
static Map<Native> allToAllConsensus \
( \
/*! [in] The value at [proci] is sent to proci. */ \
const Map<Native>& sendData, \
/*! Message tag for the communication */ \
const int tag, \
const label communicator = worldComm \
) \
{ \
Map<Native> recvData; \
allToAllConsensus(sendData, recvData, tag, communicator); \
return recvData; \
}
//- Exchange \b non-zero integer data with all ranks in the communicator
//- using non-blocking consensus exchange.
// The \c sendData[proci] is the (non-zero) value to send to proci.
// After return recvData contains the non-zero values sent from the
// other processors. The recvData list is always assigned zero before
// receipt and values of zero are never transmitted.
// After return recvData contains the data from the other processors.
// \n
// For \b non-parallel : does a simple copy of sendData to recvData
//
// \note The message tag should be chosen to be a unique value
// since the implementation uses probing with ANY_SOURCE !!
// An initial barrier may help to avoid synchronisation problems
// caused elsewhere (See "nbx.tuning" opt switch)
static void allToAllConsensus
(
const UList<int32_t>& sendData,
UList<int32_t>& recvData,
const int tag,
const label communicator = worldComm
);
Pstream_CommonRoutines(int32_t);
Pstream_CommonRoutines(int64_t);
//- Exchange \b non-zero integer data with all ranks in the communicator
//- using non-blocking consensus exchange.
// The \c sendData[proci] is the (non-zero) value to send to proci.
// After return recvData contains the non-zero values sent from the
// other processors. The recvData list is always assigned zero before
// receipt and values of zero are never transmitted.
// After return recvData contains the data from the other processors.
// \n
// For \b non-parallel : does a simple copy of sendData to recvData
//
// \note The message tag should be chosen to be a unique value
// since the implementation uses probing with ANY_SOURCE !!
// An initial barrier may help to avoid synchronisation problems
// caused elsewhere (See "nbx.tuning" opt switch)
static void allToAllConsensus
(
const UList<int64_t>& sendData,
UList<int64_t>& recvData,
const int tag,
const label communicator = worldComm
);
//- Exchange \b non-zero integer data with all ranks in the communicator
//- using non-blocking consensus exchange.
// The \c sendData[proci] is the (non-zero) value to send to proci.
// After return recvData contains the non-zero values sent from the
// other processors. Since the recvData map always cleared before
// receipt and values of zero are never transmitted, a simple check
// of its keys is sufficient to determine connectivity.
// \n
// For \b non-parallel : copy own rank (if it exists and non-zero)
// from sendData to recvData.
//
// \note The message tag should be chosen to be a unique value
// since the implementation uses probing with ANY_SOURCE !!
// An initial barrier may help to avoid synchronisation problems
// caused elsewhere (See "nbx.tuning" opt switch)
static void allToAllConsensus
(
const Map<int32_t>& sendData,
Map<int32_t>& recvData,
const int tag,
const label communicator = worldComm
);
//- Exchange \b non-zero integer data with all ranks in the communicator
//- using non-blocking consensus exchange.
// The \c sendData[proci] is the (non-zero) value to send to proci.
// After return recvData contains the non-zero values sent from the
// other processors. Since the recvData map always cleared before
// receipt and values of zero are never transmitted, a simple check
// of its keys is sufficient to determine connectivity.
// \n
// For \b non-parallel : copy own rank (if it exists and non-zero)
// from sendData to recvData.
//
// \note The message tag should be chosen to be a unique value
// since the implementation uses probing with ANY_SOURCE !!
// An initial barrier may help to avoid synchronisation problems
// caused elsewhere (See "nbx.tuning" opt switch)
static void allToAllConsensus
(
const Map<int64_t>& sendData,
Map<int64_t>& recvData,
const int tag,
const label communicator = worldComm
);
#undef Pstream_CommonRoutines
// Low-level gather/scatter routines
@ -1121,13 +1076,7 @@ public:
/*! Number of send/recv data per rank. Globally consistent! */ \
int count, \
const label communicator = worldComm \
);
Pstream_CommonRoutines(char);
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native) \
); \
\
/*! \brief Receive variable length \c Native data from all ranks */ \
static void gather \

View File

@ -63,17 +63,9 @@ void Foam::UPstream::mpiAllGather \
int count, \
const label comm \
) \
{}
Pstream_CommonRoutines(char);
#undef Pstream_CommonRoutines
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native) \
{} \
\
\
void Foam::UPstream::gather \
( \
const Native* sendData, \

View File

@ -79,17 +79,8 @@ void Foam::UPstream::mpiAllGather \
allData, count, \
TaggedType, comm \
); \
}
Pstream_CommonRoutines(char, MPI_BYTE);
#undef Pstream_CommonRoutines
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native, TaggedType) \
} \
\
void Foam::UPstream::gather \
( \
const Native* sendData, \

View File

@ -507,7 +507,7 @@ void Foam::PstreamDetail::allToAllConsensus
if (!UPstream::is_rank(comm))
{
return;
return; // Process not in communicator
}
const label myProci = UPstream::myProcNo(comm);
@ -539,11 +539,15 @@ void Foam::PstreamDetail::allToAllConsensus
if (!UPstream::is_parallel(comm))
{
// deep copy
// Non-parallel : deep copy
recvData.deepCopy(sendData);
return;
}
// Fake send/recv for myself
{
recvData[myProci] = sendData[myProci];
}
// Implementation description
// --------------------------
@ -562,7 +566,10 @@ void Foam::PstreamDetail::allToAllConsensus
// This is because we are dealing with a flat list of entries to
// send and not a sparse Map etc.
DynamicList<MPI_Request> sendRequests(sendData.size());
// ------------------------------------------------------------------------
// Setup sends
// ------------------------------------------------------------------------
profilingPstream::beginTiming();
@ -573,20 +580,16 @@ void Foam::PstreamDetail::allToAllConsensus
MPI_Barrier(PstreamGlobals::MPICommunicators_[comm]);
}
DynamicList<MPI_Request> sendRequests(sendData.size());
// Start nonblocking synchronous send to process dest
// Start nonblocking synchronous send to destination rank
for (label proci = 0; proci < numProc; ++proci)
{
if (sendData[proci] == zeroValue)
{
// Do not send/recv empty data
}
else if (proci == myProci)
{
// Do myself
recvData[proci] = sendData[proci];
}
else
else if (proci != myProci)
{
// Has data to send
@ -604,7 +607,9 @@ void Foam::PstreamDetail::allToAllConsensus
}
// ------------------------------------------------------------------------
// Probe and receive
// ------------------------------------------------------------------------
MPI_Request barrierRequest;
@ -721,22 +726,29 @@ void Foam::PstreamDetail::allToAllConsensus
}
// Initial: clear out everything
const Type zeroValue = pTraits<Type>::zero;
recvBufs.clear();
if (!UPstream::is_parallel(comm))
// Fake send/recv for myself - parallel or non-parallel
{
// Do myself
const auto iter = sendBufs.find(myProci);
if (iter.good() && (iter.val() != zeroValue))
if (iter.good())
{
// Do myself: insert_or_assign
recvBufs(iter.key()) = iter.val();
}
}
if (!UPstream::is_parallel(comm))
{
// Nothing left to do
return;
}
// ------------------------------------------------------------------------
// Setup sends
// ------------------------------------------------------------------------
// Algorithm NBX: Nonblocking consensus
// Implementation like above, but sending map data.
@ -752,7 +764,7 @@ void Foam::PstreamDetail::allToAllConsensus
}
// Start nonblocking synchronous send to process dest
// Start nonblocking synchronous send to destination ranks
// Same as forAllConstIters()
const auto endIter = sendBufs.cend();
@ -761,19 +773,8 @@ void Foam::PstreamDetail::allToAllConsensus
const label proci = iter.key();
const auto& sendData = iter.val();
if (sendData == zeroValue)
if (proci != myProci && proci >= 0 && proci < numProc)
{
// Do not send/recv empty/zero data
}
else if (proci == myProci)
{
// Do myself: insert_or_assign
recvBufs(proci) = sendData;
}
else
{
// Has data to send
MPI_Issend
(
&sendData,
@ -788,7 +789,9 @@ void Foam::PstreamDetail::allToAllConsensus
}
// ------------------------------------------------------------------------
// Probe and receive
// ------------------------------------------------------------------------
MPI_Request barrierRequest;