ENH: simplify handling of pushed/freed requests

- previously had an additional stack for freedRequests_,
  which were used to 'remember' locations into the list of
  outstandingRequests_ that were handled by 'waitRequest()'.

  This was principally done for sanity checks on shutdown,
  but we now just test for any outstanding requests that
  are *not* MPI_REQUEST_NULL instead (much simpler).

  The framework with freedRequests_ also had a provision to 'recycle'
  them by popping from that stack, but this is rather fragile since it
  would only triggered by some collectives
  (MPI_Iallreduce, MPI_Ialltoall, MPI_Igather, MPI_Iscatter)
  with no guarantee that these will all be properly removed again.
  There was also no pruning of extraneous indices.

ENH: consolidate internal reset/push of requests

- replace duplicate code with inline functions
  reset_request(), push_request()

ENH: null out trailing requests

- extra safety (paranoia) for the UPstream::Request versions
  of finishedRequests(), waitAnyRequest()

CONFIG: document nPollProcInterfaces in etc/controlDict

- still experimental, but at least make the keyword known
This commit is contained in:
Mark Olesen
2023-04-06 14:35:00 +02:00
parent 700156b4c5
commit aa002122c2
9 changed files with 149 additions and 209 deletions

View File

@ -1,7 +1,7 @@
/*--------------------------------*- C++ -*----------------------------------*\
| ========= | |
| \\ / F ield | OpenFOAM: The Open Source CFD Toolbox |
| \\ / O peration | Version: v2212 |
| \\ / O peration | Version: v2306 |
| \\ / A nd | Website: www.openfoam.com |
| \\/ M anipulation | |
\*---------------------------------------------------------------------------*/
@ -146,6 +146,10 @@ OptimisationSwitches
// global reduction, even if multi-pass is not needed)
maxCommsSize 0;
// Optional (quite experimental) feature in lduMatrixUpdate
// to poll (processor) interfaces for individual readiness
// instead of waiting for all to complete first.
nPollProcInterfaces 0;
// Trap floating point exception.
// Can override with FOAM_SIGFPE env variable (true|false)

View File

@ -499,56 +499,69 @@ public:
);
// Non-blocking comms
// Requests (non-blocking comms)
//- Number of outstanding requests
//- Number of outstanding requests (on the internal list of requests)
static label nRequests() noexcept;
//- Truncate outstanding requests to given length, which is
//- expected to be in the range 0 to nRequests.
//- expected to be in the range [0 to nRequests()].
// A no-op for out-of-range values.
static void resetRequests(const label n);
//- Wait until all requests (from position onwards) have finished.
// A no-op if parRun() == false, if there are no pending requests
// or if the start is out-of-range (0 to nRequests)
//- Corresponds to MPI_Waitall()
// A no-op if parRun() == false,
// if the position is out-of-range [0 to nRequests()],
// or the internal list of requests is empty.
//
// \param pos starting position within the internal list of requests
static void waitRequests(const label pos);
//- Wait until all requests have finished.
// A no-op if parRun() == false or the list is empty
//- Corresponds to MPI_Waitall()
// A no-op if parRun() == false, or the list is empty.
static void waitRequests(UList<UPstream::Request>& requests);
//- Wait until any request has finished and return its index.
//- Corresponds to MPI_Waitany()
// Returns -1 if parRun() == false, or the list is empty,
// or if all the requests have already been handled
static label waitAnyRequest(UList<UPstream::Request>& requests);
//- Wait until request i has finished.
//- Corresponds to MPI_Wait()
// A no-op if parRun() == false,
// there are no pending requests,
// or if the index is out-of-range (0 to nRequests)
static void waitRequest(const label i);
//- Wait until specified request has finished.
//- Corresponds to MPI_Wait()
// A no-op if parRun() == false or for a null-request
static void waitRequest(UPstream::Request& req);
//- Non-blocking comms: has request i finished?
//- Corresponds to MPI_Test()
// A no-op and returns true if parRun() == false,
// there are no pending requests,
// or if the index is out-of-range (0 to nRequests)
static bool finishedRequest(const label i);
//- Non-blocking comms: has request finished?
//- Corresponds to MPI_Test()
// A no-op and returns true if parRun() == false
// or for a null-request
static bool finishedRequest(UPstream::Request& req);
//- Non-blocking comms: have all requests finished?
//- Corresponds to MPI_Testall()
// A no-op and returns true if parRun() == false or list is empty
static bool finishedRequests(UList<UPstream::Request>& requests);
// General
//- Set as parallel run on/off.
// \return the previous value
static bool parRun(const bool on) noexcept

