Merge branch 'fix-processorCyclic_reducedSending' into 'develop'

ENH: improve send/recv robustness in the presence of processorCyclic (#2814)

See merge request Development/openfoam!614
This commit is contained in:
Andrew Heather
2023-06-22 12:19:14 +00:00
10 changed files with 291 additions and 80 deletions

View File

@ -487,6 +487,9 @@ void Foam::globalPoints::sendPatchPoints
const polyBoundaryMesh& patches = mesh_.boundaryMesh();
const labelPairList& patchInfo = globalTransforms_.patchTransformSign();
// Reset send/recv information
pBufs.clear();
// Information to send:
// The patch face
@ -499,6 +502,19 @@ void Foam::globalPoints::sendPatchPoints
DynamicList<labelPairList> allInfo;
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// Has non-zero data sent
Map<int> isActiveSend(0);
if (UPstream::parRun())
{
isActiveSend.resize(2*min(patches.size(),pBufs.nProcs()));
}
forAll(patches, patchi)
{
const polyPatch& pp = patches[patchi];
@ -508,7 +524,7 @@ void Foam::globalPoints::sendPatchPoints
if
(
(Pstream::parRun() && isA<processorPolyPatch>(pp))
(UPstream::parRun() && isA<processorPolyPatch>(pp))
&& (mergeSeparated || patchInfo[patchi].first() == -1)
)
{
@ -561,21 +577,32 @@ void Foam::globalPoints::sendPatchPoints
}
if (!patchFaces.empty())
// Send to neighbour
{
// Send to neighbour
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchFaces << indexInFace << allInfo;
// Record if send is required (data are non-zero)
isActiveSend(nbrProci) |= int(!patchFaces.empty());
if (debug)
{
Pout<< " Sending from " << pp.name() << " to proc:"
Pout<< "Sending from " << pp.name() << " to proc:"
<< nbrProci << " point information:"
<< patchFaces.size() << endl;
}
UOPstream toNeighbour(nbrProci, pBufs);
toNeighbour << patchFaces << indexInFace << allInfo;
}
}
}
// Eliminate unnecessary sends
forAllConstIters(isActiveSend, iter)
{
if (!iter.val())
{
pBufs.clearSend(iter.key());
}
}
}
@ -606,7 +633,7 @@ void Foam::globalPoints::receivePatchPoints
if
(
(Pstream::parRun() && isA<processorPolyPatch>(pp))
(UPstream::parRun() && isA<processorPolyPatch>(pp))
&& (mergeSeparated || patchInfo[patchi].first() == -1)
)
{
@ -615,6 +642,7 @@ void Foam::globalPoints::receivePatchPoints
if (!pBufs.recvDataCount(nbrProci))
{
// Nothing to receive
continue;
}
@ -623,8 +651,8 @@ void Foam::globalPoints::receivePatchPoints
List<labelPairList> nbrInfo;
{
UIPstream fromNeighbour(nbrProci, pBufs);
fromNeighbour >> patchFaces >> indexInFace >> nbrInfo;
UIPstream fromNbr(nbrProci, pBufs);
fromNbr >> patchFaces >> indexInFace >> nbrInfo;
}
if (debug)
@ -929,8 +957,6 @@ void Foam::globalPoints::calculateSharedPoints
// Do one exchange iteration to get neighbour points.
{
pBufs.clear();
sendPatchPoints
(
mergeSeparated,
@ -962,8 +988,6 @@ void Foam::globalPoints::calculateSharedPoints
do
{
pBufs.clear();
sendPatchPoints
(
mergeSeparated,

View File

@ -541,7 +541,7 @@ Foam::label Foam::polyBoundaryMesh::nNonProcessor() const
{
const polyPatchList& patches = *this;
label nonProc = 0;
label count = 0;
for (const polyPatch& p : patches)
{
@ -550,10 +550,28 @@ Foam::label Foam::polyBoundaryMesh::nNonProcessor() const
break;
}
++nonProc;
++count;
}
return nonProc;
return count;
}
Foam::label Foam::polyBoundaryMesh::nProcessorPatches() const
{
const polyPatchList& patches = *this;
label count = 0;
for (const polyPatch& p : patches)
{
if (isA<processorPolyPatch>(p))
{
++count;
}
}
return count;
}

View File

@ -175,6 +175,9 @@ public:
//- The number of patches before the first processor patch.
label nNonProcessor() const;
//- The number of processorPolyPatch patches
label nProcessorPatches() const;
//- Return a list of patch names
wordList names() const;

View File

@ -127,10 +127,20 @@ void Foam::syncTools::syncPointMap
if (UPstream::parRun())
{
DynamicList<label> neighbProcs;
// Presize according to number of processor patches
// (global topology information may not yet be available...)
DynamicList<label> neighbProcs(patches.nProcessorPatches());
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
// Send
// Sample and send.
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// If data needs to be sent (index corresponding to neighbProcs)
DynamicList<bool> isActiveSend(neighbProcs.capacity());
for (const polyPatch& pp : patches)
{
const auto* ppp = isA<processorPolyPatch>(pp);
@ -140,10 +150,7 @@ void Foam::syncTools::syncPointMap
const auto& procPatch = *ppp;
const label nbrProci = procPatch.neighbProcNo();
neighbProcs.push_back(nbrProci);
// Get data per patchPoint in neighbouring point numbers.
const labelList& meshPts = procPatch.meshPoints();
const labelList& nbrPts = procPatch.neighbPoints();
@ -161,7 +168,28 @@ void Foam::syncTools::syncPointMap
}
}
if (!patchInfo.empty())
const bool hasSendData = (!patchInfo.empty());
// Neighbour connectivity (push_uniq)
// - record if send is required (non-empty data)
{
label nbrIndex = neighbProcs.find(nbrProci);
if (nbrIndex < 0)
{
nbrIndex = neighbProcs.size();
neighbProcs.push_back(nbrProci);
isActiveSend.push_back(false);
}
if (hasSendData)
{
isActiveSend[nbrIndex] = true;
}
}
// Send to neighbour
{
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo;
@ -169,6 +197,15 @@ void Foam::syncTools::syncPointMap
}
}
// Eliminate unnecessary sends
forAll(neighbProcs, nbrIndex)
{
if (!isActiveSend[nbrIndex])
{
pBufs.clearSend(neighbProcs[nbrIndex]);
}
}
// Limit exchange to involved procs
pBufs.finishedNeighbourSends(neighbProcs);
@ -185,6 +222,7 @@ void Foam::syncTools::syncPointMap
if (!pBufs.recvDataCount(nbrProci))
{
// Nothing to receive
continue;
}
@ -383,10 +421,20 @@ void Foam::syncTools::syncEdgeMap
if (UPstream::parRun())
{
DynamicList<label> neighbProcs;
// Presize according to number of processor patches
// (global topology information may not yet be available...)
DynamicList<label> neighbProcs(patches.nProcessorPatches());
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
// Send
// Sample and send.
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// If data needs to be sent (index corresponding to neighbProcs)
DynamicList<bool> isActiveSend(neighbProcs.capacity());
for (const polyPatch& pp : patches)
{
const auto* ppp = isA<processorPolyPatch>(pp);
@ -396,10 +444,7 @@ void Foam::syncTools::syncEdgeMap
const auto& procPatch = *ppp;
const label nbrProci = procPatch.neighbProcNo();
neighbProcs.push_back(nbrProci);
// Get data per patch edge in neighbouring edge.
const edgeList& edges = procPatch.edges();
const labelList& meshPts = procPatch.meshPoints();
const labelList& nbrPts = procPatch.neighbPoints();
@ -419,7 +464,28 @@ void Foam::syncTools::syncEdgeMap
}
}
if (!patchInfo.empty())
const bool hasSendData = (!patchInfo.empty());
// Neighbour connectivity (push_uniq)
// and record if send is required (non-empty data)
{
label nbrIndex = neighbProcs.find(nbrProci);
if (nbrIndex < 0)
{
nbrIndex = neighbProcs.size();
neighbProcs.push_back(nbrProci);
isActiveSend.push_back(false);
}
if (hasSendData)
{
isActiveSend[nbrIndex] = true;
}
}
// Send to neighbour
{
UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo;
@ -427,6 +493,15 @@ void Foam::syncTools::syncEdgeMap
}
}
// Eliminate unnecessary sends
forAll(neighbProcs, nbrIndex)
{
if (!isActiveSend[nbrIndex])
{
pBufs.clearSend(neighbProcs[nbrIndex]);
}
}
// Limit exchange to involved procs
pBufs.finishedNeighbourSends(neighbProcs);
@ -443,6 +518,7 @@ void Foam::syncTools::syncEdgeMap
if (!pBufs.recvDataCount(nbrProci))
{
// Nothing to receive
continue;
}
@ -1138,7 +1214,8 @@ void Foam::syncTools::syncBoundaryFaceList
const auto& procPatch = *ppp;
const label nbrProci = procPatch.neighbProcNo();
neighbProcs.push_back(nbrProci);
// Neighbour connectivity
neighbProcs.push_uniq(nbrProci);
const SubList<T> fld
(

View File

@ -309,7 +309,7 @@ Foam::label Foam::faBoundaryMesh::nNonProcessor() const
{
const faPatchList& patches = *this;
label nonProc = 0;
label count = 0;
for (const faPatch& p : patches)
{
@ -318,10 +318,28 @@ Foam::label Foam::faBoundaryMesh::nNonProcessor() const
break;
}
++nonProc;
++count;
}
return nonProc;
return count;
}
Foam::label Foam::faBoundaryMesh::nProcessorPatches() const
{
const faPatchList& patches = *this;
label count = 0;
for (const faPatch& p : patches)
{
if (isA<processorFaPatch>(p))
{
++count;
}
}
return count;
}

View File

@ -152,6 +152,9 @@ public:
//- The number of patches before the first processor patch.
label nNonProcessor() const;
//- The number of processorFaPatch patches
label nProcessorPatches() const;
//- Return a list of patch names
wordList names() const;

View File

@ -172,24 +172,23 @@ void Foam::fvMeshSubset::doCoupledPatches
label nUncoupled = 0;
if (syncPar && Pstream::parRun())
if (syncPar && UPstream::parRun())
{
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
// Send face usage across processor patches
for (const polyPatch& pp : oldPatches)
if (!nCellsUsingFace.empty())
{
const auto* procPatch = isA<processorPolyPatch>(pp);
if (procPatch)
for (const polyPatch& pp : oldPatches)
{
const label nbrProci = procPatch->neighbProcNo();
const auto* procPatch = isA<processorPolyPatch>(pp);
if (!nCellsUsingFace.empty())
if (procPatch)
{
UOPstream toNeighbour(nbrProci, pBufs);
const label nbrProci = procPatch->neighbProcNo();
toNeighbour <<
UOPstream toNbr(nbrProci, pBufs);
toNbr <<
SubList<label>(nCellsUsingFace, pp.size(), pp.start());
}
}
@ -208,19 +207,18 @@ void Foam::fvMeshSubset::doCoupledPatches
if (!pBufs.recvDataCount(nbrProci))
{
// Nothing to receive
continue;
}
UIPstream fromNeighbour(nbrProci, pBufs);
const labelList nbrList(fromNeighbour);
// Combine with this side.
if (!nCellsUsingFace.empty())
labelList nbrCellsUsingFace;
{
const labelList& nbrCellsUsingFace(nbrList);
UIPstream fromNbr(nbrProci, pBufs);
fromNbr >> nbrCellsUsingFace;
}
if (!nCellsUsingFace.empty() && !nbrCellsUsingFace.empty())
{
// Combine with this side.
forAll(pp, i)

View File

@ -532,6 +532,19 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
// Reset buffers
pBufs_.clear();
// Information to send:
DynamicList<Type> sendFacesInfo;
DynamicList<label> sendFaces;
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// If data needs to be sent (index corresponding to neighbourProcs)
List<bool> isActiveSend(neighbourProcs.size(), false);
for (const label patchi : procPatches)
{
const auto& procPatch =
@ -539,13 +552,12 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
const label nbrProci = procPatch.neighbProcNo();
// Allocate buffers
label nSendFaces;
labelList sendFaces(procPatch.size());
List<Type> sendFacesInfo(procPatch.size());
// Resize buffers
sendFaces.resize_nocopy(procPatch.size());
sendFacesInfo.resize_nocopy(procPatch.size());
// Determine which faces changed on current patch
nSendFaces = getChangedPatchFaces
const label nSendFaces = getChangedPatchFaces
(
procPatch,
0,
@ -554,6 +566,10 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
sendFacesInfo
);
// Shrink
sendFaces.resize(nSendFaces);
sendFacesInfo.resize(nSendFaces);
// Adapt wallInfo for leaving domain
leaveDomain
(
@ -563,25 +579,43 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
sendFacesInfo
);
if (nSendFaces)
// Record if send is required (non-empty data)
if (!sendFaces.empty())
{
const label nbrIndex = neighbourProcs.find(nbrProci);
if (nbrIndex >= 0) // Safety check (should be unnecessary)
{
isActiveSend[nbrIndex] = true;
}
}
// Send to neighbour
{
UOPstream toNbr(nbrProci, pBufs_);
toNbr << sendFaces << sendFacesInfo;
if (debug & 2)
{
Pout<< " Processor patch " << patchi << ' ' << procPatch.name()
<< " send:" << nSendFaces << " to proc:" << nbrProci
<< " send:" << sendFaces.size() << " to proc:" << nbrProci
<< endl;
}
UOPstream os(nbrProci, pBufs_);
os
<< SubList<label>(sendFaces, nSendFaces)
<< SubList<Type>(sendFacesInfo, nSendFaces);
}
}
// Finished sends
// Eliminate unnecessary sends
forAll(neighbourProcs, nbrIndex)
{
if (!isActiveSend[nbrIndex])
{
pBufs_.clearSend(neighbourProcs[nbrIndex]);
}
}
// Limit exchange to involved procs
pBufs_.finishedNeighbourSends(neighbourProcs);
for (const label patchi : procPatches)
{
const auto& procPatch =
@ -591,6 +625,7 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
if (!pBufs_.recvDataCount(nbrProci))
{
// Nothing to receive
continue;
}
@ -602,6 +637,8 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
is >> receiveFaces >> receiveFacesInfo;
}
const label nReceiveFaces = receiveFaces.size();
if (debug & 2)
{
Pout<< " Processor patch " << patchi << ' ' << procPatch.name()
@ -615,7 +652,7 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
transform
(
procPatch.forwardT(),
receiveFaces.size(),
nReceiveFaces,
receiveFacesInfo
);
}
@ -624,7 +661,7 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
enterDomain
(
procPatch,
receiveFaces.size(),
nReceiveFaces,
receiveFaces,
receiveFacesInfo
);
@ -633,7 +670,7 @@ void Foam::FaceCellWave<Type, TrackingData>::handleProcPatches()
mergeFaceInfo
(
procPatch,
receiveFaces.size(),
nReceiveFaces,
receiveFaces,
receiveFacesInfo
);
@ -656,12 +693,11 @@ void Foam::FaceCellWave<Type, TrackingData>::handleCyclicPatches()
const auto& nbrPatch = cycPatch.neighbPatch();
// Allocate buffers
label nReceiveFaces;
labelList receiveFaces(patch.size());
List<Type> receiveFacesInfo(patch.size());
// Determine which faces changed
nReceiveFaces = getChangedPatchFaces
const label nReceiveFaces = getChangedPatchFaces
(
nbrPatch,
0,

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2021-2022 OpenCFD Ltd.
Copyright (C) 2021-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -320,6 +320,15 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
DynamicList<label> thisPoints;
DynamicList<label> nbrPoints;
// Reduce communication by only sending non-zero data,
// but with multiply-connected processor/processor
// (eg, processorCyclic) also need to send zero information
// to keep things synchronised
// If data needs to be sent (index corresponding to neighbourProcs)
List<bool> isActiveSend(neighbourProcs.size(), false);
for (const label patchi : procPatches)
{
const auto& procPatch =
@ -329,8 +338,10 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
patchInfo.clear();
patchInfo.reserve(procPatch.nPoints());
thisPoints.clear();
thisPoints.reserve(procPatch.nPoints());
nbrPoints.clear();
nbrPoints.reserve(procPatch.nPoints());
@ -350,22 +361,40 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
// Adapt for leaving domain
leaveDomain(procPatch, thisPoints, patchInfo);
if (patchInfo.size())
// Record if send is required (non-empty data)
if (!patchInfo.empty())
{
const label nbrIndex = neighbourProcs.find(nbrProci);
if (nbrIndex >= 0) // Safety check (should be unnecessary)
{
isActiveSend[nbrIndex] = true;
}
}
// Send to neighbour
{
UOPstream toNbr(nbrProci, pBufs_);
toNbr << nbrPoints << patchInfo;
//if (debug & 2)
//{
// Pout<< "Processor patch " << patchi << ' ' << procPatch.name()
// << " send:" << patchInfo.size()
// << " to proc:" << nbrProci << endl;
//}
UOPstream os(nbrProci, pBufs_);
os << nbrPoints << patchInfo;
}
}
// Eliminate unnecessary sends
forAll(neighbourProcs, nbrIndex)
{
if (!isActiveSend[nbrIndex])
{
pBufs_.clearSend(neighbourProcs[nbrIndex]);
}
}
// Finished sends
// Limit exchange to involved procs
pBufs_.finishedNeighbourSends(neighbourProcs);
@ -382,6 +411,7 @@ void Foam::PointEdgeWave<Type, TrackingData>::handleProcPatches()
if (!pBufs_.recvDataCount(nbrProci))
{
// Nothing to receive
continue;
}

View File

@ -497,7 +497,7 @@ Foam::DynamicList<Foam::label> Foam::isoAdvection::syncProcPatches
if (Pstream::parRun())
{
DynamicList<label> neighProcs;
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
// Send
for (const label patchi : procPatchLabels_)
@ -506,8 +506,8 @@ Foam::DynamicList<Foam::label> Foam::isoAdvection::syncProcPatches
refCast<const processorPolyPatch>(patches[patchi]);
const label nbrProci = procPatch.neighbProcNo();
neighProcs.append(nbrProci);
UOPstream toNbr(nbrProci, pBufs);
// Neighbour connectivity
neighProcs.push_uniq(nbrProci);
const scalarField& pFlux = dVf.boundaryField()[patchi];
const List<label>& surfCellFacesOnProcPatch =
@ -519,6 +519,7 @@ Foam::DynamicList<Foam::label> Foam::isoAdvection::syncProcPatches
surfCellFacesOnProcPatch
);
UOPstream toNbr(nbrProci, pBufs);
toNbr << surfCellFacesOnProcPatch << dVfPatch;
}
@ -533,11 +534,14 @@ Foam::DynamicList<Foam::label> Foam::isoAdvection::syncProcPatches
refCast<const processorPolyPatch>(patches[patchi]);
const label nbrProci = procPatch.neighbProcNo();
UIPstream fromNeighb(nbrProci, pBufs);
List<label> faceIDs;
List<scalar> nbrdVfs;
fromNeighb >> faceIDs >> nbrdVfs;
{
UIPstream fromNbr(nbrProci, pBufs);
fromNbr >> faceIDs >> nbrdVfs;
}
if (returnSyncedFaces)
{
List<label> syncedFaceI(faceIDs);