ENH: additional control and access methods for PstreamBuffers

- PstreamBuffers nProcs() and allProcs() methods to recover the rank
  information consistent with the communicator used for construction

- allowClearRecv() methods for more control over buffer reuse
  For example,

      pBufs.allowClearRecv(false);

      forAll(particles, particlei)
      {
          pBufs.clear();

          fill...

          read via IPstream(..., pBufs);
       }

  This preserves the receive buffers memory allocation between calls.

- finishedNeighbourSends() method as compact wrapper for
  finishedSends() when send/recv ranks are identically
  (eg, neighbours)

- hasSendData()/hasRecvData() methods for PstreamBuffers.

  Can be useful for some situations to skip reading entirely.
  For example,

      pBufs.finishedNeighbourSends(neighProcs);

      if (!returnReduce(pBufs.hasRecvData(), orOp<bool>()))
      {
          // Nothing to do
          continue;
      }
      ...

  On an individual basis:

      for (const int proci : pBufs.allProcs())
      {
          if (pBufs.hasRecvData(proci))
          {
             ...
          }
      }

  Also conceivable to do the following instead (nonBlocking only):

      if (!returnReduce(pBufs.hasSendData(), orOp<bool>()))
      {
          // Nothing to do
          pBufs.clear();
          continue;
      }

      pBufs.finishedNeighbourSends(neighProcs);
      ...
This commit is contained in:
Mark Olesen
2022-02-28 13:31:52 +01:00
committed by Andrew Heather
parent bfca84d11a
commit 14631984df
20 changed files with 411 additions and 141 deletions

View File

@ -0,0 +1,3 @@
Test-processorTopology.C
EXE = $(FOAM_USER_APPBIN)/Test-processorTopology

View File

@ -0,0 +1,2 @@
/* EXE_INC = */
/* EXE_LIBS = */

View File

