ENH: Pstream::exchange with Map<Container> data

- dynamic sparse data exchange using Map to hold data and sizes.

  Still uses the personalised exchange paradigm, but with non-blocking
  consensus exchange to obtain the sizes and regular point-to-point
  for the data exchange itself. This avoids an all-to-all but still
  keeps the point-to-point for overlapping communication, data
  chunking etc.
This commit is contained in:
Mark Olesen
2023-02-10 21:03:43 +01:00
parent b6af124b80
commit 1ce7a62209
3 changed files with 348 additions and 41 deletions

View File

@ -32,6 +32,8 @@ Description
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#define Foam_PstreamExchange_debug_chunks
#include "List.H" #include "List.H"
#include "argList.H" #include "argList.H"
#include "Time.H" #include "Time.H"
@ -371,6 +373,7 @@ int main(int argc, char *argv[])
} }
// Manually // Manually
Info<< "perform list exchange" << endl;
{ {
labelListList sendBufs(UPstream::nProcs()); labelListList sendBufs(UPstream::nProcs());
labelListList recvBufs(UPstream::nProcs()); labelListList recvBufs(UPstream::nProcs());
@ -397,6 +400,34 @@ int main(int argc, char *argv[])
); );
} }
Info<< "perform Map exchange" << endl;
{
Map<labelList> sendBufs;
Map<labelList> recvBufs;
Map<label> recvSizes;
if (Pstream::master())
{
for (const int proci : Pstream::allProcs())
{
if (proci != Pstream::myProcNo())
{
sendBufs(proci) = identity(500);
}
}
}
Pstream::exchangeSizes(sendBufs, recvSizes);
Pstream::exchange<labelList, label>
(
sendBufs,
recvSizes,
recvBufs
);
}
Info<< "End\n" << endl; Info<< "End\n" << endl;
return 0; return 0;
} }

View File

@ -496,6 +496,27 @@ public:
const label comm = UPstream::worldComm const label comm = UPstream::worldComm
); );
//- Exchange the \b non-zero sizes of sendBufs entries (sparse map)
//- with other ranks in the communicator
//- using non-blocking consensus exchange.
//
// Since the recvData map always cleared before receipt and sizes
// of zero are never transmitted, a simple check
// of its keys is sufficient to determine connectivity.
//
// For \b non-parallel : copy size of rank (if it exists and non-empty)
// from sendBufs to recvSizes.
//
// \note The message tag is adjusted internally to improve uniqueness
template<class Container>
static void exchangeSizes
(
const Map<Container>& sendBufs,
Map<label>& recvSizes,
const label tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
//- Helper: exchange \em contiguous data. //- Helper: exchange \em contiguous data.
//- Sends sendBufs, receives into recvBufs using predetermined receive //- Sends sendBufs, receives into recvBufs using predetermined receive
//- sizing. //- sizing.
@ -511,6 +532,22 @@ public:
const bool wait = true //!< Wait for requests to complete const bool wait = true //!< Wait for requests to complete
); );
//- Exchange \em contiguous data.
//- Sends sendBufs, receives into recvBufs.
// Data provided and received as container.
//
// No internal guards or resizing.
template<class Container, class Type>
static void exchange
(
const Map<Container>& sendBufs,
const Map<label>& recvSizes, //!< Num of recv elements (not bytes)
Map<Container>& recvBufs,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool wait = true //!< Wait for requests to complete
);
//- Exchange \em contiguous data. //- Exchange \em contiguous data.
//- Sends sendBufs, receives into recvBufs. //- Sends sendBufs, receives into recvBufs.
//- Determines sizes to receive. //- Determines sizes to receive.
@ -525,30 +562,23 @@ public:
const bool wait = true //!< Wait for requests to complete const bool wait = true //!< Wait for requests to complete
); );
//- Exchange \em contiguous data.
// Non-blocking exchange //- Sends sendBufs, receives into recvBufs.
//- Determines sizes to receive.
//- Exchange the \b non-zero sizes of sendBufs entries (sparse map) // If wait=true will wait for all transfers to finish.
//- with all ranks in the communicator template<class Container, class Type>
//- using non-blocking consensus exchange. static void exchange
//
// Since the recvData map always cleared before receipt and sizes
// of zero are never transmitted, a simple check
// of its keys is sufficient to determine connectivity.
//
// For \b non-parallel : copy size of rank (if it exists and non-empty)
// from sendBufs to recvSizes.
//
// \note The message tag is adjusted internally to improve uniqueness
template<class Container>
static void exchangeSizes
( (
const Map<Container>& sendBufs, const Map<Container>& sendBufs,
Map<label>& recvSizes, Map<Container>& recvBufs,
const label tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm const label comm = UPstream::worldComm,
const bool wait = true //!< Wait for requests to complete
); );
// Non-blocking exchange
//- Exchange \em contiguous data using non-blocking consensus //- Exchange \em contiguous data using non-blocking consensus
//- Sends sendData, receives into recvData. //- Sends sendData, receives into recvData.
// //

