ENH: improved isolation of MPI access in Pstreams

- refactor as an MPI-independent base class.

  Add bufferIPC{send,recv} private methods for construct/destruct.
  Eliminates code duplication from two constructor forms and reduces
  additional constructor definitions in dummy library.

- add PstreamBuffers access methods, refactor common finish sends
  code, tweak member packing

ENH: resize_nocopy for processorLduInterface buffers

- content is immediately overwritten

STYLE: cull unneeded includes in processorFa*

- handled by processorLduInterface
This commit is contained in:
Mark Olesen
2022-02-20 20:01:16 +01:00
committed by Andrew Heather
parent 1bbe558dc5
commit b95b24e4e7
22 changed files with 642 additions and 502 deletions

View File

@ -266,14 +266,14 @@ StringStreams = $(Streams)/StringStreams
$(StringStreams)/StringStream.C $(StringStreams)/StringStream.C
Pstreams = $(Streams)/Pstreams Pstreams = $(Streams)/Pstreams
$(Pstreams)/UIPstream.C /* $(Pstreams)/UPstream.C in global.C */
$(Pstreams)/IPstream.C
/* $(Pstreams)/UPstream.C in global.Cver */
$(Pstreams)/UPstreamCommsStruct.C $(Pstreams)/UPstreamCommsStruct.C
$(Pstreams)/Pstream.C $(Pstreams)/Pstream.C
$(Pstreams)/UOPstream.C
$(Pstreams)/OPstream.C
$(Pstreams)/PstreamBuffers.C $(Pstreams)/PstreamBuffers.C
$(Pstreams)/UIPstreamBase.C
$(Pstreams)/UOPstreamBase.C
$(Pstreams)/IPstreams.C
$(Pstreams)/OPstreams.C
dictionary = db/dictionary dictionary = db/dictionary
$(dictionary)/dictionary.C $(dictionary)/dictionary.C

View File

@ -5,8 +5,7 @@
\\ / A nd | www.openfoam.com \\ / A nd | www.openfoam.com
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2015 OpenFOAM Foundation Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2021 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -24,12 +23,11 @@ License
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Description
Read from UIPstream
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "UIPstream.H" #include "UIPstream.H"
#include "IPstream.H"
#include "IOstreams.H"
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
@ -45,51 +43,78 @@ Foam::UIPstream::UIPstream
IOstreamOption::streamFormat fmt IOstreamOption::streamFormat fmt
) )
: :
UPstream(commsType), UIPstreamBase
Istream(fmt, IOstreamOption::currentVersion), (
fromProcNo_(fromProcNo), commsType,
recvBuf_(receiveBuf), fromProcNo,
recvBufPos_(receiveBufPosition), receiveBuf,
tag_(tag), receiveBufPosition,
comm_(comm), tag,
clearAtEnd_(clearAtEnd), comm,
messageSize_(0) clearAtEnd,
fmt
)
{ {
NotImplemented; if (commsType == commsTypes::nonBlocking)
{
// Message is already received into buffer
}
else
{
bufferIPCrecv();
}
} }
Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers) Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
: :
UPstream(buffers.commsType_), UIPstreamBase(fromProcNo, buffers)
Istream(buffers.format_, IOstreamOption::currentVersion),
fromProcNo_(fromProcNo),
recvBuf_(buffers.recvBuf_[fromProcNo]),
recvBufPos_(buffers.recvBufPos_[fromProcNo]),
tag_(buffers.tag_),
comm_(buffers.comm_),
clearAtEnd_(true),
messageSize_(0)
{ {
NotImplemented; if (commsType() == commsTypes::nonBlocking)
{
// Message is already received into buffer
messageSize_ = recvBuf_.size();
if (debug)
{
Pout<< "UIPstream::UIPstream PstreamBuffers :"
<< " fromProcNo:" << fromProcNo_
<< " tag:" << tag_ << " comm:" << comm_
<< " receive buffer size:" << messageSize_
<< Foam::endl;
}
}
else
{
bufferIPCrecv();
}
} }
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // Foam::IPstream::IPstream
Foam::label Foam::UIPstream::read
( (
const commsTypes commsType, const commsTypes commsType,
const int fromProcNo, const int fromProcNo,
char* buf, const label bufSize,
const std::streamsize bufSize,
const int tag, const int tag,
const label communicator const label comm,
IOstreamOption::streamFormat fmt
) )
{ :
NotImplemented; Pstream(commsType, bufSize),
return 0; UIPstream
} (
commsType,
fromProcNo,
Pstream::transferBuf_,
transferBufPosition_,
tag,
comm,
false, // Do not clear Pstream::transferBuf_ if at end
fmt
),
transferBufPosition_(0)
{}
// ************************************************************************* // // ************************************************************************* //

View File