@ -0,0 +1,113 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Description
Test/output processor topology
\*---------------------------------------------------------------------------*/
#include "argList.H"
#include "Time.H"
#include "polyMesh.H"
#include "globalMeshData.H"
#include "OFstream.H"
using namespace Foam;
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// Main program:
int main(int argc, char *argv[])
{
argList::noFunctionObjects();
argList::addNote
(
"Create a metis graph file representation for an OpenFOAM mesh"
);
#include "setRootCase.H"
if (!Pstream::parRun())
{
FatalErrorInFunction
<< "Only meaningful in parallel"
<< exit(FatalError);
}
#include "createTime.H"
#include "createPolyMesh.H"
const globalMeshData& globData = mesh.globalData();
const labelListList& connectivity = globData;
if (Pstream::master())
{
OFstream os("processorTopology.dot");
os << "// processorTopology" << nl << nl;
os.beginBlock("graph");
forAll(connectivity, proci)
{
label nconn = 0;
for (const label neighProci : connectivity[proci])
{
if (proci < neighProci)
{
if (nconn++)
{
os << " ";
}
else
{
os << indent;
}
os << proci << " -- " << neighProci;
}
}
if (nconn)
{
os << nl;
}
}
os.endBlock();
Info<< "Wrote processorTopology graph: "
<< runTime.relativePath(os.name()) << nl;
Info<< nl
<< "Use neato, circo or fdp graphviz tools" << nl;
}
Info<< nl << "End\n" << endl;
return 0;
}
// ************************************************************************* //

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2015 OpenFOAM Foundation Copyright (C) 2015 OpenFOAM Foundation
Copyright (C) 2015-2018 OpenCFD Ltd. Copyright (C) 2015-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -185,9 +185,8 @@ Foam::parLagrangianRedistributor::redistributeLagrangianPositions
} }
// Start sending. Sets number of bytes transferred // Start sending
labelList allNTrans(Pstream::nProcs()); pBufs.finishedSends();
pBufs.finishedSends(allNTrans);
{ {
@ -205,18 +204,15 @@ Foam::parLagrangianRedistributor::redistributeLagrangianPositions
IDLList<passivePositionParticle>() IDLList<passivePositionParticle>()
); );
// Retrieve from receive buffers // Retrieve from receive buffers
forAll(allNTrans, procI) for (const int proci : pBufs.allProcs())
{ {
const label nRec = allNTrans[procI]; //Pout<< "Receive from processor" << proci << " : "
// << pBufs.hasRecvData(proci) << endl;
//Pout<< "From processor " << procI << " receiving bytes " << nRec if (pBufs.hasRecvData(proci))
// << endl;
if (nRec)
{ {
UIPstream particleStream(procI, pBufs); UIPstream particleStream(proci, pBufs);
// Receive particles and locate them // Receive particles and locate them
IDLList<passivePositionParticle> newParticles IDLList<passivePositionParticle> newParticles

View File

@ -394,8 +394,7 @@ bool Foam::decomposedBlockData::readBlocks
} }
} }
labelList recvSizes; pBufs.finishedSends();
pBufs.finishedSends(recvSizes);
if (!UPstream::master(comm)) if (!UPstream::master(comm))
{ {
@ -524,8 +523,7 @@ Foam::autoPtr<Foam::ISstream> Foam::decomposedBlockData::readBlocks
ok = is.good(); ok = is.good();
} }
labelList recvSizes; pBufs.finishedSends();
pBufs.finishedSends(recvSizes);
if (!UPstream::master(comm)) if (!UPstream::master(comm))
{ {

View File

@ -33,10 +33,11 @@ License
void Foam::PstreamBuffers::finalExchange void Foam::PstreamBuffers::finalExchange
( (
labelList& recvSizes, labelList& recvSizes,
const bool block const bool wait
) )
{ {
// Could also check that it is not called twice // Could also check that it is not called twice
// but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true; finishedSendsCalled_ = true;
if (commsType_ == UPstream::commsTypes::nonBlocking) if (commsType_ == UPstream::commsTypes::nonBlocking)
@ -51,7 +52,7 @@ void Foam::PstreamBuffers::finalExchange
recvBuf_, recvBuf_,
tag_, tag_,
comm_, comm_,
block wait
); );
} }
} }
@ -62,10 +63,11 @@ void Foam::PstreamBuffers::finalExchange
const labelUList& sendProcs, const labelUList& sendProcs,
const labelUList& recvProcs, const labelUList& recvProcs,
labelList& recvSizes, labelList& recvSizes,
const bool block const bool wait
) )
{ {
// Could also check that it is not called twice // Could also check that it is not called twice
// but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true; finishedSendsCalled_ = true;
if (commsType_ == UPstream::commsTypes::nonBlocking) if (commsType_ == UPstream::commsTypes::nonBlocking)
@ -87,7 +89,7 @@ void Foam::PstreamBuffers::finalExchange
recvBuf_, recvBuf_,
tag_, tag_,
comm_, comm_,
block wait
); );
} }
} }
@ -104,6 +106,7 @@ Foam::PstreamBuffers::PstreamBuffers
) )
: :
finishedSendsCalled_(false), finishedSendsCalled_(false),
allowClearRecv_(true),
format_(fmt), format_(fmt),
commsType_(commsType), commsType_(commsType),
tag_(tag), tag_(tag),
@ -151,20 +154,107 @@ void Foam::PstreamBuffers::clear()
} }
void Foam::PstreamBuffers::finishedSends(const bool block) void Foam::PstreamBuffers::clearStorage()
{
// Could also clear out entire sendBuf_, recvBuf_ and reallocate.
// Not sure if it makes much difference
for (DynamicList<char>& buf : sendBuf_)
{
buf.clearStorage();
}
for (DynamicList<char>& buf : recvBuf_)
{
buf.clearStorage();
}
recvBufPos_ = 0;
finishedSendsCalled_ = false;
}
bool Foam::PstreamBuffers::hasSendData() const
{
for (const DynamicList<char>& buf : sendBuf_)
{
if (!buf.empty())
{
return true;
}
}
return false;
}
bool Foam::PstreamBuffers::hasSendData(const label proci) const
{
return !sendBuf_[proci].empty();
}
bool Foam::PstreamBuffers::hasRecvData() const
{
if (finishedSendsCalled_)
{
for (const DynamicList<char>& buf : recvBuf_)
{
if (!buf.empty())
{
return true;
}
}
}
#ifdef FULLDEBUG
else
{
FatalErrorInFunction
<< "Call finishedSends first" << exit(FatalError);
}
#endif
return false;
}
bool Foam::PstreamBuffers::hasRecvData(const label proci) const
{
if (finishedSendsCalled_)
{
return !recvBuf_[proci].empty();
}
#ifdef FULLDEBUG
else
{
FatalErrorInFunction
<< "Call finishedSends first" << exit(FatalError);
}
#endif
return false;
}
bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
{
bool old(allowClearRecv_);
allowClearRecv_ = on;
return old;
}
void Foam::PstreamBuffers::finishedSends(const bool wait)
{ {
labelList recvSizes; labelList recvSizes;
finalExchange(recvSizes, block); finalExchange(recvSizes, wait);
} }
void Foam::PstreamBuffers::finishedSends void Foam::PstreamBuffers::finishedSends
( (
labelList& recvSizes, labelList& recvSizes,
const bool block const bool wait
) )
{ {
finalExchange(recvSizes, block); finalExchange(recvSizes, wait);
if (commsType_ != UPstream::commsTypes::nonBlocking) if (commsType_ != UPstream::commsTypes::nonBlocking)
{ {
@ -184,11 +274,11 @@ void Foam::PstreamBuffers::finishedSends
( (
const labelUList& sendProcs, const labelUList& sendProcs,
const labelUList& recvProcs, const labelUList& recvProcs,
const bool block const bool wait
) )
{ {
labelList recvSizes; labelList recvSizes;
finalExchange(sendProcs, recvProcs, recvSizes, block); finalExchange(sendProcs, recvProcs, recvSizes, wait);
} }
@ -197,10 +287,10 @@ void Foam::PstreamBuffers::finishedSends
const labelUList& sendProcs, const labelUList& sendProcs,
const labelUList& recvProcs, const labelUList& recvProcs,
labelList& recvSizes, labelList& recvSizes,
const bool block const bool wait
) )
{ {
finalExchange(sendProcs, recvProcs, recvSizes, block); finalExchange(sendProcs, recvProcs, recvSizes, wait);
if (commsType_ != UPstream::commsTypes::nonBlocking) if (commsType_ != UPstream::commsTypes::nonBlocking)
{ {

View File

@ -90,7 +90,7 @@ class PstreamBuffers
{ {
// Friendship // Friendship
friend class UOPstreamBase; // Access to sendBuf_ friend class UOPstreamBase; // Access to sendBuf_
friend class UIPstreamBase; // Access to recvBuf_ recvBufPos_; friend class UIPstreamBase; // Access to recvBuf_, recvBufPos_
// Private Data // Private Data
@ -98,6 +98,9 @@ class PstreamBuffers
//- Track if sends are complete //- Track if sends are complete
bool finishedSendsCalled_; bool finishedSendsCalled_;
//- Permit clear of individual receive buffer by external access
bool allowClearRecv_;
//- Buffer format (ascii | binary) //- Buffer format (ascii | binary)
const IOstreamOption::streamFormat format_; const IOstreamOption::streamFormat format_;
@ -110,30 +113,31 @@ class PstreamBuffers
//- Communicator //- Communicator
const label comm_; const label comm_;
//- Send buffer //- Send buffer. Size is nProcs()
List<DynamicList<char>> sendBuf_; List<DynamicList<char>> sendBuf_;
//- Receive buffer //- Receive buffer. Size is nProcs()
List<DynamicList<char>> recvBuf_; List<DynamicList<char>> recvBuf_;
//- Current read positions within recvBuf_ //- Current read positions within recvBuf_. Size is nProcs()
labelList recvBufPos_; labelList recvBufPos_;
// Private Member Functions // Private Member Functions
//- Mark all sends as having been done. //- Mark all sends as having been done.
// This will start receives in non-blocking mode. // This will start receives (nonBlocking comms).
void finalExchange(labelList& recvSizes, const bool block); void finalExchange(labelList& recvSizes, const bool wait);
//- Mark all sends as having been done. //- Mark sends as done.
// Only exchange sizes between sendProcs/recvProcs // Only exchange sizes using the sendProcs/recvProcs subset
// (nonBlocking comms).
void finalExchange void finalExchange
( (
const labelUList& sendProcs, const labelUList& sendProcs,
const labelUList& recvProcs, const labelUList& recvProcs,
labelList& recvSizes, labelList& recvSizes,
const bool block const bool wait
); );
@ -183,54 +187,157 @@ public:
return comm_; return comm_;
} }
//- True if finishedSends has been called
// Sizing
//- Number of ranks associated with PstreamBuffers
label nProcs() const noexcept
{
return recvBufPos_.size();
}
//- Range of ranks indices associated with PstreamBuffers
UPstream::rangeType allProcs() const noexcept
{
// Proc 0 -> nProcs (int value)
return UPstream::rangeType(static_cast<int>(nProcs()));
}
// Queries
//- True if finishedSends() or finishedNeighbourSends() has been called
bool finished() const noexcept bool finished() const noexcept
{ {
return finishedSendsCalled_; return finishedSendsCalled_;
} }
//- True if any (local) send buffers have data
bool hasSendData() const;
//- True if (local) send buffer has data on specified processor.
bool hasSendData(const label proci) const;
//- True if any (local) recv buffers have data.
//- Must call finishedSends() or finishedNeighbourSends() first!
bool hasRecvData() const;
//- True if (local) recv buffer has data on specified processor.
//- Must call finishedSends() or finishedNeighbourSends() first!
bool hasRecvData(const label proci) const;
//- Is clearStorage of individual receive buffer by external hooks
//- allowed? (default: true)
bool allowClearRecv() const noexcept
{
return allowClearRecv_;
}
// Edit // Edit
//- Reset (clear) individual buffers and reset state. //- Clear individual buffers and reset states.
// Does not clear buffer storage // Does not clear individual buffer storage.
void clear(); void clear();
//- Clear individual buffer storage and reset states.
void clearStorage();
//- Change allowClearRecv, return previous value
bool allowClearRecv(bool on) noexcept;
// Functions // Functions
//- Mark all sends as having been done. //- Mark sends as done
// This will start receives in non-blocking mode. //
// If block will wait for all transfers to finish // Non-blocking mode: populates receive buffers (all-to-all).
// (only relevant for nonBlocking mode) // \param wait wait for requests to complete (in nonBlocking mode)
void finishedSends(const bool block = true); void finishedSends(const bool wait = true);
//- Mark all sends as having been done. //- Mark sends as done.
// Same as above but also returns sizes (bytes) received. //- Recovers the sizes (bytes) received.
// \note currently only valid for non-blocking. //
void finishedSends(labelList& recvSizes, const bool block = true); // Non-blocking mode: populates receive buffers (all-to-all).
// \param[out] recvSizes the sizes (bytes) received
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedSends(labelList& recvSizes, const bool wait = true);
//- Mark all sends as having been done, with limited //- Mark sends as done using subset of send/recv ranks
//- send/recv processor connections. //- to exchange data on.
// \note currently only valid for non-blocking. //
// Non-blocking mode: populates receive buffers.
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedSends void finishedSends
( (
const labelUList& sendProcs, const labelUList& sendProcs,
const labelUList& recvProcs, const labelUList& recvProcs,
const bool block = true const bool wait = true
); );
//- Mark all sends as having been done, with limited //- Mark sends as done using subset of send/recv ranks
//- send/recv processor connections. //- to exchange data on. Recovers the sizes (bytes) received.
// Same as above but also returns sizes (bytes) received. //
// \note currently only valid for non-blocking. // Non-blocking mode: populates receive buffers.
//
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param[out] recvSizes the sizes (bytes) received
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedSends void finishedSends
( (
const labelUList& sendProcs, const labelUList& sendProcs,
const labelUList& recvProcs, const labelUList& recvProcs,
labelList& recvSizes, labelList& recvSizes,
const bool block = true const bool wait = true
); );
//- Mark sends as done using subset of send/recv ranks
//- and recover the sizes (bytes) received.
//
// Non-blocking mode: populates receive buffers.
//
// \param neighProcs ranks used for sends/recvs
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
// \note Same as finishedSends with identical sendProcs/recvProcs
void finishedNeighbourSends
(
const labelUList& neighProcs,
const bool wait = true
)
{
finishedSends(neighProcs, neighProcs, wait);
}
//- Mark sends as done using subset of send/recv ranks
//- and recover the sizes (bytes) received.
//
// Non-blocking mode: it will populate receive buffers.
//
// \param neighProcs ranks used for sends/recvs
// \param[out] recvSizes the sizes (bytes) received
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking mode.
void finishedNeighbourSends
(
const labelUList& neighProcs,
labelList& recvSizes,
const bool wait = true
)
{
finishedSends(neighProcs, neighProcs, recvSizes, wait);
}
}; };

View File

@ -184,7 +184,7 @@ Foam::UIPstreamBase::UIPstreamBase
recvBufPos_(buffers.recvBufPos_[fromProcNo]), recvBufPos_(buffers.recvBufPos_[fromProcNo]),
tag_(buffers.tag()), tag_(buffers.tag()),
comm_(buffers.comm()), comm_(buffers.comm()),
clearAtEnd_(true), clearAtEnd_(buffers.allowClearRecv()),
messageSize_(0) messageSize_(0)
{ {
if if

View File

@ -127,7 +127,7 @@ void Foam::syncTools::syncPointMap
if (Pstream::parRun()) if (Pstream::parRun())
{ {
DynamicList<label> sendRecvProcs; DynamicList<label> neighbProcs;
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Send // Send
@ -159,14 +159,14 @@ void Foam::syncTools::syncPointMap
} }
} }
sendRecvProcs.append(nbrProci); neighbProcs.append(nbrProci);
UOPstream toNbr(nbrProci, pBufs); UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo; toNbr << patchInfo;
} }
} }
// Limit exchange to involved procs // Limit exchange to involved procs
pBufs.finishedSends(sendRecvProcs, sendRecvProcs); pBufs.finishedNeighbourSends(neighbProcs);
// Receive and combine. // Receive and combine.
for (const polyPatch& pp : patches) for (const polyPatch& pp : patches)
@ -370,7 +370,7 @@ void Foam::syncTools::syncEdgeMap
if (Pstream::parRun()) if (Pstream::parRun())
{ {
DynamicList<label> sendRecvProcs; DynamicList<label> neighbProcs;
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Send // Send
@ -404,14 +404,14 @@ void Foam::syncTools::syncEdgeMap
} }
} }
sendRecvProcs.append(nbrProci); neighbProcs.append(nbrProci);
UOPstream toNbr(nbrProci, pBufs); UOPstream toNbr(nbrProci, pBufs);
toNbr << patchInfo; toNbr << patchInfo;
} }
} }
// Limit exchange to involved procs // Limit exchange to involved procs
pBufs.finishedSends(sendRecvProcs, sendRecvProcs); pBufs.finishedNeighbourSends(neighbProcs);
// Receive and combine. // Receive and combine.
@ -1102,7 +1102,7 @@ void Foam::syncTools::syncBoundaryFaceList
} }
else else
{ {
DynamicList<label> sendRecvProcs; DynamicList<label> neighbProcs;
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Send // Send
@ -1122,14 +1122,14 @@ void Foam::syncTools::syncBoundaryFaceList
pp.start()-boundaryOffset pp.start()-boundaryOffset
); );
sendRecvProcs.append(nbrProci); neighbProcs.append(nbrProci);
UOPstream toNbr(nbrProci, pBufs); UOPstream toNbr(nbrProci, pBufs);
toNbr << fld; toNbr << fld;
} }
} }
// Limit exchange to involved procs // Limit exchange to involved procs
pBufs.finishedSends(sendRecvProcs, sendRecvProcs); pBufs.finishedNeighbourSends(neighbProcs);
// Receive and combine. // Receive and combine.

