ENH: use Improbe/Mrecv for NBX size exchange

- retains the queried message without secondary polling.
  Purported to be slightly faster. Better thread-safety.
This commit is contained in:
Mark Olesen
2023-05-08 09:13:25 +02:00
parent 50f4d0444c
commit 2eb1f5678d
2 changed files with 81 additions and 59 deletions

View File

@ -131,14 +131,16 @@ int main(int argc, char *argv[])
for (bool barrier_active = false, done = false; !done; /*nil*/)
{
int flag = 0;
MPI_Message message;
MPI_Status status;
MPI_Iprobe
MPI_Improbe
(
MPI_ANY_SOURCE,
tag,
MPI_COMM_WORLD,
&flag,
&message,
&status
);
@ -153,14 +155,12 @@ int main(int argc, char *argv[])
auto& buf = recvBufs(fromProci);
buf.resize_nocopy(count);
MPI_Irecv
MPI_Imrecv
(
buf.data_bytes(),
buf.size_bytes(),
MPI_BYTE,
fromProci,
tag,
MPI_COMM_WORLD,
&message,
&recvRequests.emplace_back()
);
}

View File

@ -157,15 +157,12 @@ void Foam::PstreamDetail::allReduce
error::printStack(Pout);
}
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
if (immediate)
{
// MPI-3 : eg, openmpi-1.7 (2013) and later
profilingPstream::beginTiming();
handled = true;
MPI_Request request;
if
@ -192,9 +189,8 @@ void Foam::PstreamDetail::allReduce
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
else
#endif
if (!handled)
{
profilingPstream::beginTiming();
@ -281,15 +277,12 @@ void Foam::PstreamDetail::allToAll
return;
}
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
if (immediate)
{
// MPI-3 : eg, openmpi-1.7 (2013) and later
profilingPstream::beginTiming();
handled = true;
MPI_Request request;
if
@ -318,9 +311,8 @@ void Foam::PstreamDetail::allToAll
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
else
#endif
if (!handled)
{
profilingPstream::beginTiming();
@ -432,15 +424,12 @@ void Foam::PstreamDetail::allToAllv
return;
}
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
if (immediate)
{
// MPI-3 : eg, openmpi-1.7 (2013) and later
profilingPstream::beginTiming();
handled = true;
MPI_Request request;
if
@ -470,9 +459,8 @@ void Foam::PstreamDetail::allToAllv
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
else
#endif
if (!handled)
{
profilingPstream::beginTiming();
@ -621,6 +609,19 @@ void Foam::PstreamDetail::allToAllConsensus
int flag = 0;
MPI_Status status;
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
MPI_Message message;
MPI_Improbe
(
MPI_ANY_SOURCE,
tag,
PstreamGlobals::MPICommunicators_[comm],
&flag,
&message,
&status
);
#else
MPI_Iprobe
(
MPI_ANY_SOURCE,
@ -629,6 +630,7 @@ void Foam::PstreamDetail::allToAllConsensus
&flag,
&status
);
#endif
if (flag)
{
@ -641,12 +643,24 @@ void Foam::PstreamDetail::allToAllConsensus
if (count != 1)
{
FatalErrorInFunction
<< "Incorrect message size. Expected 1 but had "
<< count << nl
<< "Incorrect message size from proc=" << proci
<< ". Expected 1 but had " << count << nl
<< exit(FatalError);
}
// Regular receive (the data are small)
// Regular blocking receive [the data are small]
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
MPI_Mrecv
(
&recvData[proci],
count, // count=1 (see above)
datatype,
&message,
MPI_STATUS_IGNORE
);
#else
MPI_Recv
(
&recvData[proci],
@ -657,6 +671,7 @@ void Foam::PstreamDetail::allToAllConsensus
PstreamGlobals::MPICommunicators_[comm],
MPI_STATUS_IGNORE
);
#endif
}
if (barrier_active)
@ -800,6 +815,19 @@ void Foam::PstreamDetail::allToAllConsensus
int flag = 0;
MPI_Status status;
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
MPI_Message message;
MPI_Improbe
(
MPI_ANY_SOURCE,
tag,
PstreamGlobals::MPICommunicators_[comm],
&flag,
&message,
&status
);
#else
MPI_Iprobe
(
MPI_ANY_SOURCE,
@ -808,6 +836,7 @@ void Foam::PstreamDetail::allToAllConsensus
&flag,
&status
);
#endif
if (flag)
{
@ -821,14 +850,26 @@ void Foam::PstreamDetail::allToAllConsensus
if (count != 1)
{
FatalErrorInFunction
<< "Incorrect message size. Expected 1 but had "
<< count << nl
<< "Incorrect message size from proc=" << proci
<< ". Expected 1 but had " << count << nl
<< exit(FatalError);
}
auto& recvData = recvBufs(proci);
// Regular receive [the data are small]
// Regular blocking receive [the data are small]
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
MPI_Mrecv
(
&recvData,
count, // count=1 (see above)
datatype,
&message,
MPI_STATUS_IGNORE
);
#else
MPI_Recv
(
&recvData,
@ -839,6 +880,7 @@ void Foam::PstreamDetail::allToAllConsensus
PstreamGlobals::MPICommunicators_[comm],
MPI_STATUS_IGNORE
);
#endif
}
if (barrier_active)
@ -928,15 +970,12 @@ void Foam::PstreamDetail::gather
error::printStack(Pout);
}
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
if (immediate)
{
// MPI-3 : eg, openmpi-1.7 (2013) and later
profilingPstream::beginTiming();
handled = true;
MPI_Request request;
if
@ -965,9 +1004,8 @@ void Foam::PstreamDetail::gather
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
else
#endif
if (!handled)
{
profilingPstream::beginTiming();
@ -1048,15 +1086,12 @@ void Foam::PstreamDetail::scatter
error::printStack(Pout);
}
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
if (immediate)
{
// MPI-3 : eg, openmpi-1.7 (2013) and later
profilingPstream::beginTiming();
handled = true;
MPI_Request request;
if
@ -1085,9 +1120,8 @@ void Foam::PstreamDetail::scatter
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
else
#endif
if (!handled)
{
profilingPstream::beginTiming();
@ -1193,15 +1227,12 @@ void Foam::PstreamDetail::gatherv
sendCount = 0;
}
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
if (immediate)
{
// MPI-3 : eg, openmpi-1.7 (2013) and later
profilingPstream::beginTiming();
handled = true;
MPI_Request request;
if
@ -1231,9 +1262,8 @@ void Foam::PstreamDetail::gatherv
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
else
#endif
if (!handled)
{
profilingPstream::beginTiming();
@ -1333,15 +1363,12 @@ void Foam::PstreamDetail::scatterv
<< Foam::abort(FatalError);
}
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
if (immediate)
{
// MPI-3 : eg, openmpi-1.7 (2013) and later
profilingPstream::beginTiming();
handled = true;
MPI_Request request;
if
@ -1371,9 +1398,8 @@ void Foam::PstreamDetail::scatterv
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
else
#endif
if (!handled)
{
profilingPstream::beginTiming();
@ -1445,15 +1471,12 @@ void Foam::PstreamDetail::allGather
error::printStack(Pout);
}
bool handled(false);
#if defined(MPI_VERSION) && (MPI_VERSION >= 3)
// MPI-3 : eg, openmpi-1.7 (2013) and later
if (immediate)
{
// MPI-3 : eg, openmpi-1.7 (2013) and later
profilingPstream::beginTiming();
handled = true;
MPI_Request request;
if
@ -1475,9 +1498,8 @@ void Foam::PstreamDetail::allGather
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
else
#endif
if (!handled)
{
profilingPstream::beginTiming();