From 06b3e9bd0b443adac676ec145697929131bcc781 Mon Sep 17 00:00:00 2001 From: Mark Olesen Date: Mon, 6 Oct 2025 23:08:18 +0200 Subject: [PATCH] ENH: update/refine UPstream request handling - activeRequest() checks if given request is still active/pending - finishedRequest() will now always call MPI_Test when running in parallel instead of using a fast-path for skipping a null request. This allows MPI a chance to progress operations even if that particular request is not active. - sameProcs() is a convenience wrapper for the following: * check if indices resolve to the same underlying procs * check the communicator procs against a list of procs * to compare two lists of procs. For example, check if the communicator corresponds to a host-local communication: if (UPstream::sameProcs(UPstream::commLocalNode(), myComm)) ... STYLE: UPstream::Request gets an 'active()' method - more MPI idiomatic than 'good' --- .../test/nodeTopology/Test-nodeTopology.cxx | 49 ++- .../test/one-sided1/Test-one-sided1.cxx | 1 + src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H | 139 ++++++- src/OpenFOAM/include/openfoam_mpi.H | 1 + src/Pstream/dummy/UPstreamRequest.C | 17 +- src/Pstream/dummy/UPstreamWindow.C | 2 +- src/Pstream/mpi/PstreamGlobals.H | 2 +- src/Pstream/mpi/UPstreamRequest.C | 348 ++++++++++-------- src/Pstream/mpi/UPstreamWindow.C | 1 + .../AMIInterpolation/AMIInterpolation.C | 6 +- 10 files changed, 369 insertions(+), 197 deletions(-) diff --git a/applications/test/nodeTopology/Test-nodeTopology.cxx b/applications/test/nodeTopology/Test-nodeTopology.cxx index 58b3d56cde..2f08c1160d 100644 --- a/applications/test/nodeTopology/Test-nodeTopology.cxx +++ b/applications/test/nodeTopology/Test-nodeTopology.cxx @@ -54,9 +54,12 @@ int main(int argc, char *argv[]) "int", "Num of cores to simulate (default: 4)" ); + argList::addBoolOption("scatter", "Use scatter arrows"); #include "setRootCase.H" + const bool optScatterGraph = args.found("scatter"); + label nProcs = UPstream::nProcs(UPstream::worldComm); DynamicList fake_interNode_offsets; @@ -128,25 +131,50 @@ int main(int argc, char *argv[]) auto& os = Info.stream(); os << "// node topology graph:" << nl; - os.beginBlock("graph"); + + std::string arrow; + if (optScatterGraph) + { + arrow = " -> "; + os.beginBlock("digraph"); + } + else + { + arrow = " -- "; + os.beginBlock("graph"); + } // Prefer left-to-right layout for large graphs - os << indent << "rankdir=LR" << nl; + os << indent << "rankdir=LR" << nl << nl; const label numNodes = interNodeOffsets.size()-1; + // The master + os << indent + << "0 [label=\"master\", style=filled, fillcolor=lightgray];" + << nl << nl; + // First level are the inter-node connections { - os << indent << 0 << " -- " << token::LBRACE; + os << indent + << "// inter-node: " << flatOutput(interNodeOffsets) << nl; - for (label nodei = 1; nodei < numNodes; ++nodei) + os << indent + << 0 << arrow.data() << nl; + + os.beginBlock(); + + os << indent << "rank=same; node [shape=box];" << nl << nl; + + os << indent; + for (label nodei = 0; nodei < numNodes; ++nodei) { - os << ' ' << interNodeOffsets[nodei]; + if (nodei) os << ' '; + os << "node" << nodei; } - os << token::SPACE << token::RBRACE - << " // inter-node: " << flatOutput(interNodeOffsets) - << nl; + os << nl; + os.endBlock() << nl; } // Next level are the local-node connections @@ -155,8 +183,9 @@ int main(int argc, char *argv[]) const auto firstProc = interNodeOffsets[nodei]; const auto lastProc = interNodeOffsets[nodei+1]; - os << indent << firstProc << " -- " << token::DQUOTE - << (firstProc+1) << ".." << (lastProc-1) + os << indent << "node" << nodei << arrow.data() + << token::DQUOTE + << firstProc << ".." << (lastProc-1) << token::DQUOTE << nl; } diff --git a/applications/test/one-sided1/Test-one-sided1.cxx b/applications/test/one-sided1/Test-one-sided1.cxx index 9a0af7114e..aa6a8b4e2d 100644 --- a/applications/test/one-sided1/Test-one-sided1.cxx +++ b/applications/test/one-sided1/Test-one-sided1.cxx @@ -38,6 +38,7 @@ Description #include "SubField.H" #include "vector.H" #include "IOstreams.H" +#include "UPstreamWindow.H" using namespace Foam; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index 835e653544..1c3b6a4bad 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -533,6 +533,52 @@ private: static void freeCommunicatorComponents(const label index); + // Private implementation helpers + + //- Test for communicator equality. + // True if they have the same index or address the same ranks + static bool sameProcs_impl(int comm1, int comm2) + { + return + ( + (comm1 == comm2) || + ( + // With guard against bad index + (comm1 >= 0 && comm1 <= procIDs_.size()) + && (comm2 >= 0 && comm2 <= procIDs_.size()) + && (procIDs_[comm1] == procIDs_[comm2]) + ) + ); + } + + //- Test equality of communicator procs with the given list of ranks + template + static bool sameProcs_impl + ( + int communicator, + const UList& procs + ) + { + return + ( + // With guard against bad index + (communicator >= 0 && communicator <= procIDs_.size()) + && ListOps::equal(procIDs_[communicator], procs) + ); + } + + //- Test the equality of two lists of ranks + template + static bool sameProcs_impl + ( + const UList& procs1, + const UList& procs2 + ) + { + return ListOps::equal(procs1, procs2); + } + + protected: // Protected Member Functions @@ -1061,7 +1107,7 @@ public: // // \param pos starting position within the internal list of requests // \param len length of slice to remove (negative = until the end) - static void removeRequests(const label pos, label len = -1); + static void removeRequests(label pos, label len = -1); //- Non-blocking comms: free outstanding request. //- Corresponds to MPI_Request_free() @@ -1085,7 +1131,7 @@ public: // // \param pos starting position within the internal list of requests // \param len length of slice to check (negative = until the end) - static void waitRequests(const label pos, label len = -1); + static void waitRequests(label pos, label len = -1); //- Wait until all requests have finished. //- Corresponds to MPI_Waitall() @@ -1103,7 +1149,7 @@ public: // // \param pos starting position within the internal list of requests // \param len length of slice to check (negative = until the end) - static bool waitAnyRequest(const label pos, label len = -1); + static bool waitAnyRequest(label pos, label len = -1); //- Wait until some requests (from position onwards) have finished. //- Corresponds to MPI_Waitsome() @@ -1122,7 +1168,7 @@ public: // when calling within a loop. static bool waitSomeRequests ( - const label pos, + label pos, label len = -1, DynamicList* indices = nullptr ); @@ -1148,7 +1194,7 @@ public: //- Corresponds to MPI_Waitany() // Returns -1 if parRun() == false, or the list is empty, // or if all the requests have already been handled - static label waitAnyRequest(UList& requests); + static int waitAnyRequest(UList& requests); //- Wait until request i has finished. //- Corresponds to MPI_Wait() @@ -1162,6 +1208,14 @@ public: // A no-op if parRun() == false or for a null-request static void waitRequest(UPstream::Request& req); + //- Is request \p i active (!= MPI_REQUEST_NULL)? + // False if there are no pending requests, + // or if the index is out-of-range (0 to nRequests) + static bool activeRequest(const label i); + + //- Is request active (!= MPI_REQUEST_NULL)? + static bool activeRequest(const UPstream::Request& req); + //- Non-blocking comms: has request i finished? //- Corresponds to MPI_Test() // A no-op and returns true if parRun() == false, @@ -1185,7 +1239,7 @@ public: // // \param pos starting position within the internal list of requests // \param len length of slice to check (negative = until the end) - static bool finishedRequests(const label pos, label len = -1); + static bool finishedRequests(label pos, label len = -1); //- Non-blocking comms: have all requests finished? //- Corresponds to MPI_Testall() @@ -1291,6 +1345,43 @@ public: } + //- Test for communicator equality. + // True if they have the same index or address the same ranks + static bool sameProcs(int communicator1, int communicator2) + { + return sameProcs_impl(communicator1, communicator2); + } + + //- Test equality of communicator procs with the given list of ranks. + //- Includes a guard for the communicator index. + template + < + typename T1, + typename = std::void_t + >> + > + static bool sameProcs(int communicator, const UList& procs) + { + return sameProcs_impl(communicator, procs); + } + + //- Test the equality of two lists of ranks + template + < + typename T1, + typename T2, + typename = std::void_t + < + std::enable_if_t>, + std::enable_if_t> + > + > + static bool sameProcs(const UList& procs1, const UList& procs2) + { + return sameProcs_impl(procs1, procs2); + } + + // Worlds //- All worlds @@ -1997,6 +2088,8 @@ public: // Member Functions + // Access + //- Return raw value value_type value() const noexcept { return value_; } @@ -2006,6 +2099,9 @@ public: return reinterpret_cast(value_); } + + // Query + //- True if not equal to MPI_COMM_NULL bool good() const noexcept; @@ -2094,6 +2190,8 @@ public: // Member Functions + // Access + //- Return raw value value_type value() const noexcept { return value_; } @@ -2103,23 +2201,37 @@ public: return reinterpret_cast(value_); } + + // Basics + //- True if not equal to MPI_REQUEST_NULL bool good() const noexcept; //- Reset to default constructed value (MPI_REQUEST_NULL) void reset() noexcept; - //- Same as calling UPstream::cancelRequest() - void cancel() { UPstream::cancelRequest(*this); } - - //- Same as calling UPstream::freeRequest() - void free() { UPstream::freeRequest(*this); } + //- True if request is active (!= MPI_REQUEST_NULL) + //- Same as good(). Same as calling UPstream::activeRequest() + bool active() const noexcept { return good(); } //- Same as calling UPstream::finishedRequest() + // Uses MPI_Test() bool finished() { return UPstream::finishedRequest(*this); } - //- Same as calling UPstream::waitRequest() + //- Same as calling UPstream::waitRequest(). + // Uses MPI_Wait() void wait() { UPstream::waitRequest(*this); } + + + // Other + + //- Same as calling UPstream::cancelRequest(). + // Uses MPI_Cancel(), MPI_Request_free() + void cancel() { UPstream::cancelRequest(*this); } + + //- Same as calling UPstream::freeRequest(). + // Uses MPI_Request_free() + void free() { UPstream::freeRequest(*this); } }; @@ -2138,6 +2250,9 @@ Ostream& operator<<(Ostream&, const UPstream::commsStruct&); #include "UPstreamTraits.H" #include "UPstreamWindow.H" +// Locally include the following where required: +// - UPstreamFile.H + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // #ifdef NoRepository diff --git a/src/OpenFOAM/include/openfoam_mpi.H b/src/OpenFOAM/include/openfoam_mpi.H index a7e3249810..ad4b09a6be 100644 --- a/src/OpenFOAM/include/openfoam_mpi.H +++ b/src/OpenFOAM/include/openfoam_mpi.H @@ -33,6 +33,7 @@ Description #define Foam_UPstreamMPI_H #include "UPstream.H" +#include "UPstreamWindow.H" // Include MPI without any C++ bindings #ifndef MPICH_SKIP_MPICXX diff --git a/src/Pstream/dummy/UPstreamRequest.C b/src/Pstream/dummy/UPstreamRequest.C index 4e67be8594..cb0dc3bb81 100644 --- a/src/Pstream/dummy/UPstreamRequest.C +++ b/src/Pstream/dummy/UPstreamRequest.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2023 OpenCFD Ltd. + Copyright (C) 2023-2025 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -68,22 +68,22 @@ void Foam::UPstream::cancelRequest(const label i) {} void Foam::UPstream::cancelRequest(UPstream::Request&) {} void Foam::UPstream::cancelRequests(UList&) {} -void Foam::UPstream::removeRequests(const label pos, label len) {} +void Foam::UPstream::removeRequests(label pos, label len) {} void Foam::UPstream::freeRequest(UPstream::Request&) {} void Foam::UPstream::freeRequests(UList&) {} -void Foam::UPstream::waitRequests(const label pos, label len) {} +void Foam::UPstream::waitRequests(label pos, label len) {} void Foam::UPstream::waitRequests(UList&) {} -bool Foam::UPstream::waitAnyRequest(const label pos, label len) +bool Foam::UPstream::waitAnyRequest(label pos, label len) { return false; } bool Foam::UPstream::waitSomeRequests ( - const label pos, + label pos, label len, DynamicList* indices ) @@ -102,7 +102,7 @@ bool Foam::UPstream::waitSomeRequests return false; } -Foam::label Foam::UPstream::waitAnyRequest(UList&) +int Foam::UPstream::waitAnyRequest(UList&) { return -1; } @@ -110,10 +110,13 @@ Foam::label Foam::UPstream::waitAnyRequest(UList&) void Foam::UPstream::waitRequest(const label i) {} void Foam::UPstream::waitRequest(UPstream::Request&) {} +bool Foam::UPstream::activeRequest(const label i) { return false; } +bool Foam::UPstream::activeRequest(const UPstream::Request&) { return false; } + bool Foam::UPstream::finishedRequest(const label i) { return true; } bool Foam::UPstream::finishedRequest(UPstream::Request&) { return true; } -bool Foam::UPstream::finishedRequests(const label pos, label len) +bool Foam::UPstream::finishedRequests(label pos, label len) { return true; } diff --git a/src/Pstream/dummy/UPstreamWindow.C b/src/Pstream/dummy/UPstreamWindow.C index e7281d3bba..5982d8d20c 100644 --- a/src/Pstream/dummy/UPstreamWindow.C +++ b/src/Pstream/dummy/UPstreamWindow.C @@ -25,7 +25,7 @@ License \*---------------------------------------------------------------------------*/ -#include "UPstream.H" +#include "UPstreamWindow.H" // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // diff --git a/src/Pstream/mpi/PstreamGlobals.H b/src/Pstream/mpi/PstreamGlobals.H index e895883992..83c22f1bae 100644 --- a/src/Pstream/mpi/PstreamGlobals.H +++ b/src/Pstream/mpi/PstreamGlobals.H @@ -41,7 +41,7 @@ SourceFiles #include "DynamicList.H" #include "FixedList.H" -#include "UPstream.H" // For UPstream::Request +// UPstream, UPstream::Request, UPstream::Window, ... #include "openfoam_mpi.H" // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // diff --git a/src/Pstream/mpi/UPstreamRequest.C b/src/Pstream/mpi/UPstreamRequest.C index afbad051d6..af557cd096 100644 --- a/src/Pstream/mpi/UPstreamRequest.C +++ b/src/Pstream/mpi/UPstreamRequest.C @@ -42,7 +42,7 @@ Foam::UPstream::Request::Request() noexcept bool Foam::UPstream::Request::good() const noexcept { - return MPI_REQUEST_NULL != PstreamUtils::Cast::to_mpi(*this); + return (MPI_REQUEST_NULL != PstreamUtils::Cast::to_mpi(*this)); } @@ -99,7 +99,7 @@ void Foam::UPstream::addRequest(UPstream::Request& req) { MPI_Request request = PstreamUtils::Cast::to_mpi(req); - if (MPI_REQUEST_NULL != request) + if (MPI_REQUEST_NULL != request) // Active handle is mandatory { PstreamGlobals::outstandingRequests_.push_back(request); } @@ -116,8 +116,7 @@ void Foam::UPstream::cancelRequest(const label i) if ( !UPstream::parRun() - || i < 0 - || i >= PstreamGlobals::outstandingRequests_.size() + || (i < 0 || i >= PstreamGlobals::outstandingRequests_.size()) ) { return; @@ -170,12 +169,14 @@ void Foam::UPstream::cancelRequests(UList& requests) MPI_Cancel(&request); MPI_Request_free(&request); } - req = UPstream::Request(MPI_REQUEST_NULL); // Now inactive } + + // Everything handled, reset all to MPI_REQUEST_NULL + requests = UPstream::Request(MPI_REQUEST_NULL); } -void Foam::UPstream::removeRequests(const label pos, label len) +void Foam::UPstream::removeRequests(label pos, label len) { // No-op for non-parallel, no pending requests or out-of-range if @@ -258,12 +259,14 @@ void Foam::UPstream::freeRequests(UList& requests) // } MPI_Request_free(&request); } - req = UPstream::Request(MPI_REQUEST_NULL); // Now inactive } + + // Everything handled, reset all to MPI_REQUEST_NULL + requests = UPstream::Request(MPI_REQUEST_NULL); } -void Foam::UPstream::waitRequests(const label pos, label len) +void Foam::UPstream::waitRequests(label pos, label len) { // No-op for non-parallel, no pending requests or out-of-range if @@ -289,7 +292,7 @@ void Foam::UPstream::waitRequests(const label pos, label len) } // Have count >= 1 - auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos); + auto* mpiRequests = (PstreamGlobals::outstandingRequests_.data() + pos); if (UPstream::debug) { @@ -302,7 +305,7 @@ void Foam::UPstream::waitRequests(const label pos, label len) if (count == 1) { // On success: sets request to MPI_REQUEST_NULL - if (MPI_Wait(waitRequests, MPI_STATUS_IGNORE)) + if (MPI_Wait(mpiRequests, MPI_STATUS_IGNORE)) { FatalErrorInFunction << "MPI_Wait returned with error" @@ -312,7 +315,7 @@ void Foam::UPstream::waitRequests(const label pos, label len) else if (count > 1) { // On success: sets each request to MPI_REQUEST_NULL - if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE)) + if (MPI_Waitall(count, mpiRequests, MPI_STATUSES_IGNORE)) { FatalErrorInFunction << "MPI_Waitall returned with error" @@ -346,16 +349,17 @@ void Foam::UPstream::waitRequests(UList& requests) // Looks ugly but is legitimate since UPstream::Request is an intptr_t, // which is always large enough to hold an MPI_Request (int or pointer) - label count = 0; - auto* waitRequests = reinterpret_cast(requests.data()); + int count = 0; + auto* mpiRequests = reinterpret_cast(requests.data()); + // Transcribe, but pre-filter to eliminate inactive requests for (auto& req : requests) { MPI_Request request = PstreamUtils::Cast::to_mpi(req); if (MPI_REQUEST_NULL != request) // Apply some prefiltering { - waitRequests[count] = request; + mpiRequests[count] = request; ++count; } } @@ -369,7 +373,7 @@ void Foam::UPstream::waitRequests(UList& requests) profilingPstream::beginTiming(); // On success: sets each request to MPI_REQUEST_NULL - if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE)) + if (MPI_Waitall(count, mpiRequests, MPI_STATUSES_IGNORE)) { FatalErrorInFunction << "MPI_Waitall returned with error" @@ -383,7 +387,7 @@ void Foam::UPstream::waitRequests(UList& requests) } -bool Foam::UPstream::waitAnyRequest(const label pos, label len) +bool Foam::UPstream::waitAnyRequest(label pos, label len) { // No-op for non-parallel, no pending requests or out-of-range if @@ -407,7 +411,7 @@ bool Foam::UPstream::waitAnyRequest(const label pos, label len) } // Have count >= 1 - auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos); + auto* mpiRequests = (PstreamGlobals::outstandingRequests_.data() + pos); if (UPstream::debug) { @@ -419,7 +423,7 @@ bool Foam::UPstream::waitAnyRequest(const label pos, label len) // On success: sets request to MPI_REQUEST_NULL int index = MPI_UNDEFINED; - if (MPI_Waitany(count, waitRequests, &index, MPI_STATUS_IGNORE)) + if (MPI_Waitany(count, mpiRequests, &index, MPI_STATUS_IGNORE)) { FatalErrorInFunction << "MPI_Waitany returned with error" @@ -440,7 +444,7 @@ bool Foam::UPstream::waitAnyRequest(const label pos, label len) bool Foam::UPstream::waitSomeRequests ( - const label pos, + label pos, label len, DynamicList* indices ) @@ -468,7 +472,7 @@ bool Foam::UPstream::waitSomeRequests } // Have count >= 1 - auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos); + auto* mpiRequests = (PstreamGlobals::outstandingRequests_.data() + pos); if (UPstream::debug) { @@ -497,7 +501,7 @@ bool Foam::UPstream::waitSomeRequests MPI_Waitsome ( count, - waitRequests, + mpiRequests, &outcount, (indices ? indices->data() : tmpIndices.data()), MPI_STATUSES_IGNORE @@ -543,13 +547,13 @@ bool Foam::UPstream::waitSomeRequests // Looks ugly but is legitimate since UPstream::Request is an intptr_t, // which is always large enough to hold an MPI_Request (int or pointer) - label count = 0; - auto* waitRequests = reinterpret_cast(requests.data()); + const int count = static_cast(requests.size()); + auto* mpiRequests = reinterpret_cast(requests.data()); - for (auto& req : requests) + // Transcribe without changing locations + for (int i = 0; i < count; ++i) { - waitRequests[count] = PstreamUtils::Cast::to_mpi(req); - ++count; + mpiRequests[i] = PstreamUtils::Cast::to_mpi(requests[i]); } // Local temporary storage, or return via calling parameter @@ -578,7 +582,7 @@ bool Foam::UPstream::waitSomeRequests MPI_Waitsome ( count, - waitRequests, + mpiRequests, &outcount, (indices ? indices->data() : tmpIndices.data()), MPI_STATUSES_IGNORE @@ -612,7 +616,7 @@ bool Foam::UPstream::waitSomeRequests { for (label i = requests.size()-1; i >= 0; --i) { - requests[i] = UPstream::Request(waitRequests[i]); + requests[i] = UPstream::Request(mpiRequests[i]); } } @@ -620,7 +624,7 @@ bool Foam::UPstream::waitSomeRequests } -Foam::label Foam::UPstream::waitAnyRequest(UList& requests) +int Foam::UPstream::waitAnyRequest(UList& requests) { // No-op for non-parallel or no pending requests if (!UPstream::parRun() || requests.empty()) @@ -631,23 +635,22 @@ Foam::label Foam::UPstream::waitAnyRequest(UList& requests) // Looks ugly but is legitimate since UPstream::Request is an intptr_t, // which is always large enough to hold an MPI_Request (int or pointer) - label count = 0; - auto* waitRequests = reinterpret_cast(requests.data()); + const int count = static_cast(requests.size()); + auto* mpiRequests = reinterpret_cast(requests.data()); // Transcribe UPstream::Request into MPI_Request // - do not change locations within the list since these are relevant // for the return index. - for (auto& req : requests) + for (int i = 0; i < count; ++i) { - waitRequests[count] = PstreamUtils::Cast::to_mpi(req); - ++count; + mpiRequests[i] = PstreamUtils::Cast::to_mpi(requests[i]); } profilingPstream::beginTiming(); // On success: sets request to MPI_REQUEST_NULL int index = MPI_UNDEFINED; - if (MPI_Waitany(count, waitRequests, &index, MPI_STATUS_IGNORE)) + if (MPI_Waitany(count, mpiRequests, &index, MPI_STATUS_IGNORE)) { FatalErrorInFunction << "MPI_Waitany returned with error" @@ -664,15 +667,9 @@ Foam::label Foam::UPstream::waitAnyRequest(UList& requests) // Transcribe MPI_Request back into UPstream::Request // - do in reverse order - see note in finishedRequests() { - for (label i = count-1; i >= 0; --i) + for (label i = requests.size()-1; i >= 0; --i) { - requests[i] = UPstream::Request(waitRequests[i]); - } - - // Trailing portion - for (label i = count; i < requests.size(); ++i) - { - requests[i] = UPstream::Request(MPI_REQUEST_NULL); + requests[i] = UPstream::Request(mpiRequests[i]); } } @@ -695,16 +692,16 @@ Foam::label Foam::UPstream::waitAnyRequest(UList& requests) /// } /// /// int count = 0; -/// MPI_Request waitRequests[2]; +/// MPI_Request mpiRequests[2]; /// -/// waitRequests[count] = PstreamUtils::Cast::to_mpi(req0); -/// if (MPI_REQUEST_NULL != waitRequests[count]) +/// mpiRequests[count] = PstreamUtils::Cast::to_mpi(req0); +/// if (MPI_REQUEST_NULL != mpiRequests[count]) /// { /// ++count; /// } /// -/// waitRequests[count] = PstreamUtils::Cast::to_mpi(req1); -/// if (MPI_REQUEST_NULL != waitRequests[count]) +/// mpiRequests[count] = PstreamUtils::Cast::to_mpi(req1); +/// if (MPI_REQUEST_NULL != mpiRequests[count]) /// { /// ++count; /// } @@ -721,11 +718,25 @@ Foam::label Foam::UPstream::waitAnyRequest(UList& requests) /// profilingPstream::beginTiming(); /// /// // On success: sets each request to MPI_REQUEST_NULL -/// if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE)) +/// if (count == 1) /// { -/// FatalErrorInFunction -/// << "MPI_Waitall returned with error" -/// << Foam::abort(FatalError); +/// // On success: sets request to MPI_REQUEST_NULL +/// if (MPI_Wait(mpiRequests, MPI_STATUS_IGNORE)) +/// { +/// FatalErrorInFunction +/// << "MPI_Wait returned with error" +/// << Foam::abort(FatalError); +/// } +/// } +/// else // (count > 1) +/// { +/// // On success: sets each request to MPI_REQUEST_NULL +/// if (MPI_Waitall(count, mpiRequests, MPI_STATUSES_IGNORE)) +/// { +/// FatalErrorInFunction +/// << "MPI_Waitall returned with error" +/// << Foam::abort(FatalError); +/// } /// } /// /// profilingPstream::addWaitTime(); @@ -738,8 +749,7 @@ void Foam::UPstream::waitRequest(const label i) if ( !UPstream::parRun() - || i < 0 - || i >= PstreamGlobals::outstandingRequests_.size() + || (i < 0 || i >= PstreamGlobals::outstandingRequests_.size()) ) { return; @@ -810,15 +820,27 @@ void Foam::UPstream::waitRequest(UPstream::Request& req) } +bool Foam::UPstream::activeRequest(const label i) +{ + return + ( + (i >= 0 && i < PstreamGlobals::outstandingRequests_.size()) + && (MPI_REQUEST_NULL != PstreamGlobals::outstandingRequests_[i]) + ); +} + + +bool Foam::UPstream::activeRequest(const UPstream::Request& req) +{ + // Same as UPstream::Request::active() + return (MPI_REQUEST_NULL != PstreamUtils::Cast::to_mpi(req)); +} + + bool Foam::UPstream::finishedRequest(const label i) { - // No-op for non-parallel, or out-of-range (eg, placeholder indices) - if - ( - !UPstream::parRun() - || i < 0 - || i >= PstreamGlobals::outstandingRequests_.size() - ) + // No-op for non-parallel + if (!UPstream::parRun()) { return true; } @@ -829,19 +851,25 @@ bool Foam::UPstream::finishedRequest(const label i) << i << endl; } - auto& request = PstreamGlobals::outstandingRequests_[i]; + // NB: call MPI_Test() even with out-of-range or an inactive handle. + // This allows MPI to progress behind the scenes if it wishes. - // Fast-path (no-op) for null request - if (MPI_REQUEST_NULL == request) + int flag = 0; + if (i < 0 || i >= PstreamGlobals::outstandingRequests_.size()) { - return true; + auto& request = PstreamGlobals::outstandingRequests_[i]; + + // On success: sets request to MPI_REQUEST_NULL + MPI_Test(&request, &flag, MPI_STATUS_IGNORE); + } + else + { + // Pass a dummy request (for progress side-effect) + MPI_Request request = MPI_REQUEST_NULL; + MPI_Test(&request, &flag, MPI_STATUS_IGNORE); } - // On success: sets request to MPI_REQUEST_NULL - int flag = 0; - MPI_Test(&request, &flag, MPI_STATUS_IGNORE); - - return flag != 0; + return (flag != 0); } @@ -855,36 +883,36 @@ bool Foam::UPstream::finishedRequest(UPstream::Request& req) MPI_Request request = PstreamUtils::Cast::to_mpi(req); - // Fast-path (no-op) for null request - if (MPI_REQUEST_NULL == request) - { - return true; - } + // NB: call MPI_Test() even with an inactive handle. + // This allows MPI to progress behind the scenes if it wishes. int flag = 0; MPI_Test(&request, &flag, MPI_STATUS_IGNORE); - if (flag) - { - // Success: now inactive - req = UPstream::Request(MPI_REQUEST_NULL); - } + // Sync values + req = UPstream::Request(request); - return flag != 0; + return (flag != 0); } -bool Foam::UPstream::finishedRequests(const label pos, label len) +bool Foam::UPstream::finishedRequests(label pos, label len) { - // No-op for non-parallel, or out-of-range (eg, placeholder indices) + // No-op for non-parallel + if (!UPstream::parRun()) + { + return true; + } + + // Out-of-range (eg, placeholder indices) if ( - !UPstream::parRun() - || (pos < 0 || pos >= PstreamGlobals::outstandingRequests_.size()) + (pos < 0 || pos >= PstreamGlobals::outstandingRequests_.size()) || !len ) { - return true; + pos = 0; + len = 0; } label count = (PstreamGlobals::outstandingRequests_.size() - pos); @@ -904,29 +932,33 @@ bool Foam::UPstream::finishedRequests(const label pos, label len) << " requests starting at " << pos << endl; } - auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos); + auto* mpiRequests = (PstreamGlobals::outstandingRequests_.data() + pos); int flag = 1; - if (count == 1) + if (count <= 0) { - // Fast-path (no-op) for single null request - if (MPI_REQUEST_NULL == *waitRequests) - { - return true; - } + // No requests + + // Pass a dummy request (for progress side-effect) + MPI_Request request = MPI_REQUEST_NULL; + MPI_Test(&request, &flag, MPI_STATUS_IGNORE); + } + else if (count == 1) + { + // Single request // On success: sets request to MPI_REQUEST_NULL - MPI_Test(waitRequests, &flag, MPI_STATUS_IGNORE); + MPI_Test(mpiRequests, &flag, MPI_STATUS_IGNORE); } - else if (count > 1) + else // (count > 1) { // On success: sets each request to MPI_REQUEST_NULL // On failure: no request is modified - MPI_Testall(count, waitRequests, &flag, MPI_STATUSES_IGNORE); + MPI_Testall(count, mpiRequests, &flag, MPI_STATUSES_IGNORE); } - return flag != 0; + return (flag != 0); } @@ -941,59 +973,53 @@ bool Foam::UPstream::finishedRequests(UList& requests) // Looks ugly but is legitimate since UPstream::Request is an intptr_t, // which is always large enough to hold an MPI_Request (int or pointer) - label count = 0; - auto* waitRequests = reinterpret_cast(requests.data()); + const int count = static_cast(requests.size()); + auto* mpiRequests = reinterpret_cast(requests.data()); - for (auto& req : requests) + // Transcribe + for (int i = 0; i < count; ++i) { - MPI_Request request = PstreamUtils::Cast::to_mpi(req); + mpiRequests[i] = PstreamUtils::Cast::to_mpi(requests[i]); + } - if (MPI_REQUEST_NULL != request) // Apply some prefiltering + + // NB: call MPI_Test() even with an inactive handle. + // This allows MPI to progress behind the scenes if it wishes. + + int flag = 1; + if (count <= 0) + { + // No requests + + // Pass a dummy request (for progress side-effect) + MPI_Request request = MPI_REQUEST_NULL; + MPI_Test(&request, &flag, MPI_STATUS_IGNORE); + } + else if (count == 1) + { + // Single request + + // On success: sets request to MPI_REQUEST_NULL + MPI_Test(mpiRequests, &flag, MPI_STATUS_IGNORE); + } + else // (count > 1) + { + // On success: sets each request to MPI_REQUEST_NULL + // On failure: no request is modified + MPI_Testall(count, mpiRequests, &flag, MPI_STATUSES_IGNORE); + } + + + // Transcribe MPI_Request back into UPstream::Request + // - do in reverse order - see note in finishedRequests() + { + for (label i = requests.size()-1; i >= 0; --i) { - waitRequests[count] = request; - ++count; + requests[i] = UPstream::Request(mpiRequests[i]); } } - if (!count) - { - // No active handles - return true; - } - - // On success: sets each request to MPI_REQUEST_NULL - // On failure: no request is modified - int flag = 0; - MPI_Testall(count, waitRequests, &flag, MPI_STATUSES_IGNORE); - - if (flag) - { - // Success: reset all requests to MPI_REQUEST_NULL - requests = UPstream::Request(MPI_REQUEST_NULL); - } - else - { - // Not all done. Recover wrapped representation but in reverse order - // since sizeof(MPI_Request) can be smaller than - // sizeof(UPstream::Request::value_type) - // eg, mpich has MPI_Request as 'int' - // - // This is uglier that we'd like, but much better than allocating - // and freeing a scratch buffer each time we query things. - - for (label i = count-1; i >= 0; --i) - { - requests[i] = UPstream::Request(waitRequests[i]); - } - - // Trailing portion - for (label i = count; i < requests.size(); ++i) - { - requests[i] = UPstream::Request(MPI_REQUEST_NULL); - } - } - - return flag != 0; + return (flag != 0); } @@ -1008,29 +1034,29 @@ bool Foam::UPstream::finishedRequestPair(label& req0, label& req1) } bool anyActive = false; - MPI_Request waitRequests[2]; + MPI_Request mpiRequests[2]; // No-op for out-of-range (eg, placeholder indices) if (req0 >= 0 && req0 < PstreamGlobals::outstandingRequests_.size()) { - waitRequests[0] = PstreamGlobals::outstandingRequests_[req0]; + mpiRequests[0] = PstreamGlobals::outstandingRequests_[req0]; } else { - waitRequests[0] = MPI_REQUEST_NULL; + mpiRequests[0] = MPI_REQUEST_NULL; } if (req1 >= 0 && req1 < PstreamGlobals::outstandingRequests_.size()) { - waitRequests[1] = PstreamGlobals::outstandingRequests_[req1]; + mpiRequests[1] = PstreamGlobals::outstandingRequests_[req1]; } else { - waitRequests[1] = MPI_REQUEST_NULL; + mpiRequests[1] = MPI_REQUEST_NULL; } - if (MPI_REQUEST_NULL != waitRequests[0]) // An active handle + if (MPI_REQUEST_NULL != mpiRequests[0]) // An active handle { anyActive = true; } @@ -1039,7 +1065,7 @@ bool Foam::UPstream::finishedRequestPair(label& req0, label& req1) req0 = -1; } - if (MPI_REQUEST_NULL != waitRequests[1]) // An active handle + if (MPI_REQUEST_NULL != mpiRequests[1]) // An active handle { anyActive = true; } @@ -1064,7 +1090,7 @@ bool Foam::UPstream::finishedRequestPair(label& req0, label& req1) MPI_Testsome ( 2, - waitRequests, + mpiRequests, &outcount, indices, MPI_STATUSES_IGNORE @@ -1085,12 +1111,12 @@ bool Foam::UPstream::finishedRequestPair(label& req0, label& req1) if (req0 >= 0) { - PstreamGlobals::outstandingRequests_[req0] = waitRequests[0]; + PstreamGlobals::outstandingRequests_[req0] = mpiRequests[0]; } if (req1 >= 0) { - PstreamGlobals::outstandingRequests_[req1] = waitRequests[1]; + PstreamGlobals::outstandingRequests_[req1] = mpiRequests[1]; } // Flag indices as 'done' @@ -1108,7 +1134,7 @@ bool Foam::UPstream::finishedRequestPair(label& req0, label& req1) { if (req0 >= 0) { - PstreamGlobals::outstandingRequests_[req0] = waitRequests[0]; + PstreamGlobals::outstandingRequests_[req0] = mpiRequests[0]; req0 = -1; } } @@ -1116,7 +1142,7 @@ bool Foam::UPstream::finishedRequestPair(label& req0, label& req1) { if (req1 >= 0) { - PstreamGlobals::outstandingRequests_[req1] = waitRequests[1]; + PstreamGlobals::outstandingRequests_[req1] = mpiRequests[1]; req1 = -1; } } @@ -1137,17 +1163,17 @@ void Foam::UPstream::waitRequestPair(label& req0, label& req1) } int count = 0; - MPI_Request waitRequests[2]; + MPI_Request mpiRequests[2]; // No-op for out-of-range (eg, placeholder indices) // Prefilter inactive handles if (req0 >= 0 && req0 < PstreamGlobals::outstandingRequests_.size()) { - waitRequests[count] = PstreamGlobals::outstandingRequests_[req0]; + mpiRequests[count] = PstreamGlobals::outstandingRequests_[req0]; PstreamGlobals::outstandingRequests_[req0] = MPI_REQUEST_NULL; - if (MPI_REQUEST_NULL != waitRequests[count]) // An active handle + if (MPI_REQUEST_NULL != mpiRequests[count]) // An active handle { ++count; } @@ -1155,10 +1181,10 @@ void Foam::UPstream::waitRequestPair(label& req0, label& req1) if (req1 >= 0 && req1 < PstreamGlobals::outstandingRequests_.size()) { - waitRequests[count] = PstreamGlobals::outstandingRequests_[req1]; + mpiRequests[count] = PstreamGlobals::outstandingRequests_[req1]; PstreamGlobals::outstandingRequests_[req1] = MPI_REQUEST_NULL; - if (MPI_REQUEST_NULL != waitRequests[count]) // An active handle + if (MPI_REQUEST_NULL != mpiRequests[count]) // An active handle { ++count; } @@ -1177,7 +1203,7 @@ void Foam::UPstream::waitRequestPair(label& req0, label& req1) profilingPstream::beginTiming(); // On success: sets each request to MPI_REQUEST_NULL - if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE)) + if (MPI_Waitall(count, mpiRequests, MPI_STATUSES_IGNORE)) { FatalErrorInFunction << "MPI_Waitall returned with error" diff --git a/src/Pstream/mpi/UPstreamWindow.C b/src/Pstream/mpi/UPstreamWindow.C index bf623e42e7..bf19725572 100644 --- a/src/Pstream/mpi/UPstreamWindow.C +++ b/src/Pstream/mpi/UPstreamWindow.C @@ -25,6 +25,7 @@ License \*---------------------------------------------------------------------------*/ +#include "UPstreamWindow.H" #include "PstreamGlobals.H" #include "profilingPstream.H" diff --git a/src/meshTools/AMIInterpolation/AMIInterpolation/AMIInterpolation.C b/src/meshTools/AMIInterpolation/AMIInterpolation/AMIInterpolation.C index 5ed3d99f94..3fdc581593 100644 --- a/src/meshTools/AMIInterpolation/AMIInterpolation/AMIInterpolation.C +++ b/src/meshTools/AMIInterpolation/AMIInterpolation/AMIInterpolation.C @@ -185,11 +185,7 @@ Foam::label Foam::AMIInterpolation::calcDistribution } else if (inCommGroup) { - if - ( - currComm >= 0 - && ListOps::equal(subProcs, UPstream::procID(currComm)) - ) + if (UPstream::sameProcs(currComm, subProcs)) { // Keep geomComm if (debug)