mirror of
https://develop.openfoam.com/Development/openfoam.git
synced 2025-11-28 03:28:01 +00:00
ENH: use Mprobe/Mrecv for IPstream constructor
- retains the queried message without secondary polling.
This commit is contained in:
@ -31,83 +31,30 @@ License
|
||||
#include "profilingPstream.H"
|
||||
#include "IOstreams.H"
|
||||
|
||||
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
|
||||
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
|
||||
|
||||
void Foam::UIPstream::bufferIPCrecv()
|
||||
{
|
||||
// Called by constructor
|
||||
if (debug)
|
||||
{
|
||||
Pout<< "UIPstream IPC read buffer :"
|
||||
<< " from:" << fromProcNo_
|
||||
<< " tag:" << tag_ << " comm:" << comm_
|
||||
<< " wanted size:" << recvBuf_.capacity()
|
||||
<< Foam::endl;
|
||||
}
|
||||
|
||||
// No buffer size allocated/specified - probe size of incoming message
|
||||
if (!recvBuf_.capacity())
|
||||
{
|
||||
profilingPstream::beginTiming();
|
||||
|
||||
MPI_Status status;
|
||||
|
||||
MPI_Probe
|
||||
(
|
||||
fromProcNo_,
|
||||
tag_,
|
||||
PstreamGlobals::MPICommunicators_[comm_],
|
||||
&status
|
||||
);
|
||||
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
|
||||
|
||||
profilingPstream::addProbeTime();
|
||||
|
||||
recvBuf_.resize(messageSize_);
|
||||
|
||||
if (debug)
|
||||
{
|
||||
Pout<< "UIPstream::UIPstream : probed size:"
|
||||
<< messageSize_ << Foam::endl;
|
||||
}
|
||||
}
|
||||
|
||||
messageSize_ = UIPstream::read
|
||||
(
|
||||
commsType(),
|
||||
fromProcNo_,
|
||||
recvBuf_.data(),
|
||||
recvBuf_.capacity(),
|
||||
tag_,
|
||||
comm_
|
||||
);
|
||||
|
||||
// Set addressed size. Leave actual allocated memory intact.
|
||||
recvBuf_.resize(messageSize_);
|
||||
|
||||
if (!messageSize_)
|
||||
{
|
||||
setEof();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
||||
|
||||
Foam::label Foam::UIPstream::read
|
||||
// General blocking/non-blocking MPI receive, optionally with probed
|
||||
// message information.
|
||||
static Foam::label UPstream_mpi_receive
|
||||
(
|
||||
const UPstream::commsTypes commsType,
|
||||
const int fromProcNo,
|
||||
const Foam::UPstream::commsTypes commsType,
|
||||
char* buf,
|
||||
const std::streamsize bufSize,
|
||||
const int fromProcNo,
|
||||
const int tag,
|
||||
const label communicator,
|
||||
UPstream::Request* req
|
||||
const Foam::label communicator,
|
||||
Foam::UPstream::Request* req
|
||||
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
|
||||
// MPI-3 : eg, openmpi-1.7 (2013) and later
|
||||
, MPI_Message* message = nullptr
|
||||
#endif
|
||||
)
|
||||
{
|
||||
using namespace Foam;
|
||||
|
||||
PstreamGlobals::reset_request(req);
|
||||
|
||||
if (debug)
|
||||
if (UPstream::debug)
|
||||
{
|
||||
Pout<< "UIPstream::read : starting read from:" << fromProcNo
|
||||
<< " tag:" << tag << " comm:" << communicator
|
||||
@ -134,11 +81,26 @@ Foam::label Foam::UIPstream::read
|
||||
|| commsType == UPstream::commsTypes::scheduled
|
||||
)
|
||||
{
|
||||
int returnCode = 0;
|
||||
MPI_Status status;
|
||||
|
||||
if
|
||||
(
|
||||
MPI_Recv
|
||||
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
|
||||
// MPI-3 : eg, openmpi-1.7 (2013) and later
|
||||
if (message)
|
||||
{
|
||||
returnCode = MPI_Mrecv
|
||||
(
|
||||
buf,
|
||||
bufSize,
|
||||
MPI_BYTE,
|
||||
message,
|
||||
&status
|
||||
);
|
||||
}
|
||||
else
|
||||
#endif
|
||||
{
|
||||
returnCode = MPI_Recv
|
||||
(
|
||||
buf,
|
||||
bufSize,
|
||||
@ -147,8 +109,10 @@ Foam::label Foam::UIPstream::read
|
||||
tag,
|
||||
PstreamGlobals::MPICommunicators_[communicator],
|
||||
&status
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
if (returnCode != MPI_SUCCESS)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "MPI_Recv cannot receive incoming message"
|
||||
@ -163,7 +127,7 @@ Foam::label Foam::UIPstream::read
|
||||
int messageSize;
|
||||
MPI_Get_count(&status, MPI_BYTE, &messageSize);
|
||||
|
||||
if (debug)
|
||||
if (UPstream::debug)
|
||||
{
|
||||
Pout<< "UIPstream::read : finished read from:" << fromProcNo
|
||||
<< " tag:" << tag << " read size:" << label(bufSize)
|
||||
@ -184,11 +148,26 @@ Foam::label Foam::UIPstream::read
|
||||
}
|
||||
else if (commsType == UPstream::commsTypes::nonBlocking)
|
||||
{
|
||||
int returnCode = 0;
|
||||
MPI_Request request;
|
||||
|
||||
if
|
||||
(
|
||||
MPI_Irecv
|
||||
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
|
||||
// MPI-3 : eg, openmpi-1.7 (2013) and later
|
||||
if (message)
|
||||
{
|
||||
returnCode = MPI_Imrecv
|
||||
(
|
||||
buf,
|
||||
bufSize,
|
||||
MPI_BYTE,
|
||||
message,
|
||||
&request
|
||||
);
|
||||
}
|
||||
else
|
||||
#endif
|
||||
{
|
||||
returnCode = MPI_Irecv
|
||||
(
|
||||
buf,
|
||||
bufSize,
|
||||
@ -197,8 +176,10 @@ Foam::label Foam::UIPstream::read
|
||||
tag,
|
||||
PstreamGlobals::MPICommunicators_[communicator],
|
||||
&request
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
if (returnCode != MPI_SUCCESS)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "MPI_Irecv cannot start non-blocking receive"
|
||||
@ -207,7 +188,7 @@ Foam::label Foam::UIPstream::read
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (debug)
|
||||
if (UPstream::debug)
|
||||
{
|
||||
Pout<< "UIPstream::read : started read from:" << fromProcNo
|
||||
<< " tag:" << tag << " read size:" << label(bufSize)
|
||||
@ -220,7 +201,7 @@ Foam::label Foam::UIPstream::read
|
||||
PstreamGlobals::push_request(request, req);
|
||||
profilingPstream::addRequestTime();
|
||||
|
||||
// Assume the message is completely received.
|
||||
// Assume the message will be completely received.
|
||||
return bufSize;
|
||||
}
|
||||
|
||||
@ -232,4 +213,114 @@ Foam::label Foam::UIPstream::read
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
|
||||
|
||||
void Foam::UIPstream::bufferIPCrecv()
|
||||
{
|
||||
// Called by constructor
|
||||
if (UPstream::debug)
|
||||
{
|
||||
Pout<< "UIPstream IPC read buffer :"
|
||||
<< " from:" << fromProcNo_
|
||||
<< " tag:" << tag_ << " comm:" << comm_
|
||||
<< " wanted size:" << recvBuf_.capacity()
|
||||
<< Foam::endl;
|
||||
}
|
||||
|
||||
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
|
||||
// MPI-3 : eg, openmpi-1.7 (2013) and later
|
||||
MPI_Message message;
|
||||
MPI_Message* messagePtr = nullptr;
|
||||
#endif
|
||||
|
||||
// No buffer size allocated/specified - probe size of incoming message
|
||||
if (!recvBuf_.capacity())
|
||||
{
|
||||
profilingPstream::beginTiming();
|
||||
|
||||
MPI_Status status;
|
||||
|
||||
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
|
||||
// MPI-3 : eg, openmpi-1.7 (2013) and later
|
||||
messagePtr = &message;
|
||||
MPI_Mprobe
|
||||
(
|
||||
fromProcNo_,
|
||||
tag_,
|
||||
PstreamGlobals::MPICommunicators_[comm_],
|
||||
&message,
|
||||
&status
|
||||
);
|
||||
#else
|
||||
MPI_Probe
|
||||
(
|
||||
fromProcNo_,
|
||||
tag_,
|
||||
PstreamGlobals::MPICommunicators_[comm_],
|
||||
&status
|
||||
);
|
||||
#endif
|
||||
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
|
||||
|
||||
profilingPstream::addProbeTime();
|
||||
|
||||
recvBuf_.resize(messageSize_);
|
||||
|
||||
if (UPstream::debug)
|
||||
{
|
||||
Pout<< "UIPstream::UIPstream : probed size:"
|
||||
<< messageSize_ << Foam::endl;
|
||||
}
|
||||
}
|
||||
|
||||
messageSize_ = UPstream_mpi_receive
|
||||
(
|
||||
commsType(),
|
||||
recvBuf_.data(),
|
||||
recvBuf_.capacity(),
|
||||
fromProcNo_,
|
||||
tag_,
|
||||
comm_,
|
||||
nullptr // UPstream::Request
|
||||
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
|
||||
, messagePtr
|
||||
#endif
|
||||
);
|
||||
|
||||
// Set addressed size. Leave actual allocated memory intact.
|
||||
recvBuf_.resize(messageSize_);
|
||||
|
||||
if (!messageSize_)
|
||||
{
|
||||
setEof();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
||||
|
||||
Foam::label Foam::UIPstream::read
|
||||
(
|
||||
const UPstream::commsTypes commsType,
|
||||
const int fromProcNo,
|
||||
char* buf,
|
||||
const std::streamsize bufSize,
|
||||
const int tag,
|
||||
const label communicator,
|
||||
UPstream::Request* req
|
||||
)
|
||||
{
|
||||
return UPstream_mpi_receive
|
||||
(
|
||||
commsType,
|
||||
buf,
|
||||
bufSize,
|
||||
fromProcNo,
|
||||
tag,
|
||||
communicator,
|
||||
req
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
// ************************************************************************* //
|
||||
|
||||
Reference in New Issue
Block a user