mirror of
https://develop.openfoam.com/Development/openfoam.git
synced 2025-11-28 03:28:01 +00:00
ENH: use Mprobe/Mrecv for IPstream constructor
- retains the queried message without secondary polling.
This commit is contained in:
@ -31,83 +31,30 @@ License
|
|||||||
#include "profilingPstream.H"
|
#include "profilingPstream.H"
|
||||||
#include "IOstreams.H"
|
#include "IOstreams.H"
|
||||||
|
|
||||||
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
|
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
|
||||||
|
|
||||||
void Foam::UIPstream::bufferIPCrecv()
|
// General blocking/non-blocking MPI receive, optionally with probed
|
||||||
{
|
// message information.
|
||||||
// Called by constructor
|
static Foam::label UPstream_mpi_receive
|
||||||
if (debug)
|
|
||||||
{
|
|
||||||
Pout<< "UIPstream IPC read buffer :"
|
|
||||||
<< " from:" << fromProcNo_
|
|
||||||
<< " tag:" << tag_ << " comm:" << comm_
|
|
||||||
<< " wanted size:" << recvBuf_.capacity()
|
|
||||||
<< Foam::endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
// No buffer size allocated/specified - probe size of incoming message
|
|
||||||
if (!recvBuf_.capacity())
|
|
||||||
{
|
|
||||||
profilingPstream::beginTiming();
|
|
||||||
|
|
||||||
MPI_Status status;
|
|
||||||
|
|
||||||
MPI_Probe
|
|
||||||
(
|
(
|
||||||
fromProcNo_,
|
const Foam::UPstream::commsTypes commsType,
|
||||||
tag_,
|
|
||||||
PstreamGlobals::MPICommunicators_[comm_],
|
|
||||||
&status
|
|
||||||
);
|
|
||||||
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
|
|
||||||
|
|
||||||
profilingPstream::addProbeTime();
|
|
||||||
|
|
||||||
recvBuf_.resize(messageSize_);
|
|
||||||
|
|
||||||
if (debug)
|
|
||||||
{
|
|
||||||
Pout<< "UIPstream::UIPstream : probed size:"
|
|
||||||
<< messageSize_ << Foam::endl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
messageSize_ = UIPstream::read
|
|
||||||
(
|
|
||||||
commsType(),
|
|
||||||
fromProcNo_,
|
|
||||||
recvBuf_.data(),
|
|
||||||
recvBuf_.capacity(),
|
|
||||||
tag_,
|
|
||||||
comm_
|
|
||||||
);
|
|
||||||
|
|
||||||
// Set addressed size. Leave actual allocated memory intact.
|
|
||||||
recvBuf_.resize(messageSize_);
|
|
||||||
|
|
||||||
if (!messageSize_)
|
|
||||||
{
|
|
||||||
setEof();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
|
||||||
|
|
||||||
Foam::label Foam::UIPstream::read
|
|
||||||
(
|
|
||||||
const UPstream::commsTypes commsType,
|
|
||||||
const int fromProcNo,
|
|
||||||
char* buf,
|
char* buf,
|
||||||
const std::streamsize bufSize,
|
const std::streamsize bufSize,
|
||||||
|
const int fromProcNo,
|
||||||
const int tag,
|
const int tag,
|
||||||
const label communicator,
|
const Foam::label communicator,
|
||||||
UPstream::Request* req
|
Foam::UPstream::Request* req
|
||||||
|
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
|
||||||
|
// MPI-3 : eg, openmpi-1.7 (2013) and later
|
||||||
|
, MPI_Message* message = nullptr
|
||||||
|
#endif
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
using namespace Foam;
|
||||||
|
|
||||||
PstreamGlobals::reset_request(req);
|
PstreamGlobals::reset_request(req);
|
||||||
|
|
||||||
if (debug)
|
if (UPstream::debug)
|
||||||
{
|
{
|
||||||
Pout<< "UIPstream::read : starting read from:" << fromProcNo
|
Pout<< "UIPstream::read : starting read from:" << fromProcNo
|
||||||
<< " tag:" << tag << " comm:" << communicator
|
<< " tag:" << tag << " comm:" << communicator
|
||||||
@ -134,11 +81,26 @@ Foam::label Foam::UIPstream::read
|
|||||||
|| commsType == UPstream::commsTypes::scheduled
|
|| commsType == UPstream::commsTypes::scheduled
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
int returnCode = 0;
|
||||||
MPI_Status status;
|
MPI_Status status;
|
||||||
|
|
||||||
if
|
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
|
||||||
|
// MPI-3 : eg, openmpi-1.7 (2013) and later
|
||||||
|
if (message)
|
||||||
|
{
|
||||||
|
returnCode = MPI_Mrecv
|
||||||
(
|
(
|
||||||
MPI_Recv
|
buf,
|
||||||
|
bufSize,
|
||||||
|
MPI_BYTE,
|
||||||
|
message,
|
||||||
|
&status
|
||||||
|
);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
#endif
|
||||||
|
{
|
||||||
|
returnCode = MPI_Recv
|
||||||
(
|
(
|
||||||
buf,
|
buf,
|
||||||
bufSize,
|
bufSize,
|
||||||
@ -147,8 +109,10 @@ Foam::label Foam::UIPstream::read
|
|||||||
tag,
|
tag,
|
||||||
PstreamGlobals::MPICommunicators_[communicator],
|
PstreamGlobals::MPICommunicators_[communicator],
|
||||||
&status
|
&status
|
||||||
)
|
);
|
||||||
)
|
}
|
||||||
|
|
||||||
|
if (returnCode != MPI_SUCCESS)
|
||||||
{
|
{
|
||||||
FatalErrorInFunction
|
FatalErrorInFunction
|
||||||
<< "MPI_Recv cannot receive incoming message"
|
<< "MPI_Recv cannot receive incoming message"
|
||||||
@ -163,7 +127,7 @@ Foam::label Foam::UIPstream::read
|
|||||||
int messageSize;
|
int messageSize;
|
||||||
MPI_Get_count(&status, MPI_BYTE, &messageSize);
|
MPI_Get_count(&status, MPI_BYTE, &messageSize);
|
||||||
|
|
||||||
if (debug)
|
if (UPstream::debug)
|
||||||
{
|
{
|
||||||
Pout<< "UIPstream::read : finished read from:" << fromProcNo
|
Pout<< "UIPstream::read : finished read from:" << fromProcNo
|
||||||
<< " tag:" << tag << " read size:" << label(bufSize)
|
<< " tag:" << tag << " read size:" << label(bufSize)
|
||||||
@ -184,11 +148,26 @@ Foam::label Foam::UIPstream::read
|
|||||||
}
|
}
|
||||||
else if (commsType == UPstream::commsTypes::nonBlocking)
|
else if (commsType == UPstream::commsTypes::nonBlocking)
|
||||||
{
|
{
|
||||||
|
int returnCode = 0;
|
||||||
MPI_Request request;
|
MPI_Request request;
|
||||||
|
|
||||||
if
|
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
|
||||||
|
// MPI-3 : eg, openmpi-1.7 (2013) and later
|
||||||
|
if (message)
|
||||||
|
{
|
||||||
|
returnCode = MPI_Imrecv
|
||||||
(
|
(
|
||||||
MPI_Irecv
|
buf,
|
||||||
|
bufSize,
|
||||||
|
MPI_BYTE,
|
||||||
|
message,
|
||||||
|
&request
|
||||||
|
);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
#endif
|
||||||
|
{
|
||||||
|
returnCode = MPI_Irecv
|
||||||
(
|
(
|
||||||
buf,
|
buf,
|
||||||
bufSize,
|
bufSize,
|
||||||
@ -197,8 +176,10 @@ Foam::label Foam::UIPstream::read
|
|||||||
tag,
|
tag,
|
||||||
PstreamGlobals::MPICommunicators_[communicator],
|
PstreamGlobals::MPICommunicators_[communicator],
|
||||||
&request
|
&request
|
||||||
)
|
);
|
||||||
)
|
}
|
||||||
|
|
||||||
|
if (returnCode != MPI_SUCCESS)
|
||||||
{
|
{
|
||||||
FatalErrorInFunction
|
FatalErrorInFunction
|
||||||
<< "MPI_Irecv cannot start non-blocking receive"
|
<< "MPI_Irecv cannot start non-blocking receive"
|
||||||
@ -207,7 +188,7 @@ Foam::label Foam::UIPstream::read
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (debug)
|
if (UPstream::debug)
|
||||||
{
|
{
|
||||||
Pout<< "UIPstream::read : started read from:" << fromProcNo
|
Pout<< "UIPstream::read : started read from:" << fromProcNo
|
||||||
<< " tag:" << tag << " read size:" << label(bufSize)
|
<< " tag:" << tag << " read size:" << label(bufSize)
|
||||||
@ -220,7 +201,7 @@ Foam::label Foam::UIPstream::read
|
|||||||
PstreamGlobals::push_request(request, req);
|
PstreamGlobals::push_request(request, req);
|
||||||
profilingPstream::addRequestTime();
|
profilingPstream::addRequestTime();
|
||||||
|
|
||||||
// Assume the message is completely received.
|
// Assume the message will be completely received.
|
||||||
return bufSize;
|
return bufSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,4 +213,114 @@ Foam::label Foam::UIPstream::read
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
|
||||||
|
|
||||||
|
void Foam::UIPstream::bufferIPCrecv()
|
||||||
|
{
|
||||||
|
// Called by constructor
|
||||||
|
if (UPstream::debug)
|
||||||
|
{
|
||||||
|
Pout<< "UIPstream IPC read buffer :"
|
||||||
|
<< " from:" << fromProcNo_
|
||||||
|
<< " tag:" << tag_ << " comm:" << comm_
|
||||||
|
<< " wanted size:" << recvBuf_.capacity()
|
||||||
|
<< Foam::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
|
||||||
|
// MPI-3 : eg, openmpi-1.7 (2013) and later
|
||||||
|
MPI_Message message;
|
||||||
|
MPI_Message* messagePtr = nullptr;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
// No buffer size allocated/specified - probe size of incoming message
|
||||||
|
if (!recvBuf_.capacity())
|
||||||
|
{
|
||||||
|
profilingPstream::beginTiming();
|
||||||
|
|
||||||
|
MPI_Status status;
|
||||||
|
|
||||||
|
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
|
||||||
|
// MPI-3 : eg, openmpi-1.7 (2013) and later
|
||||||
|
messagePtr = &message;
|
||||||
|
MPI_Mprobe
|
||||||
|
(
|
||||||
|
fromProcNo_,
|
||||||
|
tag_,
|
||||||
|
PstreamGlobals::MPICommunicators_[comm_],
|
||||||
|
&message,
|
||||||
|
&status
|
||||||
|
);
|
||||||
|
#else
|
||||||
|
MPI_Probe
|
||||||
|
(
|
||||||
|
fromProcNo_,
|
||||||
|
tag_,
|
||||||
|
PstreamGlobals::MPICommunicators_[comm_],
|
||||||
|
&status
|
||||||
|
);
|
||||||
|
#endif
|
||||||
|
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
|
||||||
|
|
||||||
|
profilingPstream::addProbeTime();
|
||||||
|
|
||||||
|
recvBuf_.resize(messageSize_);
|
||||||
|
|
||||||
|
if (UPstream::debug)
|
||||||
|
{
|
||||||
|
Pout<< "UIPstream::UIPstream : probed size:"
|
||||||
|
<< messageSize_ << Foam::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
messageSize_ = UPstream_mpi_receive
|
||||||
|
(
|
||||||
|
commsType(),
|
||||||
|
recvBuf_.data(),
|
||||||
|
recvBuf_.capacity(),
|
||||||
|
fromProcNo_,
|
||||||
|
tag_,
|
||||||
|
comm_,
|
||||||
|
nullptr // UPstream::Request
|
||||||
|
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
|
||||||
|
, messagePtr
|
||||||
|
#endif
|
||||||
|
);
|
||||||
|
|
||||||
|
// Set addressed size. Leave actual allocated memory intact.
|
||||||
|
recvBuf_.resize(messageSize_);
|
||||||
|
|
||||||
|
if (!messageSize_)
|
||||||
|
{
|
||||||
|
setEof();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
||||||
|
|
||||||
|
Foam::label Foam::UIPstream::read
|
||||||
|
(
|
||||||
|
const UPstream::commsTypes commsType,
|
||||||
|
const int fromProcNo,
|
||||||
|
char* buf,
|
||||||
|
const std::streamsize bufSize,
|
||||||
|
const int tag,
|
||||||
|
const label communicator,
|
||||||
|
UPstream::Request* req
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return UPstream_mpi_receive
|
||||||
|
(
|
||||||
|
commsType,
|
||||||
|
buf,
|
||||||
|
bufSize,
|
||||||
|
fromProcNo,
|
||||||
|
tag,
|
||||||
|
communicator,
|
||||||
|
req
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// ************************************************************************* //
|
// ************************************************************************* //
|
||||||
|
|||||||
Reference in New Issue
Block a user