diff --git a/applications/test/parallel-nbx2/Make/files b/applications/test/parallel-nbx2/Make/files
new file mode 100644
index 0000000000..262936c21f
--- /dev/null
+++ b/applications/test/parallel-nbx2/Make/files
@@ -0,0 +1,3 @@
+Test-parallel-nbx2.C
+
+EXE = $(FOAM_USER_APPBIN)/Test-parallel-nbx2
diff --git a/applications/test/parallel-nbx2/Make/options b/applications/test/parallel-nbx2/Make/options
new file mode 100644
index 0000000000..18e6fe47af
--- /dev/null
+++ b/applications/test/parallel-nbx2/Make/options
@@ -0,0 +1,2 @@
+/* EXE_INC = */
+/* EXE_LIBS = */
diff --git a/applications/test/parallel-nbx2/Test-parallel-nbx2.C b/applications/test/parallel-nbx2/Test-parallel-nbx2.C
new file mode 100644
index 0000000000..27bb661e0d
--- /dev/null
+++ b/applications/test/parallel-nbx2/Test-parallel-nbx2.C
@@ -0,0 +1,227 @@
+/*---------------------------------------------------------------------------*\
+ ========= |
+ \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
+ \\ / O peration |
+ \\ / A nd | www.openfoam.com
+ \\/ M anipulation |
+-------------------------------------------------------------------------------
+ Copyright (C) 2023 OpenCFD Ltd.
+-------------------------------------------------------------------------------
+License
+ This file is part of OpenFOAM.
+
+ OpenFOAM is free software: you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with OpenFOAM. If not, see .
+
+Application
+ Test-parallel-nbx2
+
+Description
+ Test for send/receive data
+
+\*---------------------------------------------------------------------------*/
+
+#include "List.H"
+#include "argList.H"
+#include "Time.H"
+#include "IPstream.H"
+#include "OPstream.H"
+#include "IOstreams.H"
+#include "Random.H"
+
+using namespace Foam;
+
+// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
+
+int main(int argc, char *argv[])
+{
+ argList::noCheckProcessorDirectories();
+ argList::addBoolOption("non-blocking", "Test with non-blocking receives");
+
+ #include "setRootCase.H"
+
+ const bool optNonBlocking = args.found("non-blocking");
+
+ if (!Pstream::parRun())
+ {
+ Info<< "\nWarning: not parallel - skipping further tests\n" << endl;
+ return 0;
+ }
+
+ Info<< "\nTesting with non-blocking receives: " << optNonBlocking << nl;
+
+
+ const int tag = (UPstream::msgType() + 314159);
+ const label comm = UPstream::worldComm;
+
+ Random rnd(20*UPstream::myProcNo());
+
+ // Looks a bit like a DIY PstreamBuffers...
+ Map> sendBufs;
+ Map> recvBufs;
+
+ DynamicList sendRequests(10);
+ DynamicList recvRequests(10);
+
+ if (!Pstream::master())
+ {
+ // Send some random length to master
+
+ const int toProci = UPstream::masterNo();
+
+ label len = rnd.position(10, 20);
+ if (UPstream::myProcNo() && (UPstream::myProcNo() % 3) == 0) len = 0;
+
+ scalarField fld(len, scalar(UPstream::myProcNo()));
+
+ // Format for sending
+ if (!fld.empty())
+ {
+ auto& buf = sendBufs(toProci);
+ UOPstream os(buf);
+ os << fld;
+ }
+
+ // Start nonblocking synchronous send to process dest
+
+ if (sendBufs.found(toProci) && !sendBufs[toProci].empty())
+ {
+ Pout<< "send: [" << sendBufs[toProci].size() << " bytes] "
+ << flatOutput(fld) << endl;
+
+ // Has data to send
+ UOPstream::write
+ (
+ sendRequests.emplace_back(),
+ UPstream::masterNo(),
+ sendBufs[toProci],
+ tag,
+ comm,
+ UPstream::sendModes::sync
+ );
+ }
+ }
+
+
+ // Probe and receive
+
+ UPstream::Request barrierReq;
+
+ for (bool barrier_active = false, done = false; !done; /*nil*/)
+ {
+ std::pair probed =
+ UPstream::probeMessage
+ (
+ UPstream::commsTypes::nonBlocking,
+ -1, // ANY_SOURCE
+ tag,
+ comm
+ );
+
+ if (probed.second > 0)
+ {
+ // Message found and had size: receive it
+
+ const label proci = probed.first;
+ const label count = probed.second;
+
+ if (optNonBlocking)
+ {
+ recvBufs(proci).resize_nocopy(count);
+
+ // Non-blocking read
+ UIPstream::read
+ (
+ recvRequests.emplace_back(),
+ proci,
+ recvBufs[proci],
+ tag,
+ comm
+ );
+ // Pout<< "Done: "
+ // << UPstream::finishedRequests(recvRequests) << endl;
+ }
+ else
+ {
+ IPstream is
+ (
+ UPstream::commsTypes::scheduled,
+ probed.first,
+ probed.second,
+ tag,
+ comm
+ );
+
+ scalarField fld(is);
+
+ Info<< "from [" << probed.first
+ << "] : " << flatOutput(fld) << endl;
+ }
+ }
+
+ if (barrier_active)
+ {
+ // Test barrier for completion
+ if (UPstream::finishedRequest(barrierReq))
+ {
+ done = true;
+ }
+ }
+ else
+ {
+ // Check if all sends have arrived
+ if (UPstream::finishedRequests(sendRequests))
+ {
+ UPstream::barrier(comm, &barrierReq);
+ barrier_active = true;
+ }
+ }
+ }
+
+ Pout<< "pending receives: " << recvRequests.size() << endl;
+
+ // Wait for receives to complete
+ UPstream::waitRequests(recvRequests);
+
+ // It could be we need this type of synchronization point
+ // if the receives are non-blocking
+ if (optNonBlocking)
+ {
+ UPstream::barrier(comm);
+ }
+
+ if (!recvBufs.empty())
+ {
+ Pout<< "Receives from: " << flatOutput(recvBufs.sortedToc()) << endl;
+
+ forAllConstIters(recvBufs, iter)
+ {
+ Pout<< "proc:" << iter.key() << " len:" << iter.val().size() << nl;
+
+ if (!iter.val().empty())
+ {
+ UIPstream is(iter.val());
+ scalarField fld(is);
+
+ Pout<< "recv:" << iter.key()
+ << " : " << flatOutput(fld) << nl;
+ }
+ }
+ }
+
+ Info<< "\nEnd\n" << endl;
+ return 0;
+}
+
+
+// ************************************************************************* //
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPBstreams.C b/src/OpenFOAM/db/IOstreams/Pstreams/IPBstreams.C
index 5540777f09..a43f73116e 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/IPBstreams.C
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPBstreams.C
@@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
- Copyright (C) 2022 OpenCFD Ltd.
+ Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@@ -75,13 +75,12 @@ Foam::IPBstream::IPBstream
commsType,
fromProcNo,
Pstream::transferBuf_,
- transferBufPosition_,
+ UIPstreamBase::storedRecvBufPos_, // Internal only
tag,
comm,
false, // Do not clear Pstream::transferBuf_ if at end
fmt
- ),
- transferBufPosition_(0)
+ )
{}
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H
index 43152570b1..5dda9b6e6e 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H
@@ -56,11 +56,6 @@ class IPstream
public Pstream,
public UIPstream
{
- // Private Data
-
- //- Receive index into Pstream::transferBuf_
- label transferBufPosition_;
-
public:
// Constructors
@@ -90,11 +85,6 @@ class IPBstream
public Pstream,
public UIPBstream
{
- // Private Data
-
- //- Receive index into Pstream::transferBuf_
- label transferBufPosition_;
-
public:
// Constructors
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C b/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C
index caabf931d7..d82a270238 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C
@@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
- Copyright (C) 2022 OpenCFD Ltd.
+ Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@@ -91,6 +91,16 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
}
+Foam::UIPstream::UIPstream
+(
+ const DynamicList& recvBuf,
+ IOstreamOption::streamFormat fmt
+)
+:
+ UIPstreamBase(recvBuf, fmt)
+{}
+
+
Foam::IPstream::IPstream
(
const UPstream::commsTypes commsType,
@@ -107,13 +117,12 @@ Foam::IPstream::IPstream
commsType,
fromProcNo,
Pstream::transferBuf_,
- transferBufPosition_,
+ UIPstreamBase::storedRecvBufPos_, // Internal only
tag,
comm,
false, // Do not clear Pstream::transferBuf_ if at end
fmt
- ),
- transferBufPosition_(0)
+ )
{}
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/OPstreams.C b/src/OpenFOAM/db/IOstreams/Pstreams/OPstreams.C
index 9f1d70f42f..fa913a868c 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/OPstreams.C
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/OPstreams.C
@@ -52,6 +52,16 @@ Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers)
{}
+Foam::UOPstream::UOPstream
+(
+ DynamicList& sendBuf,
+ IOstreamOption::streamFormat fmt
+)
+:
+ UOPstreamBase(sendBuf, fmt)
+{}
+
+
Foam::OPstream::OPstream
(
const UPstream::commsTypes commsType,
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H
index bcb989cf3a..68e3f26bdc 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H
@@ -85,19 +85,30 @@ protected:
// Protected Data
- int fromProcNo_;
-
- DynamicList& recvBuf_;
-
- label& recvBufPos_;
+ //- Source rank for the data
+ const int fromProcNo_;
+ //- Message tag for communication
const int tag_;
- const label comm_;
+ //- The communicator index
+ const int comm_;
+ //- The message size, read on bufferIPCrecv or set directly
+ int messageSize_;
+
+ //- Receive position in buffer data, if ony
+ //- If there is no external location for recvBufPos_
+ label storedRecvBufPos_;
+
+ //- Clear the receive buffer on termination (in the destructor)
const bool clearAtEnd_;
- int messageSize_;
+ //- Reference to the receive buffer data
+ DynamicList& recvBuf_;
+
+ //- Reference to the receive position in buffer data
+ label& recvBufPos_;
// Protected Constructors
@@ -120,10 +131,17 @@ protected:
//- Construct given buffers
UIPstreamBase(const int fromProcNo, PstreamBuffers& buffers);
+ //- Construct for an externally obtained buffer.
+ // The parameter is allowed to be const (since reading will not
+ // affect it), but must reference a concrete variable.
+ UIPstreamBase
+ (
+ const DynamicList& receiveBuf,
+ IOstreamOption::streamFormat fmt
+ );
public:
-
//- Destructor. Optionally clears external receive buffer.
virtual ~UIPstreamBase();
@@ -238,6 +256,16 @@ public:
//- Construct given buffers
UIPstream(const int fromProcNo, PstreamBuffers& buffers);
+ //- Construct for reading from a standalone buffer that has
+ //- been obtained externally by the caller.
+ // The parameter is allowed to be const (since reading will not
+ // affect it), but must reference a concrete variable.
+ explicit UIPstream
+ (
+ const DynamicList& recvBuf,
+ IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
+ );
+
//- Destructor
virtual ~UIPstream() = default;
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C
index 228332528f..97685b3022 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C
@@ -159,12 +159,13 @@ Foam::UIPstreamBase::UIPstreamBase
UPstream(commsType),
Istream(fmt),
fromProcNo_(fromProcNo),
- recvBuf_(receiveBuf),
- recvBufPos_(receiveBufPosition),
tag_(tag),
comm_(comm),
+ messageSize_(0),
+ storedRecvBufPos_(0),
clearAtEnd_(clearAtEnd),
- messageSize_(0)
+ recvBuf_(receiveBuf),
+ recvBufPos_(receiveBufPosition)
{
setOpened();
setGood();
@@ -180,12 +181,13 @@ Foam::UIPstreamBase::UIPstreamBase
UPstream(buffers.commsType()),
Istream(buffers.format()),
fromProcNo_(fromProcNo),
- recvBuf_(buffers.accessRecvBuffer(fromProcNo)),
- recvBufPos_(buffers.accessRecvPosition(fromProcNo)),
tag_(buffers.tag()),
comm_(buffers.comm()),
+ messageSize_(0),
+ storedRecvBufPos_(0),
clearAtEnd_(buffers.allowClearRecv()),
- messageSize_(0)
+ recvBuf_(buffers.accessRecvBuffer(fromProcNo)),
+ recvBufPos_(buffers.accessRecvPosition(fromProcNo))
{
if
(
@@ -205,6 +207,32 @@ Foam::UIPstreamBase::UIPstreamBase
}
+Foam::UIPstreamBase::UIPstreamBase
+(
+ const DynamicList& receiveBuf,
+ IOstreamOption::streamFormat fmt
+)
+:
+ UPstream(UPstream::commsTypes::nonBlocking), // placeholder
+ Istream(fmt),
+ fromProcNo_(UPstream::masterNo()), // placeholder
+ tag_(UPstream::msgType()), // placeholder
+ comm_(UPstream::selfComm), // placeholder
+ messageSize_(receiveBuf.size()), // Message == buffer
+ storedRecvBufPos_(0),
+ clearAtEnd_(false), // Do not clear recvBuf if at end!!
+ recvBuf_
+ (
+ // The receive buffer is never modified with this code path
+ const_cast&>(receiveBuf)
+ ),
+ recvBufPos_(storedRecvBufPos_) // Internal reference
+{
+ setOpened();
+ setGood();
+}
+
+
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::UIPstreamBase::~UIPstreamBase()
@@ -517,8 +545,7 @@ void Foam::UIPstreamBase::print(Ostream& os) const
{
os << "Reading from processor " << fromProcNo_
<< " using communicator " << comm_
- << " and tag " << tag_
- << Foam::endl;
+ << " and tag " << tag_ << Foam::endl;
}
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H
index a3f86d3281..02f6f85037 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H
@@ -92,16 +92,21 @@ protected:
// Protected Data
- int toProcNo_;
-
- DynamicList& sendBuf_;
+ //- Destination rank for the data
+ const int toProcNo_;
+ //- Message tag for communication
const int tag_;
- const label comm_;
+ //- The communicator index
+ const int comm_;
+ //- Call bufferIPCsend on termination (in the destructor)
const bool sendAtDestruct_;
+ //- Reference to the send buffer data
+ DynamicList& sendBuf_;
+
// Protected Constructors
@@ -122,6 +127,12 @@ protected:
//- Construct given buffers
UOPstreamBase(const int toProcNo, PstreamBuffers& buffers);
+ //- Construct for externally obtained buffers
+ UOPstreamBase
+ (
+ DynamicList& sendBuf,
+ IOstreamOption::streamFormat fmt
+ );
public:
@@ -310,6 +321,14 @@ public:
//- Construct given buffers
UOPstream(const int toProcNo, PstreamBuffers& buffers);
+ //- Construct for writing into a standalone buffer.
+ //- Data transfer is handled externally by the caller.
+ explicit UOPstream
+ (
+ DynamicList& sendBuf,
+ IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
+ );
+
//- Destructor, usually sends buffer on destruct.
virtual ~UOPstream();
diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C
index 73d4c83939..b865858bad 100644
--- a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C
+++ b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C
@@ -145,10 +145,10 @@ Foam::UOPstreamBase::UOPstreamBase
UPstream(commsType),
Ostream(fmt),
toProcNo_(toProcNo),
- sendBuf_(sendBuf),
tag_(tag),
comm_(comm),
- sendAtDestruct_(sendAtDestruct)
+ sendAtDestruct_(sendAtDestruct),
+ sendBuf_(sendBuf)
{
setOpened();
setGood();
@@ -160,16 +160,36 @@ Foam::UOPstreamBase::UOPstreamBase(const int toProcNo, PstreamBuffers& buffers)
UPstream(buffers.commsType()),
Ostream(buffers.format()),
toProcNo_(toProcNo),
- sendBuf_(buffers.accessSendBuffer(toProcNo)),
tag_(buffers.tag()),
comm_(buffers.comm()),
- sendAtDestruct_(buffers.commsType() != UPstream::commsTypes::nonBlocking)
+ sendAtDestruct_(buffers.commsType() != UPstream::commsTypes::nonBlocking),
+ sendBuf_(buffers.accessSendBuffer(toProcNo))
{
setOpened();
setGood();
}
+Foam::UOPstreamBase::UOPstreamBase
+(
+ DynamicList& sendBuf,
+ IOstreamOption::streamFormat fmt
+)
+:
+ UPstream(UPstream::commsTypes::nonBlocking), // placeholder
+ Ostream(fmt),
+ toProcNo_(UPstream::masterNo()), // placeholder
+ tag_(UPstream::msgType()), // placeholder
+ comm_(UPstream::selfComm), // placeholder
+ sendAtDestruct_(false), // Never sendAtDestruct!!
+ sendBuf_(sendBuf)
+{
+ sendBuf_.clear(); // Overwrite into buffer
+ setOpened();
+ setGood();
+}
+
+
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::UOPstreamBase::~UOPstreamBase()
@@ -394,8 +414,8 @@ void Foam::UOPstreamBase::rewind()
void Foam::UOPstreamBase::print(Ostream& os) const
{
- os << "Writing from processor " << toProcNo_
- << " to processor " << myProcNo() << " in communicator " << comm_
+ os << "Writing to processor " << toProcNo_
+ << " from processor " << myProcNo() << " in communicator " << comm_
<< " and tag " << tag_ << Foam::endl;
}