ENH: update/refine UPstream request handling

- activeRequest() checks if given request is still active/pending

- finishedRequest() will now always call MPI_Test when running in
  parallel instead of using a fast-path for skipping a null request.
  This allows MPI a chance to progress operations even if that
  particular request is not active.

- sameProcs() is a convenience wrapper for the following:
  * check if indices resolve to the same underlying procs
  * check the communicator procs against a list of procs
  * to compare two lists of procs.

  For example, check if the communicator corresponds to a
  host-local communication:

     if (UPstream::sameProcs(UPstream::commLocalNode(), myComm)) ...

STYLE: UPstream::Request gets an 'active()' method

- more MPI idiomatic than 'good'
This commit is contained in:
Mark Olesen
2025-10-06 23:08:18 +02:00
parent a6c924cf8f
commit 06b3e9bd0b
10 changed files with 369 additions and 197 deletions

View File

@ -54,9 +54,12 @@ int main(int argc, char *argv[])
"int",
"Num of cores to simulate (default: 4)"
);
argList::addBoolOption("scatter", "Use scatter arrows");
#include "setRootCase.H"
const bool optScatterGraph = args.found("scatter");
label nProcs = UPstream::nProcs(UPstream::worldComm);
DynamicList<int> fake_interNode_offsets;
@ -128,25 +131,50 @@ int main(int argc, char *argv[])
auto& os = Info.stream();
os << "// node topology graph:" << nl;
os.beginBlock("graph");
std::string arrow;
if (optScatterGraph)
{
arrow = " -> ";
os.beginBlock("digraph");
}
else
{
arrow = " -- ";
os.beginBlock("graph");
}
// Prefer left-to-right layout for large graphs
os << indent << "rankdir=LR" << nl;
os << indent << "rankdir=LR" << nl << nl;
const label numNodes = interNodeOffsets.size()-1;
// The master
os << indent
<< "0 [label=\"master\", style=filled, fillcolor=lightgray];"
<< nl << nl;
// First level are the inter-node connections
{
os << indent << 0 << " -- " << token::LBRACE;
os << indent
<< "// inter-node: " << flatOutput(interNodeOffsets) << nl;
for (label nodei = 1; nodei < numNodes; ++nodei)
os << indent
<< 0 << arrow.data() << nl;
os.beginBlock();
os << indent << "rank=same; node [shape=box];" << nl << nl;
os << indent;
for (label nodei = 0; nodei < numNodes; ++nodei)
{
os << ' ' << interNodeOffsets[nodei];
if (nodei) os << ' ';
os << "node" << nodei;
}
os << token::SPACE << token::RBRACE
<< " // inter-node: " << flatOutput(interNodeOffsets)
<< nl;
os << nl;
os.endBlock() << nl;
}
// Next level are the local-node connections
@ -155,8 +183,9 @@ int main(int argc, char *argv[])
const auto firstProc = interNodeOffsets[nodei];
const auto lastProc = interNodeOffsets[nodei+1];
os << indent << firstProc << " -- " << token::DQUOTE
<< (firstProc+1) << ".." << (lastProc-1)
os << indent << "node" << nodei << arrow.data()
<< token::DQUOTE
<< firstProc << ".." << (lastProc-1)
<< token::DQUOTE << nl;
}

View File

@ -38,6 +38,7 @@ Description
#include "SubField.H"
#include "vector.H"
#include "IOstreams.H"
#include "UPstreamWindow.H"
using namespace Foam;

View File

