ENH: reduce storage and startup communication for processorTopology

- previously built the entire adjacency table (full communication!)
  but this is only strictly needed when using 'scheduled' as the
  default communication mode. For blocking/nonBlocking modes this
  information is not necessary at that point.

  The processorTopology::New now generally creates a smaller amount of
  data at startup: the processor->patch mapping and the patchSchedule.

  If the default communication mode is 'scheduled', the behaviour is
  almost identical to previously.

- Use Map<label> for the processor->patch mapping for a smaller memory
  footprint on large (ie, sparsely connected) cases. It also
  simplifies coding and allows recovery of the list of procNeighbours
  on demand.

- Setup the processor initEvaluate/evaluate states with fewer loops
  over the patches.

========
BREAKING: procNeighbours() method changed definition

- this was previously the entire adjacency table, but is now only the
  processor-local neighbours. Now use procAdjacency() to create or
  recover the entire adjacency table.

  The only known use is within Cloud<ParticleType>::move and there it
  was only used to obtain processor-local information.

  Old:
      const labelList& neighbourProcs =
          mesh.globalData().topology().procNeighbours()[Pstream::myProcNo()];

  New:
      const labelList& neighbourProcs =
          mesh.globalData().topology().procNeighbours();

      // If needed, the old definition (with communication!)
      const labelListList& connectivity =
          mesh.globalData().topology().procAdjacency();
