diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.txx b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.txx index 085a877833..5a3de56ad5 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.txx +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.txx @@ -433,14 +433,8 @@ void Foam::Pstream::gatherList << Foam::abort(FatalError); } - // In-place gather for contiguous types - UPstream::mpiGather - ( - nullptr, - values.data_bytes(), - sizeof(T), - communicator - ); + // In-place gather for contiguous types - one element per rank + UPstream::mpiGather(nullptr, values.data(), 1, communicator); } else { @@ -467,14 +461,8 @@ void Foam::Pstream::scatterList } else if constexpr (is_contiguous_v) { - // In-place scatter for contiguous types - UPstream::mpiScatter - ( - nullptr, - values.data_bytes(), - sizeof(T), - communicator - ); + // In-place scatter for contiguous types - one element per rank + UPstream::mpiScatter(nullptr, values.data(), 1, communicator); } else { @@ -491,33 +479,34 @@ void Foam::Pstream::allGatherList ( UList& values, [[maybe_unused]] const int tag, - const int comm + const int communicator ) { - if (!UPstream::is_parallel(comm)) + if (!UPstream::is_parallel(communicator)) { // Nothing to do return; } else if constexpr (is_contiguous_v) { - if (FOAM_UNLIKELY(values.size() < UPstream::nProcs(comm))) + if (FOAM_UNLIKELY(values.size() < UPstream::nProcs(communicator))) { FatalErrorInFunction << "List of values is too small:" << values.size() - << " vs numProcs:" << UPstream::nProcs(comm) << nl + << " vs numProcs:" << UPstream::nProcs(communicator) << nl << Foam::abort(FatalError); } - UPstream::mpiAllGather(values.data_bytes(), sizeof(T), comm); + // Allgather for contiguous types - one element per rank + UPstream::mpiAllGather(values.data(), 1, communicator); } else { // Communication order - const auto& commOrder = UPstream::whichCommunication(comm); + const auto& commOrder = UPstream::whichCommunication(communicator); - Pstream::gatherList_algorithm(commOrder, values, tag, comm); - Pstream::scatterList_algorithm(commOrder, values, tag, comm); + Pstream::gatherList_algorithm(commOrder, values, tag, communicator); + Pstream::scatterList_algorithm(commOrder, values, tag, communicator); } } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index 2f6bdb85e4..b80eba3f34 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -1579,46 +1579,50 @@ public: // Low-level gather/scatter routines + //- Receive identically-sized (contiguous) data from all ranks + template + static void mpiGather + ( + //! On rank: individual value to send (or nullptr for inplace) + const Type* sendData, + //! Master: receive buffer with all values + //! Or for in-place send/recv when sendData is nullptr + Type* recvData, + //! Number of send/recv data per rank. Globally consistent! + int count, + const int communicator = UPstream::worldComm + ); + + //- Send identically-sized (contiguous) data to all ranks + template + static void mpiScatter + ( + //! On master: send buffer with all values (nullptr for inplace) + const Type* sendData, + //! On rank: individual value to receive + //! Or for in-place send/recv when sendData is nullptr + Type* recvData, + //! Number of send/recv data per rank. Globally consistent! + int count, + const int communicator = UPstream::worldComm + ); + + //- Gather/scatter identically-sized data + // Send data from proc slot, receive into all slots + template + static void mpiAllGather + ( + //! On all ranks: the base of the data locations + Type* allData, + //! Number of send/recv data per rank. Globally consistent! + int count, + const int communicator = UPstream::worldComm + ); + + #undef Pstream_CommonRoutines #define Pstream_CommonRoutines(Type) \ \ - /*! \brief Receive identically-sized \c Type data from all ranks */ \ - static void mpiGather \ - ( \ - /*! On rank: individual value to send (or nullptr for inplace) */ \ - const Type* sendData, \ - /*! Master: receive buffer with all values */ \ - /*! Or for in-place send/recv when sendData is nullptr */ \ - Type* recvData, \ - /*! Number of send/recv data per rank. Globally consistent! */ \ - int count, \ - const label communicator = worldComm \ - ); \ - \ - /*! \brief Send identically-sized \c Type data to all ranks */ \ - static void mpiScatter \ - ( \ - /*! Master: send buffer with all values (nullptr for inplace) */ \ - const Type* sendData, \ - /*! On rank: individual value to receive */ \ - /*! Or for in-place send/recv when sendData is nullptr */ \ - Type* recvData, \ - /*! Number of send/recv data per rank. Globally consistent! */ \ - int count, \ - const label communicator = worldComm \ - ); \ - \ - /*! \brief Gather/scatter identically-sized \c Type data */ \ - /*! Send data from proc slot, receive into all slots */ \ - static void mpiAllGather \ - ( \ - /*! On all ranks: the base of the data locations */ \ - Type* allData, \ - /*! Number of send/recv data per rank. Globally consistent! */ \ - int count, \ - const label communicator = worldComm \ - ); \ - \ /*! \brief Receive variable length \c Type data from all ranks */ \ static void mpiGatherv \ ( \ diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.txx b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.txx index 9aa479d817..58aae735e9 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.txx +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.txx @@ -65,39 +65,159 @@ bool Foam::UPstream::broadcast } +template +void Foam::UPstream::mpiGather +( + const Type* sendData, + Type* recvData, + int count, + const int communicator +) +{ + if (!count || !UPstream::is_rank(communicator)) + { + // Nothing to do + return; + } + else if constexpr (!is_contiguous_v) + { + FatalErrorInFunction + << "Invalid for non-contiguous data types" + << Foam::abort(FatalError); + } + else if (!UPstream::is_parallel(communicator)) + { + // Perform any fallback copying here, while we still know the Type + if (sendData && recvData && (sendData != recvData)) + { + std::memmove(recvData, sendData, count*sizeof(Type)); + } + } + else + { + // Use element or component type (or byte-wise) for data type + UPstream::mpi_gather + ( + sendData, // The data or cmpt pointer + recvData, // The data or cmpt pointer + UPstream_dataType::size(count), + UPstream_dataType::datatype_id, + communicator + ); + } +} + + +template +void Foam::UPstream::mpiScatter +( + const Type* sendData, + Type* recvData, + int count, + const int communicator +) +{ + if (!count || !UPstream::is_rank(communicator)) + { + // Nothing to do + return; + } + else if constexpr (!is_contiguous_v) + { + FatalErrorInFunction + << "Invalid for non-contiguous data types" + << Foam::abort(FatalError); + } + else if (!UPstream::is_parallel(communicator)) + { + // Perform any fallback copying here, while we still know the Type + if (sendData && recvData && (sendData != recvData)) + { + std::memmove(recvData, sendData, count*sizeof(Type)); + } + } + else + { + // Use element or component type (or byte-wise) for data type + UPstream::mpi_scatter + ( + sendData, // The data or cmpt pointer + recvData, // The data or cmpt pointer + UPstream_dataType::size(count), + UPstream_dataType::datatype_id, + communicator + ); + } +} + + +template +void Foam::UPstream::mpiAllGather +( + Type* allData, + int count, + const int communicator +) +{ + if (!count || !UPstream::is_parallel(communicator)) + { + // Nothing sensible to do + return; + } + else if constexpr (!is_contiguous_v) + { + FatalErrorInFunction + << "Invalid for non-contiguous data types" + << Foam::abort(FatalError); + } + else + { + // Use element or component type (or byte-wise) for data type + UPstream::mpi_allgather + ( + allData, // The data or cmpt pointer + UPstream_dataType::size(count), + UPstream_dataType::datatype_id, + communicator + ); + } +} + + template Foam::List Foam::UPstream::allGatherValues ( const T& localValue, - const int comm + const int communicator ) { - if constexpr (!is_contiguous_v) - { - FatalErrorInFunction - << "Cannot all-gather values for non-contiguous types" - " - consider Pstream variant instead" << endl - << Foam::abort(FatalError); - } - - List allValues; - - if (UPstream::is_parallel(comm)) - { - allValues.resize(UPstream::nProcs(comm)); - allValues[UPstream::myProcNo(comm)] = localValue; - - UPstream::mpiAllGather(allValues.data_bytes(), sizeof(T), comm); - } - else + if (!UPstream::is_parallel(communicator)) { // non-parallel: return own value // TBD: only when UPstream::is_rank(comm) as well? - allValues.resize(1); + List allValues(1); allValues[0] = localValue; + return allValues; } + else if constexpr (!is_contiguous_v) + { + FatalErrorInFunction + << "Cannot all-gather values for non-contiguous types" + << " - consider Pstream variant instead" << endl + << Foam::abort(FatalError); + return List(); + } + else + { + // Standard gather with direct MPI communication + List allValues; - return allValues; + allValues.resize(UPstream::nProcs(communicator)); + allValues[UPstream::myProcNo(communicator)] = localValue; + + UPstream::mpiAllGather(allValues.data(), 1, communicator); + return allValues; + } } @@ -107,43 +227,38 @@ template Foam::List Foam::UPstream::listGatherValues ( const T& localValue, - const int comm + const int communicator ) { - if constexpr (!is_contiguous_v) + if (!UPstream::is_parallel(communicator)) + { + // non-parallel: return own value + // TBD: only when UPstream::is_rank(communicator) as well? + List allValues(1); + allValues[0] = localValue; + return allValues; + } + else if constexpr (!is_contiguous_v) { FatalErrorInFunction << "Cannot gather values for non-contiguous types" " - consider Pstream variant instead" << endl << Foam::abort(FatalError); - } - - List allValues; - - if (UPstream::is_parallel(comm)) - { - if (UPstream::master(comm)) - { - allValues.resize(UPstream::nProcs(comm)); - } - - UPstream::mpiGather - ( - reinterpret_cast(&localValue), - allValues.data_bytes(), - sizeof(T), // The send/recv size per rank - comm - ); + return List(); } else { - // non-parallel: return own value - // TBD: only when UPstream::is_rank(comm) as well? - allValues.resize(1); - allValues[0] = localValue; - } + // Local sizes are identical, can use MPI_Gather + List allValues; - return allValues; + if (UPstream::master(communicator)) + { + allValues.resize(UPstream::nProcs(communicator)); + } + + UPstream::mpiGather(&localValue, allValues.data(), 1, communicator); + return allValues; + } } @@ -151,51 +266,55 @@ template T Foam::UPstream::listScatterValues ( const UList& allValues, - const int comm + const int communicator ) { - if constexpr (!is_contiguous_v) + if (!UPstream::is_parallel(communicator)) { - FatalErrorInFunction - << "Cannot scatter values for non-contiguous types" - " - consider Pstream variant instead" << endl - << Foam::abort(FatalError); - } - - T localValue{}; - - if (UPstream::is_parallel(comm)) - { - const label numProc = UPstream::nProcs(comm); - - if (UPstream::master(comm) && allValues.size() < numProc) - { - FatalErrorInFunction - << "Attempting to send " << allValues.size() - << " values to " << numProc << " processors" << endl - << Foam::abort(FatalError); - } - - UPstream::mpiScatter - ( - allValues.cdata_bytes(), - reinterpret_cast(&localValue), - sizeof(T), // The send/recv size per rank - comm - ); - } - else - { - // non-parallel: return first value - // TBD: only when UPstream::is_rank(comm) as well? + // non-parallel: return own value + // TBD: only when UPstream::is_rank(communicator) as well? if (!allValues.empty()) { return allValues[0]; } - } - return localValue; + return T{}; // Fallback value + } + else if constexpr (!is_contiguous_v) + { + FatalErrorInFunction + << "Cannot scatter non-contiguous values" + " - consider Pstream variant instead" << endl + << Foam::abort(FatalError); + + return T{}; // Fallback value + } + else + { + // Local sizes are identical, can use MPI_Scatter + + const label nProcs = UPstream::nProcs(communicator); + + if + ( + FOAM_UNLIKELY + ( + UPstream::master(communicator) + && allValues.size() < nProcs + ) + ) + { + FatalErrorInFunction + << "Attempting to send " << allValues.size() + << " values to " << nProcs << " processors" << endl + << Foam::abort(FatalError); + } + + T localValue{}; + UPstream::mpiScatter(allValues.cdata(), &localValue, 1, communicator); + return localValue; + } } diff --git a/src/OpenFOAM/global/profiling/profilingPstream.C b/src/OpenFOAM/global/profiling/profilingPstream.C index e7ad9050e0..9189cd86ff 100644 --- a/src/OpenFOAM/global/profiling/profilingPstream.C +++ b/src/OpenFOAM/global/profiling/profilingPstream.C @@ -228,9 +228,9 @@ void Foam::profilingPstream::report(const int reportLevel) UPstream::mpiGather ( - procValues.cdata_bytes(), // Send - allTimes.data_bytes(), // Recv - procValues.size_bytes(), // Num send/recv data per rank + procValues.cdata(), // Send + allTimes.data(), // Recv + procValues.size(), // Num send/recv data per rank UPstream::commWorld() ); } @@ -247,9 +247,9 @@ void Foam::profilingPstream::report(const int reportLevel) UPstream::mpiGather ( - procValues.cdata_bytes(), // Send - allCounts.data_bytes(), // Recv - procValues.size_bytes(), // Num send/recv data per rank + procValues.cdata(), // Send + allCounts.data(), // Recv + procValues.size(), // Num send/recv data per rank UPstream::commWorld() ); } diff --git a/src/Pstream/dummy/UPstreamGatherScatter.C b/src/Pstream/dummy/UPstreamGatherScatter.C index 346b3e1768..e5f4fcbe84 100644 --- a/src/Pstream/dummy/UPstreamGatherScatter.C +++ b/src/Pstream/dummy/UPstreamGatherScatter.C @@ -67,6 +67,8 @@ void Foam::UPstream::mpi_allgather {} +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + void Foam::UPstream::mpi_gatherv ( const void* sendData, @@ -101,45 +103,6 @@ void Foam::UPstream::mpi_scatterv #undef Pstream_CommonRoutines #define Pstream_CommonRoutines(Type) \ \ -void Foam::UPstream::mpiGather \ -( \ - const Type* sendData, \ - Type* recvData, \ - int count, \ - const label comm \ -) \ -{ \ - if (sendData && recvData) \ - { \ - std::memmove(recvData, sendData, count*sizeof(Type)); \ - } \ -} \ - \ - \ -void Foam::UPstream::mpiScatter \ -( \ - const Type* sendData, \ - Type* recvData, \ - int count, \ - const label comm \ -) \ -{ \ - if (sendData && recvData) \ - { \ - std::memmove(recvData, sendData, count*sizeof(Type)); \ - } \ -} \ - \ - \ -void Foam::UPstream::mpiAllGather \ -( \ - Type* allData, \ - int count, \ - const label comm \ -) \ -{} \ - \ - \ void Foam::UPstream::mpiGatherv \ ( \ const Type* sendData, \ diff --git a/src/Pstream/mpi/UPstreamGatherScatter.C b/src/Pstream/mpi/UPstreamGatherScatter.C index 39470d5488..0a39fd71d6 100644 --- a/src/Pstream/mpi/UPstreamGatherScatter.C +++ b/src/Pstream/mpi/UPstreamGatherScatter.C @@ -262,52 +262,6 @@ void Foam::UPstream::mpi_scatterv #undef Pstream_CommonRoutines #define Pstream_CommonRoutines(Native, TaggedType) \ \ -void Foam::UPstream::mpiGather \ -( \ - const Native* sendData, \ - Native* recvData, \ - int count, \ - const label comm \ -) \ -{ \ - PstreamDetail::gather \ - ( \ - sendData, recvData, count, \ - TaggedType, comm \ - ); \ -} \ - \ - \ -void Foam::UPstream::mpiScatter \ -( \ - const Native* sendData, \ - Native* recvData, \ - int count, \ - const label comm \ -) \ -{ \ - PstreamDetail::scatter \ - ( \ - sendData, recvData, count, \ - TaggedType, comm \ - ); \ -} \ - \ - \ -void Foam::UPstream::mpiAllGather \ -( \ - Native* allData, \ - int count, \ - const label comm \ -) \ -{ \ - PstreamDetail::allGather \ - ( \ - allData, count, \ - TaggedType, comm \ - ); \ -} \ - \ void Foam::UPstream::mpiGatherv \ ( \ const Native* sendData, \