@ -533,6 +533,52 @@ private:
static void freeCommunicatorComponents(const label index);
// Private implementation helpers
//- Test for communicator equality.
// True if they have the same index or address the same ranks
static bool sameProcs_impl(int comm1, int comm2)
{
return
(
(comm1 == comm2) ||
(
// With guard against bad index
(comm1 >= 0 && comm1 <= procIDs_.size())
&& (comm2 >= 0 && comm2 <= procIDs_.size())
&& (procIDs_[comm1] == procIDs_[comm2])
)
);
}
//- Test equality of communicator procs with the given list of ranks
template<typename IntType>
static bool sameProcs_impl
(
int communicator,
const UList<IntType>& procs
)
{
return
(
// With guard against bad index
(communicator >= 0 && communicator <= procIDs_.size())
&& ListOps::equal(procIDs_[communicator], procs)
);
}
//- Test the equality of two lists of ranks
template<typename Type1, typename Type2>
static bool sameProcs_impl
(
const UList<Type1>& procs1,
const UList<Type2>& procs2
)
{
return ListOps::equal(procs1, procs2);
}
protected:
// Protected Member Functions
@ -1061,7 +1107,7 @@ public:
//
// \param pos starting position within the internal list of requests
// \param len length of slice to remove (negative = until the end)
static void removeRequests(const label pos, label len = -1);
static void removeRequests(label pos, label len = -1);
//- Non-blocking comms: free outstanding request.
//- Corresponds to MPI_Request_free()
@ -1085,7 +1131,7 @@ public:
//
// \param pos starting position within the internal list of requests
// \param len length of slice to check (negative = until the end)
static void waitRequests(const label pos, label len = -1);
static void waitRequests(label pos, label len = -1);
//- Wait until all requests have finished.
//- Corresponds to MPI_Waitall()
@ -1103,7 +1149,7 @@ public:
//
// \param pos starting position within the internal list of requests
// \param len length of slice to check (negative = until the end)
static bool waitAnyRequest(const label pos, label len = -1);
static bool waitAnyRequest(label pos, label len = -1);
//- Wait until some requests (from position onwards) have finished.
//- Corresponds to MPI_Waitsome()
@ -1122,7 +1168,7 @@ public:
// when calling within a loop.
static bool waitSomeRequests
(
const label pos,
label pos,
label len = -1,
DynamicList<int>* indices = nullptr
);
@ -1148,7 +1194,7 @@ public:
//- Corresponds to MPI_Waitany()
// Returns -1 if parRun() == false, or the list is empty,
// or if all the requests have already been handled
static label waitAnyRequest(UList<UPstream::Request>& requests);
static int waitAnyRequest(UList<UPstream::Request>& requests);
//- Wait until request i has finished.
//- Corresponds to MPI_Wait()
@ -1162,6 +1208,14 @@ public:
// A no-op if parRun() == false or for a null-request
static void waitRequest(UPstream::Request& req);
//- Is request \p i active (!= MPI_REQUEST_NULL)?
// False if there are no pending requests,
// or if the index is out-of-range (0 to nRequests)
static bool activeRequest(const label i);
//- Is request active (!= MPI_REQUEST_NULL)?
static bool activeRequest(const UPstream::Request& req);
//- Non-blocking comms: has request i finished?
//- Corresponds to MPI_Test()
// A no-op and returns true if parRun() == false,
@ -1185,7 +1239,7 @@ public:
//
// \param pos starting position within the internal list of requests
// \param len length of slice to check (negative = until the end)
static bool finishedRequests(const label pos, label len = -1);
static bool finishedRequests(label pos, label len = -1);
//- Non-blocking comms: have all requests finished?
//- Corresponds to MPI_Testall()
@ -1291,6 +1345,43 @@ public:
}
//- Test for communicator equality.
// True if they have the same index or address the same ranks
static bool sameProcs(int communicator1, int communicator2)
{
return sameProcs_impl(communicator1, communicator2);
}
//- Test equality of communicator procs with the given list of ranks.
//- Includes a guard for the communicator index.
template
<
typename T1,
typename = std::void_t
<std::enable_if_t<std::is_integral_v<T1>>>
>
static bool sameProcs(int communicator, const UList<T1>& procs)
{
return sameProcs_impl(communicator, procs);
}
//- Test the equality of two lists of ranks
template
<
typename T1,
typename T2,
typename = std::void_t
<
std::enable_if_t<std::is_integral_v<T1>>,
std::enable_if_t<std::is_integral_v<T2>>
>
>
static bool sameProcs(const UList<T1>& procs1, const UList<T2>& procs2)
{
return sameProcs_impl(procs1, procs2);
}
// Worlds
//- All worlds
@ -1997,6 +2088,8 @@ public:
// Member Functions
// Access
//- Return raw value
value_type value() const noexcept { return value_; }
@ -2006,6 +2099,9 @@ public:
return reinterpret_cast<const void*>(value_);
}
// Query
//- True if not equal to MPI_COMM_NULL
bool good() const noexcept;
@ -2094,6 +2190,8 @@ public:
// Member Functions
// Access
//- Return raw value
value_type value() const noexcept { return value_; }
@ -2103,23 +2201,37 @@ public:
return reinterpret_cast<const void*>(value_);
}
// Basics
//- True if not equal to MPI_REQUEST_NULL
bool good() const noexcept;
//- Reset to default constructed value (MPI_REQUEST_NULL)
void reset() noexcept;
//- Same as calling UPstream::cancelRequest()
void cancel() { UPstream::cancelRequest(*this); }
//- Same as calling UPstream::freeRequest()
void free() { UPstream::freeRequest(*this); }
//- True if request is active (!= MPI_REQUEST_NULL)
//- Same as good(). Same as calling UPstream::activeRequest()
bool active() const noexcept { return good(); }
//- Same as calling UPstream::finishedRequest()
// Uses MPI_Test()
bool finished() { return UPstream::finishedRequest(*this); }
//- Same as calling UPstream::waitRequest()
//- Same as calling UPstream::waitRequest().
// Uses MPI_Wait()
void wait() { UPstream::waitRequest(*this); }
// Other
//- Same as calling UPstream::cancelRequest().
// Uses MPI_Cancel(), MPI_Request_free()
void cancel() { UPstream::cancelRequest(*this); }
//- Same as calling UPstream::freeRequest().
// Uses MPI_Request_free()
void free() { UPstream::freeRequest(*this); }
};
@ -2138,6 +2250,9 @@ Ostream& operator<<(Ostream&, const UPstream::commsStruct&);
#include "UPstreamTraits.H"
#include "UPstreamWindow.H"
// Locally include the following where required:
// - UPstreamFile.H
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#ifdef NoRepository

