ENH: MPI_Waitany / MPI_Waitsome interface for internal list of requests

- checks requests from completion, returning true when some requests
  have completed and false when there are no active requests.
  This allows it to be used in a polling loop to progress MPI
  and then respond when as requests become satisfied.

  When using as part of a dispatch loop, waitSomeRequests() is
  probably more efficient than calling waitAnyRequest() and can help
  avoid biasing which client requests are serviced.

  Takes an optional return parameter, to retrieve the indices,
  but more importantly to avoid inner-loop reallocations.

  Example,

      DynamicList<int> indices;
      while (UPstream::waitSomeRequests(startRequest, &indices))
      {
          // Dispatch something ....
      }

      // Reset list of outstanding requests with 'Waitall' for safety
      UPstream::waitRequests(startRequest);

  ---

  If only dealing with single items and an index is required for
  dispatching, it can be better to use a list of UPstream::Request
  instead.

  Example,

      List<UPstream::Request> requests = ...;

      label index = -1;
      while ((index = UPstream::waitAnyRequest(requests)) >= 0)
      {
          // Do something at index
      }

ENH: pair-wise wrappers for MPI_Test or MPI_Wait

- for send/recv pairs of requests, can bundle both together and use a
  single MPI_Testsome and MPI_Waitall instead of two individual
  calls.
This commit is contained in:
Mark Olesen
2023-04-06 14:35:00 +02:00
parent aa002122c2
commit 11065bb029
3 changed files with 422 additions and 5 deletions

View File

@ -523,6 +523,38 @@ public:
// A no-op if parRun() == false, or the list is empty.
static void waitRequests(UList<UPstream::Request>& requests);
//- Wait until any request (from position onwards) has finished.
//- Corresponds to MPI_Waitany()
// A no-op and returns false if parRun() == false,
// if the position is out-of-range [0 to nRequests()],
// or the internal list of requests is empty.
//
// \returns true if any pending request completed.
// \returns false if all requests have already been handled.
//
// \param pos starting position within the internal list of requests
static bool waitAnyRequest(const label pos);
//- Wait until some requests (from position onwards) have finished.
//- Corresponds to MPI_Waitsome()
// A no-op and returns false if parRun() == false,
// if the position is out-of-range [0 to nRequests],
// or the internal list of requests is empty.
//
// \returns true if some pending requests completed.
// \returns false if all requests have already been handled
//
// \param pos starting position within the internal list of requests
// \param[out] indices the completed request indices relative to the
// starting position. The is an optional parameter, which can be
// used to recover the indices or simply to avoid reallocations
// when calling within a loop.
static bool waitSomeRequests
(
const label pos,
DynamicList<int>* indices = nullptr
);
//- Wait until any request has finished and return its index.
//- Corresponds to MPI_Waitany()
// Returns -1 if parRun() == false, or the list is empty,
@ -532,7 +564,7 @@ public:
//- Wait until request i has finished.
//- Corresponds to MPI_Wait()
// A no-op if parRun() == false,
// there are no pending requests,
// if there are no pending requests,
// or if the index is out-of-range (0 to nRequests)
static void waitRequest(const label i);
@ -544,7 +576,7 @@ public:
//- Non-blocking comms: has request i finished?
//- Corresponds to MPI_Test()
// A no-op and returns true if parRun() == false,
// there are no pending requests,
// if there are no pending requests,
// or if the index is out-of-range (0 to nRequests)
static bool finishedRequest(const label i);
@ -559,6 +591,22 @@ public:
// A no-op and returns true if parRun() == false or list is empty
static bool finishedRequests(UList<UPstream::Request>& requests);
//- Non-blocking comms: have both requests finished?
//- Corresponds to pair of MPI_Test()
// A no-op and returns true if parRun() == false,
// if there are no pending requests,
// or if the indices are out-of-range (0 to nRequests)
// Each finished request parameter is set to -1 (ie, done).
static bool finishedRequestPair(label& req1, label& req2);
//- Non-blocking comms: wait for both requests to finish.
//- Corresponds to pair of MPI_Wait()
// A no-op if parRun() == false,
// if there are no pending requests,
// or if the indices are out-of-range (0 to nRequests)
// Each finished request parameter is set to -1 (ie, done).
static void waitRequestPair(label& req1, label& req2);
// General

View File

