Merge branch 'cleanup-Pstream' into 'develop'

Remove obsolete Pstream functions

See merge request Development/openfoam!664
This commit is contained in:
Mattijs Janssens
2024-02-01 13:52:49 +00:00
11 changed files with 389 additions and 730 deletions

View File

@ -1,3 +1,3 @@
Test-gatherValues1.C
Test-gatherValues1.cxx
EXE = $(FOAM_USER_APPBIN)/Test-gatherValues1

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2021-2023 OpenCFD Ltd.
Copyright (C) 2021-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -52,7 +52,7 @@ int main(int argc, char *argv[])
const labelList localValues
(
identity(2 *(Pstream::myProcNo()+1), -5*Pstream::myProcNo())
identity(2 *(UPstream::myProcNo()+1), -5*UPstream::myProcNo())
);
// Test resize
@ -76,8 +76,8 @@ int main(int argc, char *argv[])
// One-sided sizing! master only
const globalIndex allProcAddr
(
sendData.size(),
globalIndex::gatherOnly{}
globalIndex::gatherOnly{},
sendData.size()
);
Pout<< "listGather sizes: " << flatOutput(allProcAddr.sizes()) << nl;
@ -98,8 +98,8 @@ int main(int argc, char *argv[])
// One-sided sizing! master only
const globalIndex allProcAddr
(
sendData.size(),
globalIndex::gatherOnly{}
globalIndex::gatherOnly{},
sendData.size()
);
Pout<< "listGather sizes: " << flatOutput(allProcAddr.sizes()) << nl;
@ -116,7 +116,7 @@ int main(int argc, char *argv[])
{
const labelList::subList& sendData =
(
Pstream::master()
UPstream::master()
? SubList<label>(localValues, 0) // exclude
: SubList<label>(localValues)
);
@ -147,11 +147,11 @@ int main(int argc, char *argv[])
<< UPstream::listScatterValues(subProcAddr.offsets()) << nl;
Pout<< endl << "local list [" << Pstream::myProcNo() << "] "
Pout<< endl << "local list [" << UPstream::myProcNo() << "] "
<< flatOutput(localValues) << nl;
Pout<< endl << "local send [" << Pstream::myProcNo() << "] "
Pout<< endl << "local send [" << UPstream::myProcNo() << "] "
<< sendSize << nl;
@ -163,7 +163,7 @@ int main(int argc, char *argv[])
Pout<< "off-proc: " << allValues << endl;
if (Pstream::master())
if (UPstream::master())
{
Info<< "master: " << flatOutput(localValues) << nl;
@ -196,7 +196,7 @@ int main(int argc, char *argv[])
{
globalIndex glob
(
globalIndex:gatherNone{},
globalIndex::gatherNone{},
labelList(Foam::one{}, 0)
);
Info<< "single:" << nl;
@ -208,35 +208,37 @@ int main(int argc, char *argv[])
}
}
// This will likely fail - not declared as is_contiguous
// Cannot even catch since it triggers an abort()
#if 0
// Non-contiguous gather - use Pstream, not UPstream!
{
std::pair<label,vector> sendData(Pstream::myProcNo(), vector::one);
typedef std::pair<label,vector> valueType;
const bool oldThrowingError = FatalError.throwing(true);
valueType sendData(UPstream::myProcNo(), vector::one);
try
{
List<std::pair<label,vector>> countValues
(
UPstream::listGatherValues<std::pair<label, vector>>
(
sendData
)
);
List<valueType> countValues
(
Pstream::listGatherValues(sendData)
);
Pout<< "listGather: " << flatOutput(countValues) << nl;
}
catch (const Foam::error& err)
{
Info<< err.message().c_str() << nl;
}
FatalError.throwing(oldThrowingError);
Pout<< "listGather: " << flatOutput(countValues) << nl;
}
// Non-contiguous scatter - use Pstream, not UPstream!
{
List<fileName> allValues;
if (UPstream::master())
{
allValues.resize(UPstream::nProcs());
forAll(allValues, proci)
{
allValues[proci] = "processor" + Foam::name(proci);
}
}
fileName procName = Pstream::listScatterValues(allValues);
Pout<< "listScatter: " << procName << nl;
}
#endif
Info<< "\nEnd\n" << endl;

View File

@ -47,9 +47,6 @@ SourceFiles
#include "UPstream.H"
#include "DynamicList.H"
// Legacy
// #define Foam_Pstream_scatter_nobroadcast
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
@ -130,22 +127,9 @@ public:
// Gather
//- Gather (reduce) data, appyling \c bop to combine \c value
//- from different processors. The basis for Foam::reduce().
// Uses the specified communication schedule.
template<class T, class BinaryOp>
static void gather
(
const List<commsStruct>& comms,
T& value,
const BinaryOp& bop,
const int tag,
const label comm
);
//- Gather (reduce) data, applying \c bop to combine \c value
//- from different processors. The basis for Foam::reduce().
// Uses linear/tree communication.
// Uses linear/tree communication (with parallel guard).
template<class T, class BinaryOp>
static void gather
(
@ -155,6 +139,34 @@ public:
const label comm = UPstream::worldComm
);
//- Gather individual values into list locations.
// On master list length == nProcs, otherwise zero length.
// \n
// For \b non-parallel :
// the returned list length is 1 with localValue.
template<class T>
static List<T> listGatherValues
(
const T& localValue,
const label comm = UPstream::worldComm,
//! Only used for non-contiguous types
const int tag = UPstream::msgType()
);
//- Scatter individual values from list locations.
// On master input list length == nProcs, ignored on other procs.
// \n
// For \b non-parallel :
// returns the first list element (or default initialized).
template<class T>
static T listScatterValues
(
const UList<T>& allValues,
const label comm = UPstream::worldComm,
//! Only used for non-contiguous types
const int tag = UPstream::msgType()
);
// Gather/combine data
// Inplace combine values from processors.
@ -162,39 +174,10 @@ public:
//- Gather data, applying \c cop to inplace combine \c value
//- from different processors.
// Uses the specified communication schedule.
// Uses linear/tree communication (with parallel guard).
template<class T, class CombineOp>
static void combineGather
(
const List<commsStruct>& comms,
T& value,
const CombineOp& cop,
const int tag,
const label comm
);
//- Gather data, applying \c cop to inplace combine \c value
//- from different processors.
// Uses linear/tree communication.
template<class T, class CombineOp>
static void combineGather
(
T& value,
const CombineOp& cop,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
//- Reduce inplace (cf. MPI Allreduce)
//- applying \c cop to inplace combine \c value
//- from different processors.
//- After completion all processors have the same data.
// Uses the specified communication schedule.
// Wraps combineGather/broadcast (may change in the future).
template<class T, class CombineOp>
static void combineReduce
(
const List<commsStruct>& comms,
T& value,
const CombineOp& cop,
const int tag = UPstream::msgType(),
@ -232,17 +215,8 @@ public:
// Combine variants working on whole List at a time.
template<class T, class CombineOp>
static void listCombineGather
(
const List<commsStruct>& comms,
List<T>& values,
const CombineOp& cop,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
//- Combines List elements.
// Uses linear/tree communication (with parallel guard).
template<class T, class CombineOp>
static void listCombineGather
(
@ -252,7 +226,9 @@ public:
const label comm = UPstream::worldComm
);
//- Combines List elements.
//- After completion all processors have the same data.
// Uses linear/tree communication (with parallel guard).
template<class T, class CombineOp>
static void listCombineReduce
(
@ -279,17 +255,8 @@ public:
// Combine variants working on whole map at a time.
// Container needs iterators, find() and insert methods defined.
template<class Container, class CombineOp>
static void mapCombineGather
(
const List<commsStruct>& comms,
Container& values,
const CombineOp& cop,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
//- Combine Map elements.
// Uses linear/tree communication (with parallel guard).
template<class Container, class CombineOp>
static void mapCombineGather
(
@ -355,7 +322,7 @@ public:
);
//- Gather data, but keep individual values separate.
//- Uses linear/tree communication.
//- Uses MPI_Allgather or manual linear/tree communication.
// After completion all processors have the same data.
// Wraps gatherList/scatterList (may change in the future).
template<class T>
@ -369,91 +336,43 @@ public:
// Scatter
//- Broadcast data: Distribute without modification.
// \note comms and tag parameters only used when
// Foam_Pstream_scatter_nobroadcast is defined
template<class T>
static void scatter
(
const List<commsStruct>& comms,
T& value,
const int tag,
const label comm
);
//- Broadcast data: Distribute without modification.
// \note tag parameter only used when
// Foam_Pstream_scatter_nobroadcast is defined
//- Broadcast data
template<class T>
FOAM_DEPRECATED_FOR(2024-01, "broadcast()")
static void scatter
(
T& value,
const int tag = UPstream::msgType(),
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm
);
//- Broadcast data: Distribute without modification.
// \note tag parameter only used when
// Foam_Pstream_scatter_nobroadcast is defined
template<class T>
static void combineScatter
(
const List<commsStruct>& comms,
T& value,
const int tag,
const label comm
);
//- Broadcast data: Distribute without modification.
// \note tag parameter only used when
// Foam_Pstream_scatter_nobroadcast is defined
//- Broadcast data
template<class T>
FOAM_DEPRECATED_FOR(2024-01, "broadcast()")
static void combineScatter
(
T& value,
const int tag = UPstream::msgType(),
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm
);
//- Broadcast data: Distribute without modification.
// \note comms and tag parameters only used when
// Foam_Pstream_scatter_nobroadcast is defined
template<class T>
static void listCombineScatter
(
const List<commsStruct>& comms,
List<T>& value,
const int tag,
const label comm
);
//- Broadcast data: Distribute without modification.
// \note comms and tag parameters only used when
// Foam_Pstream_scatter_nobroadcast is defined
//- Broadcast data
template<class T>
FOAM_DEPRECATED_FOR(2024-01, "broadcast()")
static void listCombineScatter
(
List<T>& value,
const int tag = UPstream::msgType(),
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm
);
//- Broadcast data: Distribute without modification.
template<class Container>
static void mapCombineScatter
(
const List<commsStruct>& comms,
Container& values,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
//- Broadcast data
template<class Container>
FOAM_DEPRECATED_FOR(2024-01, "broadcast()")
static void mapCombineScatter
(
Container& values,
const int tag = UPstream::msgType(),
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm
);

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022-2023 OpenCFD Ltd.
Copyright (C) 2022-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -137,4 +137,28 @@ void Foam::Pstream::broadcastList(ListType& list, const label comm)
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// Convenience wrappers - defined after all specialisations are known
namespace Foam
{
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
//- Return a broadcasted value (uses a copy internally)
template<class Type>
Type returnBroadcast(const Type& value, const label comm)
{
Type work(value);
Pstream::broadcast(work, comm);
return work;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2019-2023 OpenCFD Ltd.
Copyright (C) 2019-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -25,7 +25,7 @@ License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Description
Variant of gather, scatter.
Variant of gather.
Normal gather uses:
- default construct and read (>>) from Istream
- binary operator and assignment operator to combine values
@ -46,7 +46,6 @@ Description
template<class T, class CombineOp>
void Foam::Pstream::combineGather
(
const List<UPstream::commsStruct>& comms,
T& value,
const CombineOp& cop,
const int tag,
@ -55,8 +54,10 @@ void Foam::Pstream::combineGather
{
if (UPstream::is_parallel(comm))
{
// My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Communication order
const auto& comms = UPstream::whichCommunication(comm);
// if (comms.empty()) return; // extra safety?
const auto& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours
for (const label belowID : myComm.below())
@ -89,7 +90,7 @@ void Foam::Pstream::combineGather
(
UPstream::commsTypes::scheduled,
belowID,
0,
0, // bufsize
tag,
comm
);
@ -132,7 +133,7 @@ void Foam::Pstream::combineGather
(
UPstream::commsTypes::scheduled,
myComm.above(),
0,
0, // bufsize
tag,
comm
);
@ -143,144 +144,6 @@ void Foam::Pstream::combineGather
}
template<class T>
void Foam::Pstream::combineScatter
(
const List<UPstream::commsStruct>& comms,
T& value,
const int tag,
const label comm
)
{
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(value, comm);
#else
if (UPstream::is_parallel(comm))
{
// My communication order
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from up
if (myComm.above() != -1)
{
if (is_contiguous<T>::value)
{
UIPstream::read
(
UPstream::commsTypes::scheduled,
myComm.above(),
reinterpret_cast<char*>(&value),
sizeof(T),
tag,
comm
);
}
else
{
IPstream fromAbove
(
UPstream::commsTypes::scheduled,
myComm.above(),
0,
tag,
comm
);
value = T(fromAbove);
}
}
// Send to my downstairs neighbours
forAllReverse(myComm.below(), belowI)
{
const label belowID = myComm.below()[belowI];
if (is_contiguous<T>::value)
{
UOPstream::write
(
UPstream::commsTypes::scheduled,
belowID,
reinterpret_cast<const char*>(&value),
sizeof(T),
tag,
comm
);
}
else
{
OPstream toBelow
(
UPstream::commsTypes::scheduled,
belowID,
0,
tag,
comm
);
toBelow << value;
}
}
}
#endif
}
template<class T, class CombineOp>
void Foam::Pstream::combineGather
(
T& value,
const CombineOp& cop,
const int tag,
const label comm
)
{
Pstream::combineGather
(
UPstream::whichCommunication(comm),
value,
cop,
tag,
comm
);
}
template<class T>
void Foam::Pstream::combineScatter
(
T& value,
const int tag,
const label comm
)
{
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(value, comm);
#else
Pstream::combineScatter
(
UPstream::whichCommunication(comm),
value,
tag,
comm
);
#endif
}
template<class T, class CombineOp>
void Foam::Pstream::combineReduce
(
const List<UPstream::commsStruct>& comms,
T& value,
const CombineOp& cop,
const int tag,
const label comm
)
{
Pstream::combineGather(comms, value, cop, tag, comm);
Pstream::broadcast(value, comm);
}
template<class T, class CombineOp>
void Foam::Pstream::combineReduce
(
@ -292,9 +155,7 @@ void Foam::Pstream::combineReduce
{
if (UPstream::is_parallel(comm))
{
const auto& comms = UPstream::whichCommunication(comm);
Pstream::combineGather(comms, value, cop, tag, comm);
Pstream::combineGather(value, cop, tag, comm);
Pstream::broadcast(value, comm);
}
}
@ -305,7 +166,6 @@ void Foam::Pstream::combineReduce
template<class T, class CombineOp>
void Foam::Pstream::listCombineGather
(
const List<UPstream::commsStruct>& comms,
List<T>& values,
const CombineOp& cop,
const int tag,
@ -314,8 +174,10 @@ void Foam::Pstream::listCombineGather
{
if (UPstream::is_parallel(comm))
{
// My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Communication order
const auto& comms = UPstream::whichCommunication(comm);
// if (comms.empty()) return; // extra safety?
const auto& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours
for (const label belowID : myComm.below())
@ -351,7 +213,7 @@ void Foam::Pstream::listCombineGather
(
UPstream::commsTypes::scheduled,
belowID,
0,
0, // bufsize
tag,
comm
);
@ -397,7 +259,7 @@ void Foam::Pstream::listCombineGather
(
UPstream::commsTypes::scheduled,
myComm.above(),
0,
0, // bufsize
tag,
comm
);
@ -408,129 +270,6 @@ void Foam::Pstream::listCombineGather
}
template<class T>
void Foam::Pstream::listCombineScatter
(
const List<UPstream::commsStruct>& comms,
List<T>& values,
const int tag,
const label comm
)
{
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(values, comm);
#else
if (UPstream::is_parallel(comm))
{
// My communication order
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from up
if (myComm.above() != -1)
{
if (is_contiguous<T>::value)
{
UIPstream::read
(
UPstream::commsTypes::scheduled,
myComm.above(),
values.data_bytes(),
values.size_bytes(),
tag,
comm
);
}
else
{
IPstream fromAbove
(
UPstream::commsTypes::scheduled,
myComm.above(),
0,
tag,
comm
);
fromAbove >> values;
}
}
// Send to my downstairs neighbours
forAllReverse(myComm.below(), belowI)
{
const label belowID = myComm.below()[belowI];
if (is_contiguous<T>::value)
{
UOPstream::write
(
UPstream::commsTypes::scheduled,
belowID,
values.cdata_bytes(),
values.size_bytes(),
tag,
comm
);
}
else
{
OPstream toBelow
(
UPstream::commsTypes::scheduled,
belowID,
0,
tag,
comm
);
toBelow << values;
}
}
}
#endif
}
template<class T, class CombineOp>
void Foam::Pstream::listCombineGather
(
List<T>& values,
const CombineOp& cop,
const int tag,
const label comm
)
{
Pstream::listCombineGather
(
UPstream::whichCommunication(comm),
values,
cop,
tag,
comm
);
}
template<class T>
void Foam::Pstream::listCombineScatter
(
List<T>& values,
const int tag,
const label comm
)
{
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(values, comm);
#else
Pstream::listCombineScatter
(
UPstream::whichCommunication(comm),
values,
tag,
comm
);
#endif
}
template<class T, class CombineOp>
void Foam::Pstream::listCombineReduce
(
@ -542,9 +281,7 @@ void Foam::Pstream::listCombineReduce
{
if (UPstream::is_parallel(comm))
{
const auto& comms = UPstream::whichCommunication(comm);
Pstream::listCombineGather(comms, values, cop, tag, comm);
Pstream::listCombineGather(values, cop, tag, comm);
Pstream::broadcast(values, comm);
}
}
@ -555,7 +292,6 @@ void Foam::Pstream::listCombineReduce
template<class Container, class CombineOp>
void Foam::Pstream::mapCombineGather
(
const List<UPstream::commsStruct>& comms,
Container& values,
const CombineOp& cop,
const int tag,
@ -564,8 +300,10 @@ void Foam::Pstream::mapCombineGather
{
if (UPstream::is_parallel(comm))
{
// My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Communication order
const auto& comms = UPstream::whichCommunication(comm);
// if (comms.empty()) return; // extra safety?
const auto& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours
for (const label belowID : myComm.below())
@ -576,7 +314,7 @@ void Foam::Pstream::mapCombineGather
(
UPstream::commsTypes::scheduled,
belowID,
0,
0, // bufsize
tag,
comm
);
@ -623,7 +361,7 @@ void Foam::Pstream::mapCombineGather
(
UPstream::commsTypes::scheduled,
myComm.above(),
0,
0, // bufsize
tag,
comm
);
@ -633,110 +371,6 @@ void Foam::Pstream::mapCombineGather
}
template<class Container>
void Foam::Pstream::mapCombineScatter
(
const List<UPstream::commsStruct>& comms,
Container& values,
const int tag,
const label comm
)
{
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(values, comm);
#else
if (UPstream::is_parallel(comm))
{
// My communication order
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from up
if (myComm.above() != -1)
{
IPstream fromAbove
(
UPstream::commsTypes::scheduled,
myComm.above(),
0,
tag,
comm
);
fromAbove >> values;
if (debug & 2)
{
Pout<< " received from "
<< myComm.above() << " data:" << values << endl;
}
}
// Send to my downstairs neighbours
forAllReverse(myComm.below(), belowI)
{
const label belowID = myComm.below()[belowI];
if (debug & 2)
{
Pout<< " sending to " << belowID << " data:" << values << endl;
}
OPstream toBelow
(
UPstream::commsTypes::scheduled,
belowID,
0,
tag,
comm
);
toBelow << values;
}
}
#endif
}
template<class Container, class CombineOp>
void Foam::Pstream::mapCombineGather
(
Container& values,
const CombineOp& cop,
const int tag,
const label comm
)
{
Pstream::mapCombineGather
(
UPstream::whichCommunication(comm),
values,
cop,
tag,
comm
);
}
template<class Container>
void Foam::Pstream::mapCombineScatter
(
Container& values,
const int tag,
const label comm
)
{
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(values, comm);
#else
Pstream::mapCombineScatter
(
UPstream::whichCommunication(comm),
values,
tag,
comm
);
#endif
}
template<class Container, class CombineOp>
void Foam::Pstream::mapCombineReduce
(
@ -748,9 +382,7 @@ void Foam::Pstream::mapCombineReduce
{
if (UPstream::is_parallel(comm))
{
const auto& comms = UPstream::whichCommunication(comm);
Pstream::mapCombineGather(comms, values, cop, tag, comm);
Pstream::mapCombineGather(values, cop, tag, comm);
Pstream::broadcast(values, comm);
}
}

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2019-2022 OpenCFD Ltd.
Copyright (C) 2019-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -41,7 +41,6 @@ Description
template<class T, class BinaryOp>
void Foam::Pstream::gather
(
const List<UPstream::commsStruct>& comms,
T& value,
const BinaryOp& bop,
const int tag,
@ -50,8 +49,10 @@ void Foam::Pstream::gather
{
if (UPstream::is_parallel(comm))
{
// My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Communication order
const auto& comms = UPstream::whichCommunication(comm);
// if (comms.empty()) return; // extra safety?
const auto& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours
for (const label belowID : myComm.below())
@ -76,7 +77,7 @@ void Foam::Pstream::gather
(
UPstream::commsTypes::scheduled,
belowID,
0,
0, // bufsize
tag,
comm
);
@ -107,7 +108,7 @@ void Foam::Pstream::gather
(
UPstream::commsTypes::scheduled,
myComm.above(),
0,
0, // bufsize
tag,
comm
);
@ -119,110 +120,181 @@ void Foam::Pstream::gather
template<class T>
void Foam::Pstream::scatter
Foam::List<T> Foam::Pstream::listGatherValues
(
const List<UPstream::commsStruct>& comms,
T& value,
const int tag,
const label comm
const T& localValue,
const label comm,
const int tag
)
{
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(value, comm);
#else
// OR
// if (is_contiguous<T>::value)
// {
// return UPstream::listGatherValues(localValue, comm);
// }
List<T> allValues;
if (UPstream::is_parallel(comm))
{
// My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
const label numProc = UPstream::nProcs(comm);
// Receive from up
if (myComm.above() != -1)
if (UPstream::master(comm))
{
if (is_contiguous<T>::value)
{
UIPstream::read
(
UPstream::commsTypes::scheduled,
myComm.above(),
reinterpret_cast<char*>(&value),
sizeof(T),
tag,
comm
);
}
else
{
IPstream fromAbove
(
UPstream::commsTypes::scheduled,
myComm.above(),
0,
tag,
comm
);
fromAbove >> value;
}
allValues.resize(numProc);
}
// Send to my downstairs neighbours. Note reverse order (compared to
// receiving). This is to make sure to send to the critical path
// (only when using a tree schedule!) first.
forAllReverse(myComm.below(), belowI)
if (is_contiguous<T>::value)
{
const label belowID = myComm.below()[belowI];
UPstream::mpiGather
(
reinterpret_cast<const char*>(&localValue),
allValues.data_bytes(),
sizeof(T), // The send/recv size per rank
comm
);
}
else
{
if (UPstream::master(comm))
{
// Non-trivial to manage non-blocking gather without a
// PEX/NBX approach (eg, PstreamBuffers) but leave with
// with simple exchange for now
if (is_contiguous<T>::value)
{
UOPstream::write
(
UPstream::commsTypes::scheduled,
belowID,
reinterpret_cast<const char*>(&value),
sizeof(T),
tag,
comm
);
allValues[0] = localValue;
for (int proci = 1; proci < numProc; ++proci)
{
IPstream fromProc
(
UPstream::commsTypes::scheduled,
proci,
0, // bufsize
tag,
comm
);
fromProc >> allValues[proci];
}
}
else
else if (UPstream::is_rank(comm))
{
OPstream toBelow
OPstream toProc
(
UPstream::commsTypes::scheduled,
belowID,
0,
UPstream::masterNo(),
0, // bufsize
tag,
comm
);
toBelow << value;
toProc << localValue;
}
}
}
#endif
}
else
{
// non-parallel: return own value
// TBD: only when UPstream::is_rank(comm) as well?
allValues.resize(1);
allValues[0] = localValue;
}
template<class T, class BinaryOp>
void Foam::Pstream::gather
(
T& value,
const BinaryOp& bop,
const int tag,
const label comm
)
{
Pstream::gather(UPstream::whichCommunication(comm), value, bop, tag, comm);
return allValues;
}
template<class T>
void Foam::Pstream::scatter(T& value, const int tag, const label comm)
T Foam::Pstream::listScatterValues
(
const UList<T>& allValues,
const label comm,
const int tag
)
{
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(value, comm);
#else
Pstream::scatter(UPstream::whichCommunication(comm), value, tag, comm);
#endif
// OR
// if (is_contiguous<T>::value)
// {
// return UPstream::listScatterValues(allValues, comm);
// }
T localValue{};
if (UPstream::is_parallel(comm))
{
const label numProc = UPstream::nProcs(comm);
if (UPstream::master(comm) && allValues.size() < numProc)
{
FatalErrorInFunction
<< "Attempting to send " << allValues.size()
<< " values to " << numProc << " processors" << endl
<< Foam::abort(FatalError);
}
if (is_contiguous<T>::value)
{
UPstream::mpiScatter
(
allValues.cdata_bytes(),
reinterpret_cast<char*>(&localValue),
sizeof(T), // The send/recv size per rank
comm
);
}
else
{
if (UPstream::master(comm))
{
const label startOfRequests = UPstream::nRequests();
List<DynamicList<char>> sendBuffers(numProc);
for (int proci = 1; proci < numProc; ++proci)
{
UOPstream toProc
(
UPstream::commsTypes::nonBlocking,
proci,
sendBuffers[proci],
tag,
comm
);
toProc << allValues[proci];
}
// Wait for outstanding requests
UPstream::waitRequests(startOfRequests);
return allValues[0];
}
else if (UPstream::is_rank(comm))
{
IPstream fromProc
(
UPstream::commsTypes::scheduled,
UPstream::masterNo(),
0, // bufsize
tag,
comm
);
fromProc >> localValue;
}
}
}
else
{
// non-parallel: return first value
// TBD: only when UPstream::is_rank(comm) as well?
if (!allValues.empty())
{
return allValues[0];
}
}
return localValue;
}
// ************************************************************************* //

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2015-2023 OpenCFD Ltd.
Copyright (C) 2015-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -51,7 +51,7 @@ void Foam::Pstream::gatherList
const label comm
)
{
if (UPstream::is_parallel(comm))
if (!comms.empty() && UPstream::is_parallel(comm))
{
if (values.size() < UPstream::nProcs(comm))
{
@ -62,7 +62,7 @@ void Foam::Pstream::gatherList
}
// My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
const auto& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours
for (const label belowID : myComm.below())
@ -199,7 +199,7 @@ void Foam::Pstream::scatterList
// between scatterList() and using broadcast(List<T>&) or a regular
// scatter(List<T>&) is that processor-local data is skipped.
if (UPstream::is_parallel(comm))
if (!comms.empty() && UPstream::is_parallel(comm))
{
if (values.size() < UPstream::nProcs(comm))
{
@ -210,7 +210,7 @@ void Foam::Pstream::scatterList
}
// My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
const auto& myComm = comms[UPstream::myProcNo(comm)];
// Receive from up
if (myComm.above() != -1)
@ -323,7 +323,13 @@ void Foam::Pstream::gatherList
const label comm
)
{
Pstream::gatherList(UPstream::whichCommunication(comm), values, tag, comm);
Pstream::gatherList
(
UPstream::whichCommunication(comm),
values,
tag,
comm
);
}
@ -336,7 +342,13 @@ void Foam::Pstream::scatterList
const label comm
)
{
Pstream::scatterList(UPstream::whichCommunication(comm), values, tag, comm);
Pstream::scatterList
(
UPstream::whichCommunication(comm),
values,
tag,
comm
);
}

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2016-2023 OpenCFD Ltd.
Copyright (C) 2016-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -46,28 +46,6 @@ namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
//- Reduce inplace (cf. MPI Allreduce)
//- using specified communication schedule.
template<class T, class BinaryOp>
void reduce
(
const List<UPstream::commsStruct>& comms,
T& value,
const BinaryOp& bop,
const int tag,
const label comm
)
{
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
{
Pout<< "** reducing:" << value << " with comm:" << comm << endl;
error::printStack(Pout);
}
Pstream::gather(comms, value, bop, tag, comm);
Pstream::broadcast(value, comm);
}
//- Reduce inplace (cf. MPI Allreduce)
//- using linear/tree communication schedule
template<class T, class BinaryOp>
@ -81,7 +59,13 @@ void reduce
{
if (UPstream::is_parallel(comm))
{
Foam::reduce(UPstream::whichCommunication(comm), value, bop, tag, comm);
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
{
Pout<< "** reducing:" << value << " with comm:" << comm << endl;
error::printStack(Pout);
}
Pstream::gather(value, bop, tag, comm);
Pstream::broadcast(value, comm);
}
}
@ -436,8 +420,7 @@ Pstream_SumReduce(double);
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// Convenience wrappers for some reduction operations
// - defined after all specialisations are known
// Convenience wrappers - defined after all specialisations are known
//- Perform reduction on a copy, using specified binary operation
// \return the resulting value

View File

@ -52,6 +52,7 @@ Foam::UPstream::commsTypeNames
({
{ commsTypes::blocking, "blocking" },
{ commsTypes::scheduled, "scheduled" },
// { commsTypes::nonBlocking, "non-blocking" },
{ commsTypes::nonBlocking, "nonBlocking" },
});

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2015-2023 OpenCFD Ltd.
Copyright (C) 2015-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -895,21 +895,32 @@ public:
);
//- Communication schedule for tree all-to-master (proc 0)
static const List<commsStruct>& treeCommunication
static const List<commsStruct>&
treeCommunication
(
const label communicator = worldComm
);
//- Communication schedule for linear/tree all-to-master (proc 0).
//- Chooses based on the value of UPstream::nProcsSimpleSum
//- Communication schedule for all-to-master (proc 0) as
//- linear/tree/none with switching based on UPstream::nProcsSimpleSum
//- and the is_parallel() state
static const List<commsStruct>& whichCommunication
(
const label communicator = worldComm
)
{
const label np
(
parRun_ && is_rank(communicator) // cf. is_parallel()
? nProcs(communicator)
: 0
);
return
(
nProcs(communicator) < nProcsSimpleSum
np <= 1
? List<commsStruct>::null()
: np < nProcsSimpleSum
? linearCommunication(communicator)
: treeCommunication(communicator)
);
@ -1138,7 +1149,7 @@ public:
// On master input list length == nProcs, ignored on other procs.
// \n
// For \b non-parallel :
// returns the first list element (or zero).
// returns the first list element (or default initialized).
template<class T>
static T listScatterValues
(

View File

@ -34,14 +34,6 @@ Foam::List<T> Foam::UPstream::allGatherValues
const label comm
)
{
if (!is_contiguous<T>::value)
{
FatalErrorInFunction
<< "Cannot all-gather values for non-contiguous types" << endl
<< Foam::abort(FatalError);
}
List<T> allValues;
if (UPstream::is_parallel(comm))
@ -49,7 +41,17 @@ Foam::List<T> Foam::UPstream::allGatherValues
allValues.resize(UPstream::nProcs(comm));
allValues[UPstream::myProcNo(comm)] = localValue;
UPstream::mpiAllGather(allValues.data_bytes(), sizeof(T), comm);
if (is_contiguous<T>::value)
{
UPstream::mpiAllGather(allValues.data_bytes(), sizeof(T), comm);
}
else
{
FatalErrorInFunction
<< "Cannot all-gather values for non-contiguous types"
" - consider Pstream variant instead" << endl
<< Foam::abort(FatalError);
}
}
else
{
@ -70,14 +72,6 @@ Foam::List<T> Foam::UPstream::listGatherValues
const label comm
)
{
if (!is_contiguous<T>::value)
{
FatalErrorInFunction
<< "Cannot gather values for non-contiguous types" << endl
<< Foam::abort(FatalError);
}
List<T> allValues;
if (UPstream::is_parallel(comm))
@ -87,13 +81,23 @@ Foam::List<T> Foam::UPstream::listGatherValues
allValues.resize(UPstream::nProcs(comm));
}
UPstream::mpiGather
(
reinterpret_cast<const char*>(&localValue),
allValues.data_bytes(),
sizeof(T), // The send/recv size per rank
comm
);
if (is_contiguous<T>::value)
{
UPstream::mpiGather
(
reinterpret_cast<const char*>(&localValue),
allValues.data_bytes(),
sizeof(T), // The send/recv size per rank
comm
);
}
else
{
FatalErrorInFunction
<< "Cannot gather values for non-contiguous types"
" - consider Pstream variant instead" << endl
<< Foam::abort(FatalError);
}
}
else
{
@ -114,47 +118,46 @@ T Foam::UPstream::listScatterValues
const label comm
)
{
if (!is_contiguous<T>::value)
{
FatalErrorInFunction
<< "Cannot scatter values for non-contiguous types" << endl
<< Foam::abort(FatalError);
}
T localValue;
T localValue{};
if (UPstream::is_parallel(comm))
{
const label nproc = UPstream::nProcs(comm);
const label numProc = UPstream::nProcs(comm);
if (UPstream::master(comm) && allValues.size() < nproc)
if (UPstream::master(comm) && allValues.size() < numProc)
{
FatalErrorInFunction
<< "Attempting to send " << allValues.size()
<< " values to " << nproc << " processors" << endl
<< " values to " << numProc << " processors" << endl
<< Foam::abort(FatalError);
}
UPstream::mpiScatter
(
allValues.cdata_bytes(),
reinterpret_cast<char*>(&localValue),
sizeof(T), // The send/recv size per rank
comm
);
}
else
{
// non-parallel: return local value
if (UPstream::is_rank(comm) && !allValues.empty())
if (is_contiguous<T>::value)
{
localValue = allValues[0];
UPstream::mpiScatter
(
allValues.cdata_bytes(),
reinterpret_cast<char*>(&localValue),
sizeof(T), // The send/recv size per rank
comm
);
}
else
{
localValue = Zero;
FatalErrorInFunction
<< "Cannot scatter values for non-contiguous types"
" - consider Pstream variant instead" << endl
<< Foam::abort(FatalError);
}
}
else
{
// non-parallel: return first value
// TBD: only when UPstream::is_rank(comm) as well?
if (!allValues.empty())
{
return allValues[0];
}
}