ENH: additional exchangeSizes, PstreamBuffers methods (#2371)

- allows restriction of exchange sizes to specified send/recv procs

Based on code supplied by T.Aoyagi(RIST), A.Azami(RIST)
This commit is contained in:
Mark Olesen
2022-02-18 15:36:24 +01:00
committed by Andrew Heather
parent b0ef650a12
commit a9cdaa1bae
4 changed files with 220 additions and 18 deletions

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2016-2021 OpenCFD Ltd.
Copyright (C) 2016-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -39,8 +39,8 @@ SourceFiles
\*---------------------------------------------------------------------------*/
#ifndef Pstream_H
#define Pstream_H
#ifndef Foam_Pstream_H
#define Foam_Pstream_H
#include "UPstream.H"
#include "DynamicList.H"
@ -340,17 +340,17 @@ public:
// Exchange
//- Helper: exchange contiguous data. Sends sendData, receives into
// recvData. If block=true will wait for all transfers to finish.
template<class Container, class T>
static void exchange
//- Helper: exchange sizes of sendData for specified
//- set of send/receive processes.
template<class Container>
static void exchangeSizes
(
const UList<Container>& sendData,
const labelUList& recvSizes,
List<Container>& recvData,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool block = true
const labelUList& sendProcs,
const labelUList& recvProcs,
const Container& sendData,
labelList& sizes,
const label tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
//- Helper: exchange sizes of sendData. sendData is the data per
@ -364,6 +364,20 @@ public:
const label comm = UPstream::worldComm
);
//- Helper: exchange contiguous data. Sends sendData, receives into
// recvData. If block=true will wait for all transfers to finish.
template<class Container, class T>
static void exchange
(
const UList<Container>& sendData,
const labelUList& recvSizes,
List<Container>& recvData,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool block = true
);
//- Exchange contiguous data. Sends sendData, receives into
// recvData. Determines sizes to receive.
// If block=true will wait for all transfers to finish.

View File

@ -57,6 +57,42 @@ void Foam::PstreamBuffers::finalExchange
}
void Foam::PstreamBuffers::finalExchange
(
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
const bool block
)
{
// Could also check that it is not called twice
finishedSendsCalled_ = true;
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
Pstream::exchangeSizes
(
sendProcs,
recvProcs,
sendBuf_,
recvSizes,
tag_,
comm_
);
Pstream::exchange<DynamicList<char>, char>
(
sendBuf_,
recvSizes,
recvBuf_,
tag_,
comm_,
block
);
}
}
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::PstreamBuffers::PstreamBuffers
@ -144,4 +180,40 @@ void Foam::PstreamBuffers::finishedSends
}
void Foam::PstreamBuffers::finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
const bool block
)
{
labelList recvSizes;
finalExchange(sendProcs, recvProcs, recvSizes, block);
}
void Foam::PstreamBuffers::finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
const bool block
)
{
finalExchange(sendProcs, recvProcs, recvSizes, block);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
FatalErrorInFunction
<< "Obtaining sizes not supported in "
<< UPstream::commsTypeNames[commsType_] << endl
<< " since transfers already in progress. Use non-blocking instead."
<< exit(FatalError);
// Note: maybe possible only if using different tag from write started
// by ~UOPstream. Needs some work.
}
}
// ************************************************************************* //

View File

@ -126,6 +126,16 @@ class PstreamBuffers
// This will start receives in non-blocking mode.
void finalExchange(labelList& recvSizes, const bool block);
//- Mark all sends as having been done.
// Only exchange sizes between sendProcs/recvProcs
void finalExchange
(
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
const bool block
);
public:
@ -199,6 +209,28 @@ public:
// Same as above but also returns sizes (bytes) received.
// \note currently only valid for non-blocking.
void finishedSends(labelList& recvSizes, const bool block = true);
//- Mark all sends as having been done, with limited
//- send/recv processor connections.
// \note currently only valid for non-blocking.
void finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
const bool block = true
);
//- Mark all sends as having been done, with limited
//- send/recv processor connections.
// Same as above but also returns sizes (bytes) received.
// \note currently only valid for non-blocking.
void finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
const bool block = true
);
};

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2016-2021 OpenCFD Ltd.
Copyright (C) 2016-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -209,7 +209,7 @@ void Foam::Pstream::exchange
<< Foam::abort(FatalError);
}
recvBufs.setSize(sendBufs.size());
recvBufs.resize_nocopy(sendBufs.size());
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
@ -220,7 +220,7 @@ void Foam::Pstream::exchange
if (proci != Pstream::myProcNo(comm) && nRecv > 0)
{
recvBufs[proci].setSize(nRecv);
recvBufs[proci].resize_nocopy(nRecv);
}
}
@ -344,6 +344,90 @@ void Foam::Pstream::exchange
}
template<class Container>
void Foam::Pstream::exchangeSizes
(
const labelUList& sendProcs,
const labelUList& recvProcs,
const Container& sendBufs,
labelList& recvSizes,
const label tag,
const label comm
)
{
if (sendBufs.size() != UPstream::nProcs(comm))
{
FatalErrorInFunction
<< "Size of container " << sendBufs.size()
<< " does not equal the number of processors "
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
labelList sendSizes(sendProcs.size());
forAll(sendProcs, i)
{
sendSizes[i] = sendBufs[sendProcs[i]].size();
}
recvSizes.resize_nocopy(sendBufs.size());
recvSizes = 0; // Ensure non-received entries are properly zeroed
const label startOfRequests = UPstream::nRequests();
for (const label proci : recvProcs)
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
reinterpret_cast<char*>(&recvSizes[proci]),
sizeof(label),
tag,
comm
);
}
forAll(sendProcs, i)
{
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
sendProcs[i],
reinterpret_cast<char*>(&sendSizes[i]),
sizeof(label),
tag,
comm
);
}
UPstream::waitRequests(startOfRequests);
}
/// FUTURE?
///
/// template<class Container>
/// void Foam::Pstream::exchangeSizes
/// (
/// const labelUList& sendRecvProcs,
/// const Container& sendBufs,
/// labelList& recvSizes,
/// const label tag,
/// const label comm
/// )
/// {
/// exchangeSizes<Container>
/// (
/// sendRecvProcs,
/// sendRecvProcs,
/// sendBufs,
/// tag,
/// comm
/// );
/// }
template<class Container>
void Foam::Pstream::exchangeSizes
(
@ -366,8 +450,8 @@ void Foam::Pstream::exchangeSizes
{
sendSizes[proci] = sendBufs[proci].size();
}
recvSizes.setSize(sendSizes.size());
allToAll(sendSizes, recvSizes, comm);
recvSizes.resize_nocopy(sendSizes.size());
UPstream::allToAll(sendSizes, recvSizes, comm);
}