View File

@ -36,7 +36,6 @@ Foam::DynamicList<MPI_Comm> Foam::PstreamGlobals::MPICommunicators_;
Foam::DynamicList<MPI_Group> Foam::PstreamGlobals::MPIGroups_;
Foam::DynamicList<MPI_Request> Foam::PstreamGlobals::outstandingRequests_;
Foam::DynamicList<Foam::label> Foam::PstreamGlobals::freedRequests_;
// * * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * //

View File

@ -29,7 +29,7 @@ Namespace
Description
Global functions and variables for working with parallel streams,
but principally for mpi
but principally for MPI.
SourceFiles
PstreamGlobals.C
@ -40,6 +40,7 @@ SourceFiles
#define Foam_PstreamGlobals_H
#include "DynamicList.H"
#include "UPstream.H" // for UPstream::Request
#include <mpi.h>
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -71,7 +72,6 @@ extern DynamicList<MPI_Group> MPIGroups_;
//- Outstanding non-blocking operations.
extern DynamicList<MPI_Request> outstandingRequests_;
extern DynamicList<label> freedRequests_;
// * * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * //
@ -79,28 +79,50 @@ extern DynamicList<label> freedRequests_;
//- Fatal if comm is outside the allocated range
void checkCommunicator(const label comm, const label toProcNo);
//- Push request onto list of outstanding requests,
//- optionally reusing previously freed request locations
//
// \return index of request within outstandingRequests_
inline label push_request(MPI_Request request)
//- Reset UPstream::Request to null and/or the index of the outstanding
//- request to -1.
// Does not affect the stack of outstanding requests.
inline void reset_request
(
UPstream::Request* requestPtr,
label* requestIdx = nullptr
)
{
while (!freedRequests_.empty())
{
const label index = freedRequests_.back();
freedRequests_.pop_back();
if (requestPtr) *requestPtr = UPstream::Request(MPI_REQUEST_NULL);
if (requestIdx) *requestIdx = -1;
}
if (index < outstandingRequests_.size())
//- Transcribe MPI_Request to UPstream::Request
//- (does not affect the stack of outstanding requests)
//- or else push onto list of outstanding requests
//- and (optionally) record its location
inline void push_request
(
MPI_Request request,
UPstream::Request* requestPtr = nullptr,
label* requestIdx = nullptr
)
{
if (requestPtr)
{
outstandingRequests_[index] = request;
return index;
// Transcribe as UPstream::Request
*requestPtr = UPstream::Request(request);
// Not on stack of outstanding requests
if (requestIdx) *requestIdx = -1;
}
else
{
if (requestIdx)
{
// Its index into outstanding requests
*requestIdx = PstreamGlobals::outstandingRequests_.size();
}
const label index = outstandingRequests_.size();
outstandingRequests_.push_back(request);
return index;
PstreamGlobals::outstandingRequests_.push_back(request);
}
}

View File

@ -105,6 +105,8 @@ Foam::label Foam::UIPstream::read
UPstream::Request* req
)
{
PstreamGlobals::reset_request(req);
if (debug)
{
Pout<< "UIPstream::read : starting read from:" << fromProcNo
@ -205,26 +207,18 @@ Foam::label Foam::UIPstream::read
return 0;
}
profilingPstream::addRequestTime();
if (debug)
{
Pout<< "UIPstream::read : started read from:" << fromProcNo
<< " tag:" << tag << " read size:" << label(bufSize)
<< " commsType:" << UPstream::commsTypeNames[commsType]
<<
<< " request:" <<
(req ? label(-1) : PstreamGlobals::outstandingRequests_.size())
<< Foam::endl;
}
if (req)
{
*req = UPstream::Request(request);
}
else
{
PstreamGlobals::outstandingRequests_.push_back(request);
}
PstreamGlobals::push_request(request, req);
profilingPstream::addRequestTime();
// Assume the message is completely received.
return bufSize;

View File

@ -60,6 +60,8 @@ bool Foam::UOPstream::write
const UPstream::sendModes sendMode
)
{
PstreamGlobals::reset_request(req);
if (debug)
{
Pout<< "UOPstream::write : starting write to:" << toProcNo
@ -79,7 +81,6 @@ bool Foam::UOPstream::write
error::printStack(Pout);
}
PstreamGlobals::checkCommunicator(communicator, toProcNo);
@ -179,27 +180,18 @@ bool Foam::UOPstream::write
);
}
profilingPstream::addRequestTime();
if (debug)
{
Pout<< "UOPstream::write : started write to:" << toProcNo
<< " tag:" << tag << " size:" << label(bufSize)
<< " commType:" << UPstream::commsTypeNames[commsType]
<< " request:"
<<
<< " request:" <<
(req ? label(-1) : PstreamGlobals::outstandingRequests_.size())
<< Foam::endl;
}
if (req)
{
*req = UPstream::Request(request);
}
else
{
PstreamGlobals::outstandingRequests_.push_back(request);
}
PstreamGlobals::push_request(request, req);
profilingPstream::addRequestTime();
}
else
{

View File

@ -442,7 +442,6 @@ void Foam::UPstream::shutdown(int errNo)
}
PstreamGlobals::outstandingRequests_.clear();
PstreamGlobals::freedRequests_.clear();
if (nOutstanding)
{

View File

@ -149,7 +149,7 @@ void Foam::UPstream::waitRequests(UList<UPstream::Request>& requests)
if (!count)
{
// Early exit: non-NULL requests found
// Early exit: has NULL requests only
return;
}
@ -212,9 +212,18 @@ Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
}
// Transcribe MPI_Request back into UPstream::Request
while (--count >= 0)
// - do in reverse order - see note in finishedRequests()
{
requests[count] = UPstream::Request(waitRequests[count]);
for (label i = count-1; i >= 0; --i)
{
requests[i] = UPstream::Request(waitRequests[i]);
}
// Trailing portion
for (label i = count; i < requests.size(); ++i)
{
requests[i] = UPstream::Request(MPI_REQUEST_NULL);
}
}
return index;
@ -286,9 +295,6 @@ void Foam::UPstream::waitRequest(const label i)
return;
}
// Push index onto free cache (for later reuse)
PstreamGlobals::freedRequests_.push_back(i);
auto& request = PstreamGlobals::outstandingRequests_[i];
// No-op for null request
@ -452,7 +458,7 @@ bool Foam::UPstream::finishedRequests(UList<UPstream::Request>& requests)
if (!count)
{
// Early exit: non-NULL requests found
// Early exit: has NULL requests only
return true;
}
@ -476,9 +482,15 @@ bool Foam::UPstream::finishedRequests(UList<UPstream::Request>& requests)
// This is uglier that we'd like, but much better than allocating
// and freeing a scratch buffer each time we query things.
while (--count >= 0)
for (label i = count-1; i >= 0; --i)
{
requests[count] = UPstream::Request(waitRequests[count]);
requests[i] = UPstream::Request(waitRequests[i]);
}
// Trailing portion
for (label i = count; i < requests.size(); ++i)
{
requests[i] = UPstream::Request(MPI_REQUEST_NULL);
}
}