View File

@ -33,6 +33,7 @@ Description
#define Foam_UPstreamMPI_H
#include "UPstream.H"
#include "UPstreamWindow.H"
// Include MPI without any C++ bindings
#ifndef MPICH_SKIP_MPICXX

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2023 OpenCFD Ltd.
Copyright (C) 2023-2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -68,22 +68,22 @@ void Foam::UPstream::cancelRequest(const label i) {}
void Foam::UPstream::cancelRequest(UPstream::Request&) {}
void Foam::UPstream::cancelRequests(UList<UPstream::Request>&) {}
void Foam::UPstream::removeRequests(const label pos, label len) {}
void Foam::UPstream::removeRequests(label pos, label len) {}
void Foam::UPstream::freeRequest(UPstream::Request&) {}
void Foam::UPstream::freeRequests(UList<UPstream::Request>&) {}
void Foam::UPstream::waitRequests(const label pos, label len) {}
void Foam::UPstream::waitRequests(label pos, label len) {}
void Foam::UPstream::waitRequests(UList<UPstream::Request>&) {}
bool Foam::UPstream::waitAnyRequest(const label pos, label len)
bool Foam::UPstream::waitAnyRequest(label pos, label len)
{
return false;
}
bool Foam::UPstream::waitSomeRequests
(
const label pos,
label pos,
label len,
DynamicList<int>* indices
)
@ -102,7 +102,7 @@ bool Foam::UPstream::waitSomeRequests
return false;
}
Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>&)
int Foam::UPstream::waitAnyRequest(UList<UPstream::Request>&)
{
return -1;
}
@ -110,10 +110,13 @@ Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>&)
void Foam::UPstream::waitRequest(const label i) {}
void Foam::UPstream::waitRequest(UPstream::Request&) {}
bool Foam::UPstream::activeRequest(const label i) { return false; }
bool Foam::UPstream::activeRequest(const UPstream::Request&) { return false; }
bool Foam::UPstream::finishedRequest(const label i) { return true; }
bool Foam::UPstream::finishedRequest(UPstream::Request&) { return true; }
bool Foam::UPstream::finishedRequests(const label pos, label len)
bool Foam::UPstream::finishedRequests(label pos, label len)
{
return true;
}

View File

@ -25,7 +25,7 @@ License
\*---------------------------------------------------------------------------*/
#include "UPstream.H"
#include "UPstreamWindow.H"
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //

View File

@ -41,7 +41,7 @@ SourceFiles
#include "DynamicList.H"
#include "FixedList.H"
#include "UPstream.H" // For UPstream::Request
// UPstream, UPstream::Request, UPstream::Window, ...
#include "openfoam_mpi.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View File