@ -56,6 +56,21 @@ void Foam::UPstream::resetRequests(const label n) {}
void Foam::UPstream::waitRequests(const label pos) {}
void Foam::UPstream::waitRequests(UList<UPstream::Request>&) {}
bool Foam::UPstream::waitAnyRequest(const label pos)
{
return false;
}
bool Foam::UPstream::waitSomeRequests
(
const label pos,
DynamicList<int>* indices
)
{
if (indices) indices->clear();
return false;
}
Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>&)
{
return -1;
@ -66,10 +81,26 @@ void Foam::UPstream::waitRequest(UPstream::Request&) {}
bool Foam::UPstream::finishedRequest(const label i) { return true; }
bool Foam::UPstream::finishedRequest(UPstream::Request&) { return true; }
bool Foam::UPstream::finishedRequests(UList<UPstream::Request>&)
{
return true;
}
bool Foam::UPstream::finishedRequestPair(label& req1, label& req2)
{
req1 = -1;
req2 = -1;
return true;
}
void Foam::UPstream::waitRequestPair(label& req1, label& req2)
{
req1 = -1;
req2 = -1;
}
// ************************************************************************* //

View File

@ -170,6 +170,156 @@ void Foam::UPstream::waitRequests(UList<UPstream::Request>& requests)
}
bool Foam::UPstream::waitAnyRequest(const label pos)
{
// No-op for non-parallel, no pending requests or out-of-range
if
(
!UPstream::parRun()
|| pos < 0
|| pos >= PstreamGlobals::outstandingRequests_.size()
/// || !len
)
{
return false;
}
label count = (PstreamGlobals::outstandingRequests_.size() - pos);
/// // Treat len < 0 like npos (ie, the rest of the list) but also
/// // apply range checking to avoid bad slices
///
/// if (len >= 0 && len < count)
/// {
/// count = len;
/// }
auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos);
if (UPstream::debug)
{
Pout<< "UPstream::waitAnyRequest : starting wait for some of "
<< count << " requests starting at " << pos << endl;
}
profilingPstream::beginTiming();
// On success: sets request to MPI_REQUEST_NULL
int index = MPI_UNDEFINED;
if (MPI_Waitany(count, waitRequests, &index, MPI_STATUS_IGNORE))
{
FatalErrorInFunction
<< "MPI_Waitany returned with error"
<< Foam::abort(FatalError);
}
profilingPstream::addWaitTime();
if (index == MPI_UNDEFINED)
{
// No active request handles
return false;
}
return true;
}
bool Foam::UPstream::waitSomeRequests
(
const label pos,
DynamicList<int>* indices
)
{
// No-op for non-parallel, no pending requests or out-of-range
if
(
!UPstream::parRun()
|| pos < 0
|| pos >= PstreamGlobals::outstandingRequests_.size()
/// || !len
)
{
if (indices)
{
indices->clear();
}
return false;
}
label count = (PstreamGlobals::outstandingRequests_.size() - pos);
/// // Treat len < 0 like npos (ie, the rest of the list) but also
/// // apply range checking to avoid bad slices
///
/// if (len >= 0 && len < count)
/// {
/// count = len;
/// }
auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos);
if (UPstream::debug)
{
Pout<< "UPstream:waitSomeRequest : starting wait for any of "
<< count << " requests starting at " << pos << endl;
}
// Local temporary storage, or return via calling parameter
List<int> tmpIndices;
if (indices)
{
indices->resize_nocopy(count);
}
else
{
tmpIndices.resize(count);
}
profilingPstream::beginTiming();
// On success: sets non-blocking requests to MPI_REQUEST_NULL
int outcount = 0;
if
(
MPI_Waitsome
(
count,
waitRequests,
&outcount,
(indices ? indices->data() : tmpIndices.data()),
MPI_STATUSES_IGNORE
)
)
{
FatalErrorInFunction
<< "MPI_Waitsome returned with error"
<< Foam::abort(FatalError);
}
profilingPstream::addWaitTime();
if (outcount == MPI_UNDEFINED || outcount < 1)
{
// No active request handles
if (indices)
{
indices->clear();
}
return false;
}
if (indices)
{
indices->resize(outcount);
}
return true;
}
Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
{
// No-op for non-parallel or no pending requests
@ -196,7 +346,7 @@ Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
profilingPstream::beginTiming();
// On success: sets request to MPI_REQUEST_NULL
int index = -1;
int index = MPI_UNDEFINED;
if (MPI_Waitany(count, waitRequests, &index, MPI_STATUS_IGNORE))
{
FatalErrorInFunction
@ -376,7 +526,7 @@ bool Foam::UPstream::finishedRequest(const label i)
auto& request = PstreamGlobals::outstandingRequests_[i];
// No-op for null request
// Fast-path (no-op) for null request
if (MPI_REQUEST_NULL == request)
{
return true;
@ -412,7 +562,7 @@ bool Foam::UPstream::finishedRequest(UPstream::Request& req)
MPI_Request request = PstreamDetail::Request::get(req);
// No-op for null request
// Fast-path (no-op) for null request
if (MPI_REQUEST_NULL == request)
{
return true;
@ -498,4 +648,192 @@ bool Foam::UPstream::finishedRequests(UList<UPstream::Request>& requests)
}
bool Foam::UPstream::finishedRequestPair(label& req1, label& req2)
{
// No-op for non-parallel
if (!UPstream::parRun())
{
req1 = -1;
req2 = -1;
return true;
}
int count = 0;
MPI_Request waitRequests[2];
// In range?
if (req1 >= 0 && req1 < PstreamGlobals::outstandingRequests_.size())
{
waitRequests[0] = PstreamGlobals::outstandingRequests_[req1];
++count;
}
else
{
waitRequests[0] = MPI_REQUEST_NULL;
req1 = -1;
}
// No-op for non-parallel, or out-of-range (eg, placeholder indices)
if (req2 >= 0 && req2 < PstreamGlobals::outstandingRequests_.size())
{
waitRequests[1] = PstreamGlobals::outstandingRequests_[req2];
++count;
}
else
{
waitRequests[1] = MPI_REQUEST_NULL;
req2 = -1;
}
if (!count)
{
return true;
}
profilingPstream::beginTiming();
// On success: sets each request to MPI_REQUEST_NULL
int indices[2];
int outcount = 0;
if
(
MPI_Testsome
(
2,
waitRequests,
&outcount,
indices,
MPI_STATUSES_IGNORE
)
)
{
FatalErrorInFunction
<< "MPI_Testsome returned with error"
<< Foam::abort(FatalError);
}
profilingPstream::addWaitTime();
if (outcount == MPI_UNDEFINED)
{
// No active request handles.
// Slight pedantic, but copy back requests in case they were altered
if (req1 >= 0)
{
PstreamGlobals:: outstandingRequests_[req1] = waitRequests[0];
}
if (req2 >= 0)
{
PstreamGlobals:: outstandingRequests_[req2] = waitRequests[1];
}
// Flag indices as 'done'
req1 = -1;
req2 = -1;
return true;
}
// Copy back requests to their 'stack' locations
for (int i = 0; i < outcount; ++i)
{
int reqid = indices[i];
if (reqid == 0)
{
if (req1 >= 0)
{
PstreamGlobals:: outstandingRequests_[req1] = waitRequests[0];
req1 = -1;
}
}
if (reqid == 1)
{
if (req2 >= 0)
{
PstreamGlobals:: outstandingRequests_[req2] = waitRequests[1];
req2 = -1;
}
}
}
return (outcount > 0);
}
void Foam::UPstream::waitRequestPair(label& req1, label& req2)
{
// No-op for non-parallel. Flag indices as 'done'
if (!UPstream::parRun())
{
req1 = -1;
req2 = -1;
return;
}
int count = 0;
MPI_Request waitRequests[2];
// No-op for non-parallel, or out-of-range (eg, placeholder indices)
if (req1 >= 0 && req1 < PstreamGlobals::outstandingRequests_.size())
{
waitRequests[0] = PstreamGlobals::outstandingRequests_[req1];
++count;
}
else
{
waitRequests[0] = MPI_REQUEST_NULL;
req1 = -1; // Flag as 'done'
}
// No-op for non-parallel, or out-of-range (eg, placeholder indices)
if (req2 >= 0 && req2 < PstreamGlobals::outstandingRequests_.size())
{
waitRequests[1] = PstreamGlobals::outstandingRequests_[req2];
++count;
}
else
{
waitRequests[1] = MPI_REQUEST_NULL;
req2 = -1; // Flag as 'done'
}
// Early exit
if (!count)
{
return;
}
profilingPstream::beginTiming();
// On success: sets each request to MPI_REQUEST_NULL
if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE))
{
FatalErrorInFunction
<< "MPI_Waitall returned with error"
<< Foam::abort(FatalError);
}
profilingPstream::addWaitTime();
// Copy back requests to their 'stack' locations
// and flag index as done
if (req1 >= 0)
{
PstreamGlobals::outstandingRequests_[req1] = waitRequests[0];
}
if (req2 >= 0)
{
PstreamGlobals::outstandingRequests_[req2] = waitRequests[1];
}
// Flag indices as 'done'
req1 = -1;
req2 = -1;
}
// ************************************************************************* //