This commit is contained in:
Mark Olesen
2023-02-06 09:20:04 +01:00
parent 3e024d622b
commit 173c9ac163
8 changed files with 322 additions and 210 deletions

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -61,7 +61,7 @@ int main(int argc, char *argv[])
// Adjacency table
const labelListList& connectivity =
mesh.globalData().topology().procNeighbours();
mesh.globalData().topology().procAdjacency();
if (Pstream::master())
{

View File

@ -26,6 +26,8 @@ $(fileOps)/collatedFileOperation/hostCollatedFileOperation.C
$(fileOps)/collatedFileOperation/threadedCollatedOFstream.C
$(fileOps)/collatedFileOperation/OFstreamCollator.C
parallel/processorTopology/processorTopology.C
bools = primitives/bools
$(bools)/bool/bool.C
$(bools)/Switch/Switch.C

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2013 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -38,38 +38,51 @@ Foam::lduSchedule Foam::lduPrimitiveMesh::nonBlockingSchedule
{
lduSchedule schedule(2*interfaces.size());
// 1. All non-processor patches
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
label patchEvali = 0;
label numProcPatches = 0;
//
// 1. Schedule non-processor patches
//
forAll(interfaces, patchi)
{
if (interfaces.set(patchi) && !isA<ProcPatch>(interfaces[patchi]))
if (interfaces.set(patchi))
{
schedule[patchEvali++].setInitEvaluate(patchi);
schedule[patchEvali++].setEvaluate(patchi);
if (isA<ProcPatch>(interfaces[patchi]))
{
++numProcPatches;
}
else
{
schedule[patchEvali++].setInitEvaluate(patchi);
schedule[patchEvali++].setEvaluate(patchi);
}
}
}
// 2. All processor patches
// ~~~~~~~~~~~~~~~~~~~~~~~~
forAll(interfaces, patchi)
//
// 2. Schedule processor patches
//
if (numProcPatches)
{
if (interfaces.set(patchi) && isA<ProcPatch>(interfaces[patchi]))
forAll(interfaces, patchi)
{
schedule[patchEvali++].setInitEvaluate(patchi);
if (interfaces.set(patchi) && isA<ProcPatch>(interfaces[patchi]))
{
schedule[patchEvali].setInitEvaluate(patchi);
schedule[patchEvali + numProcPatches].setEvaluate(patchi);
++patchEvali;
}
}
}
forAll(interfaces, patchi)
{
if (interfaces.set(patchi) && isA<ProcPatch>(interfaces[patchi]))
{
schedule[patchEvali++].setEvaluate(patchi);
}
}
// Caution:
// The schedule is only valid for a subset of its range
// (where interfaces are defined) but must retain the full list length
// for later (external) bookkeeping
return schedule;
}

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -27,7 +27,6 @@ License
\*---------------------------------------------------------------------------*/
#include "commSchedule.H"
#include "ListOps.H"
#include "IOstreams.H"
#include "IOmanip.H"
#include "StringStream.H"
@ -49,15 +48,15 @@ namespace Foam
// Count the number of outstanding communications for a single processor
static label outstandingComms
(
const labelList& commToSchedule,
DynamicList<label>& procComms
const labelUList& commToSchedule,
const DynamicList<label>& procComms
)
{
label nOutstanding = 0;
for (const label commi : procComms)
for (const label commPairi : procComms)
{
if (commToSchedule[commi] == -1)
if (commToSchedule[commPairi] == -1)
{
++nOutstanding;
}
@ -82,20 +81,20 @@ Foam::commSchedule::commSchedule
// Determine comms per processor.
List<DynamicList<label>> procToComms(nProcs);
forAll(comms, commI)
forAll(comms, commPairi)
{
label proc0 = comms[commI][0];
label proc1 = comms[commI][1];
const label proc0 = comms[commPairi].first();
const label proc1 = comms[commPairi].second();
if (proc0 < 0 || proc0 >= nProcs || proc1 < 0 || proc1 >= nProcs)
{
FatalErrorInFunction
<< "Illegal processor(s): "
<< comms[commI] << abort(FatalError);
<< comms[commPairi] << abort(FatalError);
}
procToComms[proc0].append(commI);
procToComms[proc1].append(commI);
procToComms[proc0].push_back(commPairi);
procToComms[proc1].push_back(commPairi);
}
// Note: no need to shrink procToComms. Are small.
@ -108,7 +107,7 @@ Foam::commSchedule::commSchedule
const labelPair& twoProcs = comms[i];
Pout<< i << ": "
<< twoProcs[0] << " with " << twoProcs[1] << endl;
<< twoProcs.first() << " <-> " << twoProcs.second() << endl;
}
Pout<< endl;
@ -158,44 +157,46 @@ Foam::commSchedule::commSchedule
while (true)
{
label maxCommI = -1;
label maxComm = -1;
label maxNeed = labelMin;
forAll(comms, commI)
forAll(comms, commPairi)
{
label proc0 = comms[commI][0];
label proc1 = comms[commI][1];
const label proc0 = comms[commPairi].first();
const label proc1 = comms[commPairi].second();
if
(
commToSchedule[commI] == -1 // unscheduled
&& !busy[proc0]
&& !busy[proc1]
commToSchedule[commPairi] == -1 // unscheduled
&& !busy[proc0]
&& !busy[proc1]
)
{
label need =
(
outstandingComms(commToSchedule, procToComms[proc0])
+ outstandingComms(commToSchedule, procToComms[proc1]);
+ outstandingComms(commToSchedule, procToComms[proc1])
);
if (need > maxNeed)
if (maxNeed < need)
{
maxNeed = need;
maxCommI = commI;
maxComm = commPairi;
}
}
}
if (maxCommI == -1)
if (maxComm == -1)
{
// Found no unscheduled procs.
break;
}
// Schedule commI in this iteration
commToSchedule[maxCommI] = nScheduled++;
busy[comms[maxCommI][0]] = true;
busy[comms[maxCommI][1]] = true;
// Schedule commPairi in this iteration
commToSchedule[maxComm] = nScheduled++;
busy[comms[maxComm].first()] = true;
busy[comms[maxComm].second()] = true;
}
if (debug && UPstream::master())
@ -206,16 +207,16 @@ Foam::commSchedule::commSchedule
{
labelList procToComm(nProcs, -1);
forAll(commToSchedule, commI)
forAll(commToSchedule, commPairi)
{
label sched = commToSchedule[commI];
const label sched = commToSchedule[commPairi];
if (sched >= oldNScheduled && sched < nScheduled)
{
label proc0 = comms[commI][0];
procToComm[proc0] = commI;
label proc1 = comms[commI][1];
procToComm[proc1] = commI;
const label proc0 = comms[commPairi].first();
const label proc1 = comms[commPairi].second();
procToComm[proc0] = commPairi;
procToComm[proc1] = commPairi;
}
}
@ -255,31 +256,32 @@ Foam::commSchedule::commSchedule
labelList nProcScheduled(nProcs, Zero);
// Count
forAll(schedule_, i)
for (const label commPairi : schedule_)
{
label commI = schedule_[i];
const labelPair& twoProcs = comms[commI];
const labelPair& twoProcs = comms[commPairi];
nProcScheduled[twoProcs[0]]++;
nProcScheduled[twoProcs[1]]++;
nProcScheduled[twoProcs.first()]++;
nProcScheduled[twoProcs.second()]++;
}
// Allocate
forAll(procSchedule_, proci)
{
procSchedule_[proci].setSize(nProcScheduled[proci]);
procSchedule_[proci].resize_nocopy(nProcScheduled[proci]);
}
nProcScheduled = 0;
// Fill
forAll(schedule_, i)
for (const label commPairi : schedule_)
{
label commI = schedule_[i];
const labelPair& twoProcs = comms[commI];
const labelPair& twoProcs = comms[commPairi];
label proc0 = twoProcs[0];
procSchedule_[proc0][nProcScheduled[proc0]++] = commI;
const label proc0 = twoProcs.first();
const label proc1 = twoProcs.second();
label proc1 = twoProcs[1];
procSchedule_[proc1][nProcScheduled[proc1]++] = commI;
procSchedule_[proc0][nProcScheduled[proc0]++] = commPairi;
procSchedule_[proc1][nProcScheduled[proc1]++] = commPairi;
}
if (debug && UPstream::master())
@ -292,13 +294,13 @@ Foam::commSchedule::commSchedule
Pout<< "Processor " << proci << " talks to processors:" << endl;
forAll(procComms, i)
for (const label commPairi : procComms)
{
const labelPair& twoProcs = comms[procComms[i]];
const labelPair& twoProcs = comms[commPairi];
label nbr = (twoProcs[1] == proci ? twoProcs[0] : twoProcs[1]);
Pout<< " " << nbr << endl;
Pout<< " "
<< (proci == twoProcs[1] ? twoProcs[0] : twoProcs[1])
<< endl;
}
}
Pout<< endl;

View File

@ -0,0 +1,120 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "processorTopology.H"
#include "Pstream.H"
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::processorTopology::processorTopology()
:
procPatchMap_(0),
comm_(UPstream::worldComm)
{}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
const Foam::labelList& Foam::processorTopology::procNeighbours() const
{
if (procNeighbours_.empty() && !procPatchMap_.empty())
{
// My neighbouring procs in ascending sorted order
procNeighbours_ = procPatchMap_.sortedToc();
}
return procNeighbours_;
}
// May be useful in the future...
// ------------------------------
//
// const Foam::labelUList Foam::processorTopology::below() const
// {
// const auto& all = procNeighbours();
//
// const auto* pivot = std::upper_bound
// (
// all.begin(),
// all.end(),
// UPstream::myProcNo(comm_)
// );
//
// if (pivot != all.end())
// {
// return UList<label>
// (
// const_cast<label*>(all.begin()),
// (pivot - all.begin())
// );
// }
// return UList<label>();
// }
//
//
// const Foam::labelUList Foam::processorTopology::above() const
// {
// const auto& all = procNeighbours();
//
// const auto* pivot = std::upper_bound
// (
// all.begin(),
// all.end(),
// UPstream::myProcNo(comm_)
// );
// if (pivot != all.end())
// {
// return UList<label>
// (
// const_cast<label*>(pivot),
// (all.end() - pivot)
// );
// }
// return UList<label>();
// }
const Foam::labelListList& Foam::processorTopology::procAdjacency() const
{
if (UPstream::parRun() && procAdjacencyTable_.empty())
{
procAdjacencyTable_.resize(UPstream::nProcs(comm_));
// My neighbouring procs in ascending sorted order
procAdjacencyTable_[UPstream::myProcNo(comm_)]
= procPatchMap_.sortedToc();
// Synchronize on all processors
Pstream::allGatherList(procAdjacencyTable_, UPstream::msgType(), comm_);
}
return procAdjacencyTable_;
}
// ************************************************************************* //

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -51,6 +51,8 @@ SourceFiles
#include "labelList.H"
#include "lduSchedule.H"
#include "Map.H"
#include "UPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -65,48 +67,41 @@ class processorTopology
{
// Private Data
//- The processor to processor connection (adjacency) table
labelListList procNeighbours_;
//- Map from neighbour proc to patch index
//- (processor-local information!)
Map<label> procPatchMap_;
//- The neighbour processor connections (ascending order) associated
//- with the local rank.
// Possibly demand-driven data.
mutable labelList procNeighbours_;
//- The complete processor to processor connection adjacency table.
//- Globally synchronized information.
// Likely demand-driven data.
mutable labelListList procAdjacencyTable_;
//- Order in which the patches should be initialised/evaluated
//- corresponding to the schedule
lduSchedule patchSchedule_;
//- Local map from neighbour proc to patchi. Different per processor!
// -1 or patchi for connection to procID
labelList procPatchMap_;
//- The communicator used during creation of the topology
label comm_;
// Private Member Functions
//- From neighbour processor to index in our local boundary.
// Local information (so not same over all processors)
///const labelList& procPatchMap() const noexcept
///{
/// return procPatchMap_;
///}
//- Which \em local boundary is attached to specified processor
// Local information (so not same over all processors)
// \return -1 if not currently connected to specified processor.
label procToLocalPatch(const label proci) const
{
return
(
proci >= 0 && proci < procPatchMap_.size()
? procPatchMap_[proci]
: static_cast<label>(-1)
);
}
// Private Methods Functions
// Could expose as public...
// //- Map of neighbour processor to \em local boundary patch index.
// const Map<label>& procPatchMap() const noexcept
// {
// return procPatchMap_;
// }
public:
// Generated Methods
//- Default construct (empty)
processorTopology() = default;
//- Copy construct
processorTopology(const processorTopology&) = default;
@ -120,6 +115,12 @@ public:
processorTopology& operator=(processorTopology&&) = default;
// Constructors
//- Default construct (empty)
processorTopology();
// Static Functions
//- Factory method to create topology, schedule and proc/patch maps.
@ -135,17 +136,24 @@ public:
// Member Functions
//- The number of processors used by the topology
label nProcs() const noexcept
{
return procNeighbours_.size();
}
//- The communicator used during creation of the topology
label comm() const noexcept { return comm_; }
//- The processor to processor connection topology
//- (like an adjacency list). Globally synchronized information
const labelListList& procNeighbours() const noexcept
//- The neighbour processor connections (ascending order) associated
//- with the \em local rank.
const labelList& procNeighbours() const;
//- The complete processor to processor connection adjacency table.
//- Globally synchronized information.
// Likely demand-driven data.
const labelListList& procAdjacency() const;
//- Which \em local boundary is attached to specified neighbour
//- processor.
// \return -1 if not currently connected to specified processor.
label procPatchLookup(const label proci) const
{
return procNeighbours_;
return procPatchMap_.lookup(proci, -1);
}
//- Order in which the patches should be initialised/evaluated
@ -154,14 +162,6 @@ public:
{
return patchSchedule_;
}
//- Which \em local boundary is attached to specified processor
// Local information (so not same over all processors)
// \return -1 if not currently connected to specified processor.
label procPatchLookup(const label proci) const
{
return procToLocalPatch(proci);
}
};

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -44,10 +44,8 @@ Warning
#define Foam_processorTopologyNew_H
#include "processorTopology.H"
#include "bitSet.H"
#include "commSchedule.H"
#include "DynamicList.H"
#include "Pstream.H"
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
@ -58,73 +56,31 @@ Foam::processorTopology Foam::processorTopology::New
const label comm
)
{
const label myProci = Pstream::myProcNo(comm);
const label nProcs = Pstream::nProcs(comm);
processorTopology topo;
processorTopology procTopo;
topo.comm_ = comm; // The communicator
auto& schedule = topo.patchSchedule_;
auto& procToProcs = procTopo.procNeighbours_;
auto& procToPatch = procTopo.procPatchMap_;
auto& schedule = procTopo.patchSchedule_;
procToProcs.resize(nProcs);
schedule.resize(2*patches.size());
if (Pstream::parRun())
{
// Fill my 'slot' with my neighbours
auto& procSlot = procToProcs[myProci];
bitSet isNeighbour(procToProcs.size());
forAll(patches, patchi)
{
const auto* cpp = isA<ProcPatch>(patches[patchi]);
if (cpp)
{
const label nbrProci = cpp->neighbProcNo();
isNeighbour.set(nbrProci);
}
}
// The neighbouring procs in sorted (ascending) order
procSlot = isNeighbour.sortedToc();
const label maxNbrProci = procSlot.empty() ? -1 : procSlot.last();
// Note could also use Map<label> if desired
procToPatch.resize_nocopy(maxNbrProci + 1);
procToPatch = -1;
forAll(patches, patchi)
{
const auto* cpp = isA<ProcPatch>(patches[patchi]);
if (cpp)
{
const label nbrProci = cpp->neighbProcNo();
// Reverse map
procToPatch[nbrProci] = patchi;
}
}
// Synchronize on all processors
Pstream::allGatherList(procToProcs, UPstream::msgType(), comm);
}
// Define the schedule
// The evaluation number within the schedule
label patchEvali = 0;
// 1. All non-processor patches
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// Number of processor patches
label numProcPatches = 0;
//
// 1. Schedule all non-processor patches
//
forAll(patches, patchi)
{
if (!isA<ProcPatch>(patches[patchi]))
if (isA<ProcPatch>(patches[patchi]))
{
++numProcPatches;
}
else
{
schedule[patchEvali++].setInitEvaluate(patchi);
schedule[patchEvali++].setEvaluate(patchi);
@ -132,15 +88,42 @@ Foam::processorTopology Foam::processorTopology::New
}
// 2. All processor patches
// ~~~~~~~~~~~~~~~~~~~~~~~~
// Assemble processor patch information
if (UPstream::parRun() && numProcPatches)
{
// Create reverse map (from proc to patch)
// - assumes single connections between processors
auto& patchMap = topo.procPatchMap_;
patchMap.resize(2*numProcPatches);
forAll(patches, patchi)
{
const auto* cpp = isA<ProcPatch>(patches[patchi]);
if (cpp)
{
patchMap.set(cpp->neighbProcNo(), patchi);
}
}
}
//
// 2. Handle processor patches
//
if
(
Pstream::parRun()
&& Pstream::defaultCommsType == Pstream::commsTypes::scheduled
UPstream::parRun()
&& UPstream::defaultCommsType == UPstream::commsTypes::scheduled
)
{
const label myProci = UPstream::myProcNo(comm);
const label nProcs = UPstream::nProcs(comm);
// Synchronized on all processors
const auto& procToProcs = topo.procAdjacency();
// Determine the schedule for all processor patches.
// Insert processor pair once to determine the schedule.
// Each processor pair stands for both send and receive.
@ -158,30 +141,28 @@ Foam::processorTopology Foam::processorTopology::New
{
if (proci < nbrProci)
{
comms.append(labelPair(proci, nbrProci));
// Owner to neighbour connection
comms.push_back(labelPair(proci, nbrProci));
}
}
}
// Determine a schedule.
labelList mySchedule
(
commSchedule
(
nProcs,
comms
).procSchedule()[myProci]
commSchedule(nProcs, comms).procSchedule()[myProci]
);
for (const label scheduleIndex : mySchedule)
{
// Get the other processor
label nbrProci = comms[scheduleIndex][0];
label nbrProci = comms[scheduleIndex].first();
if (nbrProci == myProci)
{
nbrProci = comms[scheduleIndex][1];
nbrProci = comms[scheduleIndex].second();
}
const label patchi = procToPatch[nbrProci];
const label patchi = topo.procPatchLookup(nbrProci);
if (myProci > nbrProci)
{
@ -199,26 +180,21 @@ Foam::processorTopology Foam::processorTopology::New
{
// Non-blocking schedule for processor patches
// initEvaluate
forAll(patches, patchi)
if (numProcPatches)
{
if (isA<ProcPatch>(patches[patchi]))
forAll(patches, patchi)
{
schedule[patchEvali++].setInitEvaluate(patchi);
}
}
// evaluate
forAll(patches, patchi)
{
if (isA<ProcPatch>(patches[patchi]))
{
schedule[patchEvali++].setEvaluate(patchi);
if (isA<ProcPatch>(patches[patchi]))
{
schedule[patchEvali].setInitEvaluate(patchi);
schedule[patchEvali + numProcPatches].setEvaluate(patchi);
++patchEvali;
}
}
}
}
return procTopo;
return topo;
}

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017, 2020 OpenFOAM Foundation
Copyright (C) 2020-2022 OpenCFD Ltd.
Copyright (C) 2020-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -161,8 +161,7 @@ void Foam::Cloud<ParticleType>::move
const labelList& procPatchNeighbours = pData.processorPatchNeighbours();
// Which processors this processor is connected to
const labelList& neighbourProcs =
pData.topology().procNeighbours()[Pstream::myProcNo()];
const labelList& neighbourProcs = pData.topology().procNeighbours();
// Initialise the stepFraction moved for the particles
for (ParticleType& p : *this)