@ -42,7 +42,7 @@ Foam::UPstream::Request::Request() noexcept
bool Foam::UPstream::Request::good() const noexcept
{
return MPI_REQUEST_NULL != PstreamUtils::Cast::to_mpi(*this);
return (MPI_REQUEST_NULL != PstreamUtils::Cast::to_mpi(*this));
}
@ -99,7 +99,7 @@ void Foam::UPstream::addRequest(UPstream::Request& req)
{
MPI_Request request = PstreamUtils::Cast::to_mpi(req);
if (MPI_REQUEST_NULL != request)
if (MPI_REQUEST_NULL != request) // Active handle is mandatory
{
PstreamGlobals::outstandingRequests_.push_back(request);
}
@ -116,8 +116,7 @@ void Foam::UPstream::cancelRequest(const label i)
if
(
!UPstream::parRun()
|| i < 0
|| i >= PstreamGlobals::outstandingRequests_.size()
|| (i < 0 || i >= PstreamGlobals::outstandingRequests_.size())
)
{
return;
@ -170,12 +169,14 @@ void Foam::UPstream::cancelRequests(UList<UPstream::Request>& requests)
MPI_Cancel(&request);
MPI_Request_free(&request);
}
req = UPstream::Request(MPI_REQUEST_NULL); // Now inactive
}
// Everything handled, reset all to MPI_REQUEST_NULL
requests = UPstream::Request(MPI_REQUEST_NULL);
}
void Foam::UPstream::removeRequests(const label pos, label len)
void Foam::UPstream::removeRequests(label pos, label len)
{
// No-op for non-parallel, no pending requests or out-of-range
if
@ -258,12 +259,14 @@ void Foam::UPstream::freeRequests(UList<UPstream::Request>& requests)
// }
MPI_Request_free(&request);
}
req = UPstream::Request(MPI_REQUEST_NULL); // Now inactive
}
// Everything handled, reset all to MPI_REQUEST_NULL
requests = UPstream::Request(MPI_REQUEST_NULL);
}
void Foam::UPstream::waitRequests(const label pos, label len)
void Foam::UPstream::waitRequests(label pos, label len)
{
// No-op for non-parallel, no pending requests or out-of-range
if
@ -289,7 +292,7 @@ void Foam::UPstream::waitRequests(const label pos, label len)
}
// Have count >= 1
auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos);
auto* mpiRequests = (PstreamGlobals::outstandingRequests_.data() + pos);
if (UPstream::debug)
{
@ -302,7 +305,7 @@ void Foam::UPstream::waitRequests(const label pos, label len)
if (count == 1)
{
// On success: sets request to MPI_REQUEST_NULL
if (MPI_Wait(waitRequests, MPI_STATUS_IGNORE))
if (MPI_Wait(mpiRequests, MPI_STATUS_IGNORE))
{
FatalErrorInFunction
<< "MPI_Wait returned with error"
@ -312,7 +315,7 @@ void Foam::UPstream::waitRequests(const label pos, label len)
else if (count > 1)
{
// On success: sets each request to MPI_REQUEST_NULL
if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE))
if (MPI_Waitall(count, mpiRequests, MPI_STATUSES_IGNORE))
{
FatalErrorInFunction
<< "MPI_Waitall returned with error"
@ -346,16 +349,17 @@ void Foam::UPstream::waitRequests(UList<UPstream::Request>& requests)
// Looks ugly but is legitimate since UPstream::Request is an intptr_t,
// which is always large enough to hold an MPI_Request (int or pointer)
label count = 0;
auto* waitRequests = reinterpret_cast<MPI_Request*>(requests.data());
int count = 0;
auto* mpiRequests = reinterpret_cast<MPI_Request*>(requests.data());
// Transcribe, but pre-filter to eliminate inactive requests
for (auto& req : requests)
{
MPI_Request request = PstreamUtils::Cast::to_mpi(req);
if (MPI_REQUEST_NULL != request) // Apply some prefiltering
{
waitRequests[count] = request;
mpiRequests[count] = request;
++count;
}
}
@ -369,7 +373,7 @@ void Foam::UPstream::waitRequests(UList<UPstream::Request>& requests)
profilingPstream::beginTiming();
// On success: sets each request to MPI_REQUEST_NULL
if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE))
if (MPI_Waitall(count, mpiRequests, MPI_STATUSES_IGNORE))
{
FatalErrorInFunction
<< "MPI_Waitall returned with error"
@ -383,7 +387,7 @@ void Foam::UPstream::waitRequests(UList<UPstream::Request>& requests)
}
bool Foam::UPstream::waitAnyRequest(const label pos, label len)
bool Foam::UPstream::waitAnyRequest(label pos, label len)
{
// No-op for non-parallel, no pending requests or out-of-range
if
@ -407,7 +411,7 @@ bool Foam::UPstream::waitAnyRequest(const label pos, label len)
}
// Have count >= 1
auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos);
auto* mpiRequests = (PstreamGlobals::outstandingRequests_.data() + pos);
if (UPstream::debug)
{
@ -419,7 +423,7 @@ bool Foam::UPstream::waitAnyRequest(const label pos, label len)
// On success: sets request to MPI_REQUEST_NULL
int index = MPI_UNDEFINED;
if (MPI_Waitany(count, waitRequests, &index, MPI_STATUS_IGNORE))
if (MPI_Waitany(count, mpiRequests, &index, MPI_STATUS_IGNORE))
{
FatalErrorInFunction
<< "MPI_Waitany returned with error"
@ -440,7 +444,7 @@ bool Foam::UPstream::waitAnyRequest(const label pos, label len)
bool Foam::UPstream::waitSomeRequests
(
const label pos,
label pos,
label len,
DynamicList<int>* indices
)
@ -468,7 +472,7 @@ bool Foam::UPstream::waitSomeRequests
}
// Have count >= 1
auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos);
auto* mpiRequests = (PstreamGlobals::outstandingRequests_.data() + pos);
if (UPstream::debug)
{
@ -497,7 +501,7 @@ bool Foam::UPstream::waitSomeRequests
MPI_Waitsome
(
count,
waitRequests,
mpiRequests,
&outcount,
(indices ? indices->data() : tmpIndices.data()),
MPI_STATUSES_IGNORE
@ -543,13 +547,13 @@ bool Foam::UPstream::waitSomeRequests
// Looks ugly but is legitimate since UPstream::Request is an intptr_t,
// which is always large enough to hold an MPI_Request (int or pointer)
label count = 0;
auto* waitRequests = reinterpret_cast<MPI_Request*>(requests.data());
const int count = static_cast<int>(requests.size());
auto* mpiRequests = reinterpret_cast<MPI_Request*>(requests.data());
for (auto& req : requests)
// Transcribe without changing locations
for (int i = 0; i < count; ++i)
{
waitRequests[count] = PstreamUtils::Cast::to_mpi(req);
++count;
mpiRequests[i] = PstreamUtils::Cast::to_mpi(requests[i]);
}
// Local temporary storage, or return via calling parameter
@ -578,7 +582,7 @@ bool Foam::UPstream::waitSomeRequests
MPI_Waitsome
(
count,
waitRequests,
mpiRequests,
&outcount,
(indices ? indices->data() : tmpIndices.data()),
MPI_STATUSES_IGNORE
@ -612,7 +616,7 @@ bool Foam::UPstream::waitSomeRequests
{
for (label i = requests.size()-1; i >= 0; --i)
{
requests[i] = UPstream::Request(waitRequests[i]);
requests[i] = UPstream::Request(mpiRequests[i]);
}
}
@ -620,7 +624,7 @@ bool Foam::UPstream::waitSomeRequests
}
Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
int Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
{
// No-op for non-parallel or no pending requests
if (!UPstream::parRun() || requests.empty())
@ -631,23 +635,22 @@ Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
// Looks ugly but is legitimate since UPstream::Request is an intptr_t,
// which is always large enough to hold an MPI_Request (int or pointer)
label count = 0;
auto* waitRequests = reinterpret_cast<MPI_Request*>(requests.data());
const int count = static_cast<int>(requests.size());
auto* mpiRequests = reinterpret_cast<MPI_Request*>(requests.data());
// Transcribe UPstream::Request into MPI_Request
// - do not change locations within the list since these are relevant
// for the return index.
for (auto& req : requests)
for (int i = 0; i < count; ++i)
{
waitRequests[count] = PstreamUtils::Cast::to_mpi(req);
++count;
mpiRequests[i] = PstreamUtils::Cast::to_mpi(requests[i]);
}
profilingPstream::beginTiming();
// On success: sets request to MPI_REQUEST_NULL
int index = MPI_UNDEFINED;
if (MPI_Waitany(count, waitRequests, &index, MPI_STATUS_IGNORE))
if (MPI_Waitany(count, mpiRequests, &index, MPI_STATUS_IGNORE))
{
FatalErrorInFunction
<< "MPI_Waitany returned with error"
@ -664,15 +667,9 @@ Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
// Transcribe MPI_Request back into UPstream::Request
// - do in reverse order - see note in finishedRequests()
{
for (label i = count-1; i >= 0; --i)
for (label i = requests.size()-1; i >= 0; --i)
{
requests[i] = UPstream::Request(waitRequests[i]);
}
// Trailing portion
for (label i = count; i < requests.size(); ++i)
{
requests[i] = UPstream::Request(MPI_REQUEST_NULL);
requests[i] = UPstream::Request(mpiRequests[i]);
}
}
@ -695,16 +692,16 @@ Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
/// }
///
/// int count = 0;
/// MPI_Request waitRequests[2];
/// MPI_Request mpiRequests[2];
///
/// waitRequests[count] = PstreamUtils::Cast::to_mpi(req0);
/// if (MPI_REQUEST_NULL != waitRequests[count])
/// mpiRequests[count] = PstreamUtils::Cast::to_mpi(req0);
/// if (MPI_REQUEST_NULL != mpiRequests[count])
/// {
/// ++count;
/// }
///
/// waitRequests[count] = PstreamUtils::Cast::to_mpi(req1);
/// if (MPI_REQUEST_NULL != waitRequests[count])
/// mpiRequests[count] = PstreamUtils::Cast::to_mpi(req1);
/// if (MPI_REQUEST_NULL != mpiRequests[count])
/// {
/// ++count;
/// }
@ -721,11 +718,25 @@ Foam::label Foam::UPstream::waitAnyRequest(UList<UPstream::Request>& requests)
/// profilingPstream::beginTiming();
///
/// // On success: sets each request to MPI_REQUEST_NULL
/// if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE))
/// if (count == 1)
/// {
/// FatalErrorInFunction
/// << "MPI_Waitall returned with error"
/// << Foam::abort(FatalError);
/// // On success: sets request to MPI_REQUEST_NULL
/// if (MPI_Wait(mpiRequests, MPI_STATUS_IGNORE))
/// {
/// FatalErrorInFunction
/// << "MPI_Wait returned with error"
/// << Foam::abort(FatalError);
/// }
/// }
/// else // (count > 1)
/// {
/// // On success: sets each request to MPI_REQUEST_NULL
/// if (MPI_Waitall(count, mpiRequests, MPI_STATUSES_IGNORE))
/// {
/// FatalErrorInFunction
/// << "MPI_Waitall returned with error"
/// << Foam::abort(FatalError);
/// }
/// }
///
/// profilingPstream::addWaitTime();
@ -738,8 +749,7 @@ void Foam::UPstream::waitRequest(const label i)
if
(
!UPstream::parRun()
|| i < 0
|| i >= PstreamGlobals::outstandingRequests_.size()
|| (i < 0 || i >= PstreamGlobals::outstandingRequests_.size())
)
{
return;
@ -810,15 +820,27 @@ void Foam::UPstream::waitRequest(UPstream::Request& req)
}
bool Foam::UPstream::activeRequest(const label i)
{
return
(
(i >= 0 && i < PstreamGlobals::outstandingRequests_.size())
&& (MPI_REQUEST_NULL != PstreamGlobals::outstandingRequests_[i])
);
}
bool Foam::UPstream::activeRequest(const UPstream::Request& req)
{
// Same as UPstream::Request::active()
return (MPI_REQUEST_NULL != PstreamUtils::Cast::to_mpi(req));
}
bool Foam::UPstream::finishedRequest(const label i)
{
// No-op for non-parallel, or out-of-range (eg, placeholder indices)
if
(
!UPstream::parRun()
|| i < 0
|| i >= PstreamGlobals::outstandingRequests_.size()
)
// No-op for non-parallel
if (!UPstream::parRun())
{
return true;
}
@ -829,19 +851,25 @@ bool Foam::UPstream::finishedRequest(const label i)
<< i << endl;
}
auto& request = PstreamGlobals::outstandingRequests_[i];
// NB: call MPI_Test() even with out-of-range or an inactive handle.
// This allows MPI to progress behind the scenes if it wishes.
// Fast-path (no-op) for null request
if (MPI_REQUEST_NULL == request)
int flag = 0;
if (i < 0 || i >= PstreamGlobals::outstandingRequests_.size())
{
return true;
auto& request = PstreamGlobals::outstandingRequests_[i];
// On success: sets request to MPI_REQUEST_NULL
MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
}
else
{
// Pass a dummy request (for progress side-effect)
MPI_Request request = MPI_REQUEST_NULL;
MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
}
// On success: sets request to MPI_REQUEST_NULL
int flag = 0;
MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
return flag != 0;
return (flag != 0);
}
@ -855,36 +883,36 @@ bool Foam::UPstream::finishedRequest(UPstream::Request& req)
MPI_Request request = PstreamUtils::Cast::to_mpi(req);
// Fast-path (no-op) for null request
if (MPI_REQUEST_NULL == request)
{
return true;
}
// NB: call MPI_Test() even with an inactive handle.
// This allows MPI to progress behind the scenes if it wishes.
int flag = 0;
MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
if (flag)
{
// Success: now inactive
req = UPstream::Request(MPI_REQUEST_NULL);
}
// Sync values
req = UPstream::Request(request);
return flag != 0;
return (flag != 0);
}
bool Foam::UPstream::finishedRequests(const label pos, label len)
bool Foam::UPstream::finishedRequests(label pos, label len)
{
// No-op for non-parallel, or out-of-range (eg, placeholder indices)
// No-op for non-parallel
if (!UPstream::parRun())
{
return true;
}
// Out-of-range (eg, placeholder indices)
if
(
!UPstream::parRun()
|| (pos < 0 || pos >= PstreamGlobals::outstandingRequests_.size())
(pos < 0 || pos >= PstreamGlobals::outstandingRequests_.size())
|| !len
)
{
return true;
pos = 0;
len = 0;
}
label count = (PstreamGlobals::outstandingRequests_.size() - pos);
@ -904,29 +932,33 @@ bool Foam::UPstream::finishedRequests(const label pos, label len)
<< " requests starting at " << pos << endl;
}
auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos);
auto* mpiRequests = (PstreamGlobals::outstandingRequests_.data() + pos);
int flag = 1;
if (count == 1)
if (count <= 0)
{
// Fast-path (no-op) for single null request
if (MPI_REQUEST_NULL == *waitRequests)
{
return true;
}
// No requests
// Pass a dummy request (for progress side-effect)
MPI_Request request = MPI_REQUEST_NULL;
MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
}
else if (count == 1)
{
// Single request
// On success: sets request to MPI_REQUEST_NULL
MPI_Test(waitRequests, &flag, MPI_STATUS_IGNORE);
MPI_Test(mpiRequests, &flag, MPI_STATUS_IGNORE);
}
else if (count > 1)
else // (count > 1)
{
// On success: sets each request to MPI_REQUEST_NULL
// On failure: no request is modified
MPI_Testall(count, waitRequests, &flag, MPI_STATUSES_IGNORE);
MPI_Testall(count, mpiRequests, &flag, MPI_STATUSES_IGNORE);
}
return flag != 0;
return (flag != 0);
}
@ -941,59 +973,53 @@ bool Foam::UPstream::finishedRequests(UList<UPstream::Request>& requests)
// Looks ugly but is legitimate since UPstream::Request is an intptr_t,
// which is always large enough to hold an MPI_Request (int or pointer)
label count = 0;
auto* waitRequests = reinterpret_cast<MPI_Request*>(requests.data());
const int count = static_cast<int>(requests.size());
auto* mpiRequests = reinterpret_cast<MPI_Request*>(requests.data());
for (auto& req : requests)
// Transcribe
for (int i = 0; i < count; ++i)
{
MPI_Request request = PstreamUtils::Cast::to_mpi(req);
mpiRequests[i] = PstreamUtils::Cast::to_mpi(requests[i]);
}
if (MPI_REQUEST_NULL != request) // Apply some prefiltering
// NB: call MPI_Test() even with an inactive handle.
// This allows MPI to progress behind the scenes if it wishes.
int flag = 1;
if (count <= 0)
{
// No requests
// Pass a dummy request (for progress side-effect)
MPI_Request request = MPI_REQUEST_NULL;
MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
}
else if (count == 1)
{
// Single request
// On success: sets request to MPI_REQUEST_NULL
MPI_Test(mpiRequests, &flag, MPI_STATUS_IGNORE);
}
else // (count > 1)
{
// On success: sets each request to MPI_REQUEST_NULL
// On failure: no request is modified
MPI_Testall(count, mpiRequests, &flag, MPI_STATUSES_IGNORE);
}
// Transcribe MPI_Request back into UPstream::Request
// - do in reverse order - see note in finishedRequests()
{
for (label i = requests.size()-1; i >= 0; --i)
{
waitRequests[count] = request;
++count;
requests[i] = UPstream::Request(mpiRequests[i]);
}
}
if (!count)
{
// No active handles
return true;
}
// On success: sets each request to MPI_REQUEST_NULL
// On failure: no request is modified
int flag = 0;
MPI_Testall(count, waitRequests, &flag, MPI_STATUSES_IGNORE);
if (flag)
{
// Success: reset all requests to MPI_REQUEST_NULL
requests = UPstream::Request(MPI_REQUEST_NULL);
}
else
{
// Not all done. Recover wrapped representation but in reverse order
// since sizeof(MPI_Request) can be smaller than
// sizeof(UPstream::Request::value_type)
// eg, mpich has MPI_Request as 'int'
//
// This is uglier that we'd like, but much better than allocating
// and freeing a scratch buffer each time we query things.
for (label i = count-1; i >= 0; --i)
{
requests[i] = UPstream::Request(waitRequests[i]);
}
// Trailing portion
for (label i = count; i < requests.size(); ++i)
{
requests[i] = UPstream::Request(MPI_REQUEST_NULL);
}
}
return flag != 0;
return (flag != 0);
}
@ -1008,29 +1034,29 @@ bool Foam::UPstream::finishedRequestPair(label& req0, label& req1)
}
bool anyActive = false;
MPI_Request waitRequests[2];
MPI_Request mpiRequests[2];
// No-op for out-of-range (eg, placeholder indices)
if (req0 >= 0 && req0 < PstreamGlobals::outstandingRequests_.size())
{
waitRequests[0] = PstreamGlobals::outstandingRequests_[req0];
mpiRequests[0] = PstreamGlobals::outstandingRequests_[req0];
}
else
{
waitRequests[0] = MPI_REQUEST_NULL;
mpiRequests[0] = MPI_REQUEST_NULL;
}
if (req1 >= 0 && req1 < PstreamGlobals::outstandingRequests_.size())
{
waitRequests[1] = PstreamGlobals::outstandingRequests_[req1];
mpiRequests[1] = PstreamGlobals::outstandingRequests_[req1];
}
else
{
waitRequests[1] = MPI_REQUEST_NULL;
mpiRequests[1] = MPI_REQUEST_NULL;
}
if (MPI_REQUEST_NULL != waitRequests[0]) // An active handle
if (MPI_REQUEST_NULL != mpiRequests[0]) // An active handle
{
anyActive = true;
}
@ -1039,7 +1065,7 @@ bool Foam::UPstream::finishedRequestPair(label& req0, label& req1)
req0 = -1;
}
if (MPI_REQUEST_NULL != waitRequests[1]) // An active handle
if (MPI_REQUEST_NULL != mpiRequests[1]) // An active handle
{
anyActive = true;
}
@ -1064,7 +1090,7 @@ bool Foam::UPstream::finishedRequestPair(label& req0, label& req1)
MPI_Testsome
(
2,
waitRequests,
mpiRequests,
&outcount,
indices,
MPI_STATUSES_IGNORE
@ -1085,12 +1111,12 @@ bool Foam::UPstream::finishedRequestPair(label& req0, label& req1)
if (req0 >= 0)
{
PstreamGlobals::outstandingRequests_[req0] = waitRequests[0];
PstreamGlobals::outstandingRequests_[req0] = mpiRequests[0];
}
if (req1 >= 0)
{
PstreamGlobals::outstandingRequests_[req1] = waitRequests[1];
PstreamGlobals::outstandingRequests_[req1] = mpiRequests[1];
}
// Flag indices as 'done'
@ -1108,7 +1134,7 @@ bool Foam::UPstream::finishedRequestPair(label& req0, label& req1)
{
if (req0 >= 0)
{
PstreamGlobals::outstandingRequests_[req0] = waitRequests[0];
PstreamGlobals::outstandingRequests_[req0] = mpiRequests[0];
req0 = -1;
}
}
@ -1116,7 +1142,7 @@ bool Foam::UPstream::finishedRequestPair(label& req0, label& req1)
{
if (req1 >= 0)
{
PstreamGlobals::outstandingRequests_[req1] = waitRequests[1];
PstreamGlobals::outstandingRequests_[req1] = mpiRequests[1];
req1 = -1;
}
}
@ -1137,17 +1163,17 @@ void Foam::UPstream::waitRequestPair(label& req0, label& req1)
}
int count = 0;
MPI_Request waitRequests[2];
MPI_Request mpiRequests[2];
// No-op for out-of-range (eg, placeholder indices)
// Prefilter inactive handles
if (req0 >= 0 && req0 < PstreamGlobals::outstandingRequests_.size())
{
waitRequests[count] = PstreamGlobals::outstandingRequests_[req0];
mpiRequests[count] = PstreamGlobals::outstandingRequests_[req0];
PstreamGlobals::outstandingRequests_[req0] = MPI_REQUEST_NULL;
if (MPI_REQUEST_NULL != waitRequests[count]) // An active handle
if (MPI_REQUEST_NULL != mpiRequests[count]) // An active handle
{
++count;
}
@ -1155,10 +1181,10 @@ void Foam::UPstream::waitRequestPair(label& req0, label& req1)
if (req1 >= 0 && req1 < PstreamGlobals::outstandingRequests_.size())
{
waitRequests[count] = PstreamGlobals::outstandingRequests_[req1];
mpiRequests[count] = PstreamGlobals::outstandingRequests_[req1];
PstreamGlobals::outstandingRequests_[req1] = MPI_REQUEST_NULL;
if (MPI_REQUEST_NULL != waitRequests[count]) // An active handle
if (MPI_REQUEST_NULL != mpiRequests[count]) // An active handle
{
++count;
}
@ -1177,7 +1203,7 @@ void Foam::UPstream::waitRequestPair(label& req0, label& req1)
profilingPstream::beginTiming();
// On success: sets each request to MPI_REQUEST_NULL
if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE))
if (MPI_Waitall(count, mpiRequests, MPI_STATUSES_IGNORE))
{
FatalErrorInFunction
<< "MPI_Waitall returned with error"

View File

@ -25,6 +25,7 @@ License
\*---------------------------------------------------------------------------*/
#include "UPstreamWindow.H"
#include "PstreamGlobals.H"
#include "profilingPstream.H"

View File

@ -185,11 +185,7 @@ Foam::label Foam::AMIInterpolation::calcDistribution
}
else if (inCommGroup)
{
if
(
currComm >= 0
&& ListOps::equal(subProcs, UPstream::procID(currComm))
)
if (UPstream::sameProcs(currComm, subProcs))
{
// Keep geomComm
if (debug)