View File

@ -366,6 +366,89 @@ void exchangeContainer
} }
//- Exchange \em contiguous data using point-to-point communication.
//- Sends sendBufs, receives into recvBufs.
// Data provided and received as container all of which have been
// properly sized before calling
//
// No internal guards or resizing.
template<class Container, class Type>
void exchangeContainer
(
const Map<Container>& sendBufs,
Map<Container>& recvBufs,
const int tag,
const label comm,
const bool wait //!< Wait for requests to complete
)
{
const label startOfRequests = UPstream::nRequests();
const label myProci = UPstream::myProcNo(comm);
// Set up receives
// ~~~~~~~~~~~~~~~
forAllIters(recvBufs, iter)
{
const label proci = iter.key();
auto& recvData = iter.val();
if (proci != myProci && !recvData.empty())
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
recvData.data_bytes(),
recvData.size_bytes(),
tag,
comm
);
}
}
// Set up sends
// ~~~~~~~~~~~~
forAllConstIters(sendBufs, iter)
{
const label proci = iter.key();
const auto& sendData = iter.val();
if (proci != myProci && !sendData.empty())
{
if
(
!UOPstream::write
(
UPstream::commsTypes::nonBlocking,
proci,
sendData.cdata_bytes(),
sendData.size_bytes(),
tag,
comm
)
)
{
FatalErrorInFunction
<< "Cannot send outgoing message to:"
<< proci << " nBytes:"
<< label(sendData.size_bytes())
<< Foam::abort(FatalError);
}
}
}
// Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~
if (wait)
{
UPstream::waitRequests(startOfRequests);
}
}
} // namespace PstreamDetail } // namespace PstreamDetail
} // namespace Foam } // namespace Foam
@ -489,6 +572,169 @@ void Foam::Pstream::exchange
} }
template<class Container, class Type>
void Foam::Pstream::exchange
(
const Map<Container>& sendBufs,
const Map<label>& recvSizes,
Map<Container>& recvBufs,
const int tag,
const label comm,
const bool wait
)
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
const int myProci = UPstream::myProcNo(comm);
const label numProcs = UPstream::nProcs(comm);
// Initial: clear out receive 'slots'
// Preferrable to clear out the map entries instead of the map itself
// since this can potentially preserve allocated space
// (eg DynamicList entries) between calls
forAllIters(recvBufs, iter)
{
iter.val().clear();
}
if (UPstream::parRun() && numProcs > 1)
{
// Presize all receive buffers
forAllIters(recvSizes, iter)
{
const label proci = iter.key();
const label count = iter.val();
if (proci != myProci && count > 0)
{
recvBufs(proci).resize_nocopy(count);
}
}
// Define the exchange sequences as a flattened list.
// We add an additional step of ordering the send/recv list
// by message size, which can help with transfer speeds.
typedef std::pair<int, stdFoam::span<const Type>> sendTuple;
typedef std::pair<int, stdFoam::span<Type>> recvTuple;
// Populate send sequences
DynamicList<sendTuple> sends(sendBufs.size());
forAllConstIters(sendBufs, iter)
{
const auto proci = iter.key();
const auto& sendData = iter.val();
if (proci != myProci && !sendData.empty())
{
sends.push_back
(
sendTuple
(
proci,
{ sendData.cdata(), std::size_t(sendData.size()) }
)
);
}
}
// Shorter messages first
std::sort
(
sends.begin(),
sends.end(),
[=](const sendTuple& a, const sendTuple& b)
{
return (a.second.size() < b.second.size());
}
);
// Populate recv sequences
DynamicList<recvTuple> recvs(recvBufs.size());
forAllIters(recvBufs, iter)
{
const auto proci = iter.key();
auto& recvData = recvBufs[proci];
if (proci != myProci && !recvData.empty())
{
recvs.push_back
(
recvTuple
(
proci,
{ recvData.data(), std::size_t(recvData.size()) }
)
);
}
}
// Shorter messages first
std::sort
(
recvs.begin(),
recvs.end(),
[=](const recvTuple& a, const recvTuple& b)
{
return (a.second.size() < b.second.size());
}
);
if (UPstream::maxCommsSize <= 0)
{
// Do the exchanging in a single go
PstreamDetail::exchangeBuf<Type>
(
sends,
recvs,
tag,
comm,
wait
);
}
else
{
// Exchange buffers in chunks
PstreamDetail::exchangeChunkedBuf<Type>
(
sends,
recvs,
tag,
comm,
wait
);
}
}
// Do myself
{
const auto iter = sendBufs.find(myProci);
bool needsCopy = iter.good();
if (needsCopy)
{
const auto& sendData = iter.val();
needsCopy = !sendData.empty();
if (needsCopy)
{
// insert_or_assign
recvBufs(myProci) = sendData;
}
}
if (!needsCopy)
{
recvBufs.erase(myProci);
}
}
}
template<class Container> template<class Container>
void Foam::Pstream::exchangeSizes void Foam::Pstream::exchangeSizes
( (
@ -643,29 +889,9 @@ void Foam::Pstream::exchange
const bool wait const bool wait
) )
{ {
if
(
wait
&& UPstream::parRun()
&& UPstream::nProcsNonblockingExchange > 1
&& UPstream::nProcsNonblockingExchange <= UPstream::nProcs(comm)
)
{
// Use algorithm NBX: Nonblocking Consensus Exchange
Pstream::exchangeConsensus<Container, Type>
(
sendBufs,
recvBufs,
(tag + 314159), // some unique tag?
comm
);
return;
}
// Algorithm PEX: Personalized Exchange // Algorithm PEX: Personalized Exchange
// - Step 1: each process writes the data sizes to each peer and // - Step 1: each process writes the data sizes to each peer and
// redistributes the vector (eg, MPI_Alltoall) // redistributes the vector (eg, MPI_Alltoall or non-blocking consensus)
// - Step 2: size receive buffers and setup receives for all // - Step 2: size receive buffers and setup receives for all
// non-zero sendcounts. Post all sends and wait. // non-zero sendcounts. Post all sends and wait.
@ -676,4 +902,24 @@ void Foam::Pstream::exchange
} }
template<class Container, class Type>
void Foam::Pstream::exchange
(
const Map<Container>& sendBufs,
Map<Container>& recvBufs,
const int tag,
const label comm,
const bool wait
)
{
// Algorithm PEX: Personalized Exchange
// but using nonblocking consensus exchange for the sizes
Map<label> recvSizes;
exchangeSizes(sendBufs, recvSizes, tag, comm);
exchange<Container, Type>(sendBufs, recvSizes, recvBufs, tag, comm, wait);
}
// ************************************************************************* // // ************************************************************************* //