ENH: reduce overhead for particle transfer

- For v2112 and earlier: pre-assembled lists of particles
  to be transferred and target patch on a per processor basis.
  Apart from memory overhead of assembling the lists this adds
  allocations/de-allocation when building linked-lists.

- Now stream particle transfer tuples directly into PstreamBuffers.
  Use a local cache of UOPstream wrappers for the formatters
  (since there are potentially many particles being shifted about).
  On the receiving size, read out tuple-wise.

- Communication on transfers now restricted to the immediate
  neighbours instead of using an all-to-all to exchange sizes.

  Applied to Cloud::move and RecycleInteraction
This commit is contained in:
Mark Olesen
2022-03-02 17:00:43 +01:00
committed by Andrew Heather
parent 2b54d86152
commit 4540b064ee
2 changed files with 126 additions and 159 deletions

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017, 2020 OpenFOAM Foundation
Copyright (C) 2020-2021 OpenCFD Ltd.
Copyright (C) 2020-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -89,7 +89,7 @@ Foam::Cloud<ParticleType>::Cloud
// Ask for the tetBasePtIs to trigger all processors to build
// them, otherwise, if some processors have no particles then
// there is a comms mismatch.
polyMesh_.tetBasePtIs();
(void)polyMesh_.tetBasePtIs();
if (particles.size())
{
@ -163,54 +163,55 @@ void Foam::Cloud<ParticleType>::move
// Which processors this processor is connected to
const labelList& neighbourProcs = pData[Pstream::myProcNo()];
// Indexing from the processor number into the neighbourProcs list
labelList neighbourProcIndices(Pstream::nProcs(), -1);
forAll(neighbourProcs, i)
{
neighbourProcIndices[neighbourProcs[i]] = i;
}
// Initialise the stepFraction moved for the particles
forAllIters(*this, pIter)
for (ParticleType& p : *this)
{
pIter().reset();
p.reset();
}
// List of lists of particles to be transferred for all of the
// neighbour processors
List<IDLList<ParticleType>> particleTransferLists
(
neighbourProcs.size()
);
// List of destination processorPatches indices for all of the
// neighbour processors
List<DynamicList<label>> patchIndexTransferLists
(
neighbourProcs.size()
);
// Allocate transfer buffers
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Clear the global positions as there are about to change
// Clear the global positions as these are about to change
globalPositionsPtr_.clear();
// For v2112 and earlier: pre-assembled lists of particles
// to be transferred and target patch on a per processor basis.
// Apart from memory overhead of assembling the lists this adds
// allocations/de-allocation when building linked-lists.
// Now stream particle transfer tuples directly into PstreamBuffers.
// Use a local cache of UOPstream wrappers for the formatters
// (since there are potentially many particles being shifted about).
// Allocate transfer buffers,
// automatic clearStorage when UIPstream closes is disabled.
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
pBufs.allowClearRecv(false);
// Cache of opened UOPstream wrappers
PtrList<UOPstream> UOPstreamPtrs(Pstream::nProcs());
// While there are particles to transfer
while (true)
{
particleTransferLists = IDLList<ParticleType>();
forAll(patchIndexTransferLists, i)
// Reset transfer buffers
pBufs.clear();
// Rewind existing streams
forAll(UOPstreamPtrs, proci)
{
patchIndexTransferLists[i].clear();
auto* osptr = UOPstreamPtrs.get(proci);
if (osptr)
{
osptr->rewind();
}
}
// Loop over all particles
for (ParticleType& p : *this)
{
// Move the particle
bool keepParticle = p.move(cloud, td, trackTime);
const bool keepParticle = p.move(cloud, td, trackTime);
// If the particle is to be kept
// (i.e. it hasn't passed through an inlet or outlet)
@ -235,22 +236,27 @@ void Foam::Cloud<ParticleType>::move
const label patchi = p.patch();
const label n = neighbourProcIndices
[
refCast<const processorPolyPatch>
(
pbm[patchi]
).neighbProcNo()
];
const label toProci =
(
refCast<const processorPolyPatch>(pbm[patchi])
.neighbProcNo()
);
// Get/create output stream
auto* osptr = UOPstreamPtrs.get(toProci);
if (!osptr)
{
osptr = new UOPstream(toProci, pBufs);
UOPstreamPtrs.set(toProci, osptr);
}
p.prepareForParallelTransfer();
particleTransferLists[n].append(this->remove(&p));
// Tuple: (patchi particle)
(*osptr) << procPatchNeighbours[patchi] << p;
patchIndexTransferLists[n].append
(
procPatchNeighbours[patchi]
);
// Can now remove from my list
deleteParticle(p);
}
}
else
@ -264,29 +270,7 @@ void Foam::Cloud<ParticleType>::move
break;
}
// Clear transfer buffers
pBufs.clear();
// Stream into send buffers
forAll(particleTransferLists, i)
{
if (particleTransferLists[i].size())
{
UOPstream particleStream
(
neighbourProcs[i],
pBufs
);
particleStream
<< patchIndexTransferLists[i]
<< particleTransferLists[i];
}
}
pBufs.finishedSends();
pBufs.finishedNeighbourSends(neighbourProcs);
if (!returnReduce(pBufs.hasRecvData(), orOp<bool>()))
{
@ -295,29 +279,23 @@ void Foam::Cloud<ParticleType>::move
}
// Retrieve from receive buffers
for (const label neighbProci : neighbourProcs)
for (const label proci : neighbourProcs)
{
if (pBufs.hasRecvData(neighbProci))
if (pBufs.hasRecvData(proci))
{
UIPstream particleStream(neighbProci, pBufs);
UIPstream is(proci, pBufs);
labelList receivePatchIndex(particleStream);
IDLList<ParticleType> newParticles
(
particleStream,
typename ParticleType::iNew(polyMesh_)
);
label pI = 0;
for (ParticleType& newp : newParticles)
// Read out each (patchi particle) tuple
while (!is.eof())
{
label patchi = procPatches[receivePatchIndex[pI++]];
label patchi = pTraits<label>(is);
auto* newp = new ParticleType(polyMesh_, is);
newp.correctAfterParallelTransfer(patchi, td);
// The real patch index
patchi = procPatches[patchi];
addParticle(newParticles.remove(&newp));
(*newp).correctAfterParallelTransfer(patchi, td);
addParticle(newp);
}
}
}
@ -349,9 +327,9 @@ void Foam::Cloud<ParticleType>::autoMap(const mapPolyMesh& mapper)
const vectorField& positions = globalPositionsPtr_();
label i = 0;
forAllIters(*this, iter)
for (ParticleType& p : *this)
{
iter().autoMap(positions[i], mapper);
p.autoMap(positions[i], mapper);
++i;
}
}
@ -360,20 +338,19 @@ void Foam::Cloud<ParticleType>::autoMap(const mapPolyMesh& mapper)
template<class ParticleType>
void Foam::Cloud<ParticleType>::writePositions() const
{
OFstream pObj
OFstream os
(
this->db().time().path()/this->name() + "_positions.obj"
);
forAllConstIters(*this, pIter)
for (const ParticleType& p : *this)
{
const ParticleType& p = pIter();
const point position(p.position());
pObj<< "v " << position.x() << " " << position.y() << " "
os << "v "
<< position.x() << ' '
<< position.y() << ' '
<< position.z() << nl;
}
pObj.flush();
}
@ -386,13 +363,12 @@ void Foam::Cloud<ParticleType>::storeGlobalPositions() const
// within autoMap, and this pre-processing would not be necessary.
globalPositionsPtr_.reset(new vectorField(this->size()));
vectorField& positions = globalPositionsPtr_();
label i = 0;
forAllConstIters(*this, iter)
for (const ParticleType& p : *this)
{
positions[i] = iter().position();
positions[i] = p.position();
++i;
}
}

