ENH: support UOPstream and UIPstream as formatters with an external buffer

- can split serialise/de-serialise and send/recv actions
This commit is contained in:
Mark Olesen
2023-02-08 17:13:44 +01:00
parent 70d310329c
commit add61ca273
11 changed files with 378 additions and 44 deletions

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -75,13 +75,12 @@ Foam::IPBstream::IPBstream
commsType,
fromProcNo,
Pstream::transferBuf_,
transferBufPosition_,
UIPstreamBase::storedRecvBufPos_, // Internal only
tag,
comm,
false, // Do not clear Pstream::transferBuf_ if at end
fmt
),
transferBufPosition_(0)
)
{}

View File

@ -56,11 +56,6 @@ class IPstream
public Pstream,
public UIPstream
{
// Private Data
//- Receive index into Pstream::transferBuf_
label transferBufPosition_;
public:
// Constructors
@ -90,11 +85,6 @@ class IPBstream
public Pstream,
public UIPBstream
{
// Private Data
//- Receive index into Pstream::transferBuf_
label transferBufPosition_;
public:
// Constructors

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -91,6 +91,16 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
}
Foam::UIPstream::UIPstream
(
const DynamicList<char>& recvBuf,
IOstreamOption::streamFormat fmt
)
:
UIPstreamBase(recvBuf, fmt)
{}
Foam::IPstream::IPstream
(
const UPstream::commsTypes commsType,
@ -107,13 +117,12 @@ Foam::IPstream::IPstream
commsType,
fromProcNo,
Pstream::transferBuf_,
transferBufPosition_,
UIPstreamBase::storedRecvBufPos_, // Internal only
tag,
comm,
false, // Do not clear Pstream::transferBuf_ if at end
fmt
),
transferBufPosition_(0)
)
{}

View File