View File

@ -125,11 +125,10 @@ void Foam::PstreamDetail::allReduce
label* requestID
)
{
PstreamGlobals::reset_request(req, requestID);
if (!UPstream::parRun())
{
// No requests generated
if (req) req->reset();
if (requestID) *requestID = -1;
return;
}
@ -189,16 +188,8 @@ void Foam::PstreamDetail::allReduce
<< Foam::abort(FatalError);
}
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
#endif
@ -207,9 +198,6 @@ void Foam::PstreamDetail::allReduce
{
profilingPstream::beginTiming();
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Allreduce
@ -246,6 +234,8 @@ void Foam::PstreamDetail::allToAll
label* requestID
)
{
PstreamGlobals::reset_request(req, requestID);
const bool immediate = (req || requestID);
const label numProc = UPstream::nProcs(comm);
@ -280,10 +270,6 @@ void Foam::PstreamDetail::allToAll
if (!UPstream::parRun() || numProc < 2)
{
recvData.deepCopy(sendData);
// No requests generated
if (req) req->reset();
if (requestID) *requestID = -1;
return;
}
@ -321,16 +307,7 @@ void Foam::PstreamDetail::allToAll
<< Foam::abort(FatalError);
}
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
#endif
@ -339,9 +316,6 @@ void Foam::PstreamDetail::allToAll
{
profilingPstream::beginTiming();
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Alltoall
@ -387,6 +361,8 @@ void Foam::PstreamDetail::allToAllv
label* requestID
)
{
PstreamGlobals::reset_request(req, requestID);
const bool immediate = (req || requestID);
const label np = UPstream::nProcs(comm);
@ -440,9 +416,6 @@ void Foam::PstreamDetail::allToAllv
recvCounts[0]*sizeof(Type)
);
// No requests generated
if (req) req->reset();
if (requestID) *requestID = -1;
return;
}
@ -481,16 +454,7 @@ void Foam::PstreamDetail::allToAllv
<< Foam::abort(FatalError);
}
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
#endif
@ -499,9 +463,6 @@ void Foam::PstreamDetail::allToAllv
{
profilingPstream::beginTiming();
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Alltoallv
@ -910,13 +871,11 @@ void Foam::PstreamDetail::gather
label* requestID
)
{
PstreamGlobals::reset_request(req, requestID);
if (!UPstream::parRun())
{
std::memmove(recvData, sendData, recvCount*sizeof(Type));
// No requests generated
if (req) req->reset();
if (requestID) *requestID = -1;
return;
}
@ -976,16 +935,7 @@ void Foam::PstreamDetail::gather
<< Foam::abort(FatalError);
}
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
#endif
@ -994,9 +944,6 @@ void Foam::PstreamDetail::gather
{
profilingPstream::beginTiming();
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Gather
@ -1040,13 +987,11 @@ void Foam::PstreamDetail::scatter
label* requestID
)
{
PstreamGlobals::reset_request(req, requestID);
if (!UPstream::parRun())
{
std::memmove(recvData, sendData, recvCount*sizeof(Type));
// No requests generated
if (req) req->reset();
if (requestID) *requestID = -1;
return;
}
@ -1106,16 +1051,7 @@ void Foam::PstreamDetail::scatter
<< Foam::abort(FatalError);
}
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
#endif
@ -1124,9 +1060,6 @@ void Foam::PstreamDetail::scatter
{
profilingPstream::beginTiming();
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Scatter
@ -1171,14 +1104,12 @@ void Foam::PstreamDetail::gatherv
label* requestID
)
{
PstreamGlobals::reset_request(req, requestID);
if (!UPstream::parRun())
{
// recvCounts[0] may be invalid - use sendCount instead
std::memmove(recvData, sendData, sendCount*sizeof(Type));
// No requests generated
if (req) req->reset();
if (requestID) *requestID = -1;
return;
}
@ -1262,16 +1193,7 @@ void Foam::PstreamDetail::gatherv
<< Foam::abort(FatalError);
}
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
#endif
@ -1280,9 +1202,6 @@ void Foam::PstreamDetail::gatherv
{
profilingPstream::beginTiming();
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Gatherv
@ -1328,13 +1247,11 @@ void Foam::PstreamDetail::scatterv
label* requestID
)
{
PstreamGlobals::reset_request(req, requestID);
if (!UPstream::parRun())
{
std::memmove(recvData, sendData, recvCount*sizeof(Type));
// No requests generated
if (req) req->reset();
if (requestID) *requestID = -1;
return;
}
@ -1412,16 +1329,7 @@ void Foam::PstreamDetail::scatterv
<< Foam::abort(FatalError);
}
if (req)
{
*req = UPstream::Request(request);
if (requestID) *requestID = -1;
}
else
{
*requestID = PstreamGlobals::push_request(request);
}
PstreamGlobals::push_request(request, req, requestID);
profilingPstream::addRequestTime();
}
#endif
@ -1430,9 +1338,6 @@ void Foam::PstreamDetail::scatterv
{
profilingPstream::beginTiming();
if (req) req->reset();
if (requestID) *requestID = -1;
if
(
MPI_Scatterv