View File

@ -205,10 +205,13 @@ void Foam::RecycleInteraction<CloudType>::postEvolve()
{
if (Pstream::parRun())
{
// Relocate the recycled parcels into slots for each receiving processor
List<IDLList<parcelType>> transferParcels(Pstream::nProcs());
List<DynamicList<scalar>> fractions(Pstream::nProcs());
List<DynamicList<label>> patchAddr(Pstream::nProcs());
// See comments in Cloud::move() about transfer particles handling
// Allocate transfer buffers
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Cache of opened UOPstream wrappers
PtrList<UOPstream> UOPstreamPtrs(Pstream::nProcs());
auto& rnd = this->owner().rndGen();
@ -217,7 +220,7 @@ void Foam::RecycleInteraction<CloudType>::postEvolve()
auto& patchParcels = recycledParcels_[addri];
auto& injectionPatch = injectionPatchPtr_[addri];
forAllIters(patchParcels, pIter)
for (parcelType& p : patchParcels)
{
// Choose a random location to insert the parcel
const scalar fraction01 = rnd.template sample01<scalar>();
@ -225,32 +228,19 @@ void Foam::RecycleInteraction<CloudType>::postEvolve()
// Identify the processor that owns the location
const label toProci = injectionPatch.whichProc(fraction01);
// Store info in slot for target processor
transferParcels[toProci].append(patchParcels.remove(pIter));
fractions[toProci].append(fraction01);
patchAddr[toProci].append(addri);
}
}
// Get/create output stream
auto* osptr = UOPstreamPtrs.get(toProci);
if (!osptr)
{
osptr = new UOPstream(toProci, pBufs);
UOPstreamPtrs.set(toProci, osptr);
}
// Set-up the sends
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Tuple: (address fraction particle)
(*osptr) << addri << fraction01 << p;
// Clear transfer buffers
pBufs.clear();
// Stream into send buffers
forAll(transferParcels, proci)
{
if (transferParcels[proci].size())
{
UOPstream particleStream(proci, pBufs);
particleStream
<< transferParcels[proci]
<< fractions[proci]
<< patchAddr[proci];
transferParcels[proci].clear();
// Can now remove from list and delete
delete(patchParcels.remove(&p));
}
}
@ -258,7 +248,7 @@ void Foam::RecycleInteraction<CloudType>::postEvolve()
if (!returnReduce(pBufs.hasRecvData(), orOp<bool>()))
{
// No parcels to transfer
// No parcels to recycle
return;
}
@ -267,48 +257,47 @@ void Foam::RecycleInteraction<CloudType>::postEvolve()
{
if (pBufs.hasRecvData(proci))
{
UIPstream particleStream(proci, pBufs);
IDLList<parcelType> newParticles
(
particleStream,
typename parcelType::iNew(this->owner().mesh())
);
scalarList fractions(particleStream);
labelList patchAddr(particleStream);
UIPstream is(proci, pBufs);
label parceli = 0;
for (parcelType& newp : newParticles)
// Read out each (address fraction particle) tuple
while (!is.eof())
{
const label addri = pTraits<label>(is);
const scalar fraction01 = pTraits<scalar>(is);
auto* newp = new parcelType(this->owner().mesh(), is);
// Parcel to be recycled
vector newPosition;
label cellOwner;
label dummy;
const label addri = patchAddr[parceli];
injectionPatchPtr_[addri].setPositionAndCell
(
mesh_,
fractions[parceli],
fraction01,
this->owner().rndGen(),
newPosition,
cellOwner,
dummy,
dummy
);
newp.relocate(newPosition, cellOwner);
newp.U() = this->owner().U()[cellOwner];
newp.nParticle() *= recycleFraction_;
newp->relocate(newPosition, cellOwner);
newp->nParticle() *= recycleFraction_;
// Assume parcel velocity is same as the carrier velocity
newp->U() = this->owner().U()[cellOwner];
// Injector ID
const label idx =
(
injIdToIndex_.size()
? injIdToIndex_.lookup(newp.typeId(), 0)
: 0
);
++nInjected_[addri][idx];
massInjected_[addri][idx] += newp.nParticle()*newp.mass();
(
injIdToIndex_.size()
? injIdToIndex_.lookup(newp->typeId(), 0)
: 0
);
this->owner().addParticle(newParticles.remove(&newp));
++parceli;
++nInjected_[addri][idx];
massInjected_[addri][idx] += newp->nParticle()*newp->mass();
this->owner().addParticle(newp);
}
}
}
@ -317,8 +306,10 @@ void Foam::RecycleInteraction<CloudType>::postEvolve()
{
forAll(recycledParcels_, addri)
{
forAllIters(recycledParcels_[addri], iter)
for (parcelType& p : recycledParcels_[addri])
{
parcelType* newp = recycledParcels_[addri].remove(&p);
// Parcel to be recycled
vector newPosition;
label cellOwner;
@ -334,19 +325,19 @@ void Foam::RecycleInteraction<CloudType>::postEvolve()
);
// Update parcel properties
parcelType* newp = recycledParcels_[addri].remove(iter);
newp->relocate(newPosition, cellOwner);
newp->nParticle() *= recycleFraction_;
// Assume parcel velocity is same as the carrier velocity
newp->U() = this->owner().U()[cellOwner];
// Injector ID
const label idx =
(
injIdToIndex_.size()
? injIdToIndex_.lookup(newp->typeId(), 0)
: 0
);
(
injIdToIndex_.size()
? injIdToIndex_.lookup(newp->typeId(), 0)
: 0
);
++nInjected_[addri][idx];
massInjected_[addri][idx] += newp->nParticle()*newp->mass();
@ -369,7 +360,7 @@ void Foam::RecycleInteraction<CloudType>::info(Ostream& os)
forAll(nRemoved_, patchi)
{
label lsd = nRemoved_[patchi].size();
const label lsd = nRemoved_[patchi].size();
npr0[patchi].setSize(lsd, Zero);
mpr0[patchi].setSize(lsd, Zero);
npi0[patchi].setSize(lsd, Zero);