ENH: separate nonBlocking send/receive mapDistribute (#2775)

This commit is contained in:
Mattijs Janssens
2023-05-09 09:00:00 +00:00
committed by Mark Olesen
parent b3fa59db92
commit 3d091a0d6a
2 changed files with 321 additions and 6 deletions

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2015-2017 OpenFOAM Foundation
Copyright (C) 2015-2022 OpenCFD Ltd.
Copyright (C) 2015-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -87,7 +87,6 @@ namespace Foam
{
// Forward Declarations
class bitSet;
class dictionary;
class globalIndex;
@ -954,9 +953,71 @@ public:
void receive(PstreamBuffers& pBufs, List<T>& field) const;
//- Debug: print layout. Can only be used on maps with sorted
// storage (local data first, then non-local data)
void printLayout(Ostream& os) const;
// Low-level. Non-blocking. TBD: receive map is usually contiguous
// - receive into slices
//- Start all sends and receives (always non-blocking)
template<class T, class negateOp>
static void send
(
const labelListList& subMap,
const bool subHasFlip,
const labelListList& constructMap,
const bool constructHasFlip,
const UList<T>& field,
labelRange& requests,
PtrList<List<T>>& sendFields,
PtrList<List<T>>& recvFields,
const negateOp& negOp,
const int tag,
const label comm
);
//- Start all sends and receives (always non-blocking)
template<class T>
void send
(
const UList<T>& field,
labelRange& requests,
PtrList<List<T>>& sendFields,
PtrList<List<T>>& recvFields,
const int tag = UPstream::msgType()
) const;
//- Wait for requests to finish and consume
template<class T, class CombineOp, class negateOp>
static void receive
(
const label constructSize,
const labelListList& constructMap,
const bool constructHasFlip,
const UPtrList<List<T>>& sendFields,
const UPtrList<List<T>>& recvFields,
const labelRange& requests,
List<T>& field,
const CombineOp& cop,
const negateOp& negOp,
const int tag,
const label comm
);
//- Wait for requests to finish and consume
template<class T>
void receive
(
const labelRange& requests,
const UPtrList<List<T>>& sendFields,
const UPtrList<List<T>>& recvFields,
List<T>& field,
const int tag = UPstream::msgType()
) const;
// Other
//- Debug: print layout. Can only be used on maps with sorted
// storage (local data first, then non-local data)
void printLayout(Ostream& os) const;
// Member Operators

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2015-2017 OpenFOAM Foundation
Copyright (C) 2015-2022 OpenCFD Ltd.
Copyright (C) 2015-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -160,6 +160,260 @@ Foam::List<T> Foam::mapDistributeBase::accessAndFlip
}
template<class T, class negateOp>
void Foam::mapDistributeBase::send
(
const labelListList& subMap,
const bool subHasFlip,
const labelListList& constructMap,
const bool constructHasFlip,
const UList<T>& field,
labelRange& requests,
PtrList<List<T>>& sendFields,
PtrList<List<T>>& recvFields,
const negateOp& negOp,
const int tag,
const label comm
)
{
if (!is_contiguous<T>::value)
{
FatalErrorInFunction
<< "Only contiguous is currently supported"
<< exit(FatalError);
}
const label myRank = UPstream::myProcNo(comm);
const label nProcs = UPstream::nProcs(comm);
requests.start() = UPstream::nRequests();
requests.size() = 0;
// Set up receives from neighbours
recvFields.clear();
recvFields.resize(nProcs);
for (const int domain : UPstream::allProcs(comm))
{
const labelList& map = constructMap[domain];
if (domain != myRank && map.size())
{
recvFields.set(domain, new List<T>(map.size()));
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
domain,
recvFields[domain].data_bytes(),
recvFields[domain].size_bytes(),
tag,
comm
);
}
}
// TDB: save where recv finish and send start
// const label endRecvRequests = UPstream::nRequests();
// Set up sends to neighbours
sendFields.clear();
sendFields.resize(nProcs);
for (const int domain : UPstream::allProcs(comm))
{
const labelList& map = subMap[domain];
if (domain != myRank && map.size())
{
sendFields.set(domain, new List<T>(map.size()));
List<T>& subField = sendFields[domain];
forAll(map, i)
{
subField[i] = accessAndFlip
(
field,
map[i],
subHasFlip,
negOp
);
}
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
domain,
subField.cdata_bytes(),
subField.size_bytes(),
tag,
comm
);
}
}
// Set up 'send' to myself
{
const labelList& map = subMap[myRank];
sendFields.set(myRank, new List<T>(map.size()));
List<T>& subField = sendFields[myRank];
forAll(map, i)
{
subField[i] = accessAndFlip
(
field,
map[i],
subHasFlip,
negOp
);
}
}
requests.size() = (UPstream::nRequests() - requests.start());
}
template<class T>
void Foam::mapDistributeBase::send
(
const UList<T>& field,
labelRange& requests,
PtrList<List<T>>& sendFields,
PtrList<List<T>>& recvFields,
const int tag
) const
{
send
(
subMap_,
subHasFlip_,
constructMap_,
constructHasFlip_,
field,
requests,
sendFields,
recvFields,
flipOp(),
tag,
comm_
);
}
template<class T, class CombineOp, class negateOp>
void Foam::mapDistributeBase::receive
(
const label constructSize,
const labelListList& constructMap,
const bool constructHasFlip,
const UPtrList<List<T>>& sendFields,
const UPtrList<List<T>>& recvFields,
const labelRange& requests,
List<T>& field,
const CombineOp& cop,
const negateOp& negOp,
const int tag,
const label comm
)
{
if (!is_contiguous<T>::value)
{
FatalErrorInFunction
<< "Only contiguous is currently supported"
<< exit(FatalError);
}
const label myRank = UPstream::myProcNo(comm);
// Combine bits. Note that can reuse field storage
field.resize(constructSize);
// Receive sub field from myself (sendFields[myRank])
if (sendFields.set(myRank))
{
const labelList& map = constructMap[myRank];
const List<T>& subField = sendFields[myRank];
flipAndCombine
(
map,
constructHasFlip,
subField,
cop,
negOp,
field
);
}
// Wait for all to finish
// TBD. sliced-range (ie, only wait for receives)
UPstream::waitRequests(requests.start(), requests.size());
// Collect neighbour fields
for (const int domain : UPstream::allProcs(comm))
{
const labelList& map = constructMap[domain];
if (domain != myRank && map.size())
{
if (!recvFields.set(domain))
{
FatalErrorInFunction
<< "Unallocated receive field from rank:" << domain
<< exit(FatalError);
}
const List<T>& subField = recvFields[domain];
checkReceivedSize(domain, map.size(), subField.size());
flipAndCombine
(
map,
constructHasFlip,
subField,
cop,
negOp,
field
);
}
}
}
template<class T>
void Foam::mapDistributeBase::receive
(
const labelRange& requests,
const UPtrList<List<T>>& sendFields,
const UPtrList<List<T>>& recvFields,
List<T>& field,
const int tag
) const
{
receive
(
constructSize_,
constructMap_,
constructHasFlip_,
sendFields,
recvFields,
requests,
field,
eqOp<T>(),
flipOp(),
tag,
comm_
);
}
template<class T, class NegateOp>
void Foam::mapDistributeBase::distribute
(