diff --git a/applications/test/processorTopology/Make/files b/applications/test/processorTopology/Make/files
new file mode 100644
index 0000000000..c9d5a6bbe0
--- /dev/null
+++ b/applications/test/processorTopology/Make/files
@@ -0,0 +1,3 @@
+Test-processorTopology.C
+
+EXE = $(FOAM_USER_APPBIN)/Test-processorTopology
diff --git a/applications/test/processorTopology/Make/options b/applications/test/processorTopology/Make/options
new file mode 100644
index 0000000000..18e6fe47af
--- /dev/null
+++ b/applications/test/processorTopology/Make/options
@@ -0,0 +1,2 @@
+/* EXE_INC = */
+/* EXE_LIBS = */
diff --git a/applications/test/processorTopology/Test-processorTopology.C b/applications/test/processorTopology/Test-processorTopology.C
new file mode 100644
index 0000000000..1d8d7fab1a
--- /dev/null
+++ b/applications/test/processorTopology/Test-processorTopology.C
@@ -0,0 +1,113 @@
+/*---------------------------------------------------------------------------*\
+ ========= |
+ \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
+ \\ / O peration |
+ \\ / A nd | www.openfoam.com
+ \\/ M anipulation |
+-------------------------------------------------------------------------------
+ Copyright (C) 2022 OpenCFD Ltd.
+-------------------------------------------------------------------------------
+License
+ This file is part of OpenFOAM.
+
+ OpenFOAM is free software: you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with OpenFOAM. If not, see .
+
+Description
+ Test/output processor topology
+
+\*---------------------------------------------------------------------------*/
+
+#include "argList.H"
+#include "Time.H"
+#include "polyMesh.H"
+#include "globalMeshData.H"
+#include "OFstream.H"
+
+using namespace Foam;
+
+// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
+// Main program:
+
+int main(int argc, char *argv[])
+{
+ argList::noFunctionObjects();
+ argList::addNote
+ (
+ "Create a metis graph file representation for an OpenFOAM mesh"
+ );
+
+ #include "setRootCase.H"
+
+ if (!Pstream::parRun())
+ {
+ FatalErrorInFunction
+ << "Only meaningful in parallel"
+ << exit(FatalError);
+ }
+
+ #include "createTime.H"
+ #include "createPolyMesh.H"
+
+ const globalMeshData& globData = mesh.globalData();
+
+ const labelListList& connectivity = globData;
+
+ if (Pstream::master())
+ {
+ OFstream os("processorTopology.dot");
+
+ os << "// processorTopology" << nl << nl;
+ os.beginBlock("graph");
+
+ forAll(connectivity, proci)
+ {
+ label nconn = 0;
+
+ for (const label neighProci : connectivity[proci])
+ {
+ if (proci < neighProci)
+ {
+ if (nconn++)
+ {
+ os << " ";
+ }
+ else
+ {
+ os << indent;
+ }
+ os << proci << " -- " << neighProci;
+ }
+ }
+
+ if (nconn)
+ {
+ os << nl;
+ }
+ }
+
+ os.endBlock();
+
+ Info<< "Wrote processorTopology graph: "
+ << runTime.relativePath(os.name()) << nl;
+
+ Info<< nl
+ << "Use neato, circo or fdp graphviz tools" << nl;
+ }
+
+ Info<< nl << "End\n" << endl;
+
+ return 0;
+}
+
+// ************************************************************************* //
diff --git a/applications/utilities/parallelProcessing/redistributePar/parLagrangianRedistributor.C b/applications/utilities/parallelProcessing/redistributePar/parLagrangianRedistributor.C
index 73eaf42958..e7e0bfe0fd 100644
--- a/applications/utilities/parallelProcessing/redistributePar/parLagrangianRedistributor.C
+++ b/applications/utilities/parallelProcessing/redistributePar/parLagrangianRedistributor.C
@@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2015 OpenFOAM Foundation
- Copyright (C) 2015-2018 OpenCFD Ltd.
+ Copyright (C) 2015-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@@ -185,9 +185,8 @@ Foam::parLagrangianRedistributor::redistributeLagrangianPositions
}
- // Start sending. Sets number of bytes transferred
- labelList allNTrans(Pstream::nProcs());
- pBufs.finishedSends(allNTrans);
+ // Start sending
+ pBufs.finishedSends();
{
@@ -205,18 +204,15 @@ Foam::parLagrangianRedistributor::redistributeLagrangianPositions
IDLList()
);
-
// Retrieve from receive buffers
- forAll(allNTrans, procI)
+ for (const int proci : pBufs.allProcs())
{
- const label nRec = allNTrans[procI];
+ //Pout<< "Receive from processor" << proci << " : "
+ // << pBufs.hasRecvData(proci) << endl;
- //Pout<< "From processor " << procI << " receiving bytes " << nRec
- // << endl;
-
- if (nRec)
+ if (pBufs.hasRecvData(proci))
{
- UIPstream particleStream(procI, pBufs);
+ UIPstream particleStream(proci, pBufs);
// Receive particles and locate them
IDLList newParticles
diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C
index f91e5d83a9..0c05da957a 100644
--- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C
+++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C
@@ -394,8 +394,7 @@ bool Foam::decomposedBlockData::readBlocks
}
}
- labelList recvSizes;
- pBufs.finishedSends(recvSizes);
+ pBufs.finishedSends();
if (!UPstream::master(comm))
{
@@ -524,8 +523,7 @@ Foam::autoPtr Foam::decomposedBlockData::readBlocks
ok = is.good();
}
- labelList recvSizes;
- pBufs.finishedSends(recvSizes);
+ pBufs.finishedSends();
if (!UPstream::master(comm))
{
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C
index e2f73c5e03..07e7eccc49 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C
@@ -33,10 +33,11 @@ License
void Foam::PstreamBuffers::finalExchange
(
labelList& recvSizes,
- const bool block
+ const bool wait
)
{
// Could also check that it is not called twice
+ // but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true;
if (commsType_ == UPstream::commsTypes::nonBlocking)
@@ -51,7 +52,7 @@ void Foam::PstreamBuffers::finalExchange
recvBuf_,
tag_,
comm_,
- block
+ wait
);
}
}
@@ -62,10 +63,11 @@ void Foam::PstreamBuffers::finalExchange
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
- const bool block
+ const bool wait
)
{
// Could also check that it is not called twice
+ // but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true;
if (commsType_ == UPstream::commsTypes::nonBlocking)
@@ -87,7 +89,7 @@ void Foam::PstreamBuffers::finalExchange
recvBuf_,
tag_,
comm_,
- block
+ wait
);
}
}
@@ -104,6 +106,7 @@ Foam::PstreamBuffers::PstreamBuffers
)
:
finishedSendsCalled_(false),
+ allowClearRecv_(true),
format_(fmt),
commsType_(commsType),
tag_(tag),
@@ -151,20 +154,107 @@ void Foam::PstreamBuffers::clear()
}
-void Foam::PstreamBuffers::finishedSends(const bool block)
+void Foam::PstreamBuffers::clearStorage()
+{
+ // Could also clear out entire sendBuf_, recvBuf_ and reallocate.
+ // Not sure if it makes much difference
+ for (DynamicList& buf : sendBuf_)
+ {
+ buf.clearStorage();
+ }
+ for (DynamicList& buf : recvBuf_)
+ {
+ buf.clearStorage();
+ }
+ recvBufPos_ = 0;
+
+ finishedSendsCalled_ = false;
+}
+
+
+bool Foam::PstreamBuffers::hasSendData() const
+{
+ for (const DynamicList& buf : sendBuf_)
+ {
+ if (!buf.empty())
+ {
+ return true;
+ }
+ }
+ return false;
+}
+
+
+bool Foam::PstreamBuffers::hasSendData(const label proci) const
+{
+ return !sendBuf_[proci].empty();
+}
+
+
+bool Foam::PstreamBuffers::hasRecvData() const
+{
+ if (finishedSendsCalled_)
+ {
+ for (const DynamicList& buf : recvBuf_)
+ {
+ if (!buf.empty())
+ {
+ return true;
+ }
+ }
+ }
+ #ifdef FULLDEBUG
+ else
+ {
+ FatalErrorInFunction
+ << "Call finishedSends first" << exit(FatalError);
+ }
+ #endif
+
+ return false;
+}
+
+
+bool Foam::PstreamBuffers::hasRecvData(const label proci) const
+{
+ if (finishedSendsCalled_)
+ {
+ return !recvBuf_[proci].empty();
+ }
+ #ifdef FULLDEBUG
+ else
+ {
+ FatalErrorInFunction
+ << "Call finishedSends first" << exit(FatalError);
+ }
+ #endif
+
+ return false;
+}
+
+
+bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
+{
+ bool old(allowClearRecv_);
+ allowClearRecv_ = on;
+ return old;
+}
+
+
+void Foam::PstreamBuffers::finishedSends(const bool wait)
{
labelList recvSizes;
- finalExchange(recvSizes, block);
+ finalExchange(recvSizes, wait);
}
void Foam::PstreamBuffers::finishedSends
(
labelList& recvSizes,
- const bool block
+ const bool wait
)
{
- finalExchange(recvSizes, block);
+ finalExchange(recvSizes, wait);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@@ -184,11 +274,11 @@ void Foam::PstreamBuffers::finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
- const bool block
+ const bool wait
)
{
labelList recvSizes;
- finalExchange(sendProcs, recvProcs, recvSizes, block);
+ finalExchange(sendProcs, recvProcs, recvSizes, wait);
}
@@ -197,10 +287,10 @@ void Foam::PstreamBuffers::finishedSends
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
- const bool block
+ const bool wait
)
{
- finalExchange(sendProcs, recvProcs, recvSizes, block);
+ finalExchange(sendProcs, recvProcs, recvSizes, wait);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H
index 193567813b..2fec28c518 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H
@@ -90,7 +90,7 @@ class PstreamBuffers
{
// Friendship
friend class UOPstreamBase; // Access to sendBuf_
- friend class UIPstreamBase; // Access to recvBuf_ recvBufPos_;
+ friend class UIPstreamBase; // Access to recvBuf_, recvBufPos_
// Private Data
@@ -98,6 +98,9 @@ class PstreamBuffers
//- Track if sends are complete
bool finishedSendsCalled_;
+ //- Permit clear of individual receive buffer by external access
+ bool allowClearRecv_;
+
//- Buffer format (ascii | binary)
const IOstreamOption::streamFormat format_;
@@ -110,30 +113,31 @@ class PstreamBuffers
//- Communicator
const label comm_;
- //- Send buffer
+ //- Send buffer. Size is nProcs()
List> sendBuf_;
- //- Receive buffer
+ //- Receive buffer. Size is nProcs()
List> recvBuf_;
- //- Current read positions within recvBuf_
+ //- Current read positions within recvBuf_. Size is nProcs()
labelList recvBufPos_;
// Private Member Functions
//- Mark all sends as having been done.
- // This will start receives in non-blocking mode.
- void finalExchange(labelList& recvSizes, const bool block);
+ // This will start receives (nonBlocking comms).
+ void finalExchange(labelList& recvSizes, const bool wait);
- //- Mark all sends as having been done.
- // Only exchange sizes between sendProcs/recvProcs
+ //- Mark sends as done.
+ // Only exchange sizes using the sendProcs/recvProcs subset
+ // (nonBlocking comms).
void finalExchange
(
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
- const bool block
+ const bool wait
);
@@ -183,54 +187,157 @@ public:
return comm_;
}
- //- True if finishedSends has been called
+
+ // Sizing
+
+ //- Number of ranks associated with PstreamBuffers
+ label nProcs() const noexcept
+ {
+ return recvBufPos_.size();
+ }
+
+ //- Range of ranks indices associated with PstreamBuffers
+ UPstream::rangeType allProcs() const noexcept
+ {
+ // Proc 0 -> nProcs (int value)
+ return UPstream::rangeType(static_cast(nProcs()));
+ }
+
+
+ // Queries
+
+ //- True if finishedSends() or finishedNeighbourSends() has been called
bool finished() const noexcept
{
return finishedSendsCalled_;
}
+ //- True if any (local) send buffers have data
+ bool hasSendData() const;
+
+ //- True if (local) send buffer has data on specified processor.
+ bool hasSendData(const label proci) const;
+
+ //- True if any (local) recv buffers have data.
+ //- Must call finishedSends() or finishedNeighbourSends() first!
+ bool hasRecvData() const;
+
+ //- True if (local) recv buffer has data on specified processor.
+ //- Must call finishedSends() or finishedNeighbourSends() first!
+ bool hasRecvData(const label proci) const;
+
+ //- Is clearStorage of individual receive buffer by external hooks
+ //- allowed? (default: true)
+ bool allowClearRecv() const noexcept
+ {
+ return allowClearRecv_;
+ }
+
// Edit
- //- Reset (clear) individual buffers and reset state.
- // Does not clear buffer storage
+ //- Clear individual buffers and reset states.
+ // Does not clear individual buffer storage.
void clear();
+ //- Clear individual buffer storage and reset states.
+ void clearStorage();
+
+ //- Change allowClearRecv, return previous value
+ bool allowClearRecv(bool on) noexcept;
+
// Functions
- //- Mark all sends as having been done.
- // This will start receives in non-blocking mode.
- // If block will wait for all transfers to finish
- // (only relevant for nonBlocking mode)
- void finishedSends(const bool block = true);
+ //- Mark sends as done
+ //
+ // Non-blocking mode: populates receive buffers (all-to-all).
+ // \param wait wait for requests to complete (in nonBlocking mode)
+ void finishedSends(const bool wait = true);
- //- Mark all sends as having been done.
- // Same as above but also returns sizes (bytes) received.
- // \note currently only valid for non-blocking.
- void finishedSends(labelList& recvSizes, const bool block = true);
+ //- Mark sends as done.
+ //- Recovers the sizes (bytes) received.
+ //
+ // Non-blocking mode: populates receive buffers (all-to-all).
+ // \param[out] recvSizes the sizes (bytes) received
+ // \param wait wait for requests to complete (in nonBlocking mode)
+ //
+ // \warning currently only valid for nonBlocking comms.
+ void finishedSends(labelList& recvSizes, const bool wait = true);
- //- Mark all sends as having been done, with limited
- //- send/recv processor connections.
- // \note currently only valid for non-blocking.
+ //- Mark sends as done using subset of send/recv ranks
+ //- to exchange data on.
+ //
+ // Non-blocking mode: populates receive buffers.
+ // \param sendProcs ranks used for sends
+ // \param recvProcs ranks used for recvs
+ // \param wait wait for requests to complete (in nonBlocking mode)
+ //
+ // \warning currently only valid for nonBlocking comms.
void finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
- const bool block = true
+ const bool wait = true
);
- //- Mark all sends as having been done, with limited
- //- send/recv processor connections.
- // Same as above but also returns sizes (bytes) received.
- // \note currently only valid for non-blocking.
+ //- Mark sends as done using subset of send/recv ranks
+ //- to exchange data on. Recovers the sizes (bytes) received.
+ //
+ // Non-blocking mode: populates receive buffers.
+ //
+ // \param sendProcs ranks used for sends
+ // \param recvProcs ranks used for recvs
+ // \param[out] recvSizes the sizes (bytes) received
+ // \param wait wait for requests to complete (in nonBlocking mode)
+ //
+ // \warning currently only valid for nonBlocking comms.
void finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
- const bool block = true
+ const bool wait = true
);
+
+ //- Mark sends as done using subset of send/recv ranks
+ //- and recover the sizes (bytes) received.
+ //
+ // Non-blocking mode: populates receive buffers.
+ //
+ // \param neighProcs ranks used for sends/recvs
+ // \param wait wait for requests to complete (in nonBlocking mode)
+ //
+ // \warning currently only valid for nonBlocking comms.
+ // \note Same as finishedSends with identical sendProcs/recvProcs
+ void finishedNeighbourSends
+ (
+ const labelUList& neighProcs,
+ const bool wait = true
+ )
+ {
+ finishedSends(neighProcs, neighProcs, wait);
+ }
+
+ //- Mark sends as done using subset of send/recv ranks
+ //- and recover the sizes (bytes) received.
+ //
+ // Non-blocking mode: it will populate receive buffers.
+ //
+ // \param neighProcs ranks used for sends/recvs
+ // \param[out] recvSizes the sizes (bytes) received
+ // \param wait wait for requests to complete (in nonBlocking mode)
+ //
+ // \warning currently only valid for nonBlocking mode.
+ void finishedNeighbourSends
+ (
+ const labelUList& neighProcs,
+ labelList& recvSizes,
+ const bool wait = true
+ )
+ {
+ finishedSends(neighProcs, neighProcs, recvSizes, wait);
+ }
};
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C
index e7d8266f32..e733ba8124 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C
@@ -184,7 +184,7 @@ Foam::UIPstreamBase::UIPstreamBase
recvBufPos_(buffers.recvBufPos_[fromProcNo]),
tag_(buffers.tag()),
comm_(buffers.comm()),
- clearAtEnd_(true),
+ clearAtEnd_(buffers.allowClearRecv()),
messageSize_(0)
{
if
diff --git a/src/OpenFOAM/meshes/polyMesh/syncTools/syncToolsTemplates.C b/src/OpenFOAM/meshes/polyMesh/syncTools/syncToolsTemplates.C
index bbbf34f0ad..9f16a7c17b 100644
--- a/src/OpenFOAM/meshes/polyMesh/syncTools/syncToolsTemplates.C
+++ b/src/OpenFOAM/meshes/polyMesh/syncTools/syncToolsTemplates.C
@@ -127,7 +127,7 @@ void Foam::syncTools::syncPointMap
if (Pstream::parRun())
{
- DynamicList