ENH: add Pstream chunked transfer detail

- to service both List and Map exchanges with limited message sizes
  (termed 'data chunking' here) add a PstreamDetail for walking and
  dispatching.  Like other Detail components, the API is subject
  to (possibly breaking) changes in the future at any time.

  The regular exchangeBuf detail has this type of signature:

  PstreamDetail::exchangeBuf
  (
      const UList<std::pair<int, stdFoam::span<const Type>>>& sends,
      const UList<std::pair<int, stdFoam::span<Type>>>& recvs,
      ...
  )

    Where [rank, span] is the tuple pack.

  The basic idea is to pre-process the send/receive buffers and
  marshall them into a flat list of [rank, span] tuples.

  The originating buffers could be any type of container (List or Map)
  which is then marshalled into this given sequence that can be
  processed in source-agnostic fashion.

  If data chunking is required (when UPstream::maxCommsSize > 0)
  it is possible to make a cheap copy of the rank/address information
  and then walk different slices or views.

ENH: replace private static methods with PstreamDetail functions

- simpler to update locally.
This commit is contained in:
Mark Olesen
2023-02-10 20:49:44 +01:00
parent 83740ad408
commit b6af124b80
3 changed files with 397 additions and 321 deletions

View File

@ -63,42 +63,6 @@ class Pstream
:
public UPstream
{
// Private Static Functions
//- Exchange \em contiguous data.
//- Sends sendBufs, receives into recvBufs.
// Data provided and received as container.
//
// No internal guards or resizing.
template<class Container, class T>
static void exchangeContainer
(
const UList<Container>& sendBufs,
const labelUList& recvSizes, //!< Num of recv elements (not bytes)
List<Container>& recvBufs,
const int tag,
const label comm,
const bool wait //!< Wait for requests to complete
);
//- Exchange \em contiguous data.
//- Sends sendBufs, receives into recvBufs.
// Data provided and received as pointers.
//
// No internal guards or resizing.
template<class T>
static void exchangeBuf
(
const labelUList& sendSizes, //!< Num of send elements (not bytes)
const UList<const char*>& sendBufs,
const labelUList& recvSizes, //!< Num of recv elements (not bytes)
List<char*>& recvBufs,
const int tag,
const label comm,
const bool wait //!< Wait for requests to complete
);
protected:
// Protected Data
@ -504,62 +468,62 @@ public:
);
// Exchange
// Exchange
//- Helper: exchange sizes of sendData for specified
//- set of send/receive processes.
template<class Container>
static void exchangeSizes
(
const labelUList& sendProcs,
const labelUList& recvProcs,
const Container& sendData,
labelList& sizes,
const label tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
//- Helper: exchange sizes of sendBufs for specified
//- set of send/receive processes.
template<class Container>
static void exchangeSizes
(
const labelUList& sendProcs,
const labelUList& recvProcs,
const Container& sendBufs,
labelList& sizes,
const label tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
//- Helper: exchange sizes of sendData.
//- The sendData is the data per processor (in the communicator).
// Returns sizes of sendData on the sending processor.
// \n
// For \b non-parallel : copy sizes from sendData directly.
template<class Container>
static void exchangeSizes
(
const Container& sendData,
labelList& sizes,
const label comm = UPstream::worldComm
);
//- Helper: exchange sizes of sendBufs.
//- The sendBufs is the data per processor (in the communicator).
// Returns sizes of sendBufs on the sending processor.
// \n
// For \b non-parallel : copy sizes from sendBufs directly.
template<class Container>
static void exchangeSizes
(
const Container& sendBufs,
labelList& recvSizes,
const label comm = UPstream::worldComm
);
//- Helper: exchange \em contiguous data.
//- Sends sendBufs, receives into recvBufs using predetermined receive
//- sizing.
// If wait=true will wait for all transfers to finish.
template<class Container, class Type>
static void exchange
(
const UList<Container>& sendBufs,
const labelUList& recvSizes,
List<Container>& recvBufs,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool wait = true //!< Wait for requests to complete
);
//- Helper: exchange \em contiguous data.
//- Sends sendData, receives into recvData.
// If wait=true will wait for all transfers to finish.
template<class Container, class Type>
static void exchange
(
const UList<Container>& sendData,
const labelUList& recvSizes,
List<Container>& recvData,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool wait = true //!< Wait for requests to complete
);
//- Exchange \em contiguous data.
//- Sends sendData, receives into recvData.
//- Determines sizes to receive.
// If wait=true will wait for all transfers to finish.
template<class Container, class Type>
static void exchange
(
const UList<Container>& sendData,
List<Container>& recvData,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool wait = true //!< Wait for requests to complete
);
//- Exchange \em contiguous data.
//- Sends sendBufs, receives into recvBufs.
//- Determines sizes to receive.
// If wait=true will wait for all transfers to finish.
template<class Container, class Type>
static void exchange
(
const UList<Container>& sendBufs,
List<Container>& recvBufs,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool wait = true //!< Wait for requests to complete
);
// Non-blocking exchange
@ -636,7 +600,6 @@ public:
#include "PstreamGather.C"
#include "PstreamCombineGather.C"
#include "PstreamGatherList.C"
#include "PstreamExchangeConsensus.C"
#include "PstreamExchange.C"
#endif

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2016-2022 OpenCFD Ltd.
Copyright (C) 2016-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -24,43 +24,300 @@ License
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Description
Exchange data.
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "contiguous.H"
#include "PstreamReduceOps.H"
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * * * Details * * * * * * * * * * * * * * * * //
namespace Foam
{
namespace PstreamDetail
{
//- Setup sends and receives, each specified as [rank, span] tuple
// The serial list of tuples can be populated from other lists, from maps
// of data or subsets of lists/maps etc.
template<class Type>
void exchangeBuf
(
const UList<std::pair<int, stdFoam::span<const Type>>>& sends,
const UList<std::pair<int, stdFoam::span<Type>>>& recvs,
const int tag,
const label comm,
const bool wait
)
{
const label startOfRequests = UPstream::nRequests();
const int myProci = UPstream::myProcNo(comm);
// Set up receives
// ~~~~~~~~~~~~~~~
for (auto& slot : recvs)
{
// [rank, span]
const auto proci = slot.first;
auto& payload = slot.second;
if (proci != myProci && !payload.empty())
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
payload.data_bytes(),
payload.size_bytes(),
tag,
comm
);
}
}
// Set up sends
// ~~~~~~~~~~~~
for (const auto& slot : sends)
{
// [rank, span]
const auto proci = slot.first;
const auto& payload = slot.second;
if (proci != myProci && !payload.empty())
{
if
(
!UOPstream::write
(
UPstream::commsTypes::nonBlocking,
proci,
payload.cdata_bytes(),
payload.size_bytes(),
tag,
comm
)
)
{
FatalErrorInFunction
<< "Cannot send outgoing message to:"
<< proci << " nBytes:"
<< label(payload.size_bytes())
<< Foam::abort(FatalError);
}
}
}
// Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~
if (wait)
{
UPstream::waitRequests(startOfRequests);
}
}
//- Chunked exchange of \em contiguous data.
//- Setup sends and receives, each specified as [rank, span] tuple.
// The serial list of tuples can be populated from other lists, from
// maps of data or subsets of lists/maps etc.
template<class Type>
void exchangeChunkedBuf
(
const UList<std::pair<int, stdFoam::span<const Type>>>& sends,
const UList<std::pair<int, stdFoam::span<Type>>>& recvs,
const int tag,
const label comm,
const bool wait
)
{
typedef std::pair<int, stdFoam::span<const Type>> sendTuple;
typedef std::pair<int, stdFoam::span<Type>> recvTuple;
// Caller already checked for parRun and maxChunkSize > 0
{
// Determine the number of chunks to send. Note that we
// only have to look at the sending data since we are
// guaranteed that some processor's sending size is some other
// processor's receive size. Also we can ignore any local comms.
//
// We need to send chunks so the number of iterations:
// maxChunkSize iterations
// ------------ ----------
// 0 0
// 1..maxChunkSize 1
// maxChunkSize+1..2*maxChunkSize 2
// ...
const label maxChunkSize =
(
max
(
static_cast<label>(1),
static_cast<label>(UPstream::maxCommsSize/sizeof(Type))
)
);
const int myProci = UPstream::myProcNo(comm);
label nChunks(0);
{
// Get max send count (elements)
auto maxCount = static_cast<stdFoam::span<char>::size_type>(0);
for (const auto& slot : sends)
{
// [rank, span]
const auto proci = slot.first;
const auto count = slot.second.size();
if (proci != myProci)
{
maxCount = max(maxCount, count);
}
}
// Convert from send count (elements) to number of chunks.
// Can normally calculate with (count-1), but add some safety
if (maxCount)
{
nChunks = 1 + label(maxCount/maxChunkSize);
}
// MPI reduce (message tag is irrelevant)
reduce(nChunks, maxOp<label>(), UPstream::msgType(), comm);
}
// Dispatch the exchanges chunk-wise
List<sendTuple> sendChunks(sends);
List<recvTuple> recvChunks(recvs);
// Dispatch
for (label iter = 0; iter < nChunks; ++iter)
{
// The begin/end for the data window
const auto beg = static_cast<std::size_t>(iter*maxChunkSize);
const auto end = static_cast<std::size_t>((iter+1)*maxChunkSize);
forAll(sendChunks, sloti)
{
const auto& baseline = sends[sloti].second;
auto& payload = sendChunks[sloti].second;
// Window the data
if (beg < baseline.size())
{
payload =
(
(end < baseline.size())
? baseline.subspan(beg, end - beg)
: baseline.subspan(beg)
);
}
else
{
payload = baseline.first(0); // zero-sized
}
}
forAll(recvChunks, sloti)
{
const auto& baseline = recvs[sloti].second;
auto& payload = recvChunks[sloti].second;
// Window the data
if (beg < baseline.size())
{
payload =
(
(end < baseline.size())
? baseline.subspan(beg, end - beg)
: baseline.subspan(beg)
);
}
else
{
payload = baseline.first(0); // zero-sized
}
}
// Exchange data chunks
PstreamDetail::exchangeBuf<Type>
(
sendChunks,
recvChunks,
tag,
comm,
wait
);
// Debugging output - can report on master only...
#if 0 // ifdef Foam_PstreamExchange_debug_chunks
do
{
labelList sendStarts(sends.size());
labelList sendCounts(sends.size());
forAll(sendChunks, sloti)
{
const auto& baseline = sends[sloti].second;
const auto& payload = sendChunks[sloti].second;
sendStarts[sloti] = (payload.data() - baseline.data());
sendCounts[sloti] = (payload.size());
}
Info<< "iter " << iter
<< ": beg=" << flatOutput(sendStarts)
<< " len=" << flatOutput(sendCounts) << endl;
} while (false);
#endif
}
}
}
//- Exchange \em contiguous data using point-to-point communication.
//- Sends sendBufs, receives into recvBufs.
// Data provided and received as container all of which have been
// properly sized before calling
//
// No internal guards or resizing.
template<class Container, class Type>
void Foam::Pstream::exchangeContainer
void exchangeContainer
(
const UList<Container>& sendBufs,
const labelUList& recvSizes,
List<Container>& recvBufs,
UList<Container>& recvBufs,
const int tag,
const label comm,
const bool wait
const bool wait //!< Wait for requests to complete
)
{
const label startOfRequests = UPstream::nRequests();
const label myProci = UPstream::myProcNo(comm);
// Set up receives
// ~~~~~~~~~~~~~~~
forAll(recvSizes, proci)
forAll(recvBufs, proci)
{
if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0)
auto& recvData = recvBufs[proci];
if (proci != myProci && !recvData.empty())
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
recvBufs[proci].data_bytes(),
recvSizes[proci]*sizeof(Type),
recvData.data_bytes(),
recvData.size_bytes(),
tag,
comm
);
@ -73,7 +330,9 @@ void Foam::Pstream::exchangeContainer
forAll(sendBufs, proci)
{
if (proci != Pstream::myProcNo(comm) && sendBufs[proci].size() > 0)
const auto& sendData = sendBufs[proci];
if (proci != myProci && !sendData.empty())
{
if
(
@ -81,8 +340,8 @@ void Foam::Pstream::exchangeContainer
(
UPstream::commsTypes::nonBlocking,
proci,
sendBufs[proci].cdata_bytes(),
sendBufs[proci].size_bytes(),
sendData.cdata_bytes(),
sendData.size_bytes(),
tag,
comm
)
@ -91,13 +350,12 @@ void Foam::Pstream::exchangeContainer
FatalErrorInFunction
<< "Cannot send outgoing message. "
<< "to:" << proci << " nBytes:"
<< label(sendBufs[proci].size_bytes())
<< label(sendData.size_bytes())
<< Foam::abort(FatalError);
}
}
}
// Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~
@ -108,79 +366,12 @@ void Foam::Pstream::exchangeContainer
}
template<class Type>
void Foam::Pstream::exchangeBuf
(
const labelUList& sendSizes,
const UList<const char*>& sendBufs,
const labelUList& recvSizes,
List<char*>& recvBufs,
const int tag,
const label comm,
const bool wait
)
{
const label startOfRequests = UPstream::nRequests();
} // namespace PstreamDetail
} // namespace Foam
// Set up receives
// ~~~~~~~~~~~~~~~
forAll(recvSizes, proci)
{
if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0)
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
recvBufs[proci],
recvSizes[proci]*sizeof(Type),
tag,
comm
);
}
}
// Set up sends
// ~~~~~~~~~~~~
forAll(sendBufs, proci)
{
if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0)
{
if
(
!UOPstream::write
(
UPstream::commsTypes::nonBlocking,
proci,
sendBufs[proci],
sendSizes[proci]*sizeof(Type),
tag,
comm
)
)
{
FatalErrorInFunction
<< "Cannot send outgoing message. "
<< "to:" << proci << " nBytes:"
<< label(sendSizes[proci]*sizeof(Type))
<< Foam::abort(FatalError);
}
}
}
// Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~
if (wait)
{
UPstream::waitRequests(startOfRequests);
}
}
#include "PstreamExchangeConsensus.C"
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
template<class Container, class Type>
void Foam::Pstream::exchange
@ -194,44 +385,46 @@ void Foam::Pstream::exchange
)
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
// if (!is_contiguous<Type>::value)
// {
// FatalErrorInFunction
// << "Contiguous data only: "
// << sizeof(Type) << Foam::abort(FatalError);
// }
if (sendBufs.size() != UPstream::nProcs(comm))
const label myProci = UPstream::myProcNo(comm);
const label numProcs = UPstream::nProcs(comm);
if (sendBufs.size() != numProcs)
{
FatalErrorInFunction
<< "Size of list " << sendBufs.size()
<< " does not equal the number of processors "
<< UPstream::nProcs(comm)
<< " does not equal the number of processors " << numProcs
<< Foam::abort(FatalError);
}
recvBufs.resize_nocopy(sendBufs.size());
recvBufs.resize_nocopy(numProcs);
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::parRun() && numProcs > 1)
{
// Presize all receive buffers
forAll(recvSizes, proci)
{
const label nRecv = recvSizes[proci];
const label count = recvSizes[proci];
if (proci != Pstream::myProcNo(comm) && nRecv > 0)
if (proci != myProci && count > 0)
{
recvBufs[proci].resize_nocopy(nRecv);
recvBufs[proci].resize_nocopy(count);
}
else
{
recvBufs[proci].clear();
}
}
typedef std::pair<int, stdFoam::span<const Type>> sendTuple;
typedef std::pair<int, stdFoam::span<Type>> recvTuple;
if (UPstream::maxCommsSize <= 0)
{
// Do the exchanging in one go
exchangeContainer<Container, Type>
PstreamDetail::exchangeContainer<Container, Type>
(
sendBufs,
recvSizes,
recvBufs,
tag,
comm,
@ -240,117 +433,59 @@ void Foam::Pstream::exchange
}
else
{
// Determine the number of chunks to send. Note that we
// only have to look at the sending data since we are
// guaranteed that some processor's sending size is some other
// processor's receive size. Also we can ignore any local comms.
// Dispatch using chunk-wise exchanges
// Populate send sequence
DynamicList<sendTuple> sends(sendBufs.size());
forAll(sendBufs, proci)
{
const auto& sendData = sendBufs[proci];
// We need to send chunks so the number of iterations:
// maxChunkSize iterations
// ------------ ----------
// 0 0
// 1..maxChunkSize 1
// maxChunkSize+1..2*maxChunkSize 2
// ...
if (proci != myProci && !sendData.empty())
{
sends.push_back
(
sendTuple
(
proci,
{ sendData.cdata(), std::size_t(sendData.size()) }
)
);
}
}
const label maxChunkSize
// Populate recv sequence
DynamicList<recvTuple> recvs(recvBufs.size());
forAll(recvBufs, proci)
{
auto& recvData = recvBufs[proci];
if (proci != myProci && !recvData.empty())
{
recvs.push_back
(
recvTuple
(
proci,
{ recvData.data(), std::size_t(recvData.size()) }
)
);
}
}
// Exchange buffers in chunks
PstreamDetail::exchangeChunkedBuf<Type>
(
max
(
static_cast<label>(1),
static_cast<label>(UPstream::maxCommsSize/sizeof(Type))
)
sends,
recvs,
tag,
comm,
wait
);
label nChunks(0);
{
// Get max send count (elements)
forAll(sendBufs, proci)
{
if (proci != Pstream::myProcNo(comm))
{
nChunks = max(nChunks, sendBufs[proci].size());
}
}
// Convert from send count (elements) to number of chunks.
// Can normally calculate with (count-1), but add some safety
if (nChunks)
{
nChunks = 1 + (nChunks/maxChunkSize);
}
reduce(nChunks, maxOp<label>(), tag, comm);
}
labelList nRecv(sendBufs.size());
labelList nSend(sendBufs.size());
labelList startRecv(sendBufs.size(), Zero);
labelList startSend(sendBufs.size(), Zero);
List<const char*> charPtrSend(sendBufs.size());
List<char*> charPtrRecv(sendBufs.size());
for (label iter = 0; iter < nChunks; ++iter)
{
forAll(sendBufs, proci)
{
nSend[proci] = min
(
maxChunkSize,
sendBufs[proci].size()-startSend[proci]
);
nRecv[proci] = min
(
maxChunkSize,
recvBufs[proci].size()-startRecv[proci]
);
charPtrSend[proci] =
(
nSend[proci] > 0
? reinterpret_cast<const char*>
(
&(sendBufs[proci][startSend[proci]])
)
: nullptr
);
charPtrRecv[proci] =
(
nRecv[proci] > 0
? reinterpret_cast<char*>
(
&(recvBufs[proci][startRecv[proci]])
)
: nullptr
);
}
// Info<< "iter " << iter
// << ": beg=" << flatOutput(startSend)
// << " len=" << flatOutput(nSend) << endl;
exchangeBuf<Type>
(
nSend,
charPtrSend,
nRecv,
charPtrRecv,
tag,
comm,
wait
);
forAll(nSend, proci)
{
startSend[proci] += nSend[proci];
startRecv[proci] += nRecv[proci];
}
}
}
}
// Do myself
recvBufs[Pstream::myProcNo(comm)] = sendBufs[Pstream::myProcNo(comm)];
recvBufs[myProci] = sendBufs[myProci];
}
@ -365,6 +500,8 @@ void Foam::Pstream::exchangeSizes
const label comm
)
{
//const label myProci = UPstream::myProcNo(comm);
if (sendBufs.size() != UPstream::nProcs(comm))
{
FatalErrorInFunction
@ -415,18 +552,6 @@ void Foam::Pstream::exchangeSizes
}
// FUTURE?
// template<class Container>
// void Foam::Pstream::exchangeSizes
// (
// const labelUList& neighProcs,
// const Container& sendBufs,
// labelList& recvSizes,
// const label tag,
// const label comm
// );
// Sparse sending
template<class Container>
void Foam::Pstream::exchangeSizes

View File

@ -51,12 +51,6 @@ void Foam::Pstream::exchangeConsensus
)
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
// if (!is_contiguous<Type>::value)
// {
// FatalErrorInFunction
// << "Contiguous data only." << sizeof(Type)
// << Foam::abort(FatalError);
// }
const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::nProcs(comm);
@ -201,12 +195,6 @@ void Foam::Pstream::exchangeConsensus
)
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
// if (!is_contiguous<Type>::value)
// {
// FatalErrorInFunction
// << "Contiguous data only." << sizeof(Type)
// << Foam::abort(FatalError);
// }
const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::nProcs(comm);