From 568ced68e270ce1260de4aafd46eeaeb1b145759 Mon Sep 17 00:00:00 2001 From: Mark Olesen Date: Tue, 10 Jan 2023 12:14:31 +0100 Subject: [PATCH] ENH: support independent handling of MPI requests (#2674) - UPstream::Request wrapping class provides an opaque wrapper for vendor MPI_Request values, independent of global lists. ENH: support for MPI barrier (blocking or non-blocking) ENH: support for MPI sync-send variants STYLE: deprecate waitRequests() without a position parameter - in many cases this can indicate a problem in the program logic since normally the startOfRequests should be tracked locally. --- .../Test-parallel-nonBlocking.C | 49 +++-- .../db/IOstreams/Pstreams/IPBstreams.C | 4 +- src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H | 6 +- .../db/IOstreams/Pstreams/IPstreams.C | 8 +- .../db/IOstreams/Pstreams/OPBstreams.C | 6 +- src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H | 6 +- .../db/IOstreams/Pstreams/OPstreams.C | 6 +- src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H | 4 +- .../db/IOstreams/Pstreams/PstreamReduceOps.H | 59 ++++- .../db/IOstreams/Pstreams/UIPstream.H | 43 +++- .../db/IOstreams/Pstreams/UIPstreamBase.C | 4 +- .../db/IOstreams/Pstreams/UOPstream.H | 44 +++- .../db/IOstreams/Pstreams/UOPstreamBase.C | 6 +- src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C | 3 + src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H | 161 +++++++++++++- .../IOstreams/Pstreams/UPstreamCommsStruct.C | 9 - src/Pstream/dummy/UIPBstreamRead.C | 6 +- src/Pstream/dummy/UIPstreamRead.C | 7 +- src/Pstream/dummy/UOPBstreamWrite.C | 6 +- src/Pstream/dummy/UOPstreamWrite.C | 6 +- src/Pstream/dummy/UPstream.C | 3 + src/Pstream/dummy/UPstreamReduce.C | 25 ++- src/Pstream/dummy/UPstreamRequest.C | 25 ++- src/Pstream/mpi/UIPBstreamRead.C | 4 +- src/Pstream/mpi/UIPstreamRead.C | 27 ++- src/Pstream/mpi/UOPBstreamWrite.C | 6 +- src/Pstream/mpi/UOPstreamWrite.C | 96 +++++--- src/Pstream/mpi/UPstream.C | 46 ++++ src/Pstream/mpi/UPstreamAllToAll.C | 1 - src/Pstream/mpi/UPstreamBroadcast.C | 2 - src/Pstream/mpi/UPstreamGatherScatter.C | 1 - src/Pstream/mpi/UPstreamReduce.C | 40 +++- src/Pstream/mpi/UPstreamRequest.C | 205 +++++++++++++++++- src/Pstream/mpi/UPstreamWrapping.H | 50 +++-- src/Pstream/mpi/UPstreamWrappingTemplates.C | 188 +++++++++++----- 35 files changed, 939 insertions(+), 223 deletions(-) diff --git a/applications/test/parallel-nonBlocking/Test-parallel-nonBlocking.C b/applications/test/parallel-nonBlocking/Test-parallel-nonBlocking.C index fc13771c73..ce00f4d8cb 100644 --- a/applications/test/parallel-nonBlocking/Test-parallel-nonBlocking.C +++ b/applications/test/parallel-nonBlocking/Test-parallel-nonBlocking.C @@ -50,9 +50,8 @@ using namespace Foam; int main(int argc, char *argv[]) { - #include "setRootCase.H" - #include "createTime.H" - + argList::noCheckProcessorDirectories(); + argList args(argc, argv); // Test PstreamBuffers // ~~~~~~~~~~~~~~~~~~~ @@ -83,13 +82,13 @@ int main(int argc, char *argv[]) if (Pstream::master()) { // Collect my own data - allData.append(data); + allData.push_back(data); for (const int proci : Pstream::subProcs()) { Perr << "master receiving from " << proci << endl; UIPstream fromProc(proci, pBufs); - allData.append(vector(fromProc)); + allData.push_back(vector(fromProc)); } } @@ -102,7 +101,7 @@ int main(int argc, char *argv[]) { Perr << "master sending to " << proci << endl; UOPstream toProc(proci, pBufs); - toSlave << allData; + toProc << allData; } } @@ -125,13 +124,27 @@ int main(int argc, char *argv[]) scalar data1 = 1.0; label request1 = -1; { - Foam::reduce(data1, sumOp(), UPstream::msgType(), request1); + Foam::reduce + ( + data1, + sumOp(), + UPstream::msgType(), + UPstream::worldComm, + request1 + ); } scalar data2 = 0.1; - label request2 = -1; + UPstream::Request request2; { - Foam::reduce(data2, sumOp(), UPstream::msgType(), request2); + Foam::reduce + ( + data2, + sumOp(), + UPstream::msgType(), + UPstream::worldComm, + request2 + ); } @@ -168,23 +181,23 @@ int main(int argc, char *argv[]) if (request1 != -1) { - Pout<< "Waiting for non-blocking reduce with request " << request1 - << endl; - Pstream::waitRequest(request1); + Pout<< "Waiting for non-blocking reduce with request " + << request1 << endl; + UPstream::waitRequest(request1); } Info<< "Reduced data1:" << data1 << endl; - if (request2 != -1) + if (request2.good()) { - Pout<< "Waiting for non-blocking reduce with request " << request1 - << endl; - Pstream::waitRequest(request2); + Pout<< "Waiting for non-blocking reduce with request " + << Foam::name(request2.pointer()) << endl; + UPstream::waitRequest(request2); } Info<< "Reduced data2:" << data2 << endl; - // Clear any outstanding requests - Pstream::resetRequests(0); + // Clear all outstanding requests + UPstream::resetRequests(0); Info<< "End\n" << endl; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPBstreams.C b/src/OpenFOAM/db/IOstreams/Pstreams/IPBstreams.C index 3bcb9142e8..5540777f09 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/IPBstreams.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPBstreams.C @@ -33,7 +33,7 @@ License Foam::UIPBstream::UIPBstream ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int fromProcNo, DynamicList& receiveBuf, label& receiveBufPosition, @@ -61,7 +61,7 @@ Foam::UIPBstream::UIPBstream Foam::IPBstream::IPBstream ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int fromProcNo, const label bufSize, const int tag, diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H index fa577369a2..43152570b1 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2013 OpenFOAM Foundation - Copyright (C) 2021-2022 OpenCFD Ltd. + Copyright (C) 2021-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -69,7 +69,7 @@ public: //- and optional buffer size, read format IPstream ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int fromProcNo, const label bufSize = 0, const int tag = UPstream::msgType(), @@ -102,7 +102,7 @@ public: //- Construct for broadcast root, optional buffer size, read format IPBstream ( - const commsTypes commsType, //!< ignored + const UPstream::commsTypes, //!< ignored const int rootProcNo, //!< normally UPstream::masterNo() const label bufSize = 0, const int tag = UPstream::msgType(), //!< ignored diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C b/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C index 232a8d0754..caabf931d7 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPstreams.C @@ -33,7 +33,7 @@ License Foam::UIPstream::UIPstream ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int fromProcNo, DynamicList& receiveBuf, label& receiveBufPosition, @@ -55,7 +55,7 @@ Foam::UIPstream::UIPstream fmt ) { - if (commsType == commsTypes::nonBlocking) + if (commsType == UPstream::commsTypes::nonBlocking) { // Message is already received into buffer } @@ -70,7 +70,7 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers) : UIPstreamBase(fromProcNo, buffers) { - if (commsType() == commsTypes::nonBlocking) + if (commsType() == UPstream::commsTypes::nonBlocking) { // Message is already received into buffer messageSize_ = recvBuf_.size(); @@ -93,7 +93,7 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers) Foam::IPstream::IPstream ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int fromProcNo, const label bufSize, const int tag, diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/OPBstreams.C b/src/OpenFOAM/db/IOstreams/Pstreams/OPBstreams.C index 182597b193..487cdf6716 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/OPBstreams.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/OPBstreams.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -33,7 +33,7 @@ License Foam::UOPBstream::UOPBstream ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int toProcNo, DynamicList& sendBuf, const int tag, @@ -48,7 +48,7 @@ Foam::UOPBstream::UOPBstream Foam::OPBstream::OPBstream ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int toProcNo, const label bufSize, const int tag, diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H index dd071d9b35..154ed87d31 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2013 OpenFOAM Foundation - Copyright (C) 2021-2022 OpenCFD Ltd. + Copyright (C) 2021-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -64,7 +64,7 @@ public: //- and optional buffer size, write format OPstream ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int toProcNo, const label bufSize = 0, const int tag = UPstream::msgType(), @@ -92,7 +92,7 @@ public: //- Construct for broadcast root, optional buffer size, write format OPBstream ( - const commsTypes commsType, //!< ignored + const UPstream::commsTypes, //!< ignored const int rootProcNo, //!< normally UPstream::masterNo() const label bufSize = 0, const int tag = UPstream::msgType(), //!< ignored diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/OPstreams.C b/src/OpenFOAM/db/IOstreams/Pstreams/OPstreams.C index 70e557ab3f..9f1d70f42f 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/OPstreams.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/OPstreams.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011 OpenFOAM Foundation - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -33,7 +33,7 @@ License Foam::UOPstream::UOPstream ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int toProcNo, DynamicList& sendBuf, const int tag, @@ -54,7 +54,7 @@ Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers) Foam::OPstream::OPstream ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int toProcNo, const label bufSize, const int tag, diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index 0af7d9fa6a..d4fd86c480 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -102,13 +102,13 @@ protected: public: - // Declare name of the class and its debug switch + //- Declare name of the class and its debug switch ClassName("Pstream"); // Constructors - //- Construct for given commsTypes, with optional buffer size + //- Construct for given communication type, with optional buffer size explicit Pstream ( const UPstream::commsTypes commsType, diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H index bc8dbfc965..e047d399bb 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2016 OpenFOAM Foundation - Copyright (C) 2016-2022 OpenCFD Ltd. + Copyright (C) 2016-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -101,6 +101,21 @@ void reduce NotImplemented; } +//- Non-blocking reduce inplace (cf. MPI Iallreduce) +//- single value. Sets request. +template +void reduce +( + T& Value, + const BinaryOp&, + const int tag, + const label comm, + UPstream::Request& req +) +{ + NotImplemented; +} + //- Non-blocking reduce inplace (cf. MPI Iallreduce) //- single value. Sets request. template @@ -117,8 +132,23 @@ void reduce } //- Non-blocking reduce inplace (cf. MPI Iallreduce) -//- of multiple values (same size on all processes!) -// Sets request. +//- of multiple values (same size on all processes!). Sets request. +template +void reduce +( + T values[], + const int size, + const BinaryOp&, + const int tag, + const label comm, + UPstream::Request& req +) +{ + NotImplemented; +} + +//- Non-blocking reduce inplace (cf. MPI Iallreduce) +//- of multiple values (same size on all processes!). Sets request. template void reduce ( @@ -272,6 +302,18 @@ Pstream_CommonReductions(Native); \ \ /*! \brief Non-blocking reduce (sum) multiple Native values. Sets request */ \ void reduce \ +( \ + Native values[], \ + const int size, \ + const sumOp&, \ + const int tag, /*!< (ignored) */ \ + const label comm, \ + UPstream::Request& req /*!< [out] request information */ \ +); \ + \ +/*! \brief Non-blocking reduce (sum) multiple Native values. Sets request */ \ +/*! \deprecated prefer version with UPstream::Request */ \ +void reduce \ ( \ Native values[], \ const int size, \ @@ -283,6 +325,17 @@ void reduce \ \ /*! \brief Non-blocking reduce (sum) single Native value. Sets request */ \ void reduce \ +( \ + Native& value, \ + const sumOp&, \ + const int tag, /*!< (ignored) */ \ + const label comm, \ + UPstream::Request& req /*!< [out] request information */ \ +); \ + \ +/*! \brief Non-blocking reduce (sum) single Native value. Sets request */ \ +/*! \deprecated prefer version with UPstream::Request */ \ +void reduce \ ( \ Native& value, \ const sumOp&, \ diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H index e51b0ac9f6..b29042280f 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2013 OpenFOAM Foundation - Copyright (C) 2017-2022 OpenCFD Ltd. + Copyright (C) 2017-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -107,7 +107,7 @@ protected: //- and IO format UIPstreamBase ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int fromProcNo, DynamicList& receiveBuf, label& receiveBufPosition, @@ -225,7 +225,7 @@ public: //- and IO format UIPstream ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int fromProcNo, DynamicList& receiveBuf, label& receiveBufPosition, @@ -255,13 +255,40 @@ public: // \return the message size static label read ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int fromProcNo, char* buf, const std::streamsize bufSize, const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm + const label comm = UPstream::worldComm, + //! [out] request information (for non-blocking) + UPstream::Request* req = nullptr ); + + //- Read buffer contents (non-blocking) from given processor + // \return the message size + inline static label read + ( + //! [out] request information + UPstream::Request& req, + const int fromProcNo, + char* buf, + const std::streamsize bufSize, + const int tag, + const label communicator + ) + { + return UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + fromProcNo, + buf, + bufSize, + tag, + communicator, + &req + ); + } }; @@ -290,7 +317,7 @@ public: //- and IO format UIPBstream ( - const commsTypes commsType, //!< ignored + const UPstream::commsTypes, //!< ignored const int rootProcNo, //!< normally UPstream::masterNo() DynamicList& receiveBuf, label& receiveBufPosition, @@ -317,11 +344,9 @@ public: // \return the message size static label read ( - const commsTypes commsTypes, //!< ignored - const int rootProcNo, //!< normally UPstream::masterNo() + const int rootProcNo, //!< normally UPstream::masterNo() char* buf, const std::streamsize bufSize, - const int tag = UPstream::msgType(), //!< ignored const label comm = UPstream::worldComm ); }; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C index 9df7380c52..21adb26831 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstreamBase.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2015 OpenFOAM Foundation - Copyright (C) 2017-2022 OpenCFD Ltd. + Copyright (C) 2017-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -146,7 +146,7 @@ inline Foam::Istream& Foam::UIPstreamBase::readString(std::string& str) Foam::UIPstreamBase::UIPstreamBase ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int fromProcNo, DynamicList& receiveBuf, label& receiveBufPosition, diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H index 80d5684b81..d2c344da43 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2014 OpenFOAM Foundation - Copyright (C) 2017-2022 OpenCFD Ltd. + Copyright (C) 2017-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -110,7 +110,7 @@ protected: //- and IO format UOPstreamBase ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int toProcNo, DynamicList& sendBuf, const int tag = UPstream::msgType(), @@ -298,7 +298,7 @@ public: //- and IO format UOPstream ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int toProcNo, DynamicList& sendBuf, const int tag = UPstream::msgType(), @@ -332,8 +332,38 @@ public: const char* buf, const std::streamsize bufSize, const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm + const label comm = UPstream::worldComm, + //! [out] request information (for non-blocking) + UPstream::Request* req = nullptr, + const UPstream::sendModes sendMode = UPstream::sendModes::normal ); + + //- Write buffer contents (non-blocking) to given processor + // \return True on success + inline static bool write + ( + //! [out] request information + UPstream::Request& req, + const int toProcNo, + const char* buf, + const std::streamsize bufSize, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const UPstream::sendModes sendMode = UPstream::sendModes::normal + ) + { + return UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + toProcNo, + buf, + bufSize, + tag, + comm, + &req, + sendMode + ); + } }; @@ -364,7 +394,7 @@ public: //- and IO format UOPBstream ( - const commsTypes commsType, //!< ignored + const UPstream::commsTypes, //!< ignored const int toProcNo, //!< normally UPstream::masterNo() DynamicList& sendBuf, const int tag = UPstream::msgType(), //!< ignored @@ -390,11 +420,9 @@ public: // \return True on success static bool write ( - const commsTypes commsType, //!< ignored - const int rootProcNo, //!< normally UPstream::masterNo() + const int rootProcNo, //!< normally UPstream::masterNo() const char* buf, const std::streamsize bufSize, - const int tag = UPstream::msgType(), //!< ignored const label comm = UPstream::worldComm ); }; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C index 871bef8a01..631818b772 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstreamBase.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2016-2022 OpenCFD Ltd. + Copyright (C) 2016-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -117,7 +117,7 @@ inline void Foam::UOPstreamBase::putChar(const char c) { sendBuf_.setCapacity(1000); } - sendBuf_.append(c); + sendBuf_.push_back(c); } @@ -133,7 +133,7 @@ inline void Foam::UOPstreamBase::putString(const std::string& str) Foam::UOPstreamBase::UOPstreamBase ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int toProcNo, DynamicList& sendBuf, const int tag, diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C index d4ef48da5e..d1be6447e6 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C @@ -519,6 +519,8 @@ Foam::UPstream::commsTypes Foam::UPstream::defaultCommsType ) ); + +//! \cond file_scope namespace Foam { //- Registered reader for UPstream::defaultCommsType @@ -549,6 +551,7 @@ namespace Foam addcommsTypeToOpt addcommsTypeToOpt_("commsType"); } +//! \endcond int Foam::UPstream::nPollProcInterfaces ( diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index ea1b63a7fe..ed16d3a770 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -52,6 +52,10 @@ SourceFiles namespace Foam { +//- Implementation details for UPstream/Pstream/MPI etc. +namespace PstreamDetail {} + + /*---------------------------------------------------------------------------*\ Class UPstream Declaration \*---------------------------------------------------------------------------*/ @@ -63,7 +67,7 @@ public: //- Int ranges are used for MPI ranks (processes) typedef IntRange rangeType; - //- Types of communications + //- Communications types enum class commsTypes : char { blocking, //!< "blocking" : (MPI_Bsend, MPI_Recv) @@ -74,9 +78,21 @@ public: //- Enumerated names for the communication types static const Enum commsTypeNames; + //- Different MPI-send modes (ignored for commsTypes::blocking) + enum class sendModes : char + { + normal, //!< (MPI_Send, MPI_Isend) + sync //!< (MPI_Ssend, MPI_Issend) + }; + // Public Classes + // Forward Declarations + + //- Wrapper for MPI_Request + class Request; + //- Structure for communicating between processors class commsStruct { @@ -100,8 +116,8 @@ public: // Constructors - //- Default construct. Above == -1 - commsStruct() noexcept; + //- Default construct with above == -1 + commsStruct() noexcept : above_(-1) {} //- Construct from components commsStruct @@ -166,7 +182,6 @@ public: friend Ostream& operator<<(Ostream&, const commsStruct&); }; - //- combineReduce operator for lists. Used for counting. struct listEq { @@ -305,7 +320,7 @@ public: // Constructors //- Construct for given communication type - explicit UPstream(const commsTypes commsType) + explicit UPstream(const commsTypes commsType) noexcept : commsType_(commsType) {} @@ -441,6 +456,13 @@ public: // Fatal if MPI has already been finalized. static bool initNull(); + //- Impose a synchronisation barrier (optionally non-blocking) + static void barrier + ( + const label communicator, + UPstream::Request* req = nullptr + ); + // Non-blocking comms @@ -452,10 +474,14 @@ public: // A no-op for out-of-range values. static void resetRequests(const label n); - //- Wait until all requests (from start onwards) have finished. + //- Wait until all requests (from position onwards) have finished. // A no-op if parRun() == false, if there are no pending requests // or if the start is out-of-range (0 to nRequests) - static void waitRequests(const label start = 0); + static void waitRequests(const label pos); + + //- Wait until all requests have finished. + // A no-op if parRun() == false or the list is empty + static void waitRequests(UList& requests); //- Wait until request i has finished. // A no-op if parRun() == false, @@ -463,12 +489,21 @@ public: // or if the index is out-of-range (0 to nRequests) static void waitRequest(const label i); + //- Wait until specified request has finished. + // A no-op if parRun() == false or for a null-request + static void waitRequest(UPstream::Request& req); + //- Non-blocking comms: has request i finished? // A no-op and returns true if parRun() == false, // there are no pending requests, // or if the index is out-of-range (0 to nRequests) static bool finishedRequest(const label i); + //- Non-blocking comms: has request finished? + // A no-op and returns true if parRun() == false + // or for a null-request + static bool finishedRequest(UPstream::Request& req); + static int allocateTag(const char* const msg = nullptr); static void freeTag(const int tag, const char* const msg = nullptr); @@ -789,6 +824,12 @@ public: // Housekeeping + //- Wait for all requests to finish. + // \deprecated(2023-01) Probably not what you want. + // Should normally be restricted to a particular starting request. + FOAM_DEPRECATED_FOR(2023-01, "waitRequests(int) method") + static void waitRequests() { waitRequests(0); } + //- Process index of first sub-process // \deprecated(2020-09) use subProcs() method instead static constexpr int firstSlave() noexcept @@ -805,8 +846,114 @@ public: }; +/*---------------------------------------------------------------------------*\ + Class UPstream::Request Declaration +\*---------------------------------------------------------------------------*/ + +//- An opaque wrapper for MPI_Request with a vendor-independent +//- representation independent of any \c header +// +// The MPI standard states that MPI_Request is always an opaque object. +// Generally it is either an integer (eg, mpich) or a pointer (eg, openmpi). +class UPstream::Request +{ +public: + + // Public Types + + //- Storage for MPI_Request (as integer or pointer) + typedef std::intptr_t value_type; + + +private: + + // Private Data + + //- The MPI_Request (as wrapped value) + value_type value_; + +public: + + // Generated Methods + + //- Copy construct + Request(const Request&) noexcept = default; + + //- Move construct + Request(Request&&) noexcept = default; + + //- Copy assignment + Request& operator=(const Request&) noexcept = default; + + //- Move assignment + Request& operator=(Request&&) noexcept = default; + + + // Member Operators + + //- Test for equality + bool operator==(const Request& rhs) const noexcept + { + return (value_ == rhs.value_); + } + + //- Test for inequality + bool operator!=(const Request& rhs) const noexcept + { + return (value_ != rhs.value_); + } + + + // Constructors + + //- Default construct as MPI_REQUEST_NULL + Request() noexcept; + + //- Construct from MPI_Request (as pointer type) + explicit Request(const void* p) noexcept + : + value_(reinterpret_cast(p)) + {} + + //- Construct from MPI_Request (as integer type) + explicit Request(value_type val) noexcept + : + value_(val) + {} + + + // Member Functions + + //- Return raw value + value_type value() const noexcept { return value_; } + + //- Return as pointer value + const void* pointer() const noexcept + { + return reinterpret_cast(value_); + } + + //- True if not equal to MPI_REQUEST_NULL + bool good() const noexcept; + + //- Reset to default constructed value (MPI_REQUEST_NULL) + void reset() noexcept; + + //- Same as calling UPstream::waitRequest() + void wait() { UPstream::waitRequest(*this); } + + //- Same as calling UPstream::finishedRequest() + bool finished() { return UPstream::finishedRequest(*this); } +}; + + +// * * * * * * * * * * * * * * * IOstream Operators * * * * * * * * * * * * // + Ostream& operator<<(Ostream&, const UPstream::commsStruct&); + +// * * * * * * * * * * * * Template Specialisations * * * * * * * * * * * * // + // Template specialisation for access of commsStruct template<> UPstream::commsStruct& diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamCommsStruct.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamCommsStruct.C index e854bc7182..760c77d2bf 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamCommsStruct.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamCommsStruct.C @@ -30,15 +30,6 @@ License // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // -Foam::UPstream::commsStruct::commsStruct() noexcept -: - above_(-1), - below_(), - allBelow_(), - allNotBelow_() -{} - - Foam::UPstream::commsStruct::commsStruct ( const label above, diff --git a/src/Pstream/dummy/UIPBstreamRead.C b/src/Pstream/dummy/UIPBstreamRead.C index 94e4dcd7c4..ac23ba916d 100644 --- a/src/Pstream/dummy/UIPBstreamRead.C +++ b/src/Pstream/dummy/UIPBstreamRead.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -39,12 +39,10 @@ void Foam::UIPBstream::bufferIPCrecv() Foam::label Foam::UIPBstream::read ( - const commsTypes commsType, const int rootProcNo, char* buf, const std::streamsize bufSize, - const int tag, - const label communicator + const label comm ) { NotImplemented; diff --git a/src/Pstream/dummy/UIPstreamRead.C b/src/Pstream/dummy/UIPstreamRead.C index 3fef73615b..316550769e 100644 --- a/src/Pstream/dummy/UIPstreamRead.C +++ b/src/Pstream/dummy/UIPstreamRead.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2015 OpenFOAM Foundation - Copyright (C) 2021-2022 OpenCFD Ltd. + Copyright (C) 2021-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -40,12 +40,13 @@ void Foam::UIPstream::bufferIPCrecv() Foam::label Foam::UIPstream::read ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int fromProcNo, char* buf, const std::streamsize bufSize, const int tag, - const label communicator + const label communicator, + UPstream::Request* req ) { NotImplemented; diff --git a/src/Pstream/dummy/UOPBstreamWrite.C b/src/Pstream/dummy/UOPBstreamWrite.C index 3951b48a0d..72c702f034 100644 --- a/src/Pstream/dummy/UOPBstreamWrite.C +++ b/src/Pstream/dummy/UOPBstreamWrite.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -40,12 +40,10 @@ bool Foam::UOPBstream::bufferIPCsend() bool Foam::UOPBstream::write ( - const commsTypes commsType, const int rootProcNo, const char* buf, const std::streamsize bufSize, - const int tag, - const label communicator + const label comm ) { NotImplemented; diff --git a/src/Pstream/dummy/UOPstreamWrite.C b/src/Pstream/dummy/UOPstreamWrite.C index 7589556b5d..24078ca924 100644 --- a/src/Pstream/dummy/UOPstreamWrite.C +++ b/src/Pstream/dummy/UOPstreamWrite.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2015 OpenFOAM Foundation - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -46,7 +46,9 @@ bool Foam::UOPstream::write const char* buf, const std::streamsize bufSize, const int tag, - const label communicator + const label communicator, + UPstream::Request* req, + const UPstream::sendModes sendMode ) { NotImplemented; diff --git a/src/Pstream/dummy/UPstream.C b/src/Pstream/dummy/UPstream.C index db934c00e3..381d7d3c22 100644 --- a/src/Pstream/dummy/UPstream.C +++ b/src/Pstream/dummy/UPstream.C @@ -94,5 +94,8 @@ void Foam::UPstream::freeTag(const int tag, const char* const msg) {} +void Foam::UPstream::barrier(const label communicator, UPstream::Request* req) +{} + // ************************************************************************* // diff --git a/src/Pstream/dummy/UPstreamReduce.C b/src/Pstream/dummy/UPstreamReduce.C index 00dda3899f..ce1e19e2a0 100644 --- a/src/Pstream/dummy/UPstreamReduce.C +++ b/src/Pstream/dummy/UPstreamReduce.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -133,6 +133,18 @@ void Foam::reduce \ Pstream_CommonReductions(Native); \ \ void Foam::reduce \ +( \ + Native values[], \ + const int size, \ + const sumOp&, \ + const int tag, \ + const label comm, \ + UPstream::Request& req \ +) \ +{} \ + \ +/* Deprecated: prefer version with UPstream::Request */ \ +void Foam::reduce \ ( \ Native values[], \ const int size, \ @@ -144,6 +156,17 @@ void Foam::reduce \ {} \ \ void Foam::reduce \ +( \ + Native& value, \ + const sumOp&, \ + const int tag, \ + const label comm, \ + UPstream::Request& req \ +) \ +{} \ + \ +/* Deprecated: prefer version with UPstream::Request */ \ +void Foam::reduce \ ( \ Native& value, \ const sumOp&, \ diff --git a/src/Pstream/dummy/UPstreamRequest.C b/src/Pstream/dummy/UPstreamRequest.C index ab104e0b3e..efd2427c14 100644 --- a/src/Pstream/dummy/UPstreamRequest.C +++ b/src/Pstream/dummy/UPstreamRequest.C @@ -27,17 +27,40 @@ License #include "UPstream.H" +// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // + +Foam::UPstream::Request::Request() noexcept +: + UPstream::Request(nullptr) +{} + + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +bool Foam::UPstream::Request::good() const noexcept +{ + return false; +} + + +void Foam::UPstream::Request::reset() noexcept +{} + + // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // Foam::label Foam::UPstream::nRequests() noexcept { return 0; } void Foam::UPstream::resetRequests(const label n) {} -void Foam::UPstream::waitRequests(const label start) {} +void Foam::UPstream::waitRequests(const label pos) {} +void Foam::UPstream::waitRequests(UList&) {} void Foam::UPstream::waitRequest(const label i) {} +void Foam::UPstream::waitRequest(UPstream::Request&) {} bool Foam::UPstream::finishedRequest(const label i) { return true; } +bool Foam::UPstream::finishedRequest(UPstream::Request&) { return true; } // ************************************************************************* // diff --git a/src/Pstream/mpi/UIPBstreamRead.C b/src/Pstream/mpi/UIPBstreamRead.C index f71a66826e..2cb5dd78bf 100644 --- a/src/Pstream/mpi/UIPBstreamRead.C +++ b/src/Pstream/mpi/UIPBstreamRead.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -113,11 +113,9 @@ void Foam::UIPBstream::bufferIPCrecv() Foam::label Foam::UIPBstream::read ( - const commsTypes commsType, const int rootProcNo, char* buf, const std::streamsize bufSize, - const int tag, const label comm ) { diff --git a/src/Pstream/mpi/UIPstreamRead.C b/src/Pstream/mpi/UIPstreamRead.C index 40d67eabc9..e2ecbc0da8 100644 --- a/src/Pstream/mpi/UIPstreamRead.C +++ b/src/Pstream/mpi/UIPstreamRead.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2019-2021 OpenCFD Ltd. + Copyright (C) 2019-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -31,8 +31,6 @@ License #include "profilingPstream.H" #include "IOstreams.H" -#include - // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // void Foam::UIPstream::bufferIPCrecv() @@ -99,12 +97,13 @@ void Foam::UIPstream::bufferIPCrecv() Foam::label Foam::UIPstream::read ( - const commsTypes commsType, + const UPstream::commsTypes commsType, const int fromProcNo, char* buf, const std::streamsize bufSize, const int tag, - const label communicator + const label communicator, + UPstream::Request* req ) { if (debug) @@ -130,8 +129,8 @@ Foam::label Foam::UIPstream::read if ( - commsType == commsTypes::blocking - || commsType == commsTypes::scheduled + commsType == UPstream::commsTypes::blocking + || commsType == UPstream::commsTypes::scheduled ) { MPI_Status status; @@ -182,7 +181,7 @@ Foam::label Foam::UIPstream::read return messageSize; } - else if (commsType == commsTypes::nonBlocking) + else if (commsType == UPstream::commsTypes::nonBlocking) { MPI_Request request; @@ -214,11 +213,19 @@ Foam::label Foam::UIPstream::read Pout<< "UIPstream::read : started read from:" << fromProcNo << " tag:" << tag << " read size:" << label(bufSize) << " commsType:" << UPstream::commsTypeNames[commsType] - << " request:" << PstreamGlobals::outstandingRequests_.size() + << + (req ? label(-1) : PstreamGlobals::outstandingRequests_.size()) << Foam::endl; } - PstreamGlobals::outstandingRequests_.push_back(request); + if (req) + { + *req = UPstream::Request(request); + } + else + { + PstreamGlobals::outstandingRequests_.push_back(request); + } // Assume the message is completely received. return bufSize; diff --git a/src/Pstream/mpi/UOPBstreamWrite.C b/src/Pstream/mpi/UOPBstreamWrite.C index 059965e72a..c01bff64b3 100644 --- a/src/Pstream/mpi/UOPBstreamWrite.C +++ b/src/Pstream/mpi/UOPBstreamWrite.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -28,8 +28,6 @@ License #include "UOPstream.H" #include "PstreamGlobals.H" -#include - // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // bool Foam::UOPBstream::bufferIPCsend() @@ -92,11 +90,9 @@ bool Foam::UOPBstream::bufferIPCsend() bool Foam::UOPBstream::write ( - const commsTypes commsType, /* unused */ const int rootProcNo, const char* buf, const std::streamsize bufSize, - const int tag, /* unused */ const label comm ) { diff --git a/src/Pstream/mpi/UOPstreamWrite.C b/src/Pstream/mpi/UOPstreamWrite.C index 3c61afecbe..5982e99cae 100644 --- a/src/Pstream/mpi/UOPstreamWrite.C +++ b/src/Pstream/mpi/UOPstreamWrite.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2019-2022 OpenCFD Ltd. + Copyright (C) 2019-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -30,8 +30,6 @@ License #include "PstreamGlobals.H" #include "profilingPstream.H" -#include - // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // bool Foam::UOPstream::bufferIPCsend() @@ -57,7 +55,9 @@ bool Foam::UOPstream::write const char* buf, const std::streamsize bufSize, const int tag, - const label communicator + const label communicator, + UPstream::Request* req, + const UPstream::sendModes sendMode ) { if (debug) @@ -87,7 +87,7 @@ bool Foam::UOPstream::write profilingPstream::beginTiming(); - if (commsType == commsTypes::blocking) + if (commsType == UPstream::commsTypes::blocking) { failed = MPI_Bsend ( @@ -110,17 +110,32 @@ bool Foam::UOPstream::write << Foam::endl; } } - else if (commsType == commsTypes::scheduled) + else if (commsType == UPstream::commsTypes::scheduled) { - failed = MPI_Send - ( - const_cast(buf), - bufSize, - MPI_BYTE, - toProcNo, - tag, - PstreamGlobals::MPICommunicators_[communicator] - ); + if (UPstream::sendModes::sync == sendMode) + { + failed = MPI_Ssend + ( + const_cast(buf), + bufSize, + MPI_BYTE, + toProcNo, + tag, + PstreamGlobals::MPICommunicators_[communicator] + ); + } + else + { + failed = MPI_Send + ( + const_cast(buf), + bufSize, + MPI_BYTE, + toProcNo, + tag, + PstreamGlobals::MPICommunicators_[communicator] + ); + } // Assume these are from scatters ... profilingPstream::addScatterTime(); @@ -133,20 +148,36 @@ bool Foam::UOPstream::write << Foam::endl; } } - else if (commsType == commsTypes::nonBlocking) + else if (commsType == UPstream::commsTypes::nonBlocking) { MPI_Request request; - failed = MPI_Isend - ( - const_cast(buf), - bufSize, - MPI_BYTE, - toProcNo, - tag, - PstreamGlobals::MPICommunicators_[communicator], - &request - ); + if (UPstream::sendModes::sync == sendMode) + { + failed = MPI_Issend + ( + const_cast(buf), + bufSize, + MPI_BYTE, + toProcNo, + tag, + PstreamGlobals::MPICommunicators_[communicator], + &request + ); + } + else + { + failed = MPI_Isend + ( + const_cast(buf), + bufSize, + MPI_BYTE, + toProcNo, + tag, + PstreamGlobals::MPICommunicators_[communicator], + &request + ); + } profilingPstream::addWaitTime(); @@ -155,11 +186,20 @@ bool Foam::UOPstream::write Pout<< "UOPstream::write : started write to:" << toProcNo << " tag:" << tag << " size:" << label(bufSize) << " commType:" << UPstream::commsTypeNames[commsType] - << " request:" << PstreamGlobals::outstandingRequests_.size() + << " request:" + << + (req ? label(-1) : PstreamGlobals::outstandingRequests_.size()) << Foam::endl; } - PstreamGlobals::outstandingRequests_.push_back(request); + if (req) + { + *req = UPstream::Request(request); + } + else + { + PstreamGlobals::outstandingRequests_.push_back(request); + } } else { diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C index 81be44c3ca..cfb0c2ebdc 100644 --- a/src/Pstream/mpi/UPstream.C +++ b/src/Pstream/mpi/UPstream.C @@ -693,4 +693,50 @@ void Foam::UPstream::freeTag(const int tag, const char* const msg) } +void Foam::UPstream::barrier(const label communicator, UPstream::Request* req) +{ + // No-op for non-parallel + if (!UPstream::parRun()) + { + return; + } + + if (req) + { + MPI_Request request; + + if + ( + MPI_Ibarrier + ( + PstreamGlobals::MPICommunicators_[communicator], + &request + ) + ) + { + FatalErrorInFunction + << "MPI_Ibarrier returned with error" + << Foam::abort(FatalError); + } + + *req = UPstream::Request(request); + } + else + { + if + ( + MPI_Barrier + ( + PstreamGlobals::MPICommunicators_[communicator] + ) + ) + { + FatalErrorInFunction + << "MPI_Barrier returned with error" + << Foam::abort(FatalError); + } + } +} + + // ************************************************************************* // diff --git a/src/Pstream/mpi/UPstreamAllToAll.C b/src/Pstream/mpi/UPstreamAllToAll.C index c2551d3e59..8bc5b5ebac 100644 --- a/src/Pstream/mpi/UPstreamAllToAll.C +++ b/src/Pstream/mpi/UPstreamAllToAll.C @@ -28,7 +28,6 @@ License #include "Pstream.H" #include "UPstreamWrapping.H" -#include #include // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // diff --git a/src/Pstream/mpi/UPstreamBroadcast.C b/src/Pstream/mpi/UPstreamBroadcast.C index cdb09bf2a4..af061bd76c 100644 --- a/src/Pstream/mpi/UPstreamBroadcast.C +++ b/src/Pstream/mpi/UPstreamBroadcast.C @@ -29,8 +29,6 @@ License #include "PstreamGlobals.H" #include "profilingPstream.H" -#include - // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // bool Foam::UPstream::broadcast diff --git a/src/Pstream/mpi/UPstreamGatherScatter.C b/src/Pstream/mpi/UPstreamGatherScatter.C index cf14937ae9..4937be2e1d 100644 --- a/src/Pstream/mpi/UPstreamGatherScatter.C +++ b/src/Pstream/mpi/UPstreamGatherScatter.C @@ -28,7 +28,6 @@ License #include "Pstream.H" #include "UPstreamWrapping.H" -#include #include // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // diff --git a/src/Pstream/mpi/UPstreamReduce.C b/src/Pstream/mpi/UPstreamReduce.C index dafa6d42c2..5314dd17ce 100644 --- a/src/Pstream/mpi/UPstreamReduce.C +++ b/src/Pstream/mpi/UPstreamReduce.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -29,7 +29,6 @@ License #include "PstreamReduceOps.H" #include "UPstreamWrapping.H" -#include #include // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // @@ -177,6 +176,23 @@ void Foam::reduce \ Pstream_CommonReductions(Native, TaggedType); \ \ void Foam::reduce \ +( \ + Native values[], \ + const int size, \ + const sumOp&, \ + const int tag, /* (unused) */ \ + const label comm, \ + UPstream::Request& req \ +) \ +{ \ + PstreamDetail::allReduce \ + ( \ + values, size, TaggedType, MPI_SUM, comm, &req, nullptr \ + ); \ +} \ + \ +/* Deprecated: prefer version with UPstream::Request */ \ +void Foam::reduce \ ( \ Native values[], \ const int size, \ @@ -188,11 +204,27 @@ void Foam::reduce \ { \ PstreamDetail::allReduce \ ( \ - values, size, TaggedType, MPI_SUM, comm, &requestID \ + values, size, TaggedType, MPI_SUM, comm, nullptr, &requestID \ ); \ } \ \ void Foam::reduce \ +( \ + Native& value, \ + const sumOp&, \ + const int tag, /* (unused) */ \ + const label comm, \ + UPstream::Request& req \ +) \ +{ \ + PstreamDetail::allReduce \ + ( \ + &value, 1, TaggedType, MPI_SUM, comm, &req, nullptr \ + ); \ +} \ + \ +/* Deprecated: prefer version with UPstream::Request */ \ +void Foam::reduce \ ( \ Native& value, \ const sumOp&, \ @@ -203,7 +235,7 @@ void Foam::reduce \ { \ PstreamDetail::allReduce \ ( \ - &value, 1, TaggedType, MPI_SUM, comm, &requestID \ + &value, 1, TaggedType, MPI_SUM, comm, nullptr, &requestID \ ); \ } \ \ diff --git a/src/Pstream/mpi/UPstreamRequest.C b/src/Pstream/mpi/UPstreamRequest.C index 3d72acc1b6..d3f6884f96 100644 --- a/src/Pstream/mpi/UPstreamRequest.C +++ b/src/Pstream/mpi/UPstreamRequest.C @@ -30,6 +30,28 @@ License #include "PstreamGlobals.H" #include "profilingPstream.H" +// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // + +Foam::UPstream::Request::Request() noexcept +: + UPstream::Request(MPI_REQUEST_NULL) +{} + + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +bool Foam::UPstream::Request::good() const noexcept +{ + return MPI_REQUEST_NULL != PstreamDetail::Request::get(*this); +} + + +void Foam::UPstream::Request::reset() noexcept +{ + *this = UPstream::Request(MPI_REQUEST_NULL); +} + + // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // Foam::label Foam::UPstream::nRequests() noexcept @@ -47,26 +69,35 @@ void Foam::UPstream::resetRequests(const label n) } -void Foam::UPstream::waitRequests(const label start) +void Foam::UPstream::waitRequests(const label pos) { // No-op for non-parallel, no pending requests or out-of-range if ( !UPstream::parRun() - || start < 0 - || start >= PstreamGlobals::outstandingRequests_.size() + || pos < 0 + || pos >= PstreamGlobals::outstandingRequests_.size() + /// || !len ) { return; } - const label count = (PstreamGlobals::outstandingRequests_.size() - start); - auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + start); + label count = (PstreamGlobals::outstandingRequests_.size() - pos); + + /// // Treat len < 0 like npos (ie, the rest of the list) but also + /// // apply range checking to avoid bad slices + /// if (len > 0 && len < count) + /// { + /// count = len; + /// } + + auto* waitRequests = (PstreamGlobals::outstandingRequests_.data() + pos); if (UPstream::debug) { Pout<< "UPstream::waitRequests : starting wait for " - << count << " requests starting at " << start << endl; + << count << " requests starting at " << pos << endl; } profilingPstream::beginTiming(); @@ -81,8 +112,8 @@ void Foam::UPstream::waitRequests(const label start) profilingPstream::addWaitTime(); - // ie, resetRequests(start) - PstreamGlobals::outstandingRequests_.resize(start); + // ie, resetRequests(pos) + PstreamGlobals::outstandingRequests_.resize(pos); if (UPstream::debug) { @@ -91,6 +122,104 @@ void Foam::UPstream::waitRequests(const label start) } +void Foam::UPstream::waitRequests(UList& requests) +{ + // No-op for non-parallel or no pending requests + if (!UPstream::parRun() || requests.empty()) + { + return; + } + + // Looks ugly but is legitimate since UPstream::Request is an intptr_t, + // which is always large enough to hold an MPI_Request (int or pointer) + + label count = 0; + auto* waitRequests = reinterpret_cast(requests.data()); + + for (auto& req : requests) + { + if (req.good()) + { + waitRequests[count] = PstreamDetail::Request::get(req); + ++count; + } + } + + if (!count) + { + return; + } + + profilingPstream::beginTiming(); + + // On success: sets request to MPI_REQUEST_NULL + if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE)) + { + FatalErrorInFunction + << "MPI_Waitall returned with error" + << Foam::abort(FatalError); + } + + profilingPstream::addWaitTime(); + + // Everything handled, reset all to MPI_REQUEST_NULL + for (auto& req : requests) + { + req.reset(); + } +} + + +// FUTURE? +// +/// void Foam::UPstream::waitRequests +/// ( +/// UPstream::Request& req1, +/// UPstream::Request& req2 +/// ) +/// { +/// // No-op for non-parallel +/// if (!UPstream::parRun()) +/// { +/// return; +/// } +/// +/// int count = 0; +/// MPI_Request waitRequests[2]; +/// +/// waitRequests[count] = PstreamDetail::Request::get(req1); +/// if (MPI_REQUEST_NULL != waitRequests[count]) +/// { +/// req1.reset(); +/// ++count; +/// } +/// +/// waitRequests[count] = PstreamDetail::Request::get(req2); +/// if (MPI_REQUEST_NULL != waitRequests[count]) +/// { +/// req2.reset(); +/// ++count; +/// } +/// +/// if (!count) +/// { +/// return; +/// } +/// +/// profilingPstream::beginTiming(); +/// +/// // On success: sets request to MPI_REQUEST_NULL +/// if (MPI_Waitall(count, waitRequests, MPI_STATUSES_IGNORE)) +/// { +/// FatalErrorInFunction +/// << "MPI_Waitall returned with error" +/// << Foam::abort(FatalError); +/// } +/// +/// profilingPstream::addWaitTime(); +/// } + + void Foam::UPstream::waitRequest(const label i) { // No-op for non-parallel, or out-of-range (eg, placeholder indices) @@ -141,6 +270,37 @@ void Foam::UPstream::waitRequest(const label i) } +void Foam::UPstream::waitRequest(UPstream::Request& req) +{ + // No-op for non-parallel + if (!UPstream::parRun()) + { + return; + } + + MPI_Request request = PstreamDetail::Request::get(req); + + // No-op for null request + if (MPI_REQUEST_NULL == request) + { + return; + } + + profilingPstream::beginTiming(); + + if (MPI_Wait(&request, MPI_STATUS_IGNORE)) + { + FatalErrorInFunction + << "MPI_Wait returned with error" + << Foam::abort(FatalError); + } + + profilingPstream::addWaitTime(); + + req.reset(); // Handled, reset to MPI_REQUEST_NULL +} + + bool Foam::UPstream::finishedRequest(const label i) { // No-op for non-parallel, or out-of-range (eg, placeholder indices) @@ -182,4 +342,33 @@ bool Foam::UPstream::finishedRequest(const label i) } +bool Foam::UPstream::finishedRequest(UPstream::Request& req) +{ + // No-op for non-parallel + if (!UPstream::parRun()) + { + return true; + } + + MPI_Request request = PstreamDetail::Request::get(req); + + // No-op for null request + if (MPI_REQUEST_NULL == request) + { + return true; + } + + int flag = 0; + MPI_Test(&request, &flag, MPI_STATUS_IGNORE); + + if (flag) + { + // Done - reset to MPI_REQUEST_NULL + req.reset(); + } + + return flag != 0; +} + + // ************************************************************************* // diff --git a/src/Pstream/mpi/UPstreamWrapping.H b/src/Pstream/mpi/UPstreamWrapping.H index f1553125e3..355bf3bf86 100644 --- a/src/Pstream/mpi/UPstreamWrapping.H +++ b/src/Pstream/mpi/UPstreamWrapping.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2012-2016 OpenFOAM Foundation - Copyright (C) 2022 OpenCFD Ltd. + Copyright (C) 2022-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -24,12 +24,6 @@ License You should have received a copy of the GNU General Public License along with OpenFOAM. If not, see . -Namespace - Foam::PstreamDetail - -Description - Some implementation details for Pstream and/or MPI. - InNamespace Foam::PstreamDetail @@ -54,6 +48,27 @@ namespace Foam namespace PstreamDetail { +// Helper for casting to MPI_Request +struct Request +{ + // To pointer + template + static typename std::enable_if::value, Type>::type + get(const UPstream::Request& req) noexcept + { + return reinterpret_cast(req.value()); + } + + // To integer + template + static typename std::enable_if::value, Type>::type + get(const UPstream::Request& req) noexcept + { + return static_cast(req.value()); + } +}; + + // MPI_Bcast, using root=0 template void broadcast0 @@ -84,7 +99,8 @@ void allReduce MPI_Datatype datatype, MPI_Op optype, const label comm, // Communicator - label* requestID = nullptr // Non-null for MPI_Iallreduce + UPstream::Request* req = nullptr, // Non-null for non-blocking + label* requestID = nullptr // (alternative to UPstream::Request) ); @@ -96,7 +112,8 @@ void allToAll UList& recvData, MPI_Datatype datatype, const label comm, // Communicator - label* requestID = nullptr // Non-null for MPI_Ialltoall + UPstream::Request* req = nullptr, // Non-null for non-blocking + label* requestID = nullptr // (alternative to UPstream::Request) ); @@ -114,7 +131,8 @@ void allToAllv MPI_Datatype datatype, const label comm, // Communicator - label* requestID = nullptr // Non-null for MPI_Ialltoallv + UPstream::Request* req = nullptr, // Non-null for non-blocking + label* requestID = nullptr // (alternative to UPstream::Request) ); @@ -130,7 +148,8 @@ void gather MPI_Datatype datatype, // The send/recv data type const label comm, // Communicator - label* requestID = nullptr // Non-null for MPI_Igather + UPstream::Request* req = nullptr, // Non-null for non-blocking + label* requestID = nullptr // (alternative to UPstream::Request) ); @@ -146,7 +165,8 @@ void scatter MPI_Datatype datatype, // The send/recv data type const label comm, // Communicator - label* requestID = nullptr // Non-null for MPI_Iscatter + UPstream::Request* req = nullptr, // Non-null for non-blocking + label* requestID = nullptr // (alternative to UPstream::Request) ); @@ -163,7 +183,8 @@ void gatherv MPI_Datatype datatype, // The send/recv data type const label comm, // Communicator - label* requestID = nullptr // Non-null for MPI_Igatherv + UPstream::Request* req = nullptr, // Non-null for non-blocking + label* requestID = nullptr // (alternative to UPstream::Request) ); @@ -180,7 +201,8 @@ void scatterv MPI_Datatype datatype, // The send/recv data type const label comm, // Communicator - label* requestID = nullptr // Non-null for MPI_Igatherv + UPstream::Request* req = nullptr, // Non-null for non-blocking + label* requestID = nullptr // (alternative to UPstream::Request) ); diff --git a/src/Pstream/mpi/UPstreamWrappingTemplates.C b/src/Pstream/mpi/UPstreamWrappingTemplates.C index 65139268f4..109e6b7f45 100644 --- a/src/Pstream/mpi/UPstreamWrappingTemplates.C +++ b/src/Pstream/mpi/UPstreamWrappingTemplates.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2012-2015 OpenFOAM Foundation - Copyright (C) 2019-2022 OpenCFD Ltd. + Copyright (C) 2019-2023 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -119,6 +119,8 @@ void Foam::PstreamDetail::allReduce MPI_Datatype datatype, MPI_Op optype, const label comm, + + UPstream::Request* req, label* requestID ) { @@ -127,9 +129,11 @@ void Foam::PstreamDetail::allReduce return; } + const bool immediate = (req || requestID); + if (UPstream::warnComm != -1 && comm != UPstream::warnComm) { - if (requestID != nullptr) + if (immediate) { Pout<< "** MPI_Iallreduce (non-blocking):"; } @@ -155,10 +159,11 @@ void Foam::PstreamDetail::allReduce bool handled(false); #if defined(MPI_VERSION) && (MPI_VERSION >= 3) - if (requestID != nullptr) + if (immediate) { handled = true; MPI_Request request; + if ( MPI_Iallreduce @@ -179,16 +184,23 @@ void Foam::PstreamDetail::allReduce << Foam::abort(FatalError); } - *requestID = PstreamGlobals::push_request(request); + if (req) + { + *req = UPstream::Request(request); + if (requestID) *requestID = -1; + } + else + { + *requestID = PstreamGlobals::push_request(request); + } } #endif if (!handled) { - if (requestID != nullptr) - { - *requestID = -1; - } + if (req) req->reset(); + if (requestID) *requestID = -1; + if ( MPI_Allreduce @@ -220,14 +232,18 @@ void Foam::PstreamDetail::allToAll UList& recvData, MPI_Datatype datatype, const label comm, + + UPstream::Request* req, label* requestID ) { + const bool immediate = (req || requestID); + const label np = UPstream::nProcs(comm); if (UPstream::warnComm != -1 && comm != UPstream::warnComm) { - if (requestID != nullptr) + if (immediate) { Pout<< "** MPI_Ialltoall (non-blocking):"; } @@ -262,11 +278,12 @@ void Foam::PstreamDetail::allToAll bool handled(false); - #if defined(MPI_VERSION) && (MPI_VERSION >= 3) - if (requestID != nullptr) +#if defined(MPI_VERSION) && (MPI_VERSION >= 3) + if (immediate) { handled = true; MPI_Request request; + if ( MPI_Ialltoall @@ -290,17 +307,23 @@ void Foam::PstreamDetail::allToAll << Foam::abort(FatalError); } - *requestID = PstreamGlobals::push_request(request); + if (req) + { + *req = UPstream::Request(request); + if (requestID) *requestID = -1; + } + else + { + *requestID = PstreamGlobals::push_request(request); + } } #endif - if (!handled) { - if (requestID != nullptr) - { - *requestID = -1; - } + if (req) req->reset(); + if (requestID) *requestID = -1; + if ( MPI_Alltoall @@ -341,14 +364,18 @@ void Foam::PstreamDetail::allToAllv MPI_Datatype datatype, const label comm, + + UPstream::Request* req, label* requestID ) { + const bool immediate = (req || requestID); + const label np = UPstream::nProcs(comm); if (UPstream::warnComm != -1 && comm != UPstream::warnComm) { - if (requestID != nullptr) + if (immediate) { Pout<< "** MPI_Ialltoallv (non-blocking):"; } @@ -402,7 +429,7 @@ void Foam::PstreamDetail::allToAllv bool handled(false); #if defined(MPI_VERSION) && (MPI_VERSION >= 3) - if (requestID != nullptr) + if (immediate) { handled = true; MPI_Request request; @@ -431,16 +458,23 @@ void Foam::PstreamDetail::allToAllv << Foam::abort(FatalError); } - *requestID = PstreamGlobals::push_request(request); + if (req) + { + *req = UPstream::Request(request); + if (requestID) *requestID = -1; + } + else + { + *requestID = PstreamGlobals::push_request(request); + } } #endif if (!handled) { - if (requestID != nullptr) - { - *requestID = -1; - } + if (req) req->reset(); + if (requestID) *requestID = -1; + if ( MPI_Alltoallv @@ -480,6 +514,8 @@ void Foam::PstreamDetail::gather MPI_Datatype datatype, const label comm, + + UPstream::Request* req, label* requestID ) { @@ -489,11 +525,13 @@ void Foam::PstreamDetail::gather return; } + const bool immediate = (req || requestID); + const label np = UPstream::nProcs(comm); if (UPstream::warnComm != -1 && comm != UPstream::warnComm) { - if (requestID != nullptr) + if (immediate) { Pout<< "** MPI_Igather (non-blocking):"; } @@ -514,10 +552,11 @@ void Foam::PstreamDetail::gather bool handled(false); #if defined(MPI_VERSION) && (MPI_VERSION >= 3) - if (requestID != nullptr) + if (immediate) { handled = true; MPI_Request request; + if ( MPI_Igather @@ -541,16 +580,23 @@ void Foam::PstreamDetail::gather << Foam::abort(FatalError); } - *requestID = PstreamGlobals::push_request(request); + if (req) + { + *req = UPstream::Request(request); + if (requestID) *requestID = -1; + } + else + { + *requestID = PstreamGlobals::push_request(request); + } } #endif if (!handled) { - if (requestID != nullptr) - { - *requestID = -1; - } + if (req) req->reset(); + if (requestID) *requestID = -1; + if ( MPI_Gather @@ -589,6 +635,8 @@ void Foam::PstreamDetail::scatter MPI_Datatype datatype, const label comm, + + UPstream::Request* req, label* requestID ) { @@ -598,11 +646,13 @@ void Foam::PstreamDetail::scatter return; } + const bool immediate = (req || requestID); + const label np = UPstream::nProcs(comm); if (UPstream::warnComm != -1 && comm != UPstream::warnComm) { - if (requestID != nullptr) + if (immediate) { Pout<< "** MPI_Iscatter (non-blocking):"; } @@ -623,10 +673,11 @@ void Foam::PstreamDetail::scatter bool handled(false); #if defined(MPI_VERSION) && (MPI_VERSION >= 3) - if (requestID != nullptr) + if (immediate) { handled = true; MPI_Request request; + if ( MPI_Iscatter @@ -650,16 +701,23 @@ void Foam::PstreamDetail::scatter << Foam::abort(FatalError); } - *requestID = PstreamGlobals::push_request(request); + if (req) + { + *req = UPstream::Request(request); + if (requestID) *requestID = -1; + } + else + { + *requestID = PstreamGlobals::push_request(request); + } } #endif if (!handled) { - if (requestID != nullptr) - { - *requestID = -1; - } + if (req) req->reset(); + if (requestID) *requestID = -1; + if ( MPI_Scatter @@ -676,7 +734,7 @@ void Foam::PstreamDetail::scatter ) { FatalErrorInFunction - << "MPI_Iscatter [comm: " << comm << "] failed." + << "MPI_Scatter [comm: " << comm << "] failed." << " sendCount " << sendCount << " recvCount " << recvCount << Foam::abort(FatalError); @@ -699,6 +757,8 @@ void Foam::PstreamDetail::gatherv MPI_Datatype datatype, const label comm, + + UPstream::Request* req, label* requestID ) { @@ -709,11 +769,13 @@ void Foam::PstreamDetail::gatherv return; } + const bool immediate = (req || requestID); + const label np = UPstream::nProcs(comm); if (UPstream::warnComm != -1 && comm != UPstream::warnComm) { - if (requestID != nullptr) + if (immediate) { Pout<< "** MPI_Igatherv (non-blocking):"; } @@ -757,10 +819,11 @@ void Foam::PstreamDetail::gatherv bool handled(false); #if defined(MPI_VERSION) && (MPI_VERSION >= 3) - if (requestID != nullptr) + if (immediate) { handled = true; MPI_Request request; + if ( MPI_Igatherv @@ -785,16 +848,23 @@ void Foam::PstreamDetail::gatherv << Foam::abort(FatalError); } - *requestID = PstreamGlobals::push_request(request); + if (req) + { + *req = UPstream::Request(request); + if (requestID) *requestID = -1; + } + else + { + *requestID = PstreamGlobals::push_request(request); + } } #endif if (!handled) { - if (requestID != nullptr) - { - *requestID = -1; - } + if (req) req->reset(); + if (requestID) *requestID = -1; + if ( MPI_Gatherv @@ -835,6 +905,8 @@ void Foam::PstreamDetail::scatterv MPI_Datatype datatype, const label comm, + + UPstream::Request* req, label* requestID ) { @@ -844,11 +916,13 @@ void Foam::PstreamDetail::scatterv return; } + const bool immediate = (req || requestID); + const label np = UPstream::nProcs(comm); if (UPstream::warnComm != -1 && comm != UPstream::warnComm) { - if (requestID != nullptr) + if (immediate) { Pout<< "** MPI_Iscatterv (non-blocking):"; } @@ -886,10 +960,11 @@ void Foam::PstreamDetail::scatterv bool handled(false); #if defined(MPI_VERSION) && (MPI_VERSION >= 3) - if (requestID != nullptr) + if (immediate) { handled = true; MPI_Request request; + if ( MPI_Iscatterv @@ -914,16 +989,23 @@ void Foam::PstreamDetail::scatterv << Foam::abort(FatalError); } - *requestID = PstreamGlobals::push_request(request); + if (req) + { + *req = UPstream::Request(request); + if (requestID) *requestID = -1; + } + else + { + *requestID = PstreamGlobals::push_request(request); + } } #endif if (!handled) { - if (requestID != nullptr) - { - *requestID = -1; - } + if (req) req->reset(); + if (requestID) *requestID = -1; + if ( MPI_Scatterv