ENH: extend Pstream gather templates to support combine or assign operation
- eliminates nearly identical code between 'gather' and 'combineGather'
* Normal gather updates by assigning the result of the binary operation.
* Combine gather updates by using a binary operator that modifies
its first parameter in-place
By-product of this refactoring are these new variants:
listGather(), listGatherReduce()
mapGather(), mapGatherReduce()
that mirror the previously existing ones
listCombineGather(), listCombineReduce()
mapCombineGather(), mapCombineReduce()
except that they use the 'regular' binary operator
This commit is contained in:
@ -1,3 +1,3 @@
|
|||||||
Test-Tuple2.C
|
Test-Tuple2.cxx
|
||||||
|
|
||||||
EXE = $(FOAM_USER_APPBIN)/Test-Tuple2
|
EXE = $(FOAM_USER_APPBIN)/Test-Tuple2
|
||||||
|
|||||||
@ -6,7 +6,7 @@
|
|||||||
\\/ M anipulation |
|
\\/ M anipulation |
|
||||||
-------------------------------------------------------------------------------
|
-------------------------------------------------------------------------------
|
||||||
Copyright (C) 2011 OpenFOAM Foundation
|
Copyright (C) 2011 OpenFOAM Foundation
|
||||||
Copyright (C) 2019-2020 OpenCFD Ltd.
|
Copyright (C) 2019-2025 OpenCFD Ltd.
|
||||||
-------------------------------------------------------------------------------
|
-------------------------------------------------------------------------------
|
||||||
License
|
License
|
||||||
This file is part of OpenFOAM.
|
This file is part of OpenFOAM.
|
||||||
@ -32,6 +32,7 @@ Description
|
|||||||
|
|
||||||
\*---------------------------------------------------------------------------*/
|
\*---------------------------------------------------------------------------*/
|
||||||
|
|
||||||
|
#include "argList.H"
|
||||||
#include "labelPair.H"
|
#include "labelPair.H"
|
||||||
#include "Tuple2.H"
|
#include "Tuple2.H"
|
||||||
#include "label.H"
|
#include "label.H"
|
||||||
@ -102,8 +103,12 @@ void printTuple2(const Pair<word>& t)
|
|||||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||||
// Main program:
|
// Main program:
|
||||||
|
|
||||||
int main()
|
int main(int argc, char *argv[])
|
||||||
{
|
{
|
||||||
|
argList::noCheckProcessorDirectories();
|
||||||
|
|
||||||
|
#include "setRootCase.H"
|
||||||
|
|
||||||
typedef Tuple2<label, scalar> indexedScalar;
|
typedef Tuple2<label, scalar> indexedScalar;
|
||||||
|
|
||||||
Info<< "Default constructed Tuple: " << indexedScalar() << nl;
|
Info<< "Default constructed Tuple: " << indexedScalar() << nl;
|
||||||
@ -34,7 +34,6 @@ SourceFiles
|
|||||||
Pstream.C
|
Pstream.C
|
||||||
PstreamBroadcast.C
|
PstreamBroadcast.C
|
||||||
PstreamGather.C
|
PstreamGather.C
|
||||||
PstreamCombineGather.C
|
|
||||||
PstreamGatherList.C
|
PstreamGatherList.C
|
||||||
PstreamExchangeConsensus.C
|
PstreamExchangeConsensus.C
|
||||||
PstreamExchange.C
|
PstreamExchange.C
|
||||||
@ -125,14 +124,18 @@ public:
|
|||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
// Gather
|
// Gather/scatter : single value
|
||||||
|
|
||||||
//- Gather (reduce) data, applying \c bop to combine \c value
|
//- Gather (reduce) data, applying \c bop to combine \c value
|
||||||
//- from different processors. The basis for Foam::reduce().
|
//- from different processors. The basis for Foam::reduce().
|
||||||
// Uses linear/tree communication (with parallel guard).
|
// A no-op for non-parallel.
|
||||||
template<class T, class BinaryOp>
|
//
|
||||||
|
// \tparam InplaceMode indicates that the binary operator
|
||||||
|
// modifies values in-place, not using assignment
|
||||||
|
template<class T, class BinaryOp, bool InplaceMode=false>
|
||||||
static void gather
|
static void gather
|
||||||
(
|
(
|
||||||
|
//! [in,out] the result is only reliable on rank=0
|
||||||
T& value,
|
T& value,
|
||||||
const BinaryOp& bop,
|
const BinaryOp& bop,
|
||||||
const int tag = UPstream::msgType(),
|
const int tag = UPstream::msgType(),
|
||||||
@ -168,17 +171,13 @@ public:
|
|||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
// Gather/combine data
|
// Inplace combine (gather) : single value
|
||||||
// Inplace combine values from processors.
|
|
||||||
// (Uses construct from Istream instead of \c << operator)
|
|
||||||
|
|
||||||
//- Gather data, applying \c cop to inplace combine \c value
|
//- Forwards to Pstream::gather with an \em in-place \c cop
|
||||||
//- from different processors.
|
|
||||||
// Uses linear/tree communication (with parallel guard).
|
|
||||||
template<class T, class CombineOp>
|
template<class T, class CombineOp>
|
||||||
static void combineGather
|
static void combineGather
|
||||||
(
|
(
|
||||||
//! [in,out]
|
//! [in,out] the result is only reliable on rank=0
|
||||||
T& value,
|
T& value,
|
||||||
const CombineOp& cop,
|
const CombineOp& cop,
|
||||||
const int tag = UPstream::msgType(),
|
const int tag = UPstream::msgType(),
|
||||||
@ -188,13 +187,10 @@ public:
|
|||||||
//- Reduce inplace (cf. MPI Allreduce)
|
//- Reduce inplace (cf. MPI Allreduce)
|
||||||
//- applying \c cop to inplace combine \c value
|
//- applying \c cop to inplace combine \c value
|
||||||
//- from different processors.
|
//- from different processors.
|
||||||
//- After completion all processors have the same data.
|
|
||||||
// Uses linear/tree communication.
|
|
||||||
// Wraps combineGather/broadcast (may change in the future).
|
|
||||||
template<class T, class CombineOp>
|
template<class T, class CombineOp>
|
||||||
static void combineReduce
|
static void combineReduce
|
||||||
(
|
(
|
||||||
//! [in,out]
|
//! [in,out] the result is consistent on all ranks
|
||||||
T& value,
|
T& value,
|
||||||
const CombineOp& cop,
|
const CombineOp& cop,
|
||||||
const int tag = UPstream::msgType(),
|
const int tag = UPstream::msgType(),
|
||||||
@ -205,37 +201,65 @@ public:
|
|||||||
template<class T, class CombineOp>
|
template<class T, class CombineOp>
|
||||||
static void combineAllGather
|
static void combineAllGather
|
||||||
(
|
(
|
||||||
|
//! [in,out] the result is consistent on all ranks
|
||||||
T& value,
|
T& value,
|
||||||
const CombineOp& cop,
|
const CombineOp& cop,
|
||||||
const int tag = UPstream::msgType(),
|
const int tag = UPstream::msgType(),
|
||||||
const label comm = UPstream::worldComm
|
const label comm = UPstream::worldComm
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
Pstream::combineReduce(value, cop, tag, comm);
|
Pstream::listCombineReduce(value, cop, tag, comm);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Combine variants working on whole List at a time.
|
// Gather/combine variants working on entire List
|
||||||
|
|
||||||
//- Combines List elements.
|
//- Gather (reduce) list elements,
|
||||||
// Uses linear/tree communication (with parallel guard).
|
//- applying \c bop to each list element
|
||||||
|
//
|
||||||
|
// \tparam InplaceMode indicates that the binary operator
|
||||||
|
// modifies values in-place, not using assignment
|
||||||
|
template<class T, class BinaryOp, bool InplaceMode=false>
|
||||||
|
static void listGather
|
||||||
|
(
|
||||||
|
//! [in,out] the result is only reliable on rank=0
|
||||||
|
UList<T>& values,
|
||||||
|
const BinaryOp& bop,
|
||||||
|
const int tag = UPstream::msgType(),
|
||||||
|
const label comm = UPstream::worldComm
|
||||||
|
);
|
||||||
|
|
||||||
|
//- Forwards to Pstream::listGather with an \em in-place \c cop
|
||||||
template<class T, class CombineOp>
|
template<class T, class CombineOp>
|
||||||
static void listCombineGather
|
static void listCombineGather
|
||||||
(
|
(
|
||||||
//! [in,out]
|
//! [in,out] the result is only reliable on rank=0
|
||||||
UList<T>& values,
|
UList<T>& values,
|
||||||
const CombineOp& cop,
|
const CombineOp& cop,
|
||||||
const int tag = UPstream::msgType(),
|
const int tag = UPstream::msgType(),
|
||||||
const label comm = UPstream::worldComm
|
const label comm = UPstream::worldComm
|
||||||
);
|
);
|
||||||
|
|
||||||
//- Combines List elements.
|
//- Gather (reduce) list elements,
|
||||||
//- After completion all processors have the same data.
|
//- applying \c bop to combine each list element.
|
||||||
// Uses linear/tree communication (with parallel guard).
|
//
|
||||||
|
// \tparam InplaceMode indicates that the binary operator
|
||||||
|
// modifies values in-place, not using assignment
|
||||||
|
template<class T, class BinaryOp, bool InplaceMode=false>
|
||||||
|
static void listGatherReduce
|
||||||
|
(
|
||||||
|
//! [in,out] the result is consistent on all ranks
|
||||||
|
List<T>& values,
|
||||||
|
const BinaryOp& bop,
|
||||||
|
const int tag = UPstream::msgType(),
|
||||||
|
const label comm = UPstream::worldComm
|
||||||
|
);
|
||||||
|
|
||||||
|
//- Forwards to Pstream::listGatherReduce with an \em in-place \c cop
|
||||||
template<class T, class CombineOp>
|
template<class T, class CombineOp>
|
||||||
static void listCombineReduce
|
static void listCombineReduce
|
||||||
(
|
(
|
||||||
//! [in,out] - List (not UList) due to broadcast()
|
//! [in,out] the result is consistent on all ranks
|
||||||
List<T>& values,
|
List<T>& values,
|
||||||
const CombineOp& cop,
|
const CombineOp& cop,
|
||||||
const int tag = UPstream::msgType(),
|
const int tag = UPstream::msgType(),
|
||||||
@ -246,7 +270,7 @@ public:
|
|||||||
template<class T, class CombineOp>
|
template<class T, class CombineOp>
|
||||||
static void listCombineAllGather
|
static void listCombineAllGather
|
||||||
(
|
(
|
||||||
//! [in,out] - List (not UList) due to broadcast()
|
//! [in,out] the result is consistent on all ranks
|
||||||
List<T>& values,
|
List<T>& values,
|
||||||
const CombineOp& cop,
|
const CombineOp& cop,
|
||||||
const int tag = UPstream::msgType(),
|
const int tag = UPstream::msgType(),
|
||||||
@ -257,14 +281,28 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Combine variants working on whole map at a time.
|
// Gather/combine variants working on Map/HashTable containers
|
||||||
// Container needs iterators, find() and insert methods defined.
|
|
||||||
|
|
||||||
//- Combine Map elements.
|
//- Gather (reduce) Map/HashTable containers,
|
||||||
// Uses linear/tree communication (with parallel guard).
|
//- applying \c bop to combine entries from different processors.
|
||||||
|
//
|
||||||
|
// \tparam InplaceMode indicates that the binary operator
|
||||||
|
// modifies values in-place, not using assignment
|
||||||
|
template<class Container, class BinaryOp, bool InplaceMode=false>
|
||||||
|
static void mapGather
|
||||||
|
(
|
||||||
|
//! [in,out] the result is only reliable on rank=0
|
||||||
|
Container& values,
|
||||||
|
const BinaryOp& bop,
|
||||||
|
const int tag = UPstream::msgType(),
|
||||||
|
const label comm = UPstream::worldComm
|
||||||
|
);
|
||||||
|
|
||||||
|
//- Forwards to Pstream::mapGather with an \em in-place \c cop
|
||||||
template<class Container, class CombineOp>
|
template<class Container, class CombineOp>
|
||||||
static void mapCombineGather
|
static void mapCombineGather
|
||||||
(
|
(
|
||||||
|
//! [in,out] the result is only reliable on rank=0
|
||||||
Container& values,
|
Container& values,
|
||||||
const CombineOp& cop,
|
const CombineOp& cop,
|
||||||
const int tag = UPstream::msgType(),
|
const int tag = UPstream::msgType(),
|
||||||
@ -272,15 +310,26 @@ public:
|
|||||||
);
|
);
|
||||||
|
|
||||||
//- Reduce inplace (cf. MPI Allreduce)
|
//- Reduce inplace (cf. MPI Allreduce)
|
||||||
//- applying \c cop to inplace combine map \c values
|
//- applying \c bop to combine map \c values
|
||||||
//- from different processors.
|
//- from different processors.
|
||||||
//- After completion all processors have the same data.
|
//- After completion all processors have the same data.
|
||||||
// Uses the specified communication schedule.
|
//
|
||||||
// Wraps mapCombineGather/broadcast (may change in the future).
|
// Wraps mapCombineGather/broadcast (may change in the future).
|
||||||
//- After completion all processors have the same data.
|
template<class Container, class BinaryOp, bool InplaceMode=false>
|
||||||
|
static void mapGatherReduce
|
||||||
|
(
|
||||||
|
//! [in,out] the result is consistent on all ranks
|
||||||
|
Container& values,
|
||||||
|
const BinaryOp& bop,
|
||||||
|
const int tag = UPstream::msgType(),
|
||||||
|
const label comm = UPstream::worldComm
|
||||||
|
);
|
||||||
|
|
||||||
|
//- Forwards to Pstream::mapGatherReduce with an \em in-place \c cop
|
||||||
template<class Container, class CombineOp>
|
template<class Container, class CombineOp>
|
||||||
static void mapCombineReduce
|
static void mapCombineReduce
|
||||||
(
|
(
|
||||||
|
//! [in,out] the result is consistent on all ranks
|
||||||
Container& values,
|
Container& values,
|
||||||
const CombineOp& cop,
|
const CombineOp& cop,
|
||||||
const int tag = UPstream::msgType(),
|
const int tag = UPstream::msgType(),
|
||||||
@ -291,6 +340,7 @@ public:
|
|||||||
template<class Container, class CombineOp>
|
template<class Container, class CombineOp>
|
||||||
static void mapCombineAllGather
|
static void mapCombineAllGather
|
||||||
(
|
(
|
||||||
|
//! [in,out] the result is consistent on all ranks
|
||||||
Container& values,
|
Container& values,
|
||||||
const CombineOp& cop,
|
const CombineOp& cop,
|
||||||
const int tag = UPstream::msgType(),
|
const int tag = UPstream::msgType(),
|
||||||
@ -343,7 +393,7 @@ public:
|
|||||||
);
|
);
|
||||||
|
|
||||||
//- Gather data, but keep individual values separate.
|
//- Gather data, but keep individual values separate.
|
||||||
//- Uses MPI_Allgather or manual linear/tree communication.
|
//- Uses MPI_Allgather or manual communication.
|
||||||
// After completion all processors have the same data.
|
// After completion all processors have the same data.
|
||||||
// Wraps gatherList/scatterList (may change in the future).
|
// Wraps gatherList/scatterList (may change in the future).
|
||||||
template<class T>
|
template<class T>
|
||||||
@ -612,7 +662,6 @@ public:
|
|||||||
#ifdef NoRepository
|
#ifdef NoRepository
|
||||||
#include "PstreamBroadcast.C"
|
#include "PstreamBroadcast.C"
|
||||||
#include "PstreamGather.C"
|
#include "PstreamGather.C"
|
||||||
#include "PstreamCombineGather.C"
|
|
||||||
#include "PstreamGatherList.C"
|
#include "PstreamGatherList.C"
|
||||||
#include "PstreamExchange.C"
|
#include "PstreamExchange.C"
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -1,365 +0,0 @@
|
|||||||
/*---------------------------------------------------------------------------*\
|
|
||||||
========= |
|
|
||||||
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
|
|
||||||
\\ / O peration |
|
|
||||||
\\ / A nd | www.openfoam.com
|
|
||||||
\\/ M anipulation |
|
|
||||||
-------------------------------------------------------------------------------
|
|
||||||
Copyright (C) 2011-2017 OpenFOAM Foundation
|
|
||||||
Copyright (C) 2019-2025 OpenCFD Ltd.
|
|
||||||
-------------------------------------------------------------------------------
|
|
||||||
License
|
|
||||||
This file is part of OpenFOAM.
|
|
||||||
|
|
||||||
OpenFOAM is free software: you can redistribute it and/or modify it
|
|
||||||
under the terms of the GNU General Public License as published by
|
|
||||||
the Free Software Foundation, either version 3 of the License, or
|
|
||||||
(at your option) any later version.
|
|
||||||
|
|
||||||
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
||||||
for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU General Public License
|
|
||||||
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
Description
|
|
||||||
Variant of gather.
|
|
||||||
Normal gather uses:
|
|
||||||
- default construct and read (>>) from Istream
|
|
||||||
- binary operator and assignment operator to combine values
|
|
||||||
|
|
||||||
combineGather uses:
|
|
||||||
- construct from Istream
|
|
||||||
- modify operator which modifies its lhs
|
|
||||||
|
|
||||||
\*---------------------------------------------------------------------------*/
|
|
||||||
|
|
||||||
#include "IPstream.H"
|
|
||||||
#include "OPstream.H"
|
|
||||||
#include "IOstreams.H"
|
|
||||||
#include "contiguous.H"
|
|
||||||
|
|
||||||
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
|
||||||
|
|
||||||
template<class T, class CombineOp>
|
|
||||||
void Foam::Pstream::combineGather
|
|
||||||
(
|
|
||||||
T& value,
|
|
||||||
const CombineOp& cop,
|
|
||||||
const int tag,
|
|
||||||
const label comm
|
|
||||||
)
|
|
||||||
{
|
|
||||||
if (UPstream::is_parallel(comm))
|
|
||||||
{
|
|
||||||
// Communication order
|
|
||||||
const auto& comms = UPstream::whichCommunication(comm);
|
|
||||||
// if (comms.empty()) return; // extra safety?
|
|
||||||
const auto& myComm = comms[UPstream::myProcNo(comm)];
|
|
||||||
|
|
||||||
// Receive from my downstairs neighbours
|
|
||||||
for (const auto belowID : myComm.below())
|
|
||||||
{
|
|
||||||
if constexpr (is_contiguous_v<T>)
|
|
||||||
{
|
|
||||||
T received;
|
|
||||||
|
|
||||||
UIPstream::read
|
|
||||||
(
|
|
||||||
UPstream::commsTypes::scheduled,
|
|
||||||
belowID,
|
|
||||||
reinterpret_cast<char*>(&received),
|
|
||||||
sizeof(T),
|
|
||||||
tag,
|
|
||||||
comm
|
|
||||||
);
|
|
||||||
|
|
||||||
if (debug & 2)
|
|
||||||
{
|
|
||||||
Perr<< " received from "
|
|
||||||
<< belowID << " data:" << received << endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
cop(value, received);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
IPstream fromBelow
|
|
||||||
(
|
|
||||||
UPstream::commsTypes::scheduled,
|
|
||||||
belowID,
|
|
||||||
0, // bufsize
|
|
||||||
tag,
|
|
||||||
comm
|
|
||||||
);
|
|
||||||
T received(fromBelow);
|
|
||||||
|
|
||||||
if (debug & 2)
|
|
||||||
{
|
|
||||||
Perr<< " received from "
|
|
||||||
<< belowID << " data:" << received << endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
cop(value, received);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send up value
|
|
||||||
if (myComm.above() >= 0)
|
|
||||||
{
|
|
||||||
if (debug & 2)
|
|
||||||
{
|
|
||||||
Perr<< " sending to " << myComm.above()
|
|
||||||
<< " data:" << value << endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
if constexpr (is_contiguous_v<T>)
|
|
||||||
{
|
|
||||||
UOPstream::write
|
|
||||||
(
|
|
||||||
UPstream::commsTypes::scheduled,
|
|
||||||
myComm.above(),
|
|
||||||
reinterpret_cast<const char*>(&value),
|
|
||||||
sizeof(T),
|
|
||||||
tag,
|
|
||||||
comm
|
|
||||||
);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
OPstream::send(value, myComm.above(), tag, comm);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
template<class T, class CombineOp>
|
|
||||||
void Foam::Pstream::combineReduce
|
|
||||||
(
|
|
||||||
T& value,
|
|
||||||
const CombineOp& cop,
|
|
||||||
const int tag,
|
|
||||||
const label comm
|
|
||||||
)
|
|
||||||
{
|
|
||||||
if (UPstream::is_parallel(comm))
|
|
||||||
{
|
|
||||||
Pstream::combineGather(value, cop, tag, comm);
|
|
||||||
Pstream::broadcast(value, comm);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
|
||||||
|
|
||||||
template<class T, class CombineOp>
|
|
||||||
void Foam::Pstream::listCombineGather
|
|
||||||
(
|
|
||||||
UList<T>& values,
|
|
||||||
const CombineOp& cop,
|
|
||||||
const int tag,
|
|
||||||
const label comm
|
|
||||||
)
|
|
||||||
{
|
|
||||||
if (UPstream::is_parallel(comm))
|
|
||||||
{
|
|
||||||
// Communication order
|
|
||||||
const auto& comms = UPstream::whichCommunication(comm);
|
|
||||||
// if (comms.empty()) return; // extra safety?
|
|
||||||
const auto& myComm = comms[UPstream::myProcNo(comm)];
|
|
||||||
|
|
||||||
// Receive from my downstairs neighbours
|
|
||||||
for (const auto belowID : myComm.below())
|
|
||||||
{
|
|
||||||
if constexpr (is_contiguous_v<T>)
|
|
||||||
{
|
|
||||||
List<T> received(values.size());
|
|
||||||
|
|
||||||
UIPstream::read
|
|
||||||
(
|
|
||||||
UPstream::commsTypes::scheduled,
|
|
||||||
belowID,
|
|
||||||
received,
|
|
||||||
tag,
|
|
||||||
comm
|
|
||||||
);
|
|
||||||
|
|
||||||
if (debug & 2)
|
|
||||||
{
|
|
||||||
Perr<< " received from "
|
|
||||||
<< belowID << " data:" << received << endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
forAll(values, i)
|
|
||||||
{
|
|
||||||
cop(values[i], received[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
IPstream fromBelow
|
|
||||||
(
|
|
||||||
UPstream::commsTypes::scheduled,
|
|
||||||
belowID,
|
|
||||||
0, // bufsize
|
|
||||||
tag,
|
|
||||||
comm
|
|
||||||
);
|
|
||||||
List<T> received(fromBelow);
|
|
||||||
|
|
||||||
if (debug & 2)
|
|
||||||
{
|
|
||||||
Perr<< " received from "
|
|
||||||
<< belowID << " data:" << received << endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
forAll(values, i)
|
|
||||||
{
|
|
||||||
cop(values[i], received[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send up values
|
|
||||||
if (myComm.above() >= 0)
|
|
||||||
{
|
|
||||||
if (debug & 2)
|
|
||||||
{
|
|
||||||
Perr<< " sending to " << myComm.above()
|
|
||||||
<< " data:" << values << endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
if constexpr (is_contiguous_v<T>)
|
|
||||||
{
|
|
||||||
UOPstream::write
|
|
||||||
(
|
|
||||||
UPstream::commsTypes::scheduled,
|
|
||||||
myComm.above(),
|
|
||||||
values,
|
|
||||||
tag,
|
|
||||||
comm
|
|
||||||
);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
OPstream::send(values, myComm.above(), tag, comm);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
template<class T, class CombineOp>
|
|
||||||
void Foam::Pstream::listCombineReduce
|
|
||||||
(
|
|
||||||
List<T>& values,
|
|
||||||
const CombineOp& cop,
|
|
||||||
const int tag,
|
|
||||||
const label comm
|
|
||||||
)
|
|
||||||
{
|
|
||||||
if (UPstream::is_parallel(comm))
|
|
||||||
{
|
|
||||||
Pstream::listCombineGather(values, cop, tag, comm);
|
|
||||||
Pstream::broadcast(values, comm);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
|
||||||
|
|
||||||
template<class Container, class CombineOp>
|
|
||||||
void Foam::Pstream::mapCombineGather
|
|
||||||
(
|
|
||||||
Container& values,
|
|
||||||
const CombineOp& cop,
|
|
||||||
const int tag,
|
|
||||||
const label comm
|
|
||||||
)
|
|
||||||
{
|
|
||||||
if (UPstream::is_parallel(comm))
|
|
||||||
{
|
|
||||||
// Communication order
|
|
||||||
const auto& comms = UPstream::whichCommunication(comm);
|
|
||||||
// if (comms.empty()) return; // extra safety?
|
|
||||||
const auto& myComm = comms[UPstream::myProcNo(comm)];
|
|
||||||
|
|
||||||
// Receive from my downstairs neighbours
|
|
||||||
for (const auto belowID : myComm.below())
|
|
||||||
{
|
|
||||||
// Map/HashTable: non-contiguous
|
|
||||||
|
|
||||||
IPstream fromBelow
|
|
||||||
(
|
|
||||||
UPstream::commsTypes::scheduled,
|
|
||||||
belowID,
|
|
||||||
0, // bufsize
|
|
||||||
tag,
|
|
||||||
comm
|
|
||||||
);
|
|
||||||
Container received(fromBelow);
|
|
||||||
|
|
||||||
if (debug & 2)
|
|
||||||
{
|
|
||||||
Perr<< " received from "
|
|
||||||
<< belowID << " data:" << received << endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
for
|
|
||||||
(
|
|
||||||
auto recvIter = received.cbegin();
|
|
||||||
recvIter != received.cend();
|
|
||||||
++recvIter
|
|
||||||
)
|
|
||||||
{
|
|
||||||
auto masterIter = values.find(recvIter.key());
|
|
||||||
|
|
||||||
if (masterIter.good())
|
|
||||||
{
|
|
||||||
// Combine with existing
|
|
||||||
cop(masterIter.val(), recvIter.val());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// Insert new key/value
|
|
||||||
values.insert(recvIter.key(), recvIter.val());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send up values
|
|
||||||
if (myComm.above() >= 0)
|
|
||||||
{
|
|
||||||
if (debug & 2)
|
|
||||||
{
|
|
||||||
Perr<< " sending to " << myComm.above()
|
|
||||||
<< " data:" << values << endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
OPstream::send(values, myComm.above(), tag, comm);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
template<class Container, class CombineOp>
|
|
||||||
void Foam::Pstream::mapCombineReduce
|
|
||||||
(
|
|
||||||
Container& values,
|
|
||||||
const CombineOp& cop,
|
|
||||||
const int tag,
|
|
||||||
const label comm
|
|
||||||
)
|
|
||||||
{
|
|
||||||
if (UPstream::is_parallel(comm))
|
|
||||||
{
|
|
||||||
Pstream::mapCombineGather(values, cop, tag, comm);
|
|
||||||
Pstream::broadcast(values, comm);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// ************************************************************************* //
|
|
||||||
@ -30,15 +30,25 @@ Description
|
|||||||
The gathered data will be a single value constructed from the values
|
The gathered data will be a single value constructed from the values
|
||||||
on individual processors using a user-specified operator.
|
on individual processors using a user-specified operator.
|
||||||
|
|
||||||
|
Note
|
||||||
|
Normal gather uses:
|
||||||
|
- binary operator that returns a value.
|
||||||
|
So assignment that return value to yield the new value
|
||||||
|
|
||||||
|
Combine gather uses:
|
||||||
|
- binary operator modifies its first parameter in-place
|
||||||
|
|
||||||
\*---------------------------------------------------------------------------*/
|
\*---------------------------------------------------------------------------*/
|
||||||
|
|
||||||
|
#include "contiguous.H"
|
||||||
#include "IPstream.H"
|
#include "IPstream.H"
|
||||||
#include "OPstream.H"
|
#include "OPstream.H"
|
||||||
#include "contiguous.H"
|
|
||||||
|
|
||||||
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
|
||||||
|
|
||||||
template<class T, class BinaryOp>
|
// Single value variants
|
||||||
|
|
||||||
|
template<class T, class BinaryOp, bool InplaceMode>
|
||||||
void Foam::Pstream::gather
|
void Foam::Pstream::gather
|
||||||
(
|
(
|
||||||
T& value,
|
T& value,
|
||||||
@ -47,15 +57,21 @@ void Foam::Pstream::gather
|
|||||||
const label communicator
|
const label communicator
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
if (UPstream::is_parallel(communicator))
|
if (!UPstream::is_parallel(communicator))
|
||||||
|
{
|
||||||
|
// Nothing to do
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
// Communication order
|
// Communication order
|
||||||
const auto& comms = UPstream::whichCommunication(communicator);
|
const auto& comms = UPstream::whichCommunication(communicator);
|
||||||
// if (comms.empty()) return; // extra safety?
|
// if (comms.empty()) return; // extra safety?
|
||||||
const auto& myComm = comms[UPstream::myProcNo(communicator)];
|
const auto& myComm = comms[UPstream::myProcNo(communicator)];
|
||||||
|
const auto& below = myComm.below();
|
||||||
|
|
||||||
// Receive from my downstairs neighbours
|
// Receive from my downstairs neighbours
|
||||||
for (const auto belowID : myComm.below())
|
for (const auto proci : below)
|
||||||
{
|
{
|
||||||
T received;
|
T received;
|
||||||
|
|
||||||
@ -64,7 +80,7 @@ void Foam::Pstream::gather
|
|||||||
UIPstream::read
|
UIPstream::read
|
||||||
(
|
(
|
||||||
UPstream::commsTypes::scheduled,
|
UPstream::commsTypes::scheduled,
|
||||||
belowID,
|
proci,
|
||||||
reinterpret_cast<char*>(&received),
|
reinterpret_cast<char*>(&received),
|
||||||
sizeof(T),
|
sizeof(T),
|
||||||
tag,
|
tag,
|
||||||
@ -73,15 +89,42 @@ void Foam::Pstream::gather
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
IPstream::recv(received, belowID, tag, communicator);
|
IPstream::recv(received, proci, tag, communicator);
|
||||||
}
|
}
|
||||||
|
|
||||||
value = bop(value, received);
|
if constexpr (InplaceMode)
|
||||||
|
{
|
||||||
|
if (debug & 2)
|
||||||
|
{
|
||||||
|
Perr<< " received from "
|
||||||
|
<< proci << " data:" << received << endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if constexpr (InplaceMode)
|
||||||
|
{
|
||||||
|
// In-place binary operation
|
||||||
|
bop(value, received);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Assign result of binary operation
|
||||||
|
value = bop(value, received);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send up value
|
// Send up value
|
||||||
if (myComm.above() >= 0)
|
if (myComm.above() >= 0)
|
||||||
{
|
{
|
||||||
|
if constexpr (InplaceMode)
|
||||||
|
{
|
||||||
|
if (debug & 2)
|
||||||
|
{
|
||||||
|
Perr<< " sending to " << myComm.above()
|
||||||
|
<< " data:" << value << endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if constexpr (is_contiguous_v<T>)
|
if constexpr (is_contiguous_v<T>)
|
||||||
{
|
{
|
||||||
UOPstream::write
|
UOPstream::write
|
||||||
@ -103,54 +146,391 @@ void Foam::Pstream::gather
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template<class T, class CombineOp>
|
||||||
|
void Foam::Pstream::combineGather
|
||||||
|
(
|
||||||
|
T& value,
|
||||||
|
const CombineOp& cop,
|
||||||
|
const int tag,
|
||||||
|
const label comm
|
||||||
|
)
|
||||||
|
{
|
||||||
|
// In-place binary operation
|
||||||
|
Pstream::gather<T, CombineOp, true>(value, cop, tag, comm);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template<class T, class CombineOp>
|
||||||
|
void Foam::Pstream::combineReduce
|
||||||
|
(
|
||||||
|
T& value,
|
||||||
|
const CombineOp& cop,
|
||||||
|
const int tag,
|
||||||
|
const label comm
|
||||||
|
)
|
||||||
|
{
|
||||||
|
if (UPstream::is_parallel(comm))
|
||||||
|
{
|
||||||
|
// In-place binary operation
|
||||||
|
Pstream::gather<T, CombineOp, true>(value, cop, tag, comm);
|
||||||
|
Pstream::broadcast(value, comm);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||||
|
|
||||||
|
// List variants
|
||||||
|
|
||||||
|
template<class T, class BinaryOp, bool InplaceMode>
|
||||||
|
void Foam::Pstream::listGather
|
||||||
|
(
|
||||||
|
UList<T>& values,
|
||||||
|
const BinaryOp& bop,
|
||||||
|
const int tag,
|
||||||
|
const label communicator
|
||||||
|
)
|
||||||
|
{
|
||||||
|
if (!UPstream::is_parallel(communicator) || values.empty())
|
||||||
|
{
|
||||||
|
// Nothing to do
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Communication order
|
||||||
|
const auto& comms = UPstream::whichCommunication(communicator);
|
||||||
|
// if (comms.empty()) return; // extra safety?
|
||||||
|
const auto& myComm = comms[UPstream::myProcNo(communicator)];
|
||||||
|
const auto& below = myComm.below();
|
||||||
|
|
||||||
|
// Same length on all ranks
|
||||||
|
const label listLen = values.size();
|
||||||
|
|
||||||
|
List<T> received;
|
||||||
|
|
||||||
|
if (!below.empty())
|
||||||
|
{
|
||||||
|
// Pre-size for contiguous reading
|
||||||
|
if constexpr (is_contiguous_v<T>)
|
||||||
|
{
|
||||||
|
received.resize_nocopy(listLen);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Receive from my downstairs neighbours
|
||||||
|
for (const auto proci : below)
|
||||||
|
{
|
||||||
|
if constexpr (is_contiguous_v<T>)
|
||||||
|
{
|
||||||
|
UIPstream::read
|
||||||
|
(
|
||||||
|
UPstream::commsTypes::scheduled,
|
||||||
|
proci,
|
||||||
|
received,
|
||||||
|
tag,
|
||||||
|
communicator
|
||||||
|
);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
received.clear(); // extra safety?
|
||||||
|
IPstream::recv(received, proci, tag, communicator);
|
||||||
|
}
|
||||||
|
|
||||||
|
if constexpr (InplaceMode)
|
||||||
|
{
|
||||||
|
if (debug & 2)
|
||||||
|
{
|
||||||
|
Perr<< " received from "
|
||||||
|
<< proci << " data:" << received << endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (label i = 0; i < listLen; ++i)
|
||||||
|
{
|
||||||
|
if constexpr (InplaceMode)
|
||||||
|
{
|
||||||
|
// In-place binary operation
|
||||||
|
bop(values[i], received[i]);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Assign result of binary operation
|
||||||
|
values[i] = bop(values[i], received[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send up values
|
||||||
|
if (myComm.above() >= 0)
|
||||||
|
{
|
||||||
|
if constexpr (InplaceMode)
|
||||||
|
{
|
||||||
|
if (debug & 2)
|
||||||
|
{
|
||||||
|
Perr<< " sending to " << myComm.above()
|
||||||
|
<< " data:" << values << endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if constexpr (is_contiguous_v<T>)
|
||||||
|
{
|
||||||
|
UOPstream::write
|
||||||
|
(
|
||||||
|
UPstream::commsTypes::scheduled,
|
||||||
|
myComm.above(),
|
||||||
|
values,
|
||||||
|
tag,
|
||||||
|
communicator
|
||||||
|
);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
OPstream::send(values, myComm.above(), tag, communicator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template<class T, class BinaryOp, bool InplaceMode>
|
||||||
|
void Foam::Pstream::listGatherReduce
|
||||||
|
(
|
||||||
|
List<T>& values,
|
||||||
|
const BinaryOp& bop,
|
||||||
|
const int tag,
|
||||||
|
const label comm
|
||||||
|
)
|
||||||
|
{
|
||||||
|
Pstream::listGather<T, BinaryOp, InplaceMode>(values, bop, tag, comm);
|
||||||
|
if (!values.empty())
|
||||||
|
{
|
||||||
|
Pstream::broadcast(values, comm);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template<class T, class CombineOp>
|
||||||
|
void Foam::Pstream::listCombineGather
|
||||||
|
(
|
||||||
|
UList<T>& values,
|
||||||
|
const CombineOp& cop,
|
||||||
|
const int tag,
|
||||||
|
const label comm
|
||||||
|
)
|
||||||
|
{
|
||||||
|
// In-place binary operation
|
||||||
|
Pstream::listGather<T, CombineOp, true>(values, cop, tag, comm);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template<class T, class CombineOp>
|
||||||
|
void Foam::Pstream::listCombineReduce
|
||||||
|
(
|
||||||
|
List<T>& values,
|
||||||
|
const CombineOp& cop,
|
||||||
|
const int tag,
|
||||||
|
const label comm
|
||||||
|
)
|
||||||
|
{
|
||||||
|
// In-place binary operation
|
||||||
|
Pstream::listGatherReduce<T, CombineOp, true>(values, cop, tag, comm);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||||
|
|
||||||
|
// Map variants
|
||||||
|
|
||||||
|
template<class Container, class BinaryOp, bool InplaceMode>
|
||||||
|
void Foam::Pstream::mapGather
|
||||||
|
(
|
||||||
|
Container& values,
|
||||||
|
const BinaryOp& bop,
|
||||||
|
const int tag,
|
||||||
|
const label communicator
|
||||||
|
)
|
||||||
|
{
|
||||||
|
if (!UPstream::is_parallel(communicator))
|
||||||
|
{
|
||||||
|
// Nothing to do
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Communication order
|
||||||
|
const auto& comms = UPstream::whichCommunication(communicator);
|
||||||
|
// if (comms.empty()) return; // extra safety?
|
||||||
|
const auto& myComm = comms[UPstream::myProcNo(communicator)];
|
||||||
|
const auto& below = myComm.below();
|
||||||
|
|
||||||
|
// Receive from my downstairs neighbours
|
||||||
|
for (const auto proci : below)
|
||||||
|
{
|
||||||
|
// Map/HashTable: non-contiguous
|
||||||
|
Container received;
|
||||||
|
IPstream::recv(received, proci, tag, communicator);
|
||||||
|
|
||||||
|
if constexpr (InplaceMode)
|
||||||
|
{
|
||||||
|
if (debug & 2)
|
||||||
|
{
|
||||||
|
Perr<< " received from "
|
||||||
|
<< proci << " data:" << received << endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto last = received.end();
|
||||||
|
|
||||||
|
for (auto iter = received.begin(); iter != last; ++iter)
|
||||||
|
{
|
||||||
|
auto slot = values.find(iter.key());
|
||||||
|
|
||||||
|
if (slot.good())
|
||||||
|
{
|
||||||
|
// Combine with existing entry
|
||||||
|
|
||||||
|
if constexpr (InplaceMode)
|
||||||
|
{
|
||||||
|
// In-place binary operation
|
||||||
|
bop(slot.val(), iter.val());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Assign result of binary operation
|
||||||
|
slot.val() = bop(slot.val(), iter.val());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Create a new entry
|
||||||
|
values.emplace(iter.key(), std::move(iter.val()));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send up values
|
||||||
|
if (myComm.above() >= 0)
|
||||||
|
{
|
||||||
|
if constexpr (InplaceMode)
|
||||||
|
{
|
||||||
|
if (debug & 2)
|
||||||
|
{
|
||||||
|
Perr<< " sending to " << myComm.above()
|
||||||
|
<< " data:" << values << endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
OPstream::send(values, myComm.above(), tag, communicator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template<class Container, class BinaryOp, bool InplaceMode>
|
||||||
|
void Foam::Pstream::mapGatherReduce
|
||||||
|
(
|
||||||
|
Container& values,
|
||||||
|
const BinaryOp& bop,
|
||||||
|
const int tag,
|
||||||
|
const label comm
|
||||||
|
)
|
||||||
|
{
|
||||||
|
Pstream::mapGather<Container, BinaryOp, InplaceMode>
|
||||||
|
(
|
||||||
|
values, bop, tag, comm
|
||||||
|
);
|
||||||
|
Pstream::broadcast(values, comm);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template<class Container, class CombineOp>
|
||||||
|
void Foam::Pstream::mapCombineGather
|
||||||
|
(
|
||||||
|
Container& values,
|
||||||
|
const CombineOp& cop,
|
||||||
|
const int tag,
|
||||||
|
const label comm
|
||||||
|
)
|
||||||
|
{
|
||||||
|
// In-place binary operation
|
||||||
|
Pstream::mapGather<Container, CombineOp, true>
|
||||||
|
(
|
||||||
|
values, cop, tag, comm
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template<class Container, class CombineOp>
|
||||||
|
void Foam::Pstream::mapCombineReduce
|
||||||
|
(
|
||||||
|
Container& values,
|
||||||
|
const CombineOp& cop,
|
||||||
|
const int tag,
|
||||||
|
const label comm
|
||||||
|
)
|
||||||
|
{
|
||||||
|
// In-place binary operation
|
||||||
|
Pstream::mapGatherReduce<Container, CombineOp, true>
|
||||||
|
(
|
||||||
|
values, cop, tag, comm
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
||||||
|
|
||||||
|
// Single values to/from a list
|
||||||
|
|
||||||
template<class T>
|
template<class T>
|
||||||
Foam::List<T> Foam::Pstream::listGatherValues
|
Foam::List<T> Foam::Pstream::listGatherValues
|
||||||
(
|
(
|
||||||
const T& localValue,
|
const T& localValue,
|
||||||
const label comm,
|
const label communicator,
|
||||||
[[maybe_unused]] const int tag
|
[[maybe_unused]] const int tag
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
if constexpr (is_contiguous_v<T>)
|
if (!UPstream::is_parallel(communicator))
|
||||||
|
{
|
||||||
|
// non-parallel: return own value
|
||||||
|
// TBD: only when UPstream::is_rank(communicator) as well?
|
||||||
|
List<T> allValues(1);
|
||||||
|
allValues[0] = localValue;
|
||||||
|
return allValues;
|
||||||
|
}
|
||||||
|
else if constexpr (is_contiguous_v<T>)
|
||||||
{
|
{
|
||||||
// UPstream version is contiguous only
|
// UPstream version is contiguous only
|
||||||
return UPstream::listGatherValues(localValue, comm);
|
return UPstream::listGatherValues(localValue, communicator);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
// Standard gather (all to one)
|
||||||
|
|
||||||
|
// The data are non-contiguous!
|
||||||
|
//
|
||||||
|
// Non-trivial to manage non-blocking gather without a
|
||||||
|
// PEX/NBX approach (eg, PstreamBuffers).
|
||||||
|
// Leave with simple exchange for now
|
||||||
|
|
||||||
List<T> allValues;
|
List<T> allValues;
|
||||||
|
if (UPstream::master(communicator))
|
||||||
if (UPstream::is_parallel(comm))
|
|
||||||
{
|
{
|
||||||
const label numProc = UPstream::nProcs(comm);
|
allValues.resize(UPstream::nProcs(communicator));
|
||||||
|
|
||||||
if (UPstream::master(comm))
|
for (const int proci : UPstream::subProcs(communicator))
|
||||||
{
|
{
|
||||||
allValues.resize(numProc);
|
IPstream::recv(allValues[proci], proci, tag, communicator);
|
||||||
|
|
||||||
// Non-trivial to manage non-blocking gather without a
|
|
||||||
// PEX/NBX approach (eg, PstreamBuffers).
|
|
||||||
// Leave with simple exchange for now
|
|
||||||
|
|
||||||
allValues[0] = localValue;
|
|
||||||
|
|
||||||
for (int proci = 1; proci < numProc; ++proci)
|
|
||||||
{
|
|
||||||
IPstream::recv(allValues[proci], proci, tag, comm);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else if (UPstream::is_rank(comm))
|
|
||||||
{
|
|
||||||
OPstream::send(localValue, UPstream::masterNo(), tag, comm);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// non-parallel: return own value
|
|
||||||
// TBD: only when UPstream::is_rank(comm) as well?
|
|
||||||
allValues.resize(1);
|
|
||||||
allValues[0] = localValue;
|
allValues[0] = localValue;
|
||||||
}
|
}
|
||||||
|
else if (UPstream::is_rank(communicator))
|
||||||
|
{
|
||||||
|
OPstream::send(localValue, UPstream::masterNo(), tag, communicator);
|
||||||
|
}
|
||||||
|
|
||||||
return allValues;
|
return allValues;
|
||||||
}
|
}
|
||||||
@ -161,73 +541,74 @@ template<class T>
|
|||||||
T Foam::Pstream::listScatterValues
|
T Foam::Pstream::listScatterValues
|
||||||
(
|
(
|
||||||
const UList<T>& allValues,
|
const UList<T>& allValues,
|
||||||
const label comm,
|
const label communicator,
|
||||||
[[maybe_unused]] const int tag
|
[[maybe_unused]] const int tag
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
if constexpr (is_contiguous_v<T>)
|
if (!UPstream::is_parallel(communicator))
|
||||||
|
{
|
||||||
|
// non-parallel: return first value
|
||||||
|
// TBD: only when UPstream::is_rank(communicator) as well?
|
||||||
|
|
||||||
|
if (!allValues.empty())
|
||||||
|
{
|
||||||
|
return allValues[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
return T{}; // Fallback value
|
||||||
|
}
|
||||||
|
else if constexpr (is_contiguous_v<T>)
|
||||||
{
|
{
|
||||||
// UPstream version is contiguous only
|
// UPstream version is contiguous only
|
||||||
return UPstream::listScatterValues(allValues, comm);
|
return UPstream::listScatterValues(allValues, communicator);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
T localValue{};
|
// Standard scatter (one to all)
|
||||||
|
|
||||||
if (UPstream::is_parallel(comm))
|
T localValue{};
|
||||||
{
|
|
||||||
const label numProc = UPstream::nProcs(comm);
|
|
||||||
|
|
||||||
if (UPstream::master(comm) && allValues.size() < numProc)
|
if (UPstream::master(communicator))
|
||||||
{
|
{
|
||||||
FatalErrorInFunction
|
const label numProc = UPstream::nProcs(communicator);
|
||||||
<< "Attempting to send " << allValues.size()
|
|
||||||
<< " values to " << numProc << " processors" << endl
|
|
||||||
<< Foam::abort(FatalError);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (UPstream::master(comm))
|
if (allValues.size() < numProc)
|
||||||
{
|
{
|
||||||
const label startOfRequests = UPstream::nRequests();
|
FatalErrorInFunction
|
||||||
|
<< "Attempting to send " << allValues.size()
|
||||||
|
<< " values to " << numProc << " processors" << endl
|
||||||
|
<< Foam::abort(FatalError);
|
||||||
|
}
|
||||||
|
|
||||||
List<DynamicList<char>> sendBuffers(numProc);
|
const label startOfRequests = UPstream::nRequests();
|
||||||
|
|
||||||
for (int proci = 1; proci < numProc; ++proci)
|
List<DynamicList<char>> sendBuffers(numProc);
|
||||||
{
|
|
||||||
UOPstream toProc
|
|
||||||
(
|
|
||||||
UPstream::commsTypes::nonBlocking,
|
|
||||||
proci,
|
|
||||||
sendBuffers[proci],
|
|
||||||
tag,
|
|
||||||
comm
|
|
||||||
);
|
|
||||||
toProc << allValues[proci];
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for outstanding requests
|
for (const int proci : UPstream::subProcs(communicator))
|
||||||
UPstream::waitRequests(startOfRequests);
|
{
|
||||||
|
UOPstream toProc
|
||||||
|
(
|
||||||
|
UPstream::commsTypes::nonBlocking,
|
||||||
|
proci,
|
||||||
|
sendBuffers[proci],
|
||||||
|
tag,
|
||||||
|
communicator
|
||||||
|
);
|
||||||
|
toProc << allValues[proci];
|
||||||
|
}
|
||||||
|
|
||||||
return allValues[0];
|
// Wait for outstanding requests
|
||||||
}
|
UPstream::waitRequests(startOfRequests);
|
||||||
else if (UPstream::is_rank(comm))
|
|
||||||
{
|
|
||||||
IPstream::recv(localValue, UPstream::masterNo(), tag, comm);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// non-parallel: return first value
|
|
||||||
// TBD: only when UPstream::is_rank(comm) as well?
|
|
||||||
|
|
||||||
if (!allValues.empty())
|
return allValues[0];
|
||||||
{
|
}
|
||||||
return allValues[0];
|
else if (UPstream::is_rank(communicator))
|
||||||
}
|
{
|
||||||
}
|
IPstream::recv(localValue, UPstream::masterNo(), tag, communicator);
|
||||||
|
}
|
||||||
|
|
||||||
return localValue;
|
return localValue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1 @@
|
|||||||
|
#warning File removed - left for old dependency check only
|
||||||
@ -20,6 +20,8 @@ Description
|
|||||||
#ifndef FoamCompat_PstreamCombineReduceOps_H
|
#ifndef FoamCompat_PstreamCombineReduceOps_H
|
||||||
#define FoamCompat_PstreamCombineReduceOps_H
|
#define FoamCompat_PstreamCombineReduceOps_H
|
||||||
|
|
||||||
|
#warning Deprecated header
|
||||||
|
|
||||||
#include "Pstream.H"
|
#include "Pstream.H"
|
||||||
#include "ops.H"
|
#include "ops.H"
|
||||||
|
|
||||||
@ -32,6 +34,7 @@ namespace Foam
|
|||||||
|
|
||||||
//- Compatibility wrapper for Pstream::combineReduce
|
//- Compatibility wrapper for Pstream::combineReduce
|
||||||
template<class T, class CombineOp>
|
template<class T, class CombineOp>
|
||||||
|
FOAM_DEPRECATED_FOR(2022-08, "Pstream::combineReduce()")
|
||||||
void combineReduce
|
void combineReduce
|
||||||
(
|
(
|
||||||
T& value,
|
T& value,
|
||||||
@ -455,7 +455,7 @@ Foam::labelList Foam::hexMeshSmootherMotionSolver::countZeroOrPos
|
|||||||
const labelList& elems
|
const labelList& elems
|
||||||
) const
|
) const
|
||||||
{
|
{
|
||||||
labelList n(size, 0);
|
labelList n(size, Zero);
|
||||||
for (const label elem : elems)
|
for (const label elem : elems)
|
||||||
{
|
{
|
||||||
if (elem >= 0)
|
if (elem >= 0)
|
||||||
@ -463,8 +463,8 @@ Foam::labelList Foam::hexMeshSmootherMotionSolver::countZeroOrPos
|
|||||||
n[elem]++;
|
n[elem]++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Pstream::listCombineGather(n, plusEqOp<label>());
|
|
||||||
Pstream::broadcast(n);
|
Pstream::listCombineReduce(n, plusEqOp<label>());
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user