@ -5,8 +5,8 @@
\\ / A nd | www.openfoam.com \\ / A nd | www.openfoam.com
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2013 OpenFOAM Foundation Copyright (C) 2011 OpenFOAM Foundation
Copyright (C) 2021 OpenCFD Ltd. Copyright (C) 2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -26,14 +26,36 @@ License
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "IPstream.H" #include "UOPstream.H"
#include "OPstream.H"
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::IPstream::IPstream Foam::UOPstream::UOPstream
( (
const commsTypes commsType, const commsTypes commsType,
const int fromProcNo, const int toProcNo,
DynamicList<char>& sendBuf,
const int tag,
const label comm,
const bool sendAtDestruct,
IOstreamOption::streamFormat fmt
)
:
UOPstreamBase(commsType, toProcNo, sendBuf, tag, comm, sendAtDestruct, fmt)
{}
Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers)
:
UOPstreamBase(toProcNo, buffers)
{}
Foam::OPstream::OPstream
(
const commsTypes commsType,
const int toProcNo,
const label bufSize, const label bufSize,
const int tag, const int tag,
const label comm, const label comm,
@ -41,19 +63,34 @@ Foam::IPstream::IPstream
) )
: :
Pstream(commsType, bufSize), Pstream(commsType, bufSize),
UIPstream UOPstream
( (
commsType, commsType,
fromProcNo, toProcNo,
Pstream::transferBuf_, Pstream::transferBuf_,
transferBufPosition_,
tag, tag,
comm, comm,
false, // Do not clear Pstream::transferBuf_ if at end true, // sendAtDestruct
fmt fmt
), )
transferBufPosition_(0)
{} {}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::UOPstream::~UOPstream()
{
if (sendAtDestruct_)
{
if (!bufferIPCsend())
{
FatalErrorInFunction
<< "Failed sending outgoing message of size "
<< sendBuf_.size() << " to processor " << toProcNo_
<< Foam::abort(FatalError);
}
}
}
// ************************************************************************* // // ************************************************************************* //

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2021 OpenCFD Ltd. Copyright (C) 2021-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -28,6 +28,35 @@ License
#include "PstreamBuffers.H" #include "PstreamBuffers.H"
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::PstreamBuffers::finalExchange
(
labelList& recvSizes,
const bool block
)
{
// Could also check that it is not called twice
finishedSendsCalled_ = true;
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
// all-to-all
Pstream::exchangeSizes(sendBuf_, recvSizes, comm_);
Pstream::exchange<DynamicList<char>, char>
(
sendBuf_,
recvSizes,
recvBuf_,
tag_,
comm_,
block
);
}
}
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::PstreamBuffers::PstreamBuffers Foam::PstreamBuffers::PstreamBuffers
@ -38,14 +67,14 @@ Foam::PstreamBuffers::PstreamBuffers
IOstreamOption::streamFormat fmt IOstreamOption::streamFormat fmt
) )
: :
finishedSendsCalled_(false),
format_(fmt),
commsType_(commsType), commsType_(commsType),
tag_(tag), tag_(tag),
comm_(comm), comm_(comm),
format_(fmt),
sendBuf_(UPstream::nProcs(comm)), sendBuf_(UPstream::nProcs(comm)),
recvBuf_(UPstream::nProcs(comm)), recvBuf_(UPstream::nProcs(comm)),
recvBufPos_(UPstream::nProcs(comm), Zero), recvBufPos_(UPstream::nProcs(comm), Zero)
finishedSendsCalled_(false)
{} {}
@ -70,45 +99,38 @@ Foam::PstreamBuffers::~PstreamBuffers()
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
void Foam::PstreamBuffers::finishedSends(const bool block) void Foam::PstreamBuffers::clear()
{ {
// Could also check that it is not called twice for (DynamicList<char>& buf : sendBuf_)
finishedSendsCalled_ = true;
if (commsType_ == UPstream::commsTypes::nonBlocking)
{ {
Pstream::exchange<DynamicList<char>, char> buf.clear();
(
sendBuf_,
recvBuf_,
tag_,
comm_,
block
);
} }
for (DynamicList<char>& buf : recvBuf_)
{
buf.clear();
}
recvBufPos_ = 0;
finishedSendsCalled_ = false;
} }
void Foam::PstreamBuffers::finishedSends(labelList& recvSizes, const bool block) void Foam::PstreamBuffers::finishedSends(const bool block)
{ {
// Could also check that it is not called twice labelList recvSizes;
finishedSendsCalled_ = true; finalExchange(recvSizes, block);
}
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
Pstream::exchangeSizes(sendBuf_, recvSizes, comm_);
Pstream::exchange<DynamicList<char>, char> void Foam::PstreamBuffers::finishedSends
( (
sendBuf_, labelList& recvSizes,
recvSizes, const bool block
recvBuf_, )
tag_, {
comm_, finalExchange(recvSizes, block);
block
); if (commsType_ != UPstream::commsTypes::nonBlocking)
}
else
{ {
FatalErrorInFunction FatalErrorInFunction
<< "Obtaining sizes not supported in " << "Obtaining sizes not supported in "
@ -122,19 +144,4 @@ void Foam::PstreamBuffers::finishedSends(labelList& recvSizes, const bool block)
} }
void Foam::PstreamBuffers::clear()
{
for (DynamicList<char>& buf : sendBuf_)
{
buf.clear();
}
for (DynamicList<char>& buf : recvBuf_)
{
buf.clear();
}
recvBufPos_ = 0;
finishedSendsCalled_ = false;
}
// ************************************************************************* // // ************************************************************************* //

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2021 OpenCFD Ltd. Copyright (C) 2021-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -70,8 +70,8 @@ SourceFiles
#include "Pstream.H" #include "Pstream.H"
#ifndef PstreamBuffers_H #ifndef Foam_PstreamBuffers_H
#define PstreamBuffers_H #define Foam_PstreamBuffers_H
#include "DynamicList.H" #include "DynamicList.H"
#include "UPstream.H" #include "UPstream.H"
@ -83,16 +83,24 @@ namespace Foam
{ {
/*---------------------------------------------------------------------------*\ /*---------------------------------------------------------------------------*\
Class PstreamBuffers Declaration Class PstreamBuffers Declaration
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
class PstreamBuffers class PstreamBuffers
{ {
friend class UOPstream; // Friendship
friend class UIPstream; friend class UOPstreamBase; // Access to sendBuf_
friend class UIPstreamBase; // Access to recvBuf_ recvBufPos_;
// Private Data // Private Data
//- Track if sends are complete
bool finishedSendsCalled_;
//- Buffer format (ascii | binary)
const IOstreamOption::streamFormat format_;
//- Communications type of this stream //- Communications type of this stream
const UPstream::commsTypes commsType_; const UPstream::commsTypes commsType_;
@ -102,9 +110,6 @@ class PstreamBuffers
//- Communicator //- Communicator
const label comm_; const label comm_;
//- Buffer format (ascii | binary)
const IOstreamOption::streamFormat format_;
//- Send buffer //- Send buffer
List<DynamicList<char>> sendBuf_; List<DynamicList<char>> sendBuf_;
@ -114,8 +119,12 @@ class PstreamBuffers
//- Current read positions within recvBuf_ //- Current read positions within recvBuf_
labelList recvBufPos_; labelList recvBufPos_;
//- Track if sends are complete
bool finishedSendsCalled_; // Private Member Functions
//- Mark all sends as having been done.
// This will start receives in non-blocking mode.
void finalExchange(labelList& recvSizes, const bool block);
public: public:
@ -138,6 +147,20 @@ public:
// Member Functions // Member Functions
// Access
//- The associated buffer format (ascii | binary)
IOstreamOption::streamFormat format() const noexcept
{
return format_;
}
//- The communications type of the stream
UPstream::commsTypes commsType() const noexcept
{
return commsType_;
}
//- The transfer message type //- The transfer message type
int tag() const noexcept int tag() const noexcept
{ {
@ -150,6 +173,22 @@ public:
return comm_; return comm_;
} }
//- True if finishedSends has been called
bool finished() const noexcept
{
return finishedSendsCalled_;
}
// Edit
//- Reset (clear) individual buffers and reset state.
// Does not clear buffer storage
void clear();
// Functions
//- Mark all sends as having been done. //- Mark all sends as having been done.
// This will start receives in non-blocking mode. // This will start receives in non-blocking mode.
// If block will wait for all transfers to finish // If block will wait for all transfers to finish
@ -160,10 +199,6 @@ public:
// Same as above but also returns sizes (bytes) received. // Same as above but also returns sizes (bytes) received.
// \note currently only valid for non-blocking. // \note currently only valid for non-blocking.
void finishedSends(labelList& recvSizes, const bool block = true); void finishedSends(labelList& recvSizes, const bool block = true);
//- Reset (clear) individual buffers and reset state.
// Does not clear buffer storage
void clear();
}; };

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2013 OpenFOAM Foundation Copyright (C) 2011-2013 OpenFOAM Foundation
Copyright (C) 2017-2021 OpenCFD Ltd. Copyright (C) 2017-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -25,24 +25,26 @@ License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Class Class
Foam::UIPstream Foam::UIPstreamBase
Description Description
Input inter-processor communications stream operating on external Base class for input inter-processor communications stream
buffer. (ie, parallel streams).
Not to be used directly, thus contructors are protected.
SourceFiles SourceFiles
UIPstream.C UIPstreamBase.C
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "Pstream.H" #include "Pstream.H"
#ifndef UIPstream_H #ifndef Foam_UIPstream_H
#define UIPstream_H #define Foam_UIPstream_H
#include "UPstream.H" #include "UPstream.H"
#include "Istream.H" #include "Istream.H"
#include "DynamicList.H"
#include "PstreamBuffers.H" #include "PstreamBuffers.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -51,31 +53,14 @@ namespace Foam
{ {
/*---------------------------------------------------------------------------*\ /*---------------------------------------------------------------------------*\
Class UIPstream Declaration Class UIPstreamBase Declaration
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
class UIPstream class UIPstreamBase
: :
public UPstream, public UPstream,
public Istream public Istream
{ {
// Private Data
int fromProcNo_;
DynamicList<char>& recvBuf_;
label& recvBufPos_;
const int tag_;
const label comm_;
const bool clearAtEnd_;
int messageSize_;
// Private Member Functions // Private Member Functions
//- Check buffer position against messageSize_ for EOF //- Check buffer position against messageSize_ for EOF
@ -96,14 +81,31 @@ class UIPstream
inline Istream& readString(std::string& str); inline Istream& readString(std::string& str);
public: protected:
// Constructors // Protected Data
int fromProcNo_;
DynamicList<char>& recvBuf_;
label& recvBufPos_;
const int tag_;
const label comm_;
const bool clearAtEnd_;
int messageSize_;
// Protected Constructors
//- Construct given process index to read from using the given //- Construct given process index to read from using the given
//- attached receive buffer, optional communication characteristics //- attached receive buffer, optional communication characteristics
//- and IO format //- and IO format
UIPstream UIPstreamBase
( (
const commsTypes commsType, const commsTypes commsType,
const int fromProcNo, const int fromProcNo,
@ -116,11 +118,14 @@ public:
); );
//- Construct given buffers //- Construct given buffers
UIPstream(const int fromProcNo, PstreamBuffers& buffers); UIPstreamBase(const int fromProcNo, PstreamBuffers& buffers);
//- Destructor public:
~UIPstream();
//- Destructor. Optionally clears external receive buffer.
virtual ~UIPstreamBase();
// Member Functions // Member Functions
@ -128,7 +133,7 @@ public:
// Inquiry // Inquiry
//- Return flags of output stream //- Return flags of output stream
ios_base::fmtflags flags() const virtual ios_base::fmtflags flags() const
{ {
return ios_base::fmtflags(0); return ios_base::fmtflags(0);
} }
@ -136,18 +141,6 @@ public:
// Read Functions // Read Functions
//- Read into given buffer from given processor
// \return the message size
static label read
(
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(),
const label communicator = UPstream::worldComm
);
//- Return next token from stream //- Return next token from stream
Istream& read(token& t); Istream& read(token& t);
@ -191,7 +184,7 @@ public:
// Edit // Edit
//- Set flags of stream //- Set flags of stream
ios_base::fmtflags flags(const ios_base::fmtflags) virtual ios_base::fmtflags flags(const ios_base::fmtflags)
{ {
return ios_base::fmtflags(0); return ios_base::fmtflags(0);
} }
@ -204,6 +197,71 @@ public:
}; };
/*---------------------------------------------------------------------------*\
Class UIPstream Declaration
\*---------------------------------------------------------------------------*/
//- Input inter-processor communications stream
//- using MPI send/recv etc. - operating on external buffer.
class UIPstream
:
public UIPstreamBase
{
// Private Member Functions
//- Initial buffer recv, called by constructor (blocking | scheduled)
void bufferIPCrecv();
public:
// Constructors
//- Construct given process index to read from using the given
//- attached receive buffer, optional communication characteristics
//- and IO format
UIPstream
(
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool clearAtEnd = false, // destroy receiveBuf if at end
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
//- Construct given buffers
UIPstream(const int fromProcNo, PstreamBuffers& buffers);
//- Destructor
virtual ~UIPstream() = default;
// Member Functions
//- Use all read methods from base
using UIPstreamBase::read;
// Static Functions
//- Read buffer contents from given processor
// \return the message size
static label read
(
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
};
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam } // End namespace Foam

View File

@ -74,7 +74,7 @@ inline static label byteAlign(const label pos, const size_t align)
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
inline void Foam::UIPstream::checkEof() inline void Foam::UIPstreamBase::checkEof()
{ {
if (recvBufPos_ == messageSize_) if (recvBufPos_ == messageSize_)
{ {
@ -83,14 +83,14 @@ inline void Foam::UIPstream::checkEof()
} }
inline void Foam::UIPstream::prepareBuffer(const size_t align) inline void Foam::UIPstreamBase::prepareBuffer(const size_t align)
{ {
recvBufPos_ = byteAlign(recvBufPos_, align); recvBufPos_ = byteAlign(recvBufPos_, align);
} }
template<class T> template<class T>
inline void Foam::UIPstream::readFromBuffer(T& val) inline void Foam::UIPstreamBase::readFromBuffer(T& val)
{ {
prepareBuffer(sizeof(T)); prepareBuffer(sizeof(T));
@ -100,7 +100,7 @@ inline void Foam::UIPstream::readFromBuffer(T& val)
} }
inline void Foam::UIPstream::readFromBuffer inline void Foam::UIPstreamBase::readFromBuffer
( (
void* data, void* data,
const size_t count const size_t count
@ -119,7 +119,7 @@ inline void Foam::UIPstream::readFromBuffer
} }
inline Foam::Istream& Foam::UIPstream::readString(std::string& str) inline Foam::Istream& Foam::UIPstreamBase::readString(std::string& str)
{ {
// Use std::string::assign() to copy content, including '\0'. // Use std::string::assign() to copy content, including '\0'.
// Stripping (when desired) is the responsibility of the sending side. // Stripping (when desired) is the responsibility of the sending side.
@ -142,15 +142,78 @@ inline Foam::Istream& Foam::UIPstream::readString(std::string& str)
} }
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::UIPstreamBase::UIPstreamBase
(
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
const int tag,
const label comm,
const bool clearAtEnd,
IOstreamOption::streamFormat fmt
)
:
UPstream(commsType),
Istream(fmt, IOstreamOption::currentVersion),
fromProcNo_(fromProcNo),
recvBuf_(receiveBuf),
recvBufPos_(receiveBufPosition),
tag_(tag),
comm_(comm),
clearAtEnd_(clearAtEnd),
messageSize_(0)
{
setOpened();
setGood();
}
Foam::UIPstreamBase::UIPstreamBase
(
const int fromProcNo,
PstreamBuffers& buffers
)
:
UPstream(buffers.commsType()),
Istream(buffers.format(), IOstreamOption::currentVersion),
fromProcNo_(fromProcNo),
recvBuf_(buffers.recvBuf_[fromProcNo]),
recvBufPos_(buffers.recvBufPos_[fromProcNo]),
tag_(buffers.tag()),
comm_(buffers.comm()),
clearAtEnd_(true),
messageSize_(0)
{
if
(
commsType() != UPstream::commsTypes::scheduled
&& !buffers.finished()
)
{
FatalErrorInFunction
<< "PstreamBuffers::finishedSends() never called." << endl
<< "Please call PstreamBuffers::finishedSends() after doing"
<< " all your sends (using UOPstream) and before doing any"
<< " receives (using UIPstream)" << Foam::exit(FatalError);
}
setOpened();
setGood();
}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::UIPstream::~UIPstream() Foam::UIPstreamBase::~UIPstreamBase()
{ {
if (clearAtEnd_ && eof()) if (clearAtEnd_ && eof())
{ {
if (debug) if (debug)
{ {
Pout<< "UIPstream::~UIPstream() : tag:" << tag_ Pout<< "UIPstreamBase Destructor : tag:" << tag_
<< " fromProcNo:" << fromProcNo_ << " fromProcNo:" << fromProcNo_
<< " clearing receive buffer of size " << " clearing receive buffer of size "
<< recvBuf_.size() << recvBuf_.size()
@ -163,7 +226,7 @@ Foam::UIPstream::~UIPstream()
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
Foam::Istream& Foam::UIPstream::read(token& t) Foam::Istream& Foam::UIPstreamBase::read(token& t)
{ {
// Return the put back token if it exists // Return the put back token if it exists
// - with additional handling for special stream flags // - with additional handling for special stream flags
@ -341,7 +404,7 @@ Foam::Istream& Foam::UIPstream::read(token& t)
} }
Foam::Istream& Foam::UIPstream::read(char& c) Foam::Istream& Foam::UIPstreamBase::read(char& c)
{ {
c = recvBuf_[recvBufPos_]; c = recvBuf_[recvBufPos_];
++recvBufPos_; ++recvBufPos_;
@ -350,40 +413,40 @@ Foam::Istream& Foam::UIPstream::read(char& c)
} }
Foam::Istream& Foam::UIPstream::read(word& str) Foam::Istream& Foam::UIPstreamBase::read(word& str)
{ {
return readString(str); return readString(str);
} }
Foam::Istream& Foam::UIPstream::read(string& str) Foam::Istream& Foam::UIPstreamBase::read(string& str)
{ {
return readString(str); return readString(str);
} }
Foam::Istream& Foam::UIPstream::read(label& val) Foam::Istream& Foam::UIPstreamBase::read(label& val)
{ {
readFromBuffer(val); readFromBuffer(val);
return *this; return *this;
} }
Foam::Istream& Foam::UIPstream::read(floatScalar& val) Foam::Istream& Foam::UIPstreamBase::read(floatScalar& val)
{ {
readFromBuffer(val); readFromBuffer(val);
return *this; return *this;
} }
Foam::Istream& Foam::UIPstream::read(doubleScalar& val) Foam::Istream& Foam::UIPstreamBase::read(doubleScalar& val)
{ {
readFromBuffer(val); readFromBuffer(val);
return *this; return *this;
} }
Foam::Istream& Foam::UIPstream::read(char* data, std::streamsize count) Foam::Istream& Foam::UIPstreamBase::read(char* data, std::streamsize count)
{ {
if (count) if (count)
{ {
@ -398,7 +461,7 @@ Foam::Istream& Foam::UIPstream::read(char* data, std::streamsize count)
} }
Foam::Istream& Foam::UIPstream::readRaw(char* data, std::streamsize count) Foam::Istream& Foam::UIPstreamBase::readRaw(char* data, std::streamsize count)
{ {
// No check for format() == BINARY since this is either done in the // No check for format() == BINARY since this is either done in the
// beginRawRead() method, or the caller knows what they are doing. // beginRawRead() method, or the caller knows what they are doing.
@ -409,7 +472,7 @@ Foam::Istream& Foam::UIPstream::readRaw(char* data, std::streamsize count)
} }
bool Foam::UIPstream::beginRawRead() bool Foam::UIPstreamBase::beginRawRead()
{ {
if (format() != BINARY) if (format() != BINARY)
{ {
@ -427,13 +490,13 @@ bool Foam::UIPstream::beginRawRead()
} }
void Foam::UIPstream::rewind() void Foam::UIPstreamBase::rewind()
{ {
recvBufPos_ = 0; recvBufPos_ = 0;
} }
void Foam::UIPstream::print(Ostream& os) const void Foam::UIPstreamBase::print(Ostream& os) const
{ {
os << "Reading from processor " << fromProcNo_ os << "Reading from processor " << fromProcNo_
<< " using communicator " << comm_ << " using communicator " << comm_

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2014 OpenFOAM Foundation Copyright (C) 2011-2014 OpenFOAM Foundation
Copyright (C) 2017-2021 OpenCFD Ltd. Copyright (C) 2017-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -25,21 +25,22 @@ License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Class Class
Foam::UOPstream Foam::UOPstreamBase
Description Description
Output inter-processor communications stream operating on external Base class for output inter-processor communications stream
buffer. (ie, parallel streams).
Not to be used directly, thus contructors are protected.
SourceFiles SourceFiles
UOPstream.C UOPstreamBase.C
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "Pstream.H" #include "Pstream.H"
#ifndef UOPstream_H #ifndef Foam_UOPstream_H
#define UOPstream_H #define Foam_UOPstream_H
#include "UPstream.H" #include "UPstream.H"
#include "Ostream.H" #include "Ostream.H"
@ -52,27 +53,14 @@ namespace Foam
{ {
/*---------------------------------------------------------------------------*\ /*---------------------------------------------------------------------------*\
Class UOPstream Declaration Class UOPstreamBase Declaration
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
class UOPstream class UOPstreamBase
: :
public UPstream, public UPstream,
public Ostream public Ostream
{ {
// Private Data
int toProcNo_;
DynamicList<char>& sendBuf_;
const int tag_;
const label comm_;
const bool sendAtDestruct_;
// Private Member Functions // Private Member Functions
//- Prepare send buffer for count bytes of output, //- Prepare send buffer for count bytes of output,
@ -100,14 +88,27 @@ class UOPstream
inline void putString(const std::string& str); inline void putString(const std::string& str);
public: protected:
// Constructors // Protected Data
int toProcNo_;
DynamicList<char>& sendBuf_;
const int tag_;
const label comm_;
const bool sendAtDestruct_;
// Protected Constructors
//- Construct given process index to write to using the given //- Construct given process index to write to using the given
//- attached send buffer, optional communication characteristics //- attached send buffer, optional communication characteristics
//- and IO format //- and IO format
UOPstream UOPstreamBase
( (
const commsTypes commsType, const commsTypes commsType,
const int toProcNo, const int toProcNo,
@ -119,11 +120,13 @@ public:
); );
//- Construct given buffers //- Construct given buffers
UOPstream(const int toProcNo, PstreamBuffers& buffers); UOPstreamBase(const int toProcNo, PstreamBuffers& buffers);
public:
//- Destructor. //- Destructor.
~UOPstream(); virtual ~UOPstreamBase();
// Member Functions // Member Functions
@ -139,17 +142,6 @@ public:
// Write Functions // Write Functions
//- Write given buffer to given processor
static bool write
(
const commsTypes commsType,
const int toProcNo,
const char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(),
const label communicator = UPstream::worldComm
);
//- Write token to stream or otherwise handle it. //- Write token to stream or otherwise handle it.
// \return false if the token type was not handled by this method // \return false if the token type was not handled by this method
virtual bool write(const token& tok); virtual bool write(const token& tok);
@ -262,7 +254,7 @@ public:
// Edit // Edit
//- Set flags of stream //- Set flags of stream
ios_base::fmtflags flags(const ios_base::fmtflags) virtual ios_base::fmtflags flags(const ios_base::fmtflags)
{ {
return ios_base::fmtflags(0); return ios_base::fmtflags(0);
} }
@ -275,6 +267,70 @@ public:
}; };
/*---------------------------------------------------------------------------*\
Class UOPstream Declaration
\*---------------------------------------------------------------------------*/
//- Output inter-processor communications stream
//- using MPI send/recv etc. - operating on external buffer.
class UOPstream
:
public UOPstreamBase
{
// Private Member Functions
//- Final buffer send, called by destructor
bool bufferIPCsend();
public:
// Constructors
//- Construct given process index to write to using the given
//- attached send buffer, optional communication characteristics
//- and IO format
UOPstream
(
const commsTypes commsType,
const int toProcNo,
DynamicList<char>& sendBuf,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool sendAtDestruct = true,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
//- Construct given buffers
UOPstream(const int toProcNo, PstreamBuffers& buffers);
//- Destructor, usually sends buffer on destruct.
virtual ~UOPstream();
// Member Functions
//- Use all write methods from base class
using UOPstreamBase::write;
// Static Functions
//- Write buffer contents to given processor
// \return True on success
static bool write
(
const commsTypes commsType,
const int toProcNo,
const char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
};
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam } // End namespace Foam

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2016-2021 OpenCFD Ltd. Copyright (C) 2016-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -52,7 +52,7 @@ inline static label byteAlign(const label pos, const size_t align)
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
inline void Foam::UOPstream::prepareBuffer inline void Foam::UOPstreamBase::prepareBuffer
( (
const size_t count, const size_t count,
const size_t align const size_t align
@ -75,13 +75,13 @@ inline void Foam::UOPstream::prepareBuffer
template<class T> template<class T>
inline void Foam::UOPstream::writeToBuffer(const T& val) inline void Foam::UOPstreamBase::writeToBuffer(const T& val)
{ {
writeToBuffer(&val, sizeof(T), sizeof(T)); writeToBuffer(&val, sizeof(T), sizeof(T));
} }
inline void Foam::UOPstream::writeToBuffer inline void Foam::UOPstreamBase::writeToBuffer
( (
const void* data, const void* data,
const size_t count, const size_t count,
@ -111,7 +111,7 @@ inline void Foam::UOPstream::writeToBuffer
} }
inline void Foam::UOPstream::putChar(const char c) inline void Foam::UOPstreamBase::putChar(const char c)
{ {
if (!sendBuf_.capacity()) if (!sendBuf_.capacity())
{ {
@ -121,7 +121,7 @@ inline void Foam::UOPstream::putChar(const char c)
} }
inline void Foam::UOPstream::putString(const std::string& str) inline void Foam::UOPstreamBase::putString(const std::string& str)
{ {
const size_t len = str.size(); const size_t len = str.size();
writeToBuffer(len); writeToBuffer(len);
@ -131,7 +131,7 @@ inline void Foam::UOPstream::putString(const std::string& str)
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::UOPstream::UOPstream Foam::UOPstreamBase::UOPstreamBase
( (
const commsTypes commsType, const commsTypes commsType,
const int toProcNo, const int toProcNo,
@ -155,15 +155,15 @@ Foam::UOPstream::UOPstream
} }
Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers) Foam::UOPstreamBase::UOPstreamBase(const int toProcNo, PstreamBuffers& buffers)
: :
UPstream(buffers.commsType_), UPstream(buffers.commsType()),
Ostream(buffers.format_, IOstreamOption::currentVersion), Ostream(buffers.format(), IOstreamOption::currentVersion),
toProcNo_(toProcNo), toProcNo_(toProcNo),
sendBuf_(buffers.sendBuf_[toProcNo]), sendBuf_(buffers.sendBuf_[toProcNo]),
tag_(buffers.tag_), tag_(buffers.tag()),
comm_(buffers.comm_), comm_(buffers.comm()),
sendAtDestruct_(buffers.commsType_ != UPstream::commsTypes::nonBlocking) sendAtDestruct_(buffers.commsType() != UPstream::commsTypes::nonBlocking)
{ {
setOpened(); setOpened();
setGood(); setGood();
@ -172,35 +172,13 @@ Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers)
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::UOPstream::~UOPstream() Foam::UOPstreamBase::~UOPstreamBase()
{ {}
if (sendAtDestruct_)
{
if
(
!UOPstream::write
(
commsType_,
toProcNo_,
sendBuf_.cdata(),
sendBuf_.size(),
tag_,
comm_
)
)
{
FatalErrorInFunction
<< "Failed sending outgoing message of size " << sendBuf_.size()
<< " to processor " << toProcNo_
<< Foam::abort(FatalError);
}
}
}
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
bool Foam::UOPstream::write(const token& tok) bool Foam::UOPstreamBase::write(const token& tok)
{ {
// Direct token handling only for some types // Direct token handling only for some types
@ -244,7 +222,7 @@ bool Foam::UOPstream::write(const token& tok)
} }
Foam::Ostream& Foam::UOPstream::write(const char c) Foam::Ostream& Foam::UOPstreamBase::write(const char c)
{ {
if (!isspace(c)) if (!isspace(c))
{ {
@ -255,7 +233,7 @@ Foam::Ostream& Foam::UOPstream::write(const char c)
} }
Foam::Ostream& Foam::UOPstream::write(const char* str) Foam::Ostream& Foam::UOPstreamBase::write(const char* str)
{ {
const word nonWhiteChars(string::validate<word>(str)); const word nonWhiteChars(string::validate<word>(str));
@ -272,7 +250,7 @@ Foam::Ostream& Foam::UOPstream::write(const char* str)
} }
Foam::Ostream& Foam::UOPstream::write(const word& str) Foam::Ostream& Foam::UOPstreamBase::write(const word& str)
{ {
putChar(token::tokenType::WORD); putChar(token::tokenType::WORD);
putString(str); putString(str);
@ -281,7 +259,7 @@ Foam::Ostream& Foam::UOPstream::write(const word& str)
} }
Foam::Ostream& Foam::UOPstream::write(const string& str) Foam::Ostream& Foam::UOPstreamBase::write(const string& str)
{ {
putChar(token::tokenType::STRING); putChar(token::tokenType::STRING);
putString(str); putString(str);
@ -290,7 +268,7 @@ Foam::Ostream& Foam::UOPstream::write(const string& str)
} }
Foam::Ostream& Foam::UOPstream::writeQuoted Foam::Ostream& Foam::UOPstreamBase::writeQuoted
( (
const std::string& str, const std::string& str,
const bool quoted const bool quoted
@ -310,7 +288,7 @@ Foam::Ostream& Foam::UOPstream::writeQuoted
} }
Foam::Ostream& Foam::UOPstream::write(const int32_t val) Foam::Ostream& Foam::UOPstreamBase::write(const int32_t val)
{ {
putChar(token::tokenType::LABEL); putChar(token::tokenType::LABEL);
writeToBuffer(val); writeToBuffer(val);
@ -318,7 +296,7 @@ Foam::Ostream& Foam::UOPstream::write(const int32_t val)
} }
Foam::Ostream& Foam::UOPstream::write(const int64_t val) Foam::Ostream& Foam::UOPstreamBase::write(const int64_t val)
{ {
putChar(token::tokenType::LABEL); putChar(token::tokenType::LABEL);
writeToBuffer(val); writeToBuffer(val);
@ -326,7 +304,7 @@ Foam::Ostream& Foam::UOPstream::write(const int64_t val)
} }
Foam::Ostream& Foam::UOPstream::write(const floatScalar val) Foam::Ostream& Foam::UOPstreamBase::write(const floatScalar val)
{ {
putChar(token::tokenType::FLOAT); putChar(token::tokenType::FLOAT);
writeToBuffer(val); writeToBuffer(val);
@ -334,7 +312,7 @@ Foam::Ostream& Foam::UOPstream::write(const floatScalar val)
} }
Foam::Ostream& Foam::UOPstream::write(const doubleScalar val) Foam::Ostream& Foam::UOPstreamBase::write(const doubleScalar val)
{ {
putChar(token::tokenType::DOUBLE); putChar(token::tokenType::DOUBLE);
writeToBuffer(val); writeToBuffer(val);
@ -342,7 +320,7 @@ Foam::Ostream& Foam::UOPstream::write(const doubleScalar val)
} }
Foam::Ostream& Foam::UOPstream::write(const char* data, std::streamsize count) Foam::Ostream& Foam::UOPstreamBase::write(const char* data, std::streamsize count)
{ {
if (format() != BINARY) if (format() != BINARY)
{ {
@ -358,7 +336,7 @@ Foam::Ostream& Foam::UOPstream::write(const char* data, std::streamsize count)
} }
Foam::Ostream& Foam::UOPstream::writeRaw Foam::Ostream& Foam::UOPstreamBase::writeRaw
( (
const char* data, const char* data,
std::streamsize count std::streamsize count
@ -374,7 +352,7 @@ Foam::Ostream& Foam::UOPstream::writeRaw
} }
bool Foam::UOPstream::beginRawWrite(std::streamsize count) bool Foam::UOPstreamBase::beginRawWrite(std::streamsize count)
{ {
if (format() != BINARY) if (format() != BINARY)
{ {
@ -391,7 +369,7 @@ bool Foam::UOPstream::beginRawWrite(std::streamsize count)
} }
void Foam::UOPstream::print(Ostream& os) const void Foam::UOPstreamBase::print(Ostream& os) const
{ {
os << "Writing from processor " << toProcNo_ os << "Writing from processor " << toProcNo_
<< " to processor " << myProcNo() << " in communicator " << comm_ << " to processor " << myProcNo() << " in communicator " << comm_

View File

@ -38,11 +38,12 @@ namespace Foam
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::processorLduInterface::resizeBuf(List<char>& buf, const label size) void Foam::processorLduInterface::resizeBuf(List<char>& buf, const label len)
{ {
if (buf.size() < size) if (buf.size() < len)
{ {
buf.resize(size); // Use nocopy variant since it will be overwritten
buf.resize_nocopy(len);
} }
} }

View File

@ -36,8 +36,8 @@ SourceFiles
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#ifndef processorLduInterface_H #ifndef Foam_processorLduInterface_H
#define processorLduInterface_H #define Foam_processorLduInterface_H
#include "lduInterface.H" #include "lduInterface.H"
#include "primitiveFieldsFwd.H" #include "primitiveFieldsFwd.H"
@ -67,7 +67,7 @@ class processorLduInterface
// Private Member Functions // Private Member Functions
//- Increase buffer size if required //- Increase buffer size if required
static void resizeBuf(List<char>& buf, const label size); static void resizeBuf(List<char>& buf, const label len);
public: public:
@ -78,7 +78,7 @@ public:
// Constructors // Constructors
//- Construct null //- Default construct
processorLduInterface() = default; processorLduInterface() = default;

View File

@ -39,7 +39,7 @@ void Foam::processorLduInterface::send
const UList<Type>& f const UList<Type>& f
) const ) const
{ {
label nBytes = f.byteSize(); const label nBytes = f.byteSize();
if if
( (

View File

@ -1,5 +1,6 @@
UPstream.C UPstream.C
UIPread.C
UOPwrite.C UIPstreamRead.C
UOPstreamWrite.C
LIB = $(FOAM_LIBBIN)/dummy/libPstream LIB = $(FOAM_LIBBIN)/dummy/libPstream

View File

@ -5,8 +5,8 @@
\\ / A nd | www.openfoam.com \\ / A nd | www.openfoam.com
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2013 OpenFOAM Foundation Copyright (C) 2011-2015 OpenFOAM Foundation
Copyright (C) 2021 OpenCFD Ltd. Copyright (C) 2021-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -26,32 +26,31 @@ License
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "OPstream.H" #include "UIPstream.H"
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
Foam::OPstream::OPstream void Foam::UIPstream::bufferIPCrecv()
{
NotImplemented;
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::UIPstream::read
( (
const commsTypes commsType, const commsTypes commsType,
const int toProcNo, const int fromProcNo,
const label bufSize, char* buf,
const std::streamsize bufSize,
const int tag, const int tag,
const label comm, const label communicator
IOstreamOption::streamFormat fmt
) )
: {
Pstream(commsType, bufSize), NotImplemented;
UOPstream return 0;
( }
commsType,
toProcNo,
Pstream::transferBuf_,
tag,
comm,
true, // sendAtDestruct
fmt
)
{}
// ************************************************************************* // // ************************************************************************* //

View File

@ -6,6 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2015 OpenFOAM Foundation Copyright (C) 2011-2015 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -23,13 +24,19 @@ License
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Description
Write primitive and binary block from OPstream
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "UOPstream.H" #include "UOPstream.H"
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
bool Foam::UOPstream::bufferIPCsend()
{
NotImplemented;
return false;
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UOPstream::write bool Foam::UOPstream::write
@ -43,7 +50,6 @@ bool Foam::UOPstream::write
) )
{ {
NotImplemented; NotImplemented;
return false; return false;
} }

View File

@ -1,6 +1,7 @@
UOPwrite.C
UIPread.C
UPstream.C
PstreamGlobals.C PstreamGlobals.C
UPstream.C
UIPstreamRead.C
UOPstreamWrite.C
LIB = $(FOAM_MPI_LIBBIN)/libPstream LIB = $(FOAM_MPI_LIBBIN)/libPstream

View File

@ -24,9 +24,6 @@ License
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Description
Read from UIPstream
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "UIPstream.H" #include "UIPstream.H"
@ -36,195 +33,64 @@ Description
#include <mpi.h> #include <mpi.h>
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
Foam::UIPstream::UIPstream void Foam::UIPstream::bufferIPCrecv()
(
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
const int tag,
const label comm,
const bool clearAtEnd,
IOstreamOption::streamFormat fmt
)
:
UPstream(commsType),
Istream(fmt, IOstreamOption::currentVersion),
fromProcNo_(fromProcNo),
recvBuf_(receiveBuf),
recvBufPos_(receiveBufPosition),
tag_(tag),
comm_(comm),
clearAtEnd_(clearAtEnd),
messageSize_(0)
{ {
setOpened(); // Called by constructor
setGood(); if (debug)
if (commsType == commsTypes::nonBlocking)
{ {
// Message is already received into buffer Pout<< "UIPstream IPC read buffer :"
<< " from:" << fromProcNo_
<< " tag:" << tag_ << " comm:" << comm_
<< " wanted size:" << recvBuf_.capacity()
<< Foam::endl;
} }
else
// No buffer size allocated/specified - probe size of incoming message
if (!recvBuf_.capacity())
{ {
if (debug) profilingPstream::beginTiming();
{
Pout<< "UIPstream::UIPstream :"
<< " read from:" << fromProcNo
<< " tag:" << tag_ << " comm:" << comm_
<< " wanted size:" << recvBuf_.capacity()
<< Foam::endl;
}
// No buffer size allocated/specified - probe size of incoming message MPI_Status status;
if (!recvBuf_.capacity())
{
profilingPstream::beginTiming();
MPI_Status status; MPI_Probe
MPI_Probe
(
fromProcNo_,
tag_,
PstreamGlobals::MPICommunicators_[comm_],
&status
);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
// Assume these are from gathers ...
profilingPstream::addGatherTime();
recvBuf_.resize(messageSize_);
if (debug)
{
Pout<< "UIPstream::UIPstream : probed size:"
<< messageSize_ << Foam::endl;
}
}
messageSize_ = UIPstream::read
( (
commsType,
fromProcNo_, fromProcNo_,
recvBuf_.data(),
recvBuf_.capacity(),
tag_, tag_,
comm_ PstreamGlobals::MPICommunicators_[comm_],
&status
); );
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
// Assume these are from gathers ...
profilingPstream::addGatherTime();
// Set addressed size. Leave actual allocated memory intact.
recvBuf_.resize(messageSize_); recvBuf_.resize(messageSize_);
if (!messageSize_) if (debug)
{ {
setEof(); Pout<< "UIPstream::UIPstream : probed size:"
<< messageSize_ << Foam::endl;
} }
} }
}
messageSize_ = UIPstream::read
Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
:
UPstream(buffers.commsType_),
Istream(buffers.format_, IOstreamOption::currentVersion),
fromProcNo_(fromProcNo),
recvBuf_(buffers.recvBuf_[fromProcNo]),
recvBufPos_(buffers.recvBufPos_[fromProcNo]),
tag_(buffers.tag_),
comm_(buffers.comm_),
clearAtEnd_(true),
messageSize_(0)
{
if
( (
commsType() != UPstream::commsTypes::scheduled commsType_,
&& !buffers.finishedSendsCalled_ fromProcNo_,
) recvBuf_.data(),
recvBuf_.capacity(),
tag_,
comm_
);
// Set addressed size. Leave actual allocated memory intact.
recvBuf_.resize(messageSize_);
if (!messageSize_)
{ {
FatalErrorInFunction setEof();
<< "PstreamBuffers::finishedSends() never called." << endl
<< "Please call PstreamBuffers::finishedSends() after doing"
<< " all your sends (using UOPstream) and before doing any"
<< " receives (using UIPstream)" << Foam::exit(FatalError);
}
setOpened();
setGood();
if (commsType() == commsTypes::nonBlocking)
{
// Message is already received into buffer
messageSize_ = recvBuf_.size();
if (debug)
{
Pout<< "UIPstream::UIPstream PstreamBuffers :"
<< " fromProcNo:" << fromProcNo
<< " tag:" << tag_ << " comm:" << comm_
<< " receive buffer size:" << messageSize_
<< Foam::endl;
}
}
else
{
if (debug)
{
Pout<< "UIPstream::UIPstream PstreamBuffers :"
<< " read from:" << fromProcNo
<< " tag:" << tag_ << " comm:" << comm_
<< " wanted size:" << recvBuf_.capacity()
<< Foam::endl;
}
// No buffer size allocated/specified - probe size of incoming message
if (!recvBuf_.capacity())
{
profilingPstream::beginTiming();
MPI_Status status;
MPI_Probe
(
fromProcNo_,
tag_,
PstreamGlobals::MPICommunicators_[comm_],
&status
);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
// Assume these are from gathers ...
profilingPstream::addGatherTime();
recvBuf_.resize(messageSize_);
if (debug)
{
Pout<< "UIPstream::UIPstream PstreamBuffers : probed size:"
<< messageSize_ << Foam::endl;
}
}
messageSize_ = UIPstream::read
(
commsType(),
fromProcNo_,
recvBuf_.data(),
recvBuf_.capacity(),
tag_,
comm_
);
// Set addressed size. Leave actual allocated memory intact.
recvBuf_.resize(messageSize_);
if (!messageSize_)
{
setEof();
}
} }
} }
@ -287,7 +153,6 @@ Foam::label Foam::UIPstream::read
FatalErrorInFunction FatalErrorInFunction
<< "MPI_Recv cannot receive incoming message" << "MPI_Recv cannot receive incoming message"
<< Foam::abort(FatalError); << Foam::abort(FatalError);
return 0; return 0;
} }
@ -336,7 +201,7 @@ Foam::label Foam::UIPstream::read
) )
{ {
FatalErrorInFunction FatalErrorInFunction
<< "MPI_Recv cannot start non-blocking receive" << "MPI_Irecv cannot start non-blocking receive"
<< Foam::abort(FatalError); << Foam::abort(FatalError);
return 0; return 0;

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2019-2021 OpenCFD Ltd. Copyright (C) 2019-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -24,9 +24,6 @@ License
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Description
Write primitive and binary block from OPstream
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "UOPstream.H" #include "UOPstream.H"
@ -35,6 +32,22 @@ Description
#include <mpi.h> #include <mpi.h>
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
bool Foam::UOPstream::bufferIPCsend()
{
return UOPstream::write
(
commsType_,
toProcNo_,
sendBuf_.cdata(),
sendBuf_.size(),
tag_,
comm_
);
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UOPstream::write bool Foam::UOPstream::write
@ -70,13 +83,13 @@ bool Foam::UOPstream::write
PstreamGlobals::checkCommunicator(communicator, toProcNo); PstreamGlobals::checkCommunicator(communicator, toProcNo);
bool transferFailed = true; bool failed = true;
profilingPstream::beginTiming(); profilingPstream::beginTiming();
if (commsType == commsTypes::blocking) if (commsType == commsTypes::blocking)
{ {
transferFailed = MPI_Bsend failed = MPI_Bsend
( (
const_cast<char*>(buf), const_cast<char*>(buf),
bufSize, bufSize,
@ -99,7 +112,7 @@ bool Foam::UOPstream::write
} }
else if (commsType == commsTypes::scheduled) else if (commsType == commsTypes::scheduled)
{ {
transferFailed = MPI_Send failed = MPI_Send
( (
const_cast<char*>(buf), const_cast<char*>(buf),
bufSize, bufSize,
@ -124,7 +137,7 @@ bool Foam::UOPstream::write
{ {
MPI_Request request; MPI_Request request;
transferFailed = MPI_Isend failed = MPI_Isend
( (
const_cast<char*>(buf), const_cast<char*>(buf),
bufSize, bufSize,
@ -151,12 +164,11 @@ bool Foam::UOPstream::write
else else
{ {
FatalErrorInFunction FatalErrorInFunction
<< "Unsupported communications type " << "Unsupported communications type " << int(commsType)
<< UPstream::commsTypeNames[commsType]
<< Foam::abort(FatalError); << Foam::abort(FatalError);
} }
return !transferFailed; return !failed;
} }

View File

@ -28,8 +28,6 @@ License
#include "processorFaPatch.H" #include "processorFaPatch.H"
#include "addToRunTimeSelectionTable.H" #include "addToRunTimeSelectionTable.H"
#include "IPstream.H"
#include "OPstream.H"
#include "transformField.H" #include "transformField.H"
#include "faBoundaryMesh.H" #include "faBoundaryMesh.H"
#include "faMesh.H" #include "faMesh.H"

View File

@ -28,8 +28,6 @@ License
#include "processorFaPatchField.H" #include "processorFaPatchField.H"
#include "processorFaPatch.H" #include "processorFaPatch.H"
#include "IPstream.H"
#include "OPstream.H"
#include "transformField.H" #include "transformField.H"
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //

View File

@ -31,8 +31,8 @@ License
#include "processorPolyPatch.H" #include "processorPolyPatch.H"
#include "cyclicPolyPatch.H" #include "cyclicPolyPatch.H"
#include "cyclicAMIPolyPatch.H" #include "cyclicAMIPolyPatch.H"
#include "OPstream.H" #include "UIPstream.H"
#include "IPstream.H" #include "UOPstream.H"
#include "PstreamReduceOps.H" #include "PstreamReduceOps.H"
#include "debug.H" #include "debug.H"
#include "typeInfo.H" #include "typeInfo.H"

View File

@ -30,8 +30,8 @@ License
#include "polyMesh.H" #include "polyMesh.H"
#include "processorPolyPatch.H" #include "processorPolyPatch.H"
#include "cyclicPolyPatch.H" #include "cyclicPolyPatch.H"
#include "OPstream.H" #include "UIPstream.H"
#include "IPstream.H" #include "UOPstream.H"
#include "PstreamCombineReduceOps.H" #include "PstreamCombineReduceOps.H"
#include "debug.H" #include "debug.H"
#include "typeInfo.H" #include "typeInfo.H"