mirror of
https://develop.openfoam.com/Development/openfoam.git
synced 2025-12-28 03:37:59 +00:00
ENH: extend Pstream gather templates to support combine or assign operation
- eliminates nearly identical code between 'gather' and 'combineGather'
* Normal gather updates by assigning the result of the binary operation.
* Combine gather updates by using a binary operator that modifies
its first parameter in-place
By-product of this refactoring are these new variants:
listGather(), listGatherReduce()
mapGather(), mapGatherReduce()
that mirror the previously existing ones
listCombineGather(), listCombineReduce()
mapCombineGather(), mapCombineReduce()
except that they use the 'regular' binary operator
This commit is contained in:
@ -1,3 +1,3 @@
|
||||
Test-Tuple2.C
|
||||
Test-Tuple2.cxx
|
||||
|
||||
EXE = $(FOAM_USER_APPBIN)/Test-Tuple2
|
||||
|
||||
@ -6,7 +6,7 @@
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2011 OpenFOAM Foundation
|
||||
Copyright (C) 2019-2020 OpenCFD Ltd.
|
||||
Copyright (C) 2019-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
@ -32,6 +32,7 @@ Description
|
||||
|
||||
\*---------------------------------------------------------------------------*/
|
||||
|
||||
#include "argList.H"
|
||||
#include "labelPair.H"
|
||||
#include "Tuple2.H"
|
||||
#include "label.H"
|
||||
@ -102,8 +103,12 @@ void printTuple2(const Pair<word>& t)
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
// Main program:
|
||||
|
||||
int main()
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
argList::noCheckProcessorDirectories();
|
||||
|
||||
#include "setRootCase.H"
|
||||
|
||||
typedef Tuple2<label, scalar> indexedScalar;
|
||||
|
||||
Info<< "Default constructed Tuple: " << indexedScalar() << nl;
|
||||
@ -34,7 +34,6 @@ SourceFiles
|
||||
Pstream.C
|
||||
PstreamBroadcast.C
|
||||
PstreamGather.C
|
||||
PstreamCombineGather.C
|
||||
PstreamGatherList.C
|
||||
PstreamExchangeConsensus.C
|
||||
PstreamExchange.C
|
||||
@ -125,14 +124,18 @@ public:
|
||||
);
|
||||
|
||||
|
||||
// Gather
|
||||
// Gather/scatter : single value
|
||||
|
||||
//- Gather (reduce) data, applying \c bop to combine \c value
|
||||
//- from different processors. The basis for Foam::reduce().
|
||||
// Uses linear/tree communication (with parallel guard).
|
||||
template<class T, class BinaryOp>
|
||||
// A no-op for non-parallel.
|
||||
//
|
||||
// \tparam InplaceMode indicates that the binary operator
|
||||
// modifies values in-place, not using assignment
|
||||
template<class T, class BinaryOp, bool InplaceMode=false>
|
||||
static void gather
|
||||
(
|
||||
//! [in,out] the result is only reliable on rank=0
|
||||
T& value,
|
||||
const BinaryOp& bop,
|
||||
const int tag = UPstream::msgType(),
|
||||
@ -168,17 +171,13 @@ public:
|
||||
);
|
||||
|
||||
|
||||
// Gather/combine data
|
||||
// Inplace combine values from processors.
|
||||
// (Uses construct from Istream instead of \c << operator)
|
||||
// Inplace combine (gather) : single value
|
||||
|
||||
//- Gather data, applying \c cop to inplace combine \c value
|
||||
//- from different processors.
|
||||
// Uses linear/tree communication (with parallel guard).
|
||||
//- Forwards to Pstream::gather with an \em in-place \c cop
|
||||
template<class T, class CombineOp>
|
||||
static void combineGather
|
||||
(
|
||||
//! [in,out]
|
||||
//! [in,out] the result is only reliable on rank=0
|
||||
T& value,
|
||||
const CombineOp& cop,
|
||||
const int tag = UPstream::msgType(),
|
||||
@ -188,13 +187,10 @@ public:
|
||||
//- Reduce inplace (cf. MPI Allreduce)
|
||||
//- applying \c cop to inplace combine \c value
|
||||
//- from different processors.
|
||||
//- After completion all processors have the same data.
|
||||
// Uses linear/tree communication.
|
||||
// Wraps combineGather/broadcast (may change in the future).
|
||||
template<class T, class CombineOp>
|
||||
static void combineReduce
|
||||
(
|
||||
//! [in,out]
|
||||
//! [in,out] the result is consistent on all ranks
|
||||
T& value,
|
||||
const CombineOp& cop,
|
||||
const int tag = UPstream::msgType(),
|
||||
@ -205,37 +201,65 @@ public:
|
||||
template<class T, class CombineOp>
|
||||
static void combineAllGather
|
||||
(
|
||||
//! [in,out] the result is consistent on all ranks
|
||||
T& value,
|
||||
const CombineOp& cop,
|
||||
const int tag = UPstream::msgType(),
|
||||
const label comm = UPstream::worldComm
|
||||
)
|
||||
{
|
||||
Pstream::combineReduce(value, cop, tag, comm);
|
||||
Pstream::listCombineReduce(value, cop, tag, comm);
|
||||
}
|
||||
|
||||
|
||||
// Combine variants working on whole List at a time.
|
||||
// Gather/combine variants working on entire List
|
||||
|
||||
//- Combines List elements.
|
||||
// Uses linear/tree communication (with parallel guard).
|
||||
//- Gather (reduce) list elements,
|
||||
//- applying \c bop to each list element
|
||||
//
|
||||
// \tparam InplaceMode indicates that the binary operator
|
||||
// modifies values in-place, not using assignment
|
||||
template<class T, class BinaryOp, bool InplaceMode=false>
|
||||
static void listGather
|
||||
(
|
||||
//! [in,out] the result is only reliable on rank=0
|
||||
UList<T>& values,
|
||||
const BinaryOp& bop,
|
||||
const int tag = UPstream::msgType(),
|
||||
const label comm = UPstream::worldComm
|
||||
);
|
||||
|
||||
//- Forwards to Pstream::listGather with an \em in-place \c cop
|
||||
template<class T, class CombineOp>
|
||||
static void listCombineGather
|
||||
(
|
||||
//! [in,out]
|
||||
//! [in,out] the result is only reliable on rank=0
|
||||
UList<T>& values,
|
||||
const CombineOp& cop,
|
||||
const int tag = UPstream::msgType(),
|
||||
const label comm = UPstream::worldComm
|
||||
);
|
||||
|
||||
//- Combines List elements.
|
||||
//- After completion all processors have the same data.
|
||||
// Uses linear/tree communication (with parallel guard).
|
||||
//- Gather (reduce) list elements,
|
||||
//- applying \c bop to combine each list element.
|
||||
//
|
||||
// \tparam InplaceMode indicates that the binary operator
|
||||
// modifies values in-place, not using assignment
|
||||
template<class T, class BinaryOp, bool InplaceMode=false>
|
||||
static void listGatherReduce
|
||||
(
|
||||
//! [in,out] the result is consistent on all ranks
|
||||
List<T>& values,
|
||||
const BinaryOp& bop,
|
||||
const int tag = UPstream::msgType(),
|
||||
const label comm = UPstream::worldComm
|
||||
);
|
||||
|
||||
//- Forwards to Pstream::listGatherReduce with an \em in-place \c cop
|
||||
template<class T, class CombineOp>
|
||||
static void listCombineReduce
|
||||
(
|
||||
//! [in,out] - List (not UList) due to broadcast()
|
||||
//! [in,out] the result is consistent on all ranks
|
||||
List<T>& values,
|
||||
const CombineOp& cop,
|
||||
const int tag = UPstream::msgType(),
|
||||
@ -246,7 +270,7 @@ public:
|
||||
template<class T, class CombineOp>
|
||||
static void listCombineAllGather
|
||||
(
|
||||
//! [in,out] - List (not UList) due to broadcast()
|
||||
//! [in,out] the result is consistent on all ranks
|
||||
List<T>& values,
|
||||
const CombineOp& cop,
|
||||
const int tag = UPstream::msgType(),
|
||||
@ -257,14 +281,28 @@ public:
|
||||
}
|
||||
|
||||
|
||||
// Combine variants working on whole map at a time.
|
||||
// Container needs iterators, find() and insert methods defined.
|
||||
// Gather/combine variants working on Map/HashTable containers
|
||||
|
||||
//- Combine Map elements.
|
||||
// Uses linear/tree communication (with parallel guard).
|
||||
//- Gather (reduce) Map/HashTable containers,
|
||||
//- applying \c bop to combine entries from different processors.
|
||||
//
|
||||
// \tparam InplaceMode indicates that the binary operator
|
||||
// modifies values in-place, not using assignment
|
||||
template<class Container, class BinaryOp, bool InplaceMode=false>
|
||||
static void mapGather
|
||||
(
|
||||
//! [in,out] the result is only reliable on rank=0
|
||||
Container& values,
|
||||
const BinaryOp& bop,
|
||||
const int tag = UPstream::msgType(),
|
||||
const label comm = UPstream::worldComm
|
||||
);
|
||||
|
||||
//- Forwards to Pstream::mapGather with an \em in-place \c cop
|
||||
template<class Container, class CombineOp>
|
||||
static void mapCombineGather
|
||||
(
|
||||
//! [in,out] the result is only reliable on rank=0
|
||||
Container& values,
|
||||
const CombineOp& cop,
|
||||
const int tag = UPstream::msgType(),
|
||||
@ -272,15 +310,26 @@ public:
|
||||
);
|
||||
|
||||
//- Reduce inplace (cf. MPI Allreduce)
|
||||
//- applying \c cop to inplace combine map \c values
|
||||
//- applying \c bop to combine map \c values
|
||||
//- from different processors.
|
||||
//- After completion all processors have the same data.
|
||||
// Uses the specified communication schedule.
|
||||
//
|
||||
// Wraps mapCombineGather/broadcast (may change in the future).
|
||||
//- After completion all processors have the same data.
|
||||
template<class Container, class BinaryOp, bool InplaceMode=false>
|
||||
static void mapGatherReduce
|
||||
(
|
||||
//! [in,out] the result is consistent on all ranks
|
||||
Container& values,
|
||||
const BinaryOp& bop,
|
||||
const int tag = UPstream::msgType(),
|
||||
const label comm = UPstream::worldComm
|
||||
);
|
||||
|
||||
//- Forwards to Pstream::mapGatherReduce with an \em in-place \c cop
|
||||
template<class Container, class CombineOp>
|
||||
static void mapCombineReduce
|
||||
(
|
||||
//! [in,out] the result is consistent on all ranks
|
||||
Container& values,
|
||||
const CombineOp& cop,
|
||||
const int tag = UPstream::msgType(),
|
||||
@ -291,6 +340,7 @@ public:
|
||||
template<class Container, class CombineOp>
|
||||
static void mapCombineAllGather
|
||||
(
|
||||
//! [in,out] the result is consistent on all ranks
|
||||
Container& values,
|
||||
const CombineOp& cop,
|
||||
const int tag = UPstream::msgType(),
|
||||
@ -343,7 +393,7 @@ public:
|
||||
);
|
||||
|
||||
//- Gather data, but keep individual values separate.
|
||||
//- Uses MPI_Allgather or manual linear/tree communication.
|
||||
//- Uses MPI_Allgather or manual communication.
|
||||
// After completion all processors have the same data.
|
||||
// Wraps gatherList/scatterList (may change in the future).
|
||||
template<class T>
|
||||
@ -612,7 +662,6 @@ public:
|
||||
#ifdef NoRepository
|
||||
#include "PstreamBroadcast.C"
|
||||
#include "PstreamGather.C"
|
||||
#include "PstreamCombineGather.C"
|
||||
#include "PstreamGatherList.C"
|
||||
#include "PstreamExchange.C"
|
||||
#endif
|
||||
|
||||
@ -1,365 +0,0 @@
|
||||
/*---------------------------------------------------------------------------*\
|
||||
========= |
|
||||
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
|
||||
\\ / O peration |
|
||||
\\ / A nd | www.openfoam.com
|
||||
\\/ M anipulation |
|
||||
-------------------------------------------------------------------------------
|
||||
Copyright (C) 2011-2017 OpenFOAM Foundation
|
||||
Copyright (C) 2019-2025 OpenCFD Ltd.
|
||||
-------------------------------------------------------------------------------
|
||||
License
|
||||
This file is part of OpenFOAM.
|
||||
|
||||
OpenFOAM is free software: you can redistribute it and/or modify it
|
||||
under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
Description
|
||||
Variant of gather.
|
||||
Normal gather uses:
|
||||
- default construct and read (>>) from Istream
|
||||
- binary operator and assignment operator to combine values
|
||||
|
||||
combineGather uses:
|
||||
- construct from Istream
|
||||
- modify operator which modifies its lhs
|
||||
|
||||
\*---------------------------------------------------------------------------*/
|
||||
|
||||
#include "IPstream.H"
|
||||
#include "OPstream.H"
|
||||
#include "IOstreams.H"
|
||||
#include "contiguous.H"
|
||||
|
||||
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
||||
|
||||
template<class T, class CombineOp>
|
||||
void Foam::Pstream::combineGather
|
||||
(
|
||||
T& value,
|
||||
const CombineOp& cop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
if (UPstream::is_parallel(comm))
|
||||
{
|
||||
// Communication order
|
||||
const auto& comms = UPstream::whichCommunication(comm);
|
||||
// if (comms.empty()) return; // extra safety?
|
||||
const auto& myComm = comms[UPstream::myProcNo(comm)];
|
||||
|
||||
// Receive from my downstairs neighbours
|
||||
for (const auto belowID : myComm.below())
|
||||
{
|
||||
if constexpr (is_contiguous_v<T>)
|
||||
{
|
||||
T received;
|
||||
|
||||
UIPstream::read
|
||||
(
|
||||
UPstream::commsTypes::scheduled,
|
||||
belowID,
|
||||
reinterpret_cast<char*>(&received),
|
||||
sizeof(T),
|
||||
tag,
|
||||
comm
|
||||
);
|
||||
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " received from "
|
||||
<< belowID << " data:" << received << endl;
|
||||
}
|
||||
|
||||
cop(value, received);
|
||||
}
|
||||
else
|
||||
{
|
||||
IPstream fromBelow
|
||||
(
|
||||
UPstream::commsTypes::scheduled,
|
||||
belowID,
|
||||
0, // bufsize
|
||||
tag,
|
||||
comm
|
||||
);
|
||||
T received(fromBelow);
|
||||
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " received from "
|
||||
<< belowID << " data:" << received << endl;
|
||||
}
|
||||
|
||||
cop(value, received);
|
||||
}
|
||||
}
|
||||
|
||||
// Send up value
|
||||
if (myComm.above() >= 0)
|
||||
{
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " sending to " << myComm.above()
|
||||
<< " data:" << value << endl;
|
||||
}
|
||||
|
||||
if constexpr (is_contiguous_v<T>)
|
||||
{
|
||||
UOPstream::write
|
||||
(
|
||||
UPstream::commsTypes::scheduled,
|
||||
myComm.above(),
|
||||
reinterpret_cast<const char*>(&value),
|
||||
sizeof(T),
|
||||
tag,
|
||||
comm
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
OPstream::send(value, myComm.above(), tag, comm);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template<class T, class CombineOp>
|
||||
void Foam::Pstream::combineReduce
|
||||
(
|
||||
T& value,
|
||||
const CombineOp& cop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
if (UPstream::is_parallel(comm))
|
||||
{
|
||||
Pstream::combineGather(value, cop, tag, comm);
|
||||
Pstream::broadcast(value, comm);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
|
||||
template<class T, class CombineOp>
|
||||
void Foam::Pstream::listCombineGather
|
||||
(
|
||||
UList<T>& values,
|
||||
const CombineOp& cop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
if (UPstream::is_parallel(comm))
|
||||
{
|
||||
// Communication order
|
||||
const auto& comms = UPstream::whichCommunication(comm);
|
||||
// if (comms.empty()) return; // extra safety?
|
||||
const auto& myComm = comms[UPstream::myProcNo(comm)];
|
||||
|
||||
// Receive from my downstairs neighbours
|
||||
for (const auto belowID : myComm.below())
|
||||
{
|
||||
if constexpr (is_contiguous_v<T>)
|
||||
{
|
||||
List<T> received(values.size());
|
||||
|
||||
UIPstream::read
|
||||
(
|
||||
UPstream::commsTypes::scheduled,
|
||||
belowID,
|
||||
received,
|
||||
tag,
|
||||
comm
|
||||
);
|
||||
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " received from "
|
||||
<< belowID << " data:" << received << endl;
|
||||
}
|
||||
|
||||
forAll(values, i)
|
||||
{
|
||||
cop(values[i], received[i]);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
IPstream fromBelow
|
||||
(
|
||||
UPstream::commsTypes::scheduled,
|
||||
belowID,
|
||||
0, // bufsize
|
||||
tag,
|
||||
comm
|
||||
);
|
||||
List<T> received(fromBelow);
|
||||
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " received from "
|
||||
<< belowID << " data:" << received << endl;
|
||||
}
|
||||
|
||||
forAll(values, i)
|
||||
{
|
||||
cop(values[i], received[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send up values
|
||||
if (myComm.above() >= 0)
|
||||
{
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " sending to " << myComm.above()
|
||||
<< " data:" << values << endl;
|
||||
}
|
||||
|
||||
if constexpr (is_contiguous_v<T>)
|
||||
{
|
||||
UOPstream::write
|
||||
(
|
||||
UPstream::commsTypes::scheduled,
|
||||
myComm.above(),
|
||||
values,
|
||||
tag,
|
||||
comm
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
OPstream::send(values, myComm.above(), tag, comm);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template<class T, class CombineOp>
|
||||
void Foam::Pstream::listCombineReduce
|
||||
(
|
||||
List<T>& values,
|
||||
const CombineOp& cop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
if (UPstream::is_parallel(comm))
|
||||
{
|
||||
Pstream::listCombineGather(values, cop, tag, comm);
|
||||
Pstream::broadcast(values, comm);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
|
||||
template<class Container, class CombineOp>
|
||||
void Foam::Pstream::mapCombineGather
|
||||
(
|
||||
Container& values,
|
||||
const CombineOp& cop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
if (UPstream::is_parallel(comm))
|
||||
{
|
||||
// Communication order
|
||||
const auto& comms = UPstream::whichCommunication(comm);
|
||||
// if (comms.empty()) return; // extra safety?
|
||||
const auto& myComm = comms[UPstream::myProcNo(comm)];
|
||||
|
||||
// Receive from my downstairs neighbours
|
||||
for (const auto belowID : myComm.below())
|
||||
{
|
||||
// Map/HashTable: non-contiguous
|
||||
|
||||
IPstream fromBelow
|
||||
(
|
||||
UPstream::commsTypes::scheduled,
|
||||
belowID,
|
||||
0, // bufsize
|
||||
tag,
|
||||
comm
|
||||
);
|
||||
Container received(fromBelow);
|
||||
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " received from "
|
||||
<< belowID << " data:" << received << endl;
|
||||
}
|
||||
|
||||
for
|
||||
(
|
||||
auto recvIter = received.cbegin();
|
||||
recvIter != received.cend();
|
||||
++recvIter
|
||||
)
|
||||
{
|
||||
auto masterIter = values.find(recvIter.key());
|
||||
|
||||
if (masterIter.good())
|
||||
{
|
||||
// Combine with existing
|
||||
cop(masterIter.val(), recvIter.val());
|
||||
}
|
||||
else
|
||||
{
|
||||
// Insert new key/value
|
||||
values.insert(recvIter.key(), recvIter.val());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send up values
|
||||
if (myComm.above() >= 0)
|
||||
{
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " sending to " << myComm.above()
|
||||
<< " data:" << values << endl;
|
||||
}
|
||||
|
||||
OPstream::send(values, myComm.above(), tag, comm);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template<class Container, class CombineOp>
|
||||
void Foam::Pstream::mapCombineReduce
|
||||
(
|
||||
Container& values,
|
||||
const CombineOp& cop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
if (UPstream::is_parallel(comm))
|
||||
{
|
||||
Pstream::mapCombineGather(values, cop, tag, comm);
|
||||
Pstream::broadcast(values, comm);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ************************************************************************* //
|
||||
@ -30,15 +30,25 @@ Description
|
||||
The gathered data will be a single value constructed from the values
|
||||
on individual processors using a user-specified operator.
|
||||
|
||||
Note
|
||||
Normal gather uses:
|
||||
- binary operator that returns a value.
|
||||
So assignment that return value to yield the new value
|
||||
|
||||
Combine gather uses:
|
||||
- binary operator modifies its first parameter in-place
|
||||
|
||||
\*---------------------------------------------------------------------------*/
|
||||
|
||||
#include "contiguous.H"
|
||||
#include "IPstream.H"
|
||||
#include "OPstream.H"
|
||||
#include "contiguous.H"
|
||||
|
||||
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
||||
|
||||
template<class T, class BinaryOp>
|
||||
// Single value variants
|
||||
|
||||
template<class T, class BinaryOp, bool InplaceMode>
|
||||
void Foam::Pstream::gather
|
||||
(
|
||||
T& value,
|
||||
@ -47,15 +57,21 @@ void Foam::Pstream::gather
|
||||
const label communicator
|
||||
)
|
||||
{
|
||||
if (UPstream::is_parallel(communicator))
|
||||
if (!UPstream::is_parallel(communicator))
|
||||
{
|
||||
// Nothing to do
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Communication order
|
||||
const auto& comms = UPstream::whichCommunication(communicator);
|
||||
// if (comms.empty()) return; // extra safety?
|
||||
const auto& myComm = comms[UPstream::myProcNo(communicator)];
|
||||
const auto& below = myComm.below();
|
||||
|
||||
// Receive from my downstairs neighbours
|
||||
for (const auto belowID : myComm.below())
|
||||
for (const auto proci : below)
|
||||
{
|
||||
T received;
|
||||
|
||||
@ -64,7 +80,7 @@ void Foam::Pstream::gather
|
||||
UIPstream::read
|
||||
(
|
||||
UPstream::commsTypes::scheduled,
|
||||
belowID,
|
||||
proci,
|
||||
reinterpret_cast<char*>(&received),
|
||||
sizeof(T),
|
||||
tag,
|
||||
@ -73,15 +89,42 @@ void Foam::Pstream::gather
|
||||
}
|
||||
else
|
||||
{
|
||||
IPstream::recv(received, belowID, tag, communicator);
|
||||
IPstream::recv(received, proci, tag, communicator);
|
||||
}
|
||||
|
||||
value = bop(value, received);
|
||||
if constexpr (InplaceMode)
|
||||
{
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " received from "
|
||||
<< proci << " data:" << received << endl;
|
||||
}
|
||||
}
|
||||
|
||||
if constexpr (InplaceMode)
|
||||
{
|
||||
// In-place binary operation
|
||||
bop(value, received);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Assign result of binary operation
|
||||
value = bop(value, received);
|
||||
}
|
||||
}
|
||||
|
||||
// Send up value
|
||||
if (myComm.above() >= 0)
|
||||
{
|
||||
if constexpr (InplaceMode)
|
||||
{
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " sending to " << myComm.above()
|
||||
<< " data:" << value << endl;
|
||||
}
|
||||
}
|
||||
|
||||
if constexpr (is_contiguous_v<T>)
|
||||
{
|
||||
UOPstream::write
|
||||
@ -103,54 +146,391 @@ void Foam::Pstream::gather
|
||||
}
|
||||
|
||||
|
||||
template<class T, class CombineOp>
|
||||
void Foam::Pstream::combineGather
|
||||
(
|
||||
T& value,
|
||||
const CombineOp& cop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
// In-place binary operation
|
||||
Pstream::gather<T, CombineOp, true>(value, cop, tag, comm);
|
||||
}
|
||||
|
||||
|
||||
template<class T, class CombineOp>
|
||||
void Foam::Pstream::combineReduce
|
||||
(
|
||||
T& value,
|
||||
const CombineOp& cop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
if (UPstream::is_parallel(comm))
|
||||
{
|
||||
// In-place binary operation
|
||||
Pstream::gather<T, CombineOp, true>(value, cop, tag, comm);
|
||||
Pstream::broadcast(value, comm);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
|
||||
// List variants
|
||||
|
||||
template<class T, class BinaryOp, bool InplaceMode>
|
||||
void Foam::Pstream::listGather
|
||||
(
|
||||
UList<T>& values,
|
||||
const BinaryOp& bop,
|
||||
const int tag,
|
||||
const label communicator
|
||||
)
|
||||
{
|
||||
if (!UPstream::is_parallel(communicator) || values.empty())
|
||||
{
|
||||
// Nothing to do
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Communication order
|
||||
const auto& comms = UPstream::whichCommunication(communicator);
|
||||
// if (comms.empty()) return; // extra safety?
|
||||
const auto& myComm = comms[UPstream::myProcNo(communicator)];
|
||||
const auto& below = myComm.below();
|
||||
|
||||
// Same length on all ranks
|
||||
const label listLen = values.size();
|
||||
|
||||
List<T> received;
|
||||
|
||||
if (!below.empty())
|
||||
{
|
||||
// Pre-size for contiguous reading
|
||||
if constexpr (is_contiguous_v<T>)
|
||||
{
|
||||
received.resize_nocopy(listLen);
|
||||
}
|
||||
}
|
||||
|
||||
// Receive from my downstairs neighbours
|
||||
for (const auto proci : below)
|
||||
{
|
||||
if constexpr (is_contiguous_v<T>)
|
||||
{
|
||||
UIPstream::read
|
||||
(
|
||||
UPstream::commsTypes::scheduled,
|
||||
proci,
|
||||
received,
|
||||
tag,
|
||||
communicator
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
received.clear(); // extra safety?
|
||||
IPstream::recv(received, proci, tag, communicator);
|
||||
}
|
||||
|
||||
if constexpr (InplaceMode)
|
||||
{
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " received from "
|
||||
<< proci << " data:" << received << endl;
|
||||
}
|
||||
}
|
||||
|
||||
for (label i = 0; i < listLen; ++i)
|
||||
{
|
||||
if constexpr (InplaceMode)
|
||||
{
|
||||
// In-place binary operation
|
||||
bop(values[i], received[i]);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Assign result of binary operation
|
||||
values[i] = bop(values[i], received[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send up values
|
||||
if (myComm.above() >= 0)
|
||||
{
|
||||
if constexpr (InplaceMode)
|
||||
{
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " sending to " << myComm.above()
|
||||
<< " data:" << values << endl;
|
||||
}
|
||||
}
|
||||
|
||||
if constexpr (is_contiguous_v<T>)
|
||||
{
|
||||
UOPstream::write
|
||||
(
|
||||
UPstream::commsTypes::scheduled,
|
||||
myComm.above(),
|
||||
values,
|
||||
tag,
|
||||
communicator
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
OPstream::send(values, myComm.above(), tag, communicator);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template<class T, class BinaryOp, bool InplaceMode>
|
||||
void Foam::Pstream::listGatherReduce
|
||||
(
|
||||
List<T>& values,
|
||||
const BinaryOp& bop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
Pstream::listGather<T, BinaryOp, InplaceMode>(values, bop, tag, comm);
|
||||
if (!values.empty())
|
||||
{
|
||||
Pstream::broadcast(values, comm);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template<class T, class CombineOp>
|
||||
void Foam::Pstream::listCombineGather
|
||||
(
|
||||
UList<T>& values,
|
||||
const CombineOp& cop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
// In-place binary operation
|
||||
Pstream::listGather<T, CombineOp, true>(values, cop, tag, comm);
|
||||
}
|
||||
|
||||
|
||||
template<class T, class CombineOp>
|
||||
void Foam::Pstream::listCombineReduce
|
||||
(
|
||||
List<T>& values,
|
||||
const CombineOp& cop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
// In-place binary operation
|
||||
Pstream::listGatherReduce<T, CombineOp, true>(values, cop, tag, comm);
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
|
||||
// Map variants
|
||||
|
||||
template<class Container, class BinaryOp, bool InplaceMode>
|
||||
void Foam::Pstream::mapGather
|
||||
(
|
||||
Container& values,
|
||||
const BinaryOp& bop,
|
||||
const int tag,
|
||||
const label communicator
|
||||
)
|
||||
{
|
||||
if (!UPstream::is_parallel(communicator))
|
||||
{
|
||||
// Nothing to do
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Communication order
|
||||
const auto& comms = UPstream::whichCommunication(communicator);
|
||||
// if (comms.empty()) return; // extra safety?
|
||||
const auto& myComm = comms[UPstream::myProcNo(communicator)];
|
||||
const auto& below = myComm.below();
|
||||
|
||||
// Receive from my downstairs neighbours
|
||||
for (const auto proci : below)
|
||||
{
|
||||
// Map/HashTable: non-contiguous
|
||||
Container received;
|
||||
IPstream::recv(received, proci, tag, communicator);
|
||||
|
||||
if constexpr (InplaceMode)
|
||||
{
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " received from "
|
||||
<< proci << " data:" << received << endl;
|
||||
}
|
||||
}
|
||||
|
||||
const auto last = received.end();
|
||||
|
||||
for (auto iter = received.begin(); iter != last; ++iter)
|
||||
{
|
||||
auto slot = values.find(iter.key());
|
||||
|
||||
if (slot.good())
|
||||
{
|
||||
// Combine with existing entry
|
||||
|
||||
if constexpr (InplaceMode)
|
||||
{
|
||||
// In-place binary operation
|
||||
bop(slot.val(), iter.val());
|
||||
}
|
||||
else
|
||||
{
|
||||
// Assign result of binary operation
|
||||
slot.val() = bop(slot.val(), iter.val());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Create a new entry
|
||||
values.emplace(iter.key(), std::move(iter.val()));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Send up values
|
||||
if (myComm.above() >= 0)
|
||||
{
|
||||
if constexpr (InplaceMode)
|
||||
{
|
||||
if (debug & 2)
|
||||
{
|
||||
Perr<< " sending to " << myComm.above()
|
||||
<< " data:" << values << endl;
|
||||
}
|
||||
}
|
||||
|
||||
OPstream::send(values, myComm.above(), tag, communicator);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template<class Container, class BinaryOp, bool InplaceMode>
|
||||
void Foam::Pstream::mapGatherReduce
|
||||
(
|
||||
Container& values,
|
||||
const BinaryOp& bop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
Pstream::mapGather<Container, BinaryOp, InplaceMode>
|
||||
(
|
||||
values, bop, tag, comm
|
||||
);
|
||||
Pstream::broadcast(values, comm);
|
||||
}
|
||||
|
||||
|
||||
template<class Container, class CombineOp>
|
||||
void Foam::Pstream::mapCombineGather
|
||||
(
|
||||
Container& values,
|
||||
const CombineOp& cop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
// In-place binary operation
|
||||
Pstream::mapGather<Container, CombineOp, true>
|
||||
(
|
||||
values, cop, tag, comm
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
template<class Container, class CombineOp>
|
||||
void Foam::Pstream::mapCombineReduce
|
||||
(
|
||||
Container& values,
|
||||
const CombineOp& cop,
|
||||
const int tag,
|
||||
const label comm
|
||||
)
|
||||
{
|
||||
// In-place binary operation
|
||||
Pstream::mapGatherReduce<Container, CombineOp, true>
|
||||
(
|
||||
values, cop, tag, comm
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||
|
||||
// Single values to/from a list
|
||||
|
||||
template<class T>
|
||||
Foam::List<T> Foam::Pstream::listGatherValues
|
||||
(
|
||||
const T& localValue,
|
||||
const label comm,
|
||||
const label communicator,
|
||||
[[maybe_unused]] const int tag
|
||||
)
|
||||
{
|
||||
if constexpr (is_contiguous_v<T>)
|
||||
if (!UPstream::is_parallel(communicator))
|
||||
{
|
||||
// non-parallel: return own value
|
||||
// TBD: only when UPstream::is_rank(communicator) as well?
|
||||
List<T> allValues(1);
|
||||
allValues[0] = localValue;
|
||||
return allValues;
|
||||
}
|
||||
else if constexpr (is_contiguous_v<T>)
|
||||
{
|
||||
// UPstream version is contiguous only
|
||||
return UPstream::listGatherValues(localValue, comm);
|
||||
return UPstream::listGatherValues(localValue, communicator);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Standard gather (all to one)
|
||||
|
||||
// The data are non-contiguous!
|
||||
//
|
||||
// Non-trivial to manage non-blocking gather without a
|
||||
// PEX/NBX approach (eg, PstreamBuffers).
|
||||
// Leave with simple exchange for now
|
||||
|
||||
List<T> allValues;
|
||||
|
||||
if (UPstream::is_parallel(comm))
|
||||
if (UPstream::master(communicator))
|
||||
{
|
||||
const label numProc = UPstream::nProcs(comm);
|
||||
allValues.resize(UPstream::nProcs(communicator));
|
||||
|
||||
if (UPstream::master(comm))
|
||||
for (const int proci : UPstream::subProcs(communicator))
|
||||
{
|
||||
allValues.resize(numProc);
|
||||
|
||||
// Non-trivial to manage non-blocking gather without a
|
||||
// PEX/NBX approach (eg, PstreamBuffers).
|
||||
// Leave with simple exchange for now
|
||||
|
||||
allValues[0] = localValue;
|
||||
|
||||
for (int proci = 1; proci < numProc; ++proci)
|
||||
{
|
||||
IPstream::recv(allValues[proci], proci, tag, comm);
|
||||
}
|
||||
IPstream::recv(allValues[proci], proci, tag, communicator);
|
||||
}
|
||||
else if (UPstream::is_rank(comm))
|
||||
{
|
||||
OPstream::send(localValue, UPstream::masterNo(), tag, comm);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// non-parallel: return own value
|
||||
// TBD: only when UPstream::is_rank(comm) as well?
|
||||
allValues.resize(1);
|
||||
|
||||
allValues[0] = localValue;
|
||||
}
|
||||
else if (UPstream::is_rank(communicator))
|
||||
{
|
||||
OPstream::send(localValue, UPstream::masterNo(), tag, communicator);
|
||||
}
|
||||
|
||||
return allValues;
|
||||
}
|
||||
@ -161,73 +541,74 @@ template<class T>
|
||||
T Foam::Pstream::listScatterValues
|
||||
(
|
||||
const UList<T>& allValues,
|
||||
const label comm,
|
||||
const label communicator,
|
||||
[[maybe_unused]] const int tag
|
||||
)
|
||||
{
|
||||
if constexpr (is_contiguous_v<T>)
|
||||
if (!UPstream::is_parallel(communicator))
|
||||
{
|
||||
// non-parallel: return first value
|
||||
// TBD: only when UPstream::is_rank(communicator) as well?
|
||||
|
||||
if (!allValues.empty())
|
||||
{
|
||||
return allValues[0];
|
||||
}
|
||||
|
||||
return T{}; // Fallback value
|
||||
}
|
||||
else if constexpr (is_contiguous_v<T>)
|
||||
{
|
||||
// UPstream version is contiguous only
|
||||
return UPstream::listScatterValues(allValues, comm);
|
||||
return UPstream::listScatterValues(allValues, communicator);
|
||||
}
|
||||
else
|
||||
{
|
||||
T localValue{};
|
||||
// Standard scatter (one to all)
|
||||
|
||||
if (UPstream::is_parallel(comm))
|
||||
{
|
||||
const label numProc = UPstream::nProcs(comm);
|
||||
T localValue{};
|
||||
|
||||
if (UPstream::master(comm) && allValues.size() < numProc)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "Attempting to send " << allValues.size()
|
||||
<< " values to " << numProc << " processors" << endl
|
||||
<< Foam::abort(FatalError);
|
||||
}
|
||||
if (UPstream::master(communicator))
|
||||
{
|
||||
const label numProc = UPstream::nProcs(communicator);
|
||||
|
||||
if (UPstream::master(comm))
|
||||
{
|
||||
const label startOfRequests = UPstream::nRequests();
|
||||
if (allValues.size() < numProc)
|
||||
{
|
||||
FatalErrorInFunction
|
||||
<< "Attempting to send " << allValues.size()
|
||||
<< " values to " << numProc << " processors" << endl
|
||||
<< Foam::abort(FatalError);
|
||||
}
|
||||
|
||||
List<DynamicList<char>> sendBuffers(numProc);
|
||||
const label startOfRequests = UPstream::nRequests();
|
||||
|
||||
for (int proci = 1; proci < numProc; ++proci)
|
||||
{
|
||||
UOPstream toProc
|
||||
(
|
||||
UPstream::commsTypes::nonBlocking,
|
||||
proci,
|
||||
sendBuffers[proci],
|
||||
tag,
|
||||
comm
|
||||
);
|
||||
toProc << allValues[proci];
|
||||
}
|
||||
List<DynamicList<char>> sendBuffers(numProc);
|
||||
|
||||
// Wait for outstanding requests
|
||||
UPstream::waitRequests(startOfRequests);
|
||||
for (const int proci : UPstream::subProcs(communicator))
|
||||
{
|
||||
UOPstream toProc
|
||||
(
|
||||
UPstream::commsTypes::nonBlocking,
|
||||
proci,
|
||||
sendBuffers[proci],
|
||||
tag,
|
||||
communicator
|
||||
);
|
||||
toProc << allValues[proci];
|
||||
}
|
||||
|
||||
return allValues[0];
|
||||
}
|
||||
else if (UPstream::is_rank(comm))
|
||||
{
|
||||
IPstream::recv(localValue, UPstream::masterNo(), tag, comm);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// non-parallel: return first value
|
||||
// TBD: only when UPstream::is_rank(comm) as well?
|
||||
// Wait for outstanding requests
|
||||
UPstream::waitRequests(startOfRequests);
|
||||
|
||||
if (!allValues.empty())
|
||||
{
|
||||
return allValues[0];
|
||||
}
|
||||
}
|
||||
return allValues[0];
|
||||
}
|
||||
else if (UPstream::is_rank(communicator))
|
||||
{
|
||||
IPstream::recv(localValue, UPstream::masterNo(), tag, communicator);
|
||||
}
|
||||
|
||||
return localValue;
|
||||
}
|
||||
return localValue;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1 @@
|
||||
#warning File removed - left for old dependency check only
|
||||
@ -20,6 +20,8 @@ Description
|
||||
#ifndef FoamCompat_PstreamCombineReduceOps_H
|
||||
#define FoamCompat_PstreamCombineReduceOps_H
|
||||
|
||||
#warning Deprecated header
|
||||
|
||||
#include "Pstream.H"
|
||||
#include "ops.H"
|
||||
|
||||
@ -32,6 +34,7 @@ namespace Foam
|
||||
|
||||
//- Compatibility wrapper for Pstream::combineReduce
|
||||
template<class T, class CombineOp>
|
||||
FOAM_DEPRECATED_FOR(2022-08, "Pstream::combineReduce()")
|
||||
void combineReduce
|
||||
(
|
||||
T& value,
|
||||
@ -455,7 +455,7 @@ Foam::labelList Foam::hexMeshSmootherMotionSolver::countZeroOrPos
|
||||
const labelList& elems
|
||||
) const
|
||||
{
|
||||
labelList n(size, 0);
|
||||
labelList n(size, Zero);
|
||||
for (const label elem : elems)
|
||||
{
|
||||
if (elem >= 0)
|
||||
@ -463,8 +463,8 @@ Foam::labelList Foam::hexMeshSmootherMotionSolver::countZeroOrPos
|
||||
n[elem]++;
|
||||
}
|
||||
}
|
||||
Pstream::listCombineGather(n, plusEqOp<label>());
|
||||
Pstream::broadcast(n);
|
||||
|
||||
Pstream::listCombineReduce(n, plusEqOp<label>());
|
||||
return n;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user