diff --git a/src/Pstream/mpi/UIPstreamRead.C b/src/Pstream/mpi/UIPstreamRead.C index 20fed25c8d..c60f95bb87 100644 --- a/src/Pstream/mpi/UIPstreamRead.C +++ b/src/Pstream/mpi/UIPstreamRead.C @@ -31,83 +31,30 @@ License #include "profilingPstream.H" #include "IOstreams.H" -// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // +// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * // -void Foam::UIPstream::bufferIPCrecv() -{ - // Called by constructor - if (debug) - { - Pout<< "UIPstream IPC read buffer :" - << " from:" << fromProcNo_ - << " tag:" << tag_ << " comm:" << comm_ - << " wanted size:" << recvBuf_.capacity() - << Foam::endl; - } - - // No buffer size allocated/specified - probe size of incoming message - if (!recvBuf_.capacity()) - { - profilingPstream::beginTiming(); - - MPI_Status status; - - MPI_Probe - ( - fromProcNo_, - tag_, - PstreamGlobals::MPICommunicators_[comm_], - &status - ); - MPI_Get_count(&status, MPI_BYTE, &messageSize_); - - profilingPstream::addProbeTime(); - - recvBuf_.resize(messageSize_); - - if (debug) - { - Pout<< "UIPstream::UIPstream : probed size:" - << messageSize_ << Foam::endl; - } - } - - messageSize_ = UIPstream::read - ( - commsType(), - fromProcNo_, - recvBuf_.data(), - recvBuf_.capacity(), - tag_, - comm_ - ); - - // Set addressed size. Leave actual allocated memory intact. - recvBuf_.resize(messageSize_); - - if (!messageSize_) - { - setEof(); - } -} - - -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // - -Foam::label Foam::UIPstream::read +// General blocking/non-blocking MPI receive, optionally with probed +// message information. +static Foam::label UPstream_mpi_receive ( - const UPstream::commsTypes commsType, - const int fromProcNo, + const Foam::UPstream::commsTypes commsType, char* buf, const std::streamsize bufSize, + const int fromProcNo, const int tag, - const label communicator, - UPstream::Request* req + const Foam::label communicator, + Foam::UPstream::Request* req +#if defined(MPI_VERSION) && (MPI_VERSION >= 3) + // MPI-3 : eg, openmpi-1.7 (2013) and later + , MPI_Message* message = nullptr +#endif ) { + using namespace Foam; + PstreamGlobals::reset_request(req); - if (debug) + if (UPstream::debug) { Pout<< "UIPstream::read : starting read from:" << fromProcNo << " tag:" << tag << " comm:" << communicator @@ -134,11 +81,26 @@ Foam::label Foam::UIPstream::read || commsType == UPstream::commsTypes::scheduled ) { + int returnCode = 0; MPI_Status status; - if - ( - MPI_Recv +#if defined(MPI_VERSION) && (MPI_VERSION >= 3) + // MPI-3 : eg, openmpi-1.7 (2013) and later + if (message) + { + returnCode = MPI_Mrecv + ( + buf, + bufSize, + MPI_BYTE, + message, + &status + ); + } + else +#endif + { + returnCode = MPI_Recv ( buf, bufSize, @@ -147,8 +109,10 @@ Foam::label Foam::UIPstream::read tag, PstreamGlobals::MPICommunicators_[communicator], &status - ) - ) + ); + } + + if (returnCode != MPI_SUCCESS) { FatalErrorInFunction << "MPI_Recv cannot receive incoming message" @@ -163,7 +127,7 @@ Foam::label Foam::UIPstream::read int messageSize; MPI_Get_count(&status, MPI_BYTE, &messageSize); - if (debug) + if (UPstream::debug) { Pout<< "UIPstream::read : finished read from:" << fromProcNo << " tag:" << tag << " read size:" << label(bufSize) @@ -184,11 +148,26 @@ Foam::label Foam::UIPstream::read } else if (commsType == UPstream::commsTypes::nonBlocking) { + int returnCode = 0; MPI_Request request; - if - ( - MPI_Irecv +#if defined(MPI_VERSION) && (MPI_VERSION >= 3) + // MPI-3 : eg, openmpi-1.7 (2013) and later + if (message) + { + returnCode = MPI_Imrecv + ( + buf, + bufSize, + MPI_BYTE, + message, + &request + ); + } + else +#endif + { + returnCode = MPI_Irecv ( buf, bufSize, @@ -197,8 +176,10 @@ Foam::label Foam::UIPstream::read tag, PstreamGlobals::MPICommunicators_[communicator], &request - ) - ) + ); + } + + if (returnCode != MPI_SUCCESS) { FatalErrorInFunction << "MPI_Irecv cannot start non-blocking receive" @@ -207,7 +188,7 @@ Foam::label Foam::UIPstream::read return 0; } - if (debug) + if (UPstream::debug) { Pout<< "UIPstream::read : started read from:" << fromProcNo << " tag:" << tag << " read size:" << label(bufSize) @@ -220,7 +201,7 @@ Foam::label Foam::UIPstream::read PstreamGlobals::push_request(request, req); profilingPstream::addRequestTime(); - // Assume the message is completely received. + // Assume the message will be completely received. return bufSize; } @@ -232,4 +213,114 @@ Foam::label Foam::UIPstream::read } +// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // + +void Foam::UIPstream::bufferIPCrecv() +{ + // Called by constructor + if (UPstream::debug) + { + Pout<< "UIPstream IPC read buffer :" + << " from:" << fromProcNo_ + << " tag:" << tag_ << " comm:" << comm_ + << " wanted size:" << recvBuf_.capacity() + << Foam::endl; + } + +#if defined(MPI_VERSION) && (MPI_VERSION >= 3) + // MPI-3 : eg, openmpi-1.7 (2013) and later + MPI_Message message; + MPI_Message* messagePtr = nullptr; +#endif + + // No buffer size allocated/specified - probe size of incoming message + if (!recvBuf_.capacity()) + { + profilingPstream::beginTiming(); + + MPI_Status status; + +#if defined(MPI_VERSION) && (MPI_VERSION >= 3) + // MPI-3 : eg, openmpi-1.7 (2013) and later + messagePtr = &message; + MPI_Mprobe + ( + fromProcNo_, + tag_, + PstreamGlobals::MPICommunicators_[comm_], + &message, + &status + ); +#else + MPI_Probe + ( + fromProcNo_, + tag_, + PstreamGlobals::MPICommunicators_[comm_], + &status + ); +#endif + MPI_Get_count(&status, MPI_BYTE, &messageSize_); + + profilingPstream::addProbeTime(); + + recvBuf_.resize(messageSize_); + + if (UPstream::debug) + { + Pout<< "UIPstream::UIPstream : probed size:" + << messageSize_ << Foam::endl; + } + } + + messageSize_ = UPstream_mpi_receive + ( + commsType(), + recvBuf_.data(), + recvBuf_.capacity(), + fromProcNo_, + tag_, + comm_, + nullptr // UPstream::Request +#if defined(MPI_VERSION) && (MPI_VERSION >= 3) + , messagePtr +#endif + ); + + // Set addressed size. Leave actual allocated memory intact. + recvBuf_.resize(messageSize_); + + if (!messageSize_) + { + setEof(); + } +} + + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +Foam::label Foam::UIPstream::read +( + const UPstream::commsTypes commsType, + const int fromProcNo, + char* buf, + const std::streamsize bufSize, + const int tag, + const label communicator, + UPstream::Request* req +) +{ + return UPstream_mpi_receive + ( + commsType, + buf, + bufSize, + fromProcNo, + tag, + communicator, + req + ); +} + + // ************************************************************************* //