@ -52,6 +52,16 @@ Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers)
{}
Foam::UOPstream::UOPstream
(
DynamicList<char>& sendBuf,
IOstreamOption::streamFormat fmt
)
:
UOPstreamBase(sendBuf, fmt)
{}
Foam::OPstream::OPstream
(
const UPstream::commsTypes commsType,

View File

@ -85,19 +85,30 @@ protected:
// Protected Data
int fromProcNo_;
DynamicList<char>& recvBuf_;
label& recvBufPos_;
//- Source rank for the data
const int fromProcNo_;
//- Message tag for communication
const int tag_;
const label comm_;
//- The communicator index
const int comm_;
//- The message size, read on bufferIPCrecv or set directly
int messageSize_;
//- Receive position in buffer data, if ony
//- If there is no external location for recvBufPos_
label storedRecvBufPos_;
//- Clear the receive buffer on termination (in the destructor)
const bool clearAtEnd_;
int messageSize_;
//- Reference to the receive buffer data
DynamicList<char>& recvBuf_;
//- Reference to the receive position in buffer data
label& recvBufPos_;
// Protected Constructors
@ -120,10 +131,17 @@ protected:
//- Construct given buffers
UIPstreamBase(const int fromProcNo, PstreamBuffers& buffers);
//- Construct for an externally obtained buffer.
// The parameter is allowed to be const (since reading will not
// affect it), but must reference a concrete variable.
UIPstreamBase
(
const DynamicList<char>& receiveBuf,
IOstreamOption::streamFormat fmt
);
public:
//- Destructor. Optionally clears external receive buffer.
virtual ~UIPstreamBase();
@ -238,6 +256,16 @@ public:
//- Construct given buffers
UIPstream(const int fromProcNo, PstreamBuffers& buffers);
//- Construct for reading from a standalone buffer that has
//- been obtained externally by the caller.
// The parameter is allowed to be const (since reading will not
// affect it), but must reference a concrete variable.
explicit UIPstream
(
const DynamicList<char>& recvBuf,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
//- Destructor
virtual ~UIPstream() = default;

View File

@ -159,12 +159,13 @@ Foam::UIPstreamBase::UIPstreamBase
UPstream(commsType),
Istream(fmt),
fromProcNo_(fromProcNo),
recvBuf_(receiveBuf),
recvBufPos_(receiveBufPosition),
tag_(tag),
comm_(comm),
messageSize_(0),
storedRecvBufPos_(0),
clearAtEnd_(clearAtEnd),
messageSize_(0)
recvBuf_(receiveBuf),
recvBufPos_(receiveBufPosition)
{
setOpened();
setGood();
@ -180,12 +181,13 @@ Foam::UIPstreamBase::UIPstreamBase
UPstream(buffers.commsType()),
Istream(buffers.format()),
fromProcNo_(fromProcNo),
recvBuf_(buffers.accessRecvBuffer(fromProcNo)),
recvBufPos_(buffers.accessRecvPosition(fromProcNo)),
tag_(buffers.tag()),
comm_(buffers.comm()),
messageSize_(0),
storedRecvBufPos_(0),
clearAtEnd_(buffers.allowClearRecv()),
messageSize_(0)
recvBuf_(buffers.accessRecvBuffer(fromProcNo)),
recvBufPos_(buffers.accessRecvPosition(fromProcNo))
{
if
(
@ -205,6 +207,32 @@ Foam::UIPstreamBase::UIPstreamBase
}
Foam::UIPstreamBase::UIPstreamBase
(
const DynamicList<char>& receiveBuf,
IOstreamOption::streamFormat fmt
)
:
UPstream(UPstream::commsTypes::nonBlocking), // placeholder
Istream(fmt),
fromProcNo_(UPstream::masterNo()), // placeholder
tag_(UPstream::msgType()), // placeholder
comm_(UPstream::selfComm), // placeholder
messageSize_(receiveBuf.size()), // Message == buffer
storedRecvBufPos_(0),
clearAtEnd_(false), // Do not clear recvBuf if at end!!
recvBuf_
(
// The receive buffer is never modified with this code path
const_cast<DynamicList<char>&>(receiveBuf)
),
recvBufPos_(storedRecvBufPos_) // Internal reference
{
setOpened();
setGood();
}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::UIPstreamBase::~UIPstreamBase()
@ -517,8 +545,7 @@ void Foam::UIPstreamBase::print(Ostream& os) const
{
os << "Reading from processor " << fromProcNo_
<< " using communicator " << comm_
<< " and tag " << tag_
<< Foam::endl;
<< " and tag " << tag_ << Foam::endl;
}

View File

@ -92,16 +92,21 @@ protected:
// Protected Data
int toProcNo_;
DynamicList<char>& sendBuf_;
//- Destination rank for the data
const int toProcNo_;
//- Message tag for communication
const int tag_;
const label comm_;
//- The communicator index
const int comm_;
//- Call bufferIPCsend on termination (in the destructor)
const bool sendAtDestruct_;
//- Reference to the send buffer data
DynamicList<char>& sendBuf_;
// Protected Constructors
@ -122,6 +127,12 @@ protected:
//- Construct given buffers
UOPstreamBase(const int toProcNo, PstreamBuffers& buffers);
//- Construct for externally obtained buffers
UOPstreamBase
(
DynamicList<char>& sendBuf,
IOstreamOption::streamFormat fmt
);
public:
@ -310,6 +321,14 @@ public:
//- Construct given buffers
UOPstream(const int toProcNo, PstreamBuffers& buffers);
//- Construct for writing into a standalone buffer.
//- Data transfer is handled externally by the caller.
explicit UOPstream
(
DynamicList<char>& sendBuf,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
//- Destructor, usually sends buffer on destruct.
virtual ~UOPstream();

View File

@ -145,10 +145,10 @@ Foam::UOPstreamBase::UOPstreamBase
UPstream(commsType),
Ostream(fmt),
toProcNo_(toProcNo),
sendBuf_(sendBuf),
tag_(tag),
comm_(comm),
sendAtDestruct_(sendAtDestruct)
sendAtDestruct_(sendAtDestruct),
sendBuf_(sendBuf)
{
setOpened();
setGood();
@ -160,16 +160,36 @@ Foam::UOPstreamBase::UOPstreamBase(const int toProcNo, PstreamBuffers& buffers)
UPstream(buffers.commsType()),
Ostream(buffers.format()),
toProcNo_(toProcNo),
sendBuf_(buffers.accessSendBuffer(toProcNo)),
tag_(buffers.tag()),
comm_(buffers.comm()),
sendAtDestruct_(buffers.commsType() != UPstream::commsTypes::nonBlocking)
sendAtDestruct_(buffers.commsType() != UPstream::commsTypes::nonBlocking),
sendBuf_(buffers.accessSendBuffer(toProcNo))
{
setOpened();
setGood();
}
Foam::UOPstreamBase::UOPstreamBase
(
DynamicList<char>& sendBuf,
IOstreamOption::streamFormat fmt
)
:
UPstream(UPstream::commsTypes::nonBlocking), // placeholder
Ostream(fmt),
toProcNo_(UPstream::masterNo()), // placeholder
tag_(UPstream::msgType()), // placeholder
comm_(UPstream::selfComm), // placeholder
sendAtDestruct_(false), // Never sendAtDestruct!!
sendBuf_(sendBuf)
{
sendBuf_.clear(); // Overwrite into buffer
setOpened();
setGood();
}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::UOPstreamBase::~UOPstreamBase()
@ -394,8 +414,8 @@ void Foam::UOPstreamBase::rewind()
void Foam::UOPstreamBase::print(Ostream& os) const
{
os << "Writing from processor " << toProcNo_
<< " to processor " << myProcNo() << " in communicator " << comm_
os << "Writing to processor " << toProcNo_
<< " from processor " << myProcNo() << " in communicator " << comm_
<< " and tag " << tag_ << Foam::endl;
}