ENH: implement NBX with recovery of sizes and non-blocking receives

This commit is contained in:
Mark Olesen
2023-02-16 20:06:00 +01:00
parent fb2bf77e8e
commit 2569405383
4 changed files with 207 additions and 74 deletions

View File

@ -593,7 +593,8 @@ public:
const UList<Container>& sendBufs,
List<Container>& recvBufs,
const int tag,
const label comm
const label comm,
const bool wait = true //!< Wait for requests to complete
);
//- Exchange \em contiguous data using non-blocking consensus
@ -614,7 +615,8 @@ public:
const Map<Container>& sendBufs,
Map<Container>& recvBufs,
const int tag,
const label comm
const label comm,
const bool wait = true //!< Wait for requests to complete
);
};

View File

@ -39,60 +39,86 @@ Note
#include "contiguous.H"
#include "PstreamReduceOps.H"
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * * * Details * * * * * * * * * * * * * * * * //
namespace Foam
{
namespace PstreamDetail
{
//- Exchange \em contiguous data using non-blocking consensus exchange
//- with optional tracking of the receive sizes.
//
// No internal guards or resizing - data containers are all properly
// sized before calling.
//
// \param[in] sendBufs The send buffers list (size: numProcs)
// \param[out] recvBufs The recv buffers list (size: numProcs)
// \param[out] recvSizes The recv sizes (size: 0 or numProcs).
// This parameter can be an empty list, in which case the receive sizes
// are not returned.
// \param tag The message tag
// \param comm The communicator
// \param wait Wait for non-blocking receives to complete
// \param recvCommType If blocking or (default) non-blocking
template<class Container, class Type>
void Foam::Pstream::exchangeConsensus
void exchangeConsensus
(
const UList<Container>& sendBufs,
List<Container>& recvBufs,
UList<Container>& recvBufs,
labelUList& recvSizes,
const int tag,
const label comm
const label comm,
const bool wait = true,
const UPstream::commsTypes recvCommType = UPstream::commsTypes::nonBlocking
)
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
const label startOfRequests = UPstream::nRequests();
const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::nProcs(comm);
if (sendBufs.size() != numProc)
// #ifdef FULLDEBUG
if (sendBufs.size() > numProc)
{
FatalErrorInFunction
<< "Size of list " << sendBufs.size()
<< " does not equal the number of processors " << numProc
<< "Send buffers:" << sendBufs.size() << " > numProcs:" << numProc
<< Foam::abort(FatalError);
}
if (recvBufs.size() < numProc)
{
FatalErrorInFunction
<< "Recv buffers:" << recvBufs.size() << " < numProcs:" << numProc
<< Foam::abort(FatalError);
}
// #endif
// Initial: resize and clear everything
recvBufs.resize_nocopy(sendBufs.size());
// Initial: clear all receive information
for (auto& buf : recvBufs)
{
buf.clear();
}
recvSizes = Zero;
if (!UPstream::parRun() || numProc < 2)
{
// Do myself
recvBufs[myProci] = sendBufs[myProci];
if (myProci < recvSizes.size())
{
recvSizes[myProci] = recvBufs.size();
}
return;
}
// This largely follows PstreamDetail::allToAllConsensus
// but more MPI wrapping used here.
DynamicList<UPstream::Request> requests(sendBufs.size());
// Algorithm NBX: Nonblocking consensus with List containers
//// profilingPstream::beginTiming();
DynamicList<UPstream::Request> sendRequests(sendBufs.size());
// If there are synchronisation problems,
// a beginning barrier can help, but should not be necessary
// when unique message tags are being used.
//// UPstream::barrier(comm);
// Start nonblocking synchronous send to process dest
// Start nonblocking synchronous send to processor dest
for (label proci = 0; proci < numProc; ++proci)
{
const auto& sendData = sendBufs[proci];
@ -104,15 +130,19 @@ void Foam::Pstream::exchangeConsensus
else if (proci == myProci)
{
// Do myself
recvBufs[proci] = sendBufs[proci];
recvBufs[proci] = sendData;
if (proci < recvSizes.size())
{
recvSizes[proci] = sendData.size();
}
}
else
{
// Has data to send
// Has data to send.
// The MPI send requests are tracked on a local list
UOPstream::write
(
requests.emplace_back(),
sendRequests.emplace_back(),
proci,
sendData.cdata_bytes(),
sendData.size_bytes(),
@ -126,7 +156,7 @@ void Foam::Pstream::exchangeConsensus
// Probe and receive
UPstream::Request barrierReq;
UPstream::Request barrierRequest;
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
@ -145,14 +175,20 @@ void Foam::Pstream::exchangeConsensus
// - receive into dest buffer location
const label proci = probed.first;
const label nRecv = (probed.second / sizeof(Type));
const label count = (probed.second / sizeof(Type));
auto& recvData = recvBufs[proci];
recvData.resize_nocopy(nRecv);
recvData.resize_nocopy(count);
if (proci < recvSizes.size())
{
recvSizes[proci] = count;
}
// Any non-blocking MPI recv requests are tracked on internal stack
UIPstream::read
(
UPstream::commsTypes::scheduled,
recvCommType,
proci,
recvData.data_bytes(),
recvData.size_bytes(),
@ -165,7 +201,7 @@ void Foam::Pstream::exchangeConsensus
{
// Test barrier for completion
// - all received, or nothing to receive
if (UPstream::finishedRequest(barrierReq))
if (UPstream::finishedRequest(barrierRequest))
{
done = true;
}
@ -173,29 +209,48 @@ void Foam::Pstream::exchangeConsensus
else
{
// Check if all sends have arrived
if (UPstream::finishedRequests(requests))
if (UPstream::finishedRequests(sendRequests))
{
UPstream::barrier(comm, &barrierReq);
UPstream::barrier(comm, &barrierRequest);
barrier_active = true;
}
}
}
//// profilingPstream::addAllToAllTime();
// Wait for non-blocking receives to finish
if (wait && recvCommType == UPstream::commsTypes::nonBlocking)
{
UPstream::waitRequests(startOfRequests);
}
}
//- Exchange \em contiguous data using non-blocking consensus exchange.
//
// No internal guards - the sending Map corresponds to a segment of
// 0-numProcs.
//
// \param[in] sendBufs The send buffers map (addr: 0-numProcs)
// \param[out] recvBufs The recv buffers map
// \param tag The message tag
// \param comm The communicator
// \param wait Wait for non-blocking receives to complete
// \param recvCommType If blocking or (default) non-blocking
template<class Container, class Type>
void Foam::Pstream::exchangeConsensus
void exchangeConsensus
(
const Map<Container>& sendBufs,
Map<Container>& recvBufs,
const int tag,
const label comm
const label comm,
const bool wait = true,
const UPstream::commsTypes recvCommType = UPstream::commsTypes::nonBlocking
)
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
const label startOfRequests = UPstream::nRequests();
const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::nProcs(comm);
@ -229,16 +284,7 @@ void Foam::Pstream::exchangeConsensus
// Algorithm NBX: Nonblocking consensus with Map (HashTable) containers
DynamicList<UPstream::Request> requests(sendBufs.size());
//// profilingPstream::beginTiming();
// If there are synchronisation problems,
// a beginning barrier can help, but should not be necessary
// when unique message tags are being used.
//// UPstream::barrier(comm);
DynamicList<UPstream::Request> sendRequests(sendBufs.size());
// Start nonblocking synchronous send to process dest
forAllConstIters(sendBufs, iter)
@ -246,6 +292,15 @@ void Foam::Pstream::exchangeConsensus
const label proci = iter.key();
const auto& sendData = iter.val();
#ifdef FULLDEBUG
if (proci >= numProc)
{
FatalErrorInFunction
<< "Send buffer:" << proci << " >= numProcs:" << numProc
<< Foam::abort(FatalError);
}
#endif
if (sendData.empty())
{
// Do not send/recv empty data
@ -257,11 +312,11 @@ void Foam::Pstream::exchangeConsensus
}
else
{
// Has data to send
// Has data to send.
// The MPI send requests are tracked on a local list
UOPstream::write
(
requests.emplace_back(),
sendRequests.emplace_back(),
proci,
sendData.cdata_bytes(),
sendData.size_bytes(),
@ -275,7 +330,7 @@ void Foam::Pstream::exchangeConsensus
// Probe and receive
UPstream::Request barrierReq;
UPstream::Request barrierRequest;
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
@ -294,14 +349,15 @@ void Foam::Pstream::exchangeConsensus
// - receive into dest buffer location
const label proci = probed.first;
const label nRecv = (probed.second / sizeof(Type));
const label count = (probed.second / sizeof(Type));
auto& recvData = recvBufs(proci);
recvData.resize_nocopy(nRecv);
recvData.resize_nocopy(count);
// Any non-blocking MPI recv requests are tracked on internal stack
UIPstream::read
(
UPstream::commsTypes::scheduled,
recvCommType,
proci,
recvData.data_bytes(),
recvData.size_bytes(),
@ -313,7 +369,7 @@ void Foam::Pstream::exchangeConsensus
if (barrier_active)
{
// Test barrier for completion
if (UPstream::finishedRequest(barrierReq))
if (UPstream::finishedRequest(barrierRequest))
{
done = true;
}
@ -321,15 +377,83 @@ void Foam::Pstream::exchangeConsensus
else
{
// Check if all sends have arrived
if (UPstream::finishedRequests(requests))
if (UPstream::finishedRequests(sendRequests))
{
UPstream::barrier(comm, &barrierReq);
UPstream::barrier(comm, &barrierRequest);
barrier_active = true;
}
}
}
//// profilingPstream::addAllToAllTime();
// Wait for non-blocking receives to finish
if (wait && recvCommType == UPstream::commsTypes::nonBlocking)
{
UPstream::waitRequests(startOfRequests);
}
}
} // namespace PstreamDetail
} // namespace Foam
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
template<class Container, class Type>
void Foam::Pstream::exchangeConsensus
(
const UList<Container>& sendBufs,
List<Container>& recvBufs,
const int tag,
const label comm,
const bool wait
)
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
if (sendBufs.size() != UPstream::nProcs(comm))
{
FatalErrorInFunction
<< "Send buffers size:" << sendBufs.size()
<< " != numProcs:" << UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
// Resize receive buffers. Individual clearing is done internally
recvBufs.resize_nocopy(sendBufs.size());
labelList dummyRecvSizes;
PstreamDetail::exchangeConsensus
(
sendBufs,
recvBufs,
dummyRecvSizes,
tag,
comm,
wait
);
}
template<class Container, class Type>
void Foam::Pstream::exchangeConsensus
(
const Map<Container>& sendBufs,
Map<Container>& recvBufs,
const int tag,
const label comm,
const bool wait
)
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
PstreamDetail::exchangeConsensus
(
sendBufs,
recvBufs,
tag,
comm,
wait
);
}

View File

@ -51,6 +51,9 @@ void Foam::UPstream::allToAll \
Pstream_CommonRoutines(int32_t, MPI_INT32_T);
Pstream_CommonRoutines(int64_t, MPI_INT64_T);
// Future?
// Pstream_CommonRoutines(uint32_t, MPI_UINT32_T);
// Pstream_CommonRoutines(uint64_t, MPI_UINT64_T);
#undef Pstream_CommonRoutines

View File

@ -580,7 +580,7 @@ void Foam::PstreamDetail::allToAllConsensus
// This is because we are dealing with a flat list of entries to
// send and not a sparse Map etc.
DynamicList<MPI_Request> requests(sendData.size());
DynamicList<MPI_Request> sendRequests(sendData.size());
profilingPstream::beginTiming();
@ -614,7 +614,7 @@ void Foam::PstreamDetail::allToAllConsensus
proci,
tag,
PstreamGlobals::MPICommunicators_[comm],
&requests.emplace_back()
&sendRequests.emplace_back()
);
}
}
@ -622,7 +622,7 @@ void Foam::PstreamDetail::allToAllConsensus
// Probe and receive
MPI_Request barrierReq;
MPI_Request barrierRequest;
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
@ -654,6 +654,7 @@ void Foam::PstreamDetail::allToAllConsensus
<< exit(FatalError);
}
// Regular receive (the data are small)
MPI_Recv
(
&recvData[proci],
@ -670,7 +671,7 @@ void Foam::PstreamDetail::allToAllConsensus
{
// Test barrier for completion
// - all received, or nothing to receive
MPI_Test(&barrierReq, &flag, MPI_STATUS_IGNORE);
MPI_Test(&barrierRequest, &flag, MPI_STATUS_IGNORE);
if (flag)
{
@ -682,8 +683,9 @@ void Foam::PstreamDetail::allToAllConsensus
// Check if all sends have arrived
MPI_Testall
(
requests.size(), requests.data(),
&flag, MPI_STATUSES_IGNORE
sendRequests.size(),
sendRequests.data(),
&flag, MPI_STATUSES_IGNORE
);
if (flag)
@ -691,7 +693,7 @@ void Foam::PstreamDetail::allToAllConsensus
MPI_Ibarrier
(
PstreamGlobals::MPICommunicators_[comm],
&barrierReq
&barrierRequest
);
barrier_active = true;
}
@ -746,7 +748,7 @@ void Foam::PstreamDetail::allToAllConsensus
// Algorithm NBX: Nonblocking consensus
// Implementation like above, but sending map data.
DynamicList<MPI_Request> requests(sendBufs.size());
DynamicList<MPI_Request> sendRequests(sendBufs.size());
profilingPstream::beginTiming();
@ -786,7 +788,7 @@ void Foam::PstreamDetail::allToAllConsensus
proci,
tag,
PstreamGlobals::MPICommunicators_[comm],
&requests.emplace_back()
&sendRequests.emplace_back()
);
}
}
@ -794,7 +796,7 @@ void Foam::PstreamDetail::allToAllConsensus
// Probe and receive
MPI_Request barrierReq;
MPI_Request barrierRequest;
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
@ -829,9 +831,10 @@ void Foam::PstreamDetail::allToAllConsensus
auto& recvData = recvBufs(proci);
// Regular receive [the data are small]
MPI_Recv
(
&recvData,
&recvData,
count, // count=1 (see above)
datatype,
proci,
@ -845,7 +848,7 @@ void Foam::PstreamDetail::allToAllConsensus
{
// Test barrier for completion
// - all received, or nothing to receive
MPI_Test(&barrierReq, &flag, MPI_STATUS_IGNORE);
MPI_Test(&barrierRequest, &flag, MPI_STATUS_IGNORE);
if (flag)
{
@ -857,8 +860,9 @@ void Foam::PstreamDetail::allToAllConsensus
// Check if all sends have arrived
MPI_Testall
(
requests.size(), requests.data(),
&flag, MPI_STATUSES_IGNORE
sendRequests.size(),
sendRequests.data(),
&flag, MPI_STATUSES_IGNORE
);
if (flag)
@ -866,7 +870,7 @@ void Foam::PstreamDetail::allToAllConsensus
MPI_Ibarrier
(
PstreamGlobals::MPICommunicators_[comm],
&barrierReq
&barrierRequest
);
barrier_active = true;
}