diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index 820ef017d7..395ac02160 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -523,6 +523,38 @@ public: // A no-op if parRun() == false, or the list is empty. static void waitRequests(UList& requests); + //- Wait until any request (from position onwards) has finished. + //- Corresponds to MPI_Waitany() + // A no-op and returns false if parRun() == false, + // if the position is out-of-range [0 to nRequests()], + // or the internal list of requests is empty. + // + // \returns true if any pending request completed. + // \returns false if all requests have already been handled. + // + // \param pos starting position within the internal list of requests + static bool waitAnyRequest(const label pos); + + //- Wait until some requests (from position onwards) have finished. + //- Corresponds to MPI_Waitsome() + // A no-op and returns false if parRun() == false, + // if the position is out-of-range [0 to nRequests], + // or the internal list of requests is empty. + // + // \returns true if some pending requests completed. + // \returns false if all requests have already been handled + // + // \param pos starting position within the internal list of requests + // \param[out] indices the completed request indices relative to the + // starting position. The is an optional parameter, which can be + // used to recover the indices or simply to avoid reallocations + // when calling within a loop. + static bool waitSomeRequests + ( + const label pos, + DynamicList* indices = nullptr + ); + //- Wait until any request has finished and return its index. //- Corresponds to MPI_Waitany() // Returns -1 if parRun() == false, or the list is empty, @@ -532,7 +564,7 @@ public: //- Wait until request i has finished. //- Corresponds to MPI_Wait() // A no-op if parRun() == false, - // there are no pending requests, + // if there are no pending requests, // or if the index is out-of-range (0 to nRequests) static void waitRequest(const label i); @@ -544,7 +576,7 @@ public: //- Non-blocking comms: has request i finished? //- Corresponds to MPI_Test() // A no-op and returns true if parRun() == false, - // there are no pending requests, + // if there are no pending requests, // or if the index is out-of-range (0 to nRequests) static bool finishedRequest(const label i); @@ -559,6 +591,22 @@ public: // A no-op and returns true if parRun() == false or list is empty static bool finishedRequests(UList& requests); + //- Non-blocking comms: have both requests finished? + //- Corresponds to pair of MPI_Test() + // A no-op and returns true if parRun() == false, + // if there are no pending requests, + // or if the indices are out-of-range (0 to nRequests) + // Each finished request parameter is set to -1 (ie, done). + static bool finishedRequestPair(label& req1, label& req2); + + //- Non-blocking comms: wait for both requests to finish. + //- Corresponds to pair of MPI_Wait() + // A no-op if parRun() == false, + // if there are no pending requests, + // or if the indices are out-of-range (0 to nRequests) + // Each finished request parameter is set to -1 (ie, done). + static void waitRequestPair(label& req1, label& req2); + // General diff --git a/src/Pstream/dummy/UPstreamRequest.C b/src/Pstream/dummy/UPstreamRequest.C index b158f9e8b7..74fbe523e1 100644 --- a/src/Pstream/dummy/UPstreamRequest.C +++ b/src/Pstream/dummy/UPstreamRequest.C @@ -56,6 +56,21 @@ void Foam::UPstream::resetRequests(const label n) {} void Foam::UPstream::waitRequests(const label pos) {} void Foam::UPstream::waitRequests(UList&) {} +bool Foam::UPstream::waitAnyRequest(const label pos) +{ + return false; +} + +bool Foam::UPstream::waitSomeRequests +( + const label pos, + DynamicList* indices +) +{ + if (indices) indices->clear(); + return false; +} + Foam::label Foam::UPstream::waitAnyRequest(UList&) { return -1; @@ -66,10 +81,26 @@ void Foam::UPstream::waitRequest(UPstream::Request&) {} bool Foam::UPstream::finishedRequest(const label i) { return true; } bool Foam::UPstream::finishedRequest(UPstream::Request&) { return true; } + bool Foam::UPstream::finishedRequests(UList&) { return true; } +bool Foam::UPstream::finishedRequestPair(label& req1, label& req2) +{ + req1 = -1; + req2 = -1; + return true; +} + + +void Foam::UPstream::waitRequestPair(label& req1, label& req2) +{ + req1 = -1; + req2 = -1; +} + + // ************************************************************************* // diff --git a/src/Pstream/mpi/UPstreamRequest.C b/src/Pstream/mpi/UPstreamRequest.C index e719bf56d3..5bb10904e0 100644 --- a/src/Pstream/mpi/UPstreamRequest.C +++ b/src/Pstream/mpi/UPstreamRequest.C @@ -170,6 +170,156 @@ void Foam::UPstream::waitRequests(UList& requests) } +bool Foam::UPstream::waitAnyRequest(const label pos) +{ + // No-op for non-parallel, no pending requests or out-of-range + if + ( + !UPstream::parRun() + || pos < 0 + || pos >= PstreamGlobals::outstandingRequests_.size() + /// || !len + ) + { + return false; + } + + label count = (PstreamGlobals::outstandingRequests_.size() - pos); + + /// // Treat len < 0 like npos (ie, the rest of the list) but also + /// // apply range checking to avoid bad slices + /// + /// if (len >= 0 && len < count) + /// { + /// count = len; + /// } + + auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos); + + if (UPstream::debug) + { + Pout<< "UPstream::waitAnyRequest : starting wait for some of " + << count << " requests starting at " << pos << endl; + } + + profilingPstream::beginTiming(); + + // On success: sets request to MPI_REQUEST_NULL + int index = MPI_UNDEFINED; + if (MPI_Waitany(count, waitRequests, &index, MPI_STATUS_IGNORE)) + { + FatalErrorInFunction + << "MPI_Waitany returned with error" + << Foam::abort(FatalError); + } + + profilingPstream::addWaitTime(); + + if (index == MPI_UNDEFINED) + { + // No active request handles + return false; + } + + return true; +} + + +bool Foam::UPstream::waitSomeRequests +( + const label pos, + DynamicList* indices +) +{ + // No-op for non-parallel, no pending requests or out-of-range + if + ( + !UPstream::parRun() + || pos < 0 + || pos >= PstreamGlobals::outstandingRequests_.size() + /// || !len + ) + { + if (indices) + { + indices->clear(); + } + return false; + } + + label count = (PstreamGlobals::outstandingRequests_.size() - pos); + + /// // Treat len < 0 like npos (ie, the rest of the list) but also + /// // apply range checking to avoid bad slices + /// + /// if (len >= 0 && len < count) + /// { + /// count = len; + /// } + + auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos); + + if (UPstream::debug) + { + Pout<< "UPstream:waitSomeRequest : starting wait for any of " + << count << " requests starting at " << pos << endl; + } + + + // Local temporary storage, or return via calling parameter + List tmpIndices; + + if (indices) + { + indices->resize_nocopy(count); + } + else + { + tmpIndices.resize(count); + } + + profilingPstream::beginTiming(); + + // On success: sets non-blocking requests to MPI_REQUEST_NULL + int outcount = 0; + if + ( + MPI_Waitsome + ( + count, + waitRequests, + &outcount, + (indices ? indices->data() : tmpIndices.data()), + MPI_STATUSES_IGNORE + ) + ) + { + FatalErrorInFunction + << "MPI_Waitsome returned with error" + << Foam::abort(FatalError); + } + + profilingPstream::addWaitTime(); + + if (outcount == MPI_UNDEFINED || outcount < 1) + { + // No active request handles + if (indices) + { + indices->clear(); + } + return false; + } + + if (indices) + { + indices->resize(outcount); + } + + return true; +} + + Foam::label Foam::UPstream::waitAnyRequest(UList& requests) { // No-op for non-parallel or no pending requests @@ -196,7 +346,7 @@ Foam::label Foam::UPstream::waitAnyRequest(UList& requests) profilingPstream::beginTiming(); // On success: sets request to MPI_REQUEST_NULL - int index = -1; + int index = MPI_UNDEFINED; if (MPI_Waitany(count, waitRequests, &index, MPI_STATUS_IGNORE)) { FatalErrorInFunction @@ -376,7 +526,7 @@ bool Foam::UPstream::finishedRequest(const label i) auto& request = PstreamGlobals::outstandingRequests_[i]; - // No-op for null request + // Fast-path (no-op) for null request if (MPI_REQUEST_NULL == request) { return true; @@ -412,7 +562,7 @@ bool Foam::UPstream::finishedRequest(UPstream::Request& req) MPI_Request request = PstreamDetail::Request::get(req); - // No-op for null request + // Fast-path (no-op) for null request if (MPI_REQUEST_NULL == request) { return true; @@ -498,4 +648,192 @@ bool Foam::UPstream::finishedRequests(UList& requests) } +bool Foam::UPstream::finishedRequestPair(label& req1, label& req2) +{ + // No-op for non-parallel + if (!UPstream::parRun()) + { + req1 = -1; + req2 = -1; + return true; + } + + int count = 0; + MPI_Request waitRequests[2]; + + // In range? + if (req1 >= 0 && req1 < PstreamGlobals::outstandingRequests_.size()) + { + waitRequests[0] = PstreamGlobals::outstandingRequests_[req1]; + ++count; + } + else + { + waitRequests[0] = MPI_REQUEST_NULL; + req1 = -1; + } + + // No-op for non-parallel, or out-of-range (eg, placeholder indices) + if (req2 >= 0 && req2 < PstreamGlobals::outstandingRequests_.size()) + { + waitRequests[1] = PstreamGlobals::outstandingRequests_[req2]; + ++count; + } + else + { + waitRequests[1] = MPI_REQUEST_NULL; + req2 = -1; + } + + if (!count) + { + return true; + } + + profilingPstream::beginTiming(); + + // On success: sets each request to MPI_REQUEST_NULL + int indices[2]; + int outcount = 0; + if + ( + MPI_Testsome + ( + 2, + waitRequests, + &outcount, + indices, + MPI_STATUSES_IGNORE + ) + ) + { + FatalErrorInFunction + << "MPI_Testsome returned with error" + << Foam::abort(FatalError); + } + + profilingPstream::addWaitTime(); + + if (outcount == MPI_UNDEFINED) + { + // No active request handles. + // Slight pedantic, but copy back requests in case they were altered + + if (req1 >= 0) + { + PstreamGlobals:: outstandingRequests_[req1] = waitRequests[0]; + } + + if (req2 >= 0) + { + PstreamGlobals:: outstandingRequests_[req2] = waitRequests[1]; + } + + // Flag indices as 'done' + req1 = -1; + req2 = -1; + return true; + } + + // Copy back requests to their 'stack' locations + for (int i = 0; i < outcount; ++i) + { + int reqid = indices[i]; + + if (reqid == 0) + { + if (req1 >= 0) + { + PstreamGlobals:: outstandingRequests_[req1] = waitRequests[0]; + req1 = -1; + } + } + if (reqid == 1) + { + if (req2 >= 0) + { + PstreamGlobals:: outstandingRequests_[req2] = waitRequests[1]; + req2 = -1; + } + } + } + + return (outcount > 0); +} + + +void Foam::UPstream::waitRequestPair(label& req1, label& req2) +{ + // No-op for non-parallel. Flag indices as 'done' + if (!UPstream::parRun()) + { + req1 = -1; + req2 = -1; + return; + } + + int count = 0; + MPI_Request waitRequests[2]; + + // No-op for non-parallel, or out-of-range (eg, placeholder indices) + if (req1 >= 0 && req1 < PstreamGlobals::outstandingRequests_.size()) + { + waitRequests[0] = PstreamGlobals::outstandingRequests_[req1]; + ++count; + } + else + { + waitRequests[0] = MPI_REQUEST_NULL; + req1 = -1; // Flag as 'done' + } + + // No-op for non-parallel, or out-of-range (eg, placeholder indices) + if (req2 >= 0 && req2 < PstreamGlobals::outstandingRequests_.size()) + { + waitRequests[1] = PstreamGlobals::outstandingRequests_[req2]; + ++count; + } + else + { + waitRequests[1] = MPI_REQUEST_NULL; + req2 = -1; // Flag as 'done' + } + + // Early exit + if (!count) + { + return; + } + + profilingPstream::beginTiming(); + + // On success: sets each request to MPI_REQUEST_NULL + if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE)) + { + FatalErrorInFunction + << "MPI_Waitall returned with error" + << Foam::abort(FatalError); + } + + profilingPstream::addWaitTime(); + + // Copy back requests to their 'stack' locations + // and flag index as done + + if (req1 >= 0) + { + PstreamGlobals::outstandingRequests_[req1] = waitRequests[0]; + } + + if (req2 >= 0) + { + PstreamGlobals::outstandingRequests_[req2] = waitRequests[1]; + } + + // Flag indices as 'done' + req1 = -1; + req2 = -1; +} + + // ************************************************************************* //