View File

@ -871,15 +871,14 @@ void Foam::globalIndex::get
UOPstream os(proci, sendBufs); UOPstream os(proci, sendBufs);
os << localIDs; os << localIDs;
} }
labelList recvSizes; sendBufs.finishedSends();
sendBufs.finishedSends(recvSizes);
PstreamBuffers returnBufs(Pstream::commsTypes::nonBlocking, tag, comm); PstreamBuffers returnBufs(Pstream::commsTypes::nonBlocking, tag, comm);
forAll(recvSizes, proci) for (const int proci : sendBufs.allProcs())
{ {
if (recvSizes[proci]) if (sendBufs.hasRecvData(proci))
{ {
UIPstream is(proci, sendBufs); UIPstream is(proci, sendBufs);
labelList localIDs(is); labelList localIDs(is);

View File

@ -171,7 +171,6 @@ void Foam::zoneDistribute::setUpCommforZone
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking); PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
labelList recvSizes;
// Stream data into buffer // Stream data into buffer
for (const int proci : UPstream::allProcs()) for (const int proci : UPstream::allProcs())
@ -205,12 +204,12 @@ void Foam::zoneDistribute::setUpCommforZone
if (returnReduce(fullUpdate, orOp<bool>())) if (returnReduce(fullUpdate, orOp<bool>()))
{ {
pBufs.finishedSends(recvSizes); pBufs.finishedSends();
// Update which ones receive // Update which ones receive
for (const int proci : UPstream::allProcs()) for (const int proci : UPstream::allProcs())
{ {
recvFrom_[proci] = (recvSizes[proci] > 0); recvFrom_[proci] = pBufs.hasRecvData(proci);
} }
} }
else else
@ -233,18 +232,14 @@ void Foam::zoneDistribute::setUpCommforZone
} }
// Wait until everything is written // Wait until everything is written
pBufs.finishedSends(sendProcs, recvProcs, recvSizes); pBufs.finishedSends(sendProcs, recvProcs);
} }
for (const int proci : UPstream::allProcs()) for (const int proci : UPstream::allProcs())
{ {
send_[proci].clear(); send_[proci].clear();
if if (proci != UPstream::myProcNo() && pBufs.hasRecvData(proci))
(
proci != UPstream::myProcNo()
&& recvFrom_[proci] // Or: (recvSizes[proci] > 0)
)
{ {
UIPstream fromProc(proci, pBufs); UIPstream fromProc(proci, pBufs);
fromProc >> send_[proci]; fromProc >> send_[proci];

View File

@ -191,7 +191,6 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking); PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
labelList recvSizes;
// Stream data into buffer // Stream data into buffer
for (const int proci : UPstream::allProcs()) for (const int proci : UPstream::allProcs())
@ -222,13 +221,13 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
} }
} }
pBufs.finishedSends(sendProcs, recvProcs, recvSizes); pBufs.finishedSends(sendProcs, recvProcs);
} }
else else
{ {
// Fallback to all-to-all communication // Fallback to all-to-all communication
pBufs.finishedSends(recvSizes); pBufs.finishedSends();
} }
@ -237,11 +236,7 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
// Do not use recvFrom_[proci] here // Do not use recvFrom_[proci] here
// - could be incorrect if comms are not prepared! // - could be incorrect if comms are not prepared!
if if (proci != UPstream::myProcNo() && pBufs.hasRecvData(proci))
(
proci != UPstream::myProcNo()
&& (recvSizes[proci] > 0)
)
{ {
UIPstream fromProc(proci, pBufs); UIPstream fromProc(proci, pBufs);
Map<Type> tmpValue(fromProc); Map<Type> tmpValue(fromProc);

View File

@ -223,7 +223,7 @@ bool Foam::functionObjects::energySpectrum::write()
{ {
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
UOPstream toProc(0, pBufs); UOPstream toProc(Pstream::masterNo(), pBufs);
toProc << Uc << C << cellAddr_; toProc << Uc << C << cellAddr_;

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com \\ / A nd | www.openfoam.com
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2020-2021 OpenCFD Ltd. Copyright (C) 2020-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -96,8 +96,7 @@ void Foam::functionObjects::syncObjects::sync()
); );
const label nProcs = Pstream::nProcs(pBufs.comm()); for (const int proci : pBufs.allProcs())
for (label proci = 0; proci < nProcs; proci++)
{ {
// Get database to send // Get database to send
const objectRegistry& sendObr = mappedPatchBase::subRegistry const objectRegistry& sendObr = mappedPatchBase::subRegistry
@ -123,7 +122,7 @@ void Foam::functionObjects::syncObjects::sync()
// Start sending and receiving and block // Start sending and receiving and block
pBufs.finishedSends(); pBufs.finishedSends();
for (label proci = 0; proci < nProcs; proci++) for (const int proci : pBufs.allProcs())
{ {
// Get database to receive data into // Get database to receive data into
const objectRegistry& receiveObr = mappedPatchBase::subRegistry const objectRegistry& receiveObr = mappedPatchBase::subRegistry

View File

@ -286,34 +286,18 @@ void Foam::Cloud<ParticleType>::move
} }
// Start sending. Sets number of bytes transferred pBufs.finishedSends();
labelList allNTrans(Pstream::nProcs());
pBufs.finishedSends(allNTrans);
if (!returnReduce(pBufs.hasRecvData(), orOp<bool>()))
bool transferred = false;
for (const label n : allNTrans)
{
if (n)
{
transferred = true;
break;
}
}
reduce(transferred, orOp<bool>());
if (!transferred)
{ {
// No parcels to transfer
break; break;
} }
// Retrieve from receive buffers // Retrieve from receive buffers
for (const label neighbProci : neighbourProcs) for (const label neighbProci : neighbourProcs)
{ {
label nRec = allNTrans[neighbProci]; if (pBufs.hasRecvData(neighbProci))
if (nRec)
{ {
UIPstream particleStream(neighbProci, pBufs); UIPstream particleStream(neighbProci, pBufs);

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com \\ / A nd | www.openfoam.com
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2020 OpenCFD Ltd. Copyright (C) 2020-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -254,29 +254,18 @@ void Foam::RecycleInteraction<CloudType>::postEvolve()
} }
} }
// Start sending. Sets number of bytes transferred pBufs.finishedSends();
labelList allNTrans(Pstream::nProcs());
pBufs.finishedSends(allNTrans); if (!returnReduce(pBufs.hasRecvData(), orOp<bool>()))
bool transferred = false;
for (const label n : allNTrans)
{
if (n)
{
transferred = true;
break;
}
}
reduce(transferred, orOp<bool>());
if (!transferred)
{ {
// No parcels to transfer // No parcels to transfer
return; return;
} }
// Retrieve from receive buffers // Retrieve from receive buffers
for (label proci = 0; proci < Pstream::nProcs(); ++proci) for (const int proci : pBufs.allProcs())
{ {
if (allNTrans[proci]) if (pBufs.hasRecvData(proci))
{ {
UIPstream particleStream(proci, pBufs); UIPstream particleStream(proci, pBufs);
IDLList<parcelType> newParticles IDLList<parcelType> newParticles

View File

@ -162,7 +162,7 @@ void Foam::faMeshReconstructor::calcAddressing
{ {
labelList localOrder; labelList localOrder;
UIPstream fromProc(Pstream::master(), pBufs); UIPstream fromProc(Pstream::masterNo(), pBufs);
fromProc >> localOrder; fromProc >> localOrder;
faFaceProcAddr_ = labelList(faFaceProcAddr_, localOrder); faFaceProcAddr_ = labelList(faFaceProcAddr_, localOrder);

View File

@ -182,7 +182,7 @@ void surfaceNoise::readSurfaceData
pBufs.finishedSends(); pBufs.finishedSends();
// Receive data from the master // Receive data from the master
UIPstream fromProc(0, pBufs); UIPstream fromProc(Pstream::masterNo(), pBufs);
scalarList pSlice(fromProc); scalarList pSlice(fromProc);
@ -260,7 +260,7 @@ scalar surfaceNoise::writeSurfaceData
if (!Pstream::master()) if (!Pstream::master())
{ {
UOPstream toProc(0, pBufs); UOPstream toProc(Pstream::masterNo(), pBufs);
toProc << data; toProc << data;
} }
@ -375,7 +375,7 @@ scalar surfaceNoise::surfaceAverage
if (!Pstream::master()) if (!Pstream::master())
{ {
UOPstream toProc(0, pBufs); UOPstream toProc(Pstream::masterNo(), pBufs);
toProc << data; toProc << data;
} }

View File

@ -304,8 +304,6 @@ void Foam::meshToMesh::distributeCells
List<labelList>& procLocalFaceIDs List<labelList>& procLocalFaceIDs
) const ) const
{ {
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
points.setSize(Pstream::nProcs()); points.setSize(Pstream::nProcs());
nInternalFaces.setSize(Pstream::nProcs(), 0); nInternalFaces.setSize(Pstream::nProcs(), 0);
faces.setSize(Pstream::nProcs()); faces.setSize(Pstream::nProcs());
@ -317,6 +315,8 @@ void Foam::meshToMesh::distributeCells
procLocalFaceIDs.setSize(Pstream::nProcs());; procLocalFaceIDs.setSize(Pstream::nProcs());;
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
for (const int domain : Pstream::allProcs()) for (const int domain : Pstream::allProcs())
{ {
const labelList& sendElems = map.subMap()[domain]; const labelList& sendElems = map.subMap()[domain];

View File

@ -497,7 +497,7 @@ Foam::DynamicList<Foam::label> Foam::isoAdvection::syncProcPatches
if (Pstream::parRun()) if (Pstream::parRun())
{ {
DynamicList<label> sendRecvProcs; DynamicList<label> neighProcs;
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking); PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Send // Send
@ -507,7 +507,7 @@ Foam::DynamicList<Foam::label> Foam::isoAdvection::syncProcPatches
refCast<const processorPolyPatch>(patches[patchi]); refCast<const processorPolyPatch>(patches[patchi]);
const label nbrProci = procPatch.neighbProcNo(); const label nbrProci = procPatch.neighbProcNo();
sendRecvProcs.append(nbrProci); neighProcs.append(nbrProci);
UOPstream toNbr(nbrProci, pBufs); UOPstream toNbr(nbrProci, pBufs);
const scalarField& pFlux = dVf.boundaryField()[patchi]; const scalarField& pFlux = dVf.boundaryField()[patchi];
@ -523,8 +523,8 @@ Foam::DynamicList<Foam::label> Foam::isoAdvection::syncProcPatches
toNbr << surfCellFacesOnProcPatch << dVfPatch; toNbr << surfCellFacesOnProcPatch << dVfPatch;
} }
// Limit exchange to involved procs // Limited to involved neighbour procs
pBufs.finishedSends(sendRecvProcs, sendRecvProcs); pBufs.finishedNeighbourSends(neighProcs);
// Receive and combine // Receive and combine