ENH: use Mprobe/Mrecv for IPstream constructor

- retains the queried message without secondary polling.
This commit is contained in:
Mark Olesen
2023-05-08 09:53:06 +02:00
parent 2eb1f5678d
commit c6f528588b

View File

@ -31,83 +31,30 @@ License
#include "profilingPstream.H" #include "profilingPstream.H"
#include "IOstreams.H" #include "IOstreams.H"
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
void Foam::UIPstream::bufferIPCrecv() // General blocking/non-blocking MPI receive, optionally with probed
{ // message information.
// Called by constructor static Foam::label UPstream_mpi_receive
if (debug)
{
Pout<< "UIPstream IPC read buffer :"
<< " from:" << fromProcNo_
<< " tag:" << tag_ << " comm:" << comm_
<< " wanted size:" << recvBuf_.capacity()
<< Foam::endl;
}
// No buffer size allocated/specified - probe size of incoming message
if (!recvBuf_.capacity())
{
profilingPstream::beginTiming();
MPI_Status status;
MPI_Probe
( (
fromProcNo_, const Foam::UPstream::commsTypes commsType,
tag_,
PstreamGlobals::MPICommunicators_[comm_],
&status
);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
profilingPstream::addProbeTime();
recvBuf_.resize(messageSize_);
if (debug)
{
Pout<< "UIPstream::UIPstream : probed size:"
<< messageSize_ << Foam::endl;
}
}
messageSize_ = UIPstream::read
(
commsType(),
fromProcNo_,
recvBuf_.data(),
recvBuf_.capacity(),
tag_,
comm_
);
// Set addressed size. Leave actual allocated memory intact.
recvBuf_.resize(messageSize_);
if (!messageSize_)
{
setEof();
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::UIPstream::read
(
const UPstream::commsTypes commsType,
const int fromProcNo,
char* buf, char* buf,
const std::streamsize bufSize, const std::streamsize bufSize,
const int fromProcNo,
const int tag, const int tag,
const label communicator, const Foam::label communicator,
UPstream::Request* req Foam::UPstream::Request* req
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
, MPI_Message* message = nullptr
#endif
) )
{ {
using namespace Foam;
PstreamGlobals::reset_request(req); PstreamGlobals::reset_request(req);
if (debug) if (UPstream::debug)
{ {
Pout<< "UIPstream::read : starting read from:" << fromProcNo Pout<< "UIPstream::read : starting read from:" << fromProcNo
<< " tag:" << tag << " comm:" << communicator << " tag:" << tag << " comm:" << communicator
@ -134,11 +81,26 @@ Foam::label Foam::UIPstream::read
|| commsType == UPstream::commsTypes::scheduled || commsType == UPstream::commsTypes::scheduled
) )
{ {
int returnCode = 0;
MPI_Status status; MPI_Status status;
if #if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
if (message)
{
returnCode = MPI_Mrecv
( (
MPI_Recv buf,
bufSize,
MPI_BYTE,
message,
&status
);
}
else
#endif
{
returnCode = MPI_Recv
( (
buf, buf,
bufSize, bufSize,
@ -147,8 +109,10 @@ Foam::label Foam::UIPstream::read
tag, tag,
PstreamGlobals::MPICommunicators_[communicator], PstreamGlobals::MPICommunicators_[communicator],
&status &status
) );
) }
if (returnCode != MPI_SUCCESS)
{ {
FatalErrorInFunction FatalErrorInFunction
<< "MPI_Recv cannot receive incoming message" << "MPI_Recv cannot receive incoming message"
@ -163,7 +127,7 @@ Foam::label Foam::UIPstream::read
int messageSize; int messageSize;
MPI_Get_count(&status, MPI_BYTE, &messageSize); MPI_Get_count(&status, MPI_BYTE, &messageSize);
if (debug) if (UPstream::debug)
{ {
Pout<< "UIPstream::read : finished read from:" << fromProcNo Pout<< "UIPstream::read : finished read from:" << fromProcNo
<< " tag:" << tag << " read size:" << label(bufSize) << " tag:" << tag << " read size:" << label(bufSize)
@ -184,11 +148,26 @@ Foam::label Foam::UIPstream::read
} }
else if (commsType == UPstream::commsTypes::nonBlocking) else if (commsType == UPstream::commsTypes::nonBlocking)
{ {
int returnCode = 0;
MPI_Request request; MPI_Request request;
if #if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
if (message)
{
returnCode = MPI_Imrecv
( (
MPI_Irecv buf,
bufSize,
MPI_BYTE,
message,
&request
);
}
else
#endif
{
returnCode = MPI_Irecv
( (
buf, buf,
bufSize, bufSize,
@ -197,8 +176,10 @@ Foam::label Foam::UIPstream::read
tag, tag,
PstreamGlobals::MPICommunicators_[communicator], PstreamGlobals::MPICommunicators_[communicator],
&request &request
) );
) }
if (returnCode != MPI_SUCCESS)
{ {
FatalErrorInFunction FatalErrorInFunction
<< "MPI_Irecv cannot start non-blocking receive" << "MPI_Irecv cannot start non-blocking receive"
@ -207,7 +188,7 @@ Foam::label Foam::UIPstream::read
return 0; return 0;
} }
if (debug) if (UPstream::debug)
{ {
Pout<< "UIPstream::read : started read from:" << fromProcNo Pout<< "UIPstream::read : started read from:" << fromProcNo
<< " tag:" << tag << " read size:" << label(bufSize) << " tag:" << tag << " read size:" << label(bufSize)
@ -220,7 +201,7 @@ Foam::label Foam::UIPstream::read
PstreamGlobals::push_request(request, req); PstreamGlobals::push_request(request, req);
profilingPstream::addRequestTime(); profilingPstream::addRequestTime();
// Assume the message is completely received. // Assume the message will be completely received.
return bufSize; return bufSize;
} }
@ -232,4 +213,114 @@ Foam::label Foam::UIPstream::read
} }
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::UIPstream::bufferIPCrecv()
{
// Called by constructor
if (UPstream::debug)
{
Pout<< "UIPstream IPC read buffer :"
<< " from:" << fromProcNo_
<< " tag:" << tag_ << " comm:" << comm_
<< " wanted size:" << recvBuf_.capacity()
<< Foam::endl;
}
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
MPI_Message message;
MPI_Message* messagePtr = nullptr;
#endif
// No buffer size allocated/specified - probe size of incoming message
if (!recvBuf_.capacity())
{
profilingPstream::beginTiming();
MPI_Status status;
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
messagePtr = &message;
MPI_Mprobe
(
fromProcNo_,
tag_,
PstreamGlobals::MPICommunicators_[comm_],
&message,
&status
);
#else
MPI_Probe
(
fromProcNo_,
tag_,
PstreamGlobals::MPICommunicators_[comm_],
&status
);
#endif
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
profilingPstream::addProbeTime();
recvBuf_.resize(messageSize_);
if (UPstream::debug)
{
Pout<< "UIPstream::UIPstream : probed size:"
<< messageSize_ << Foam::endl;
}
}
messageSize_ = UPstream_mpi_receive
(
commsType(),
recvBuf_.data(),
recvBuf_.capacity(),
fromProcNo_,
tag_,
comm_,
nullptr // UPstream::Request
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
, messagePtr
#endif
);
// Set addressed size. Leave actual allocated memory intact.
recvBuf_.resize(messageSize_);
if (!messageSize_)
{
setEof();
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::UIPstream::read
(
const UPstream::commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize,
const int tag,
const label communicator,
UPstream::Request* req
)
{
return UPstream_mpi_receive
(
commsType,
buf,
bufSize,
fromProcNo,
tag,
communicator,
req
);
}
// ************************************************************************* // // ************************************************************************* //