ENH: avoid clearStorage for persistent PstreamBuffers

- added persistent buffer for zoneDistribute
- additional handling for clearing buffers
This commit is contained in:
Mark Olesen
2023-02-13 12:01:51 +01:00
parent a16f09b10c
commit 83740ad408
9 changed files with 131 additions and 74 deletions

View File

@ -34,7 +34,6 @@ License
void Foam::PstreamBuffers::finalExchange
(
const bool wait,
const bool needSizes,
labelList& recvSizes
)
{
@ -44,36 +43,7 @@ void Foam::PstreamBuffers::finalExchange
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
if
(
wait
&& UPstream::parRun()
&& UPstream::nProcsNonblockingExchange > 1
&& UPstream::nProcsNonblockingExchange <= nProcs()
)
{
Pstream::exchangeConsensus<DynamicList<char>, char>
(
sendBuffers_,
recvBuffers_,
(tag_ + 314159), // some unique tag?
comm_
);
// Copy back out
if (needSizes)
{
recvSizes.resize_nocopy(recvBuffers_.size());
forAll(recvBuffers_, proci)
{
recvSizes[proci] = recvBuffers_[proci].size();
}
}
return;
}
// all-to-all
// Dense storage uses all-to-all
Pstream::exchangeSizes(sendBuffers_, recvSizes, comm_);
Pstream::exchange<DynamicList<char>, char>
@ -94,7 +64,6 @@ void Foam::PstreamBuffers::finalExchange
const labelUList& sendProcs,
const labelUList& recvProcs,
const bool wait,
const bool needSizes, // unused
labelList& recvSizes
)
{
@ -139,6 +108,37 @@ void Foam::PstreamBuffers::finalExchangeGatherScatter
// but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true;
if (isGather)
{
// gather mode (all-to-one)
if (UPstream::master(comm_))
{
// Master: has no sends
clearSends();
}
else
{
// Non-master: only sends to master [0]
for (label proci=1; proci < sendBuffers_.size(); ++proci)
{
sendBuffers_[proci].clear();
}
}
}
else
{
// scatter mode (one-to-all)
if (!UPstream::master(comm_))
{
// Non-master: has no sends
clearSends();
}
}
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
if (isGather)
@ -216,7 +216,7 @@ Foam::PstreamBuffers::PstreamBuffers
Foam::PstreamBuffers::~PstreamBuffers()
{
// Check that all data has been consumed.
forAll(recvPositions_, proci)
forAll(recvBuffers_, proci)
{
const label pos = recvPositions_[proci];
const label len = recvBuffers_[proci].size();
@ -260,22 +260,39 @@ Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
void Foam::PstreamBuffers::clear()
void Foam::PstreamBuffers::clearSends()
{
for (DynamicList<char>& buf : sendBuffers_)
{
buf.clear();
}
}
void Foam::PstreamBuffers::clearRecvs()
{
for (DynamicList<char>& buf : recvBuffers_)
{
buf.clear();
}
recvPositions_ = Zero;
}
void Foam::PstreamBuffers::clear()
{
clearSends();
clearRecvs();
finishedSendsCalled_ = false;
}
void Foam::PstreamBuffers::clearSend(const label proci)
{
sendBuffers_[proci].clear();
}
void Foam::PstreamBuffers::clearRecv(const label proci)
{
recvBuffers_[proci].clear();
@ -318,7 +335,7 @@ bool Foam::PstreamBuffers::hasRecvData() const
{
if (finishedSendsCalled_)
{
forAll(recvPositions_, proci)
forAll(recvBuffers_, proci)
{
if (recvPositions_[proci] < recvBuffers_[proci].size())
{
@ -373,7 +390,7 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
if (finishedSendsCalled_)
{
forAll(recvPositions_, proci)
forAll(recvBuffers_, proci)
{
const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
@ -401,11 +418,11 @@ Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount(const label proci) const
if (finishedSendsCalled_)
{
forAll(recvPositions_, i)
forAll(recvBuffers_, idx)
{
if (i != proci)
const label len(recvBuffers_[idx].size() - recvPositions_[idx]);
if (idx != proci)
{
const label len(recvBuffers_[i].size() - recvPositions_[i]);
maxLen = max(maxLen, len);
}
}
@ -447,7 +464,7 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const
{
return UList<char>
(
const_cast<char*>(recvBuffers_[proci].cdata()) + pos,
const_cast<char*>(recvBuffers_[proci].cbegin(pos)),
(len - pos)
);
}
@ -475,7 +492,7 @@ bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
void Foam::PstreamBuffers::finishedSends(const bool wait)
{
labelList recvSizes;
finalExchange(wait, false, recvSizes);
finalExchange(wait, recvSizes);
}
@ -485,7 +502,7 @@ void Foam::PstreamBuffers::finishedSends
const bool wait
)
{
finalExchange(wait, true, recvSizes);
finalExchange(wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@ -509,7 +526,7 @@ void Foam::PstreamBuffers::finishedSends
)
{
labelList recvSizes;
finalExchange(sendProcs, recvProcs, wait, false, recvSizes);
finalExchange(sendProcs, recvProcs, wait, recvSizes);
}
@ -521,7 +538,7 @@ void Foam::PstreamBuffers::finishedSends
const bool wait
)
{
finalExchange(sendProcs, recvProcs, wait, true, recvSizes);
finalExchange(sendProcs, recvProcs, wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@ -605,6 +622,27 @@ bool Foam::PstreamBuffers::finishedSends
}
void Foam::PstreamBuffers::finishedNeighbourSends
(
const labelUList& neighProcs,
labelList& recvSizes,
const bool wait
)
{
finishedSends(neighProcs, neighProcs, recvSizes, wait);
}
void Foam::PstreamBuffers::finishedNeighbourSends
(
const labelUList& neighProcs,
const bool wait
)
{
finishedSends(neighProcs, neighProcs, wait);
}
void Foam::PstreamBuffers::finishedGathers(const bool wait)
{
labelList recvSizes;

View File

@ -160,7 +160,6 @@ class PstreamBuffers
void finalExchange
(
const bool wait,
const bool needSizes, // If recvSizes needed or scratch
labelList& recvSizes
);
@ -172,7 +171,6 @@ class PstreamBuffers
const labelUList& sendProcs,
const labelUList& recvProcs,
const bool wait,
const bool needSizes, // If recvSizes needed or scratch
labelList& recvSizes
);
@ -354,15 +352,24 @@ public:
// Edit
//- Clear individual buffers and reset states.
//- Clear all send/recv buffers and reset states.
// Does not remove the buffer storage.
void clear();
//- Clear all send buffers (does not remove buffer storage)
void clearSends();
//- Clear all recv buffer and positions (does not remove buffer storage)
void clearRecvs();
//- Clear an individual send buffer (eg, data not required)
void clearSend(const label proci);
//- Clear an individual receive buffer (eg, data not required)
// Does not remove the buffer storage.
void clearRecv(const label proci);
//- Clear individual buffer storage and reset states.
//- Clear storage for all send/recv buffers and reset states.
void clearStorage();
//- Change allowClearRecv, return previous value
@ -458,10 +465,7 @@ public:
(
const labelUList& neighProcs,
const bool wait = true
)
{
finishedSends(neighProcs, neighProcs, wait);
}
);
//- Mark sends as done using subset of send/recv ranks
//- and recover the sizes (bytes) received.
@ -478,10 +482,7 @@ public:
const labelUList& neighProcs,
labelList& recvSizes,
const bool wait = true
)
{
finishedSends(neighProcs, neighProcs, recvSizes, wait);
}
);
// Gather/scatter modes

View File

@ -903,6 +903,9 @@ void Foam::globalPoints::calculateSharedPoints
)
);
// Don't clear storage on persistent buffer
pBufs.allowClearRecv(false);
// Do one exchange iteration to get neighbour points.
{
pBufs.clear();

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2020 DLR
Copyright (C) 2020-2022 OpenCFD Ltd.
Copyright (C) 2020-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -43,8 +43,12 @@ Foam::zoneDistribute::zoneDistribute(const fvMesh& mesh)
MeshObject<fvMesh, Foam::TopologicalMeshObject, zoneDistribute>(mesh),
stencil_(zoneCPCStencil::New(mesh)),
globalNumbering_(stencil_.globalNumbering()),
send_(UPstream::nProcs())
{}
send_(UPstream::nProcs()),
pBufs_(UPstream::commsTypes::nonBlocking)
{
// Don't clear storage on persistent buffer
pBufs_.allowClearRecv(false);
}
// * * * * * * * * * * * * * * * * Selectors * * * * * * * * * * * * * * //
@ -108,27 +112,27 @@ void Foam::zoneDistribute::setUpCommforZone
// Stream the send data into PstreamBuffers,
// which we also use to track the current topology.
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
pBufs_.clear();
for (const int proci : UPstream::allProcs())
{
if (proci != UPstream::myProcNo() && !needed[proci].empty())
{
// Serialize as List
UOPstream toProc(proci, pBufs);
UOPstream toProc(proci, pBufs_);
toProc << needed[proci].sortedToc();
}
}
pBufs.finishedSends(sendConnections_, sendProcs_, recvProcs_);
pBufs_.finishedSends(sendConnections_, sendProcs_, recvProcs_);
for (const int proci : pBufs.allProcs())
for (const int proci : pBufs_.allProcs())
{
send_[proci].clear();
if (proci != UPstream::myProcNo() && pBufs.recvDataCount(proci))
if (proci != UPstream::myProcNo() && pBufs_.recvDataCount(proci))
{
UIPstream fromProc(proci, pBufs);
UIPstream fromProc(proci, pBufs_);
fromProc >> send_[proci];
}
}

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2020 DLR
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -102,6 +102,9 @@ class zoneDistribute
//- Parallel [cache]: recv data from these ranks
DynamicList<label> recvProcs_;
//- Persistent set of exchange buffers
PstreamBuffers pBufs_;
// Private Member Functions

View File

@ -173,7 +173,7 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
// Stream the send data into PstreamBuffers,
// which we also use to track the current topology.
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
pBufs_.clear();
for (const int proci : UPstream::allProcs())
{
@ -191,18 +191,18 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
);
}
UOPstream toProc(proci, pBufs);
UOPstream toProc(proci, pBufs_);
toProc << sendValues;
}
}
pBufs.finishedSends(sendConnections_, sendProcs_, recvProcs_);
pBufs_.finishedSends(sendConnections_, sendProcs_, recvProcs_);
for (const int proci : pBufs.allProcs())
for (const int proci : pBufs_.allProcs())
{
if (proci != UPstream::myProcNo() && pBufs.recvDataCount(proci))
if (proci != UPstream::myProcNo() && pBufs_.recvDataCount(proci))
{
UIPstream fromProc(proci, pBufs);
UIPstream fromProc(proci, pBufs_);
Map<Type> tmpValues(fromProc);
neiValues += tmpValues;

View File

@ -226,7 +226,9 @@ reducedKoopmanOperator()
// Tests revealed that the distribution of "Q" does not affect
// the final outcome of TSQR decomposition up to sign
// Don't clear storage on persistent buffer
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
pBufs.allowClearRecv(false);
const label myProcNo = Pstream::myProcNo();
const label procNoInSubset = myProcNo % nAgglomerationProcs_;

View File

@ -59,7 +59,10 @@ Foam::FaceCellWaveBase::FaceCellWaveBase
changedCells_(mesh_.nCells()),
nUnvisitedFaces_(mesh_.nFaces()),
nUnvisitedCells_(mesh_.nCells())
{}
{
// Don't clear storage on persistent buffer
pBufs_.allowClearRecv(false);
}
// ************************************************************************* //

View File

@ -57,7 +57,10 @@ Foam::PointEdgeWaveBase::PointEdgeWaveBase
changedEdges_(mesh_.nEdges()),
nUnvisitedPoints_(mesh_.nPoints()),
nUnvisitedEdges_(mesh_.nEdges())
{}
{
// Don't clear storage on persistent buffer
pBufs_.allowClearRecv(false);
}
// ************************************************************************* //