ENH: extend/improve broadcast handling

- split off a Pstream::genericBroadcast() which uses UOPBstream during
  serialization and UOPBstream during de-serialization.
  This function will not normally be used directly by callers, but
  provides a base layer for higher-level broadcast calls.

- low-level UPstream broadcast of string content.
  Since std::string has length and contiguous content, it is possible
  to handle directly by the following:
     1. broadcast size
     2. resize
     3. broadcast content when size != 0

  Although this is a similar amount of communication as the generic
  streaming version (min 1, max 2 broadcasts) it is more efficient
  by avoiding serialization/de-serialization overhead.

- handle broadcast of List content distinctly.
  Allows an optimized path for contiguous data, similar to how
  std::string is handled (broadcast size, resize container, broadcast
  content when size != 0), but can revert to genericBroadcast (streamed)
  for non-contiguous data.

- make various scatter variants simple aliases for broadcast, since
  that is what they are doing behind the scenes anyhow:

    * scatter()
    * combineScatter()
    * listCombineScatter()
    * mapCombineScatter()

  Except scatterList() which remains somewhat different.
  Beyond the additional (size == nProcs) check, the only difference to
  using broadcast(List<T>&) or a regular scatter(List<T>&) is that
  processor-local data is skipped. So leave this variant as-is.

STYLE: rename/prefix implementation code with 'Pstream'

- better association with its purpose and provides a unique name
This commit is contained in:
Mark Olesen
2022-02-25 17:42:57 +01:00
committed by Andrew Heather
parent f3674eee36
commit c086f22298
16 changed files with 660 additions and 441 deletions

View File

@ -42,45 +42,38 @@ Description
using namespace Foam; using namespace Foam;
// This is what our new scatter will look like inside template<class T>
template<class Type> void printPre(const T& value)
void testBroadcastStream
(
Type& value,
const label comm = UPstream::worldComm
)
{ {
Info<< nl << "is_contiguous:" << is_contiguous<Type>::value << endl; Info<< nl << "is_contiguous:" << is_contiguous<T>::value << endl;
Pout<< "pre-broadcast: " << value << endl; Pout<< "pre-broadcast: " << value << endl;
}
if (is_contiguous<Type>::value) template<class T>
{ void printPost(const T& value)
UPstream::broadcast {
(
reinterpret_cast<char*>(&value),
sizeof(Type),
comm,
UPstream::masterNo()
);
}
else
{
if (UPstream::master())
{
OPBstream toAll(UPstream::masterNo(), comm);
toAll << value;
}
else
{
IPBstream fromMaster(UPstream::masterNo(), comm);
fromMaster >> value;
}
}
Pout<< "post-broadcast: " << value << endl; Pout<< "post-broadcast: " << value << endl;
} }
template<class T>
void testBroadcast(T& value)
{
printPre(value);
Pstream::broadcast(value);
printPost(value);
}
template<class T>
void testBroadcast(List<T>& values)
{
Info<< nl << "is_contiguous:" << is_contiguous<T>::value << endl;
Pout<< "pre-broadcast: " << flatOutput(values) << endl;
Pstream::broadcast(values);
Pout<< "post-broadcast: " << flatOutput(values) << endl;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
int main(int argc, char *argv[]) int main(int argc, char *argv[])
@ -95,7 +88,7 @@ int main(int argc, char *argv[])
{ {
value = UPstream::nProcs(); value = UPstream::nProcs();
} }
testBroadcastStream(value); testBroadcast(value);
} }
{ {
@ -104,7 +97,18 @@ int main(int argc, char *argv[])
{ {
values = identity(UPstream::nProcs()); values = identity(UPstream::nProcs());
} }
testBroadcastStream(values); testBroadcast(values);
}
{
word value;
if (Pstream::master())
{
value = args.executable();
}
printPre(value);
UPstream::broadcast(value); // Low-level UPstream broadcast
printPost(value);
} }
{ {
@ -117,7 +121,7 @@ int main(int argc, char *argv[])
values[i] = "value_" + Foam::name(i); values[i] = "value_" + Foam::name(i);
} }
} }
testBroadcastStream(values); testBroadcast(values);
} }
{ {
@ -126,7 +130,7 @@ int main(int argc, char *argv[])
{ {
values = vector(1,2,3); values = vector(1,2,3);
} }
testBroadcastStream(values); testBroadcast(values);
} }
{ {
@ -134,8 +138,15 @@ int main(int argc, char *argv[])
if (Pstream::master()) if (Pstream::master())
{ {
values = vector(1,2,3); values = vector(1,2,3);
scalar mult = 1;
for (auto& v : values)
{
v *= mult;
mult += 1;
}
} }
testBroadcastStream(values); testBroadcast(values);
} }
Info<< "End\n" << endl; Info<< "End\n" << endl;

View File

@ -32,10 +32,11 @@ Description
SourceFiles SourceFiles
Pstream.C Pstream.C
combineGatherScatter.C PstreamBroadcast.C
gatherScatter.C PstreamGather.C
gatherScatterList.C PstreamCombineGather.C
exchange.C PstreamGatherList.C
PstreamExchange.C
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
@ -70,11 +71,11 @@ class Pstream
static void exchangeContainer static void exchangeContainer
( (
const UList<Container>& sendBufs, const UList<Container>& sendBufs,
const labelUList& recvSizes, const labelUList& recvSizes, //!< Num of recv elements (not bytes)
List<Container>& recvBufs, List<Container>& recvBufs,
const int tag, const int tag,
const label comm, const label comm,
const bool block const bool block //!< Wait for all to finish
); );
//- Exchange contiguous data. Sends sendData, receives into //- Exchange contiguous data. Sends sendData, receives into
@ -83,13 +84,13 @@ class Pstream
template<class T> template<class T>
static void exchangeBuf static void exchangeBuf
( (
const labelUList& sendSizes, // number of T, not number of char const labelUList& sendSizes, //!< Num of send elements (not bytes)
const UList<const char*>& sendBufs, const UList<const char*>& sendBufs,
const labelUList& recvSizes, // number of T, not number of char const labelUList& recvSizes, //!< Num of recv elements (not bytes)
List<char*>& recvBufs, List<char*>& recvBufs,
const int tag, const int tag,
const label comm, const label comm,
const bool block const bool block //!< Wait for all to finish
); );
@ -109,10 +110,10 @@ public:
// Constructors // Constructors
//- Construct given optional buffer size //- Construct for given commsTypes, with optional buffer size
explicit Pstream explicit Pstream
( (
const commsTypes commsType, const UPstream::commsTypes commsType,
const label bufSize = 0 const label bufSize = 0
) )
: :
@ -125,15 +126,58 @@ public:
} }
// Gather and scatter // Static Functions
// Broadcast
//- Broadcast buffer or string content to all processes in communicator
using UPstream::broadcast;
//- Generic broadcast using streams to serialize/de-serialize
template<class T>
static void genericBroadcast
(
T& value,
const label comm = UPstream::worldComm
);
//- Broadcast value (contiguous or non-contiguous)
//- to all processes in communicator.
template<class T>
static void broadcast
(
T& value,
const label comm = UPstream::worldComm
);
//- Broadcast multiple values (contiguous or non-contiguous)
//- to all processes in communicator.
template<class T>
static void broadcast
(
List<T>& values,
const label comm = UPstream::worldComm
);
//- Broadcast multiple values (contiguous or non-contiguous)
//- to all processes in communicator.
template<class T, int SizeMin>
static void broadcast
(
DynamicList<T, SizeMin>& values,
const label comm = UPstream::worldComm
);
// Gather
//- Gather data. //- Gather data.
//- Apply bop to combine Value from different processors //- Apply \c bop to combine \c value from different processors
template<class T, class BinaryOp> template<class T, class BinaryOp>
static void gather static void gather
( (
const List<commsStruct>& comms, const List<commsStruct>& comms,
T& Value, T& value,
const BinaryOp& bop, const BinaryOp& bop,
const int tag, const int tag,
const label comm const label comm
@ -143,40 +187,22 @@ public:
template<class T, class BinaryOp> template<class T, class BinaryOp>
static void gather static void gather
( (
T& Value, T& value,
const BinaryOp& bop, const BinaryOp& bop,
const int tag = Pstream::msgType(), const int tag = UPstream::msgType(),
const label comm = Pstream::worldComm const label comm = UPstream::worldComm
);
//- Scatter data. Distribute without modification. Reverse of gather
template<class T>
static void scatter
(
const List<commsStruct>& comms,
T& Value,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template<class T>
static void scatter
(
T& Value,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
); );
// Combine variants. Inplace combine values from processors. // Gather/combine data
// Inplace combine values from processors.
// (Uses construct from Istream instead of <<) // (Uses construct from Istream instead of <<)
template<class T, class CombineOp> template<class T, class CombineOp>
static void combineGather static void combineGather
( (
const List<commsStruct>& comms, const List<commsStruct>& comms,
T& Value, T& value,
const CombineOp& cop, const CombineOp& cop,
const int tag, const int tag,
const label comm const label comm
@ -186,29 +212,10 @@ public:
template<class T, class CombineOp> template<class T, class CombineOp>
static void combineGather static void combineGather
( (
T& Value, T& value,
const CombineOp& cop, const CombineOp& cop,
const int tag = Pstream::msgType(), const int tag = UPstream::msgType(),
const label comm = Pstream::worldComm const label comm = UPstream::worldComm
);
//- Scatter data. Reverse of combineGather
template<class T>
static void combineScatter
(
const List<commsStruct>& comms,
T& Value,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template<class T>
static void combineScatter
(
T& Value,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
); );
@ -218,7 +225,7 @@ public:
static void listCombineGather static void listCombineGather
( (
const List<commsStruct>& comms, const List<commsStruct>& comms,
List<T>& Value, List<T>& value,
const CombineOp& cop, const CombineOp& cop,
const int tag, const int tag,
const label comm const label comm
@ -228,40 +235,21 @@ public:
template<class T, class CombineOp> template<class T, class CombineOp>
static void listCombineGather static void listCombineGather
( (
List<T>& Value, List<T>& values,
const CombineOp& cop, const CombineOp& cop,
const int tag = Pstream::msgType(), const int tag = UPstream::msgType(),
const label comm = Pstream::worldComm const label comm = UPstream::worldComm
);
//- Scatter data. Reverse of combineGather
template<class T>
static void listCombineScatter
(
const List<commsStruct>& comms,
List<T>& Value,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template<class T>
static void listCombineScatter
(
List<T>& Value,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
); );
// Combine variants working on whole map at a time. Container needs to // Combine variants working on whole map at a time.
// have iterators and find() defined. // Container needs iterators, find() and insert methods defined.
template<class Container, class CombineOp> template<class Container, class CombineOp>
static void mapCombineGather static void mapCombineGather
( (
const List<commsStruct>& comms, const List<commsStruct>& comms,
Container& Values, Container& values,
const CombineOp& cop, const CombineOp& cop,
const int tag, const int tag,
const label comm const label comm
@ -271,43 +259,23 @@ public:
template<class Container, class CombineOp> template<class Container, class CombineOp>
static void mapCombineGather static void mapCombineGather
( (
Container& Values, Container& values,
const CombineOp& cop, const CombineOp& cop,
const int tag = Pstream::msgType(), const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm const label comm = UPstream::worldComm
); );
//- Scatter data. Reverse of combineGather
template<class Container>
static void mapCombineScatter
(
const List<commsStruct>& comms,
Container& Values,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template<class Container>
static void mapCombineScatter
(
Container& Values,
const int tag = Pstream::msgType(),
const label comm = UPstream::worldComm
);
// Gather/scatter keeping the individual processor data separate. // Gather/scatter keeping the individual processor data separate.
// Values is a List of size UPstream::nProcs() where // The values is a List of size UPstream::nProcs() where
// Values[UPstream::myProcNo()] is the data for the current processor. // values[UPstream::myProcNo()] is the data for the current processor.
//- Gather data but keep individual values separate //- Gather data but keep individual values separate
template<class T> template<class T>
static void gatherList static void gatherList
( (
const List<commsStruct>& comms, const List<commsStruct>& comms,
List<T>& Values, List<T>& values,
const int tag, const int tag,
const label comm const label comm
); );
@ -316,17 +284,109 @@ public:
template<class T> template<class T>
static void gatherList static void gatherList
( (
List<T>& Values, List<T>& values,
const int tag = Pstream::msgType(), const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm const label comm = UPstream::worldComm
); );
// Scatter
//- Broadcast data: Distribute without modification.
// \note comms and tag parameters only used when
// Foam_Pstream_scatter_nobroadcast is defined
template<class T>
static void scatter
(
const List<commsStruct>& comms,
T& value,
const int tag,
const label comm
);
//- Broadcast data: Distribute without modification.
// \note tag parameter only used when
// Foam_Pstream_scatter_nobroadcast is defined
template<class T>
static void scatter
(
T& value,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
//- Broadcast data: Distribute without modification.
// \note tag parameter only used when
// Foam_Pstream_scatter_nobroadcast is defined
template<class T>
static void combineScatter
(
const List<commsStruct>& comms,
T& value,
const int tag,
const label comm
);
//- Broadcast data: Distribute without modification.
// \note tag parameter only used when
// Foam_Pstream_scatter_nobroadcast is defined
template<class T>
static void combineScatter
(
T& value,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
//- Broadcast data: Distribute without modification.
// \note comms and tag parameters only used when
// Foam_Pstream_scatter_nobroadcast is defined
template<class T>
static void listCombineScatter
(
const List<commsStruct>& comms,
List<T>& value,
const int tag,
const label comm
);
//- Broadcast data: Distribute without modification.
// \note comms and tag parameters only used when
// Foam_Pstream_scatter_nobroadcast is defined
template<class T>
static void listCombineScatter
(
List<T>& value,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
//- Broadcast data: Distribute without modification.
template<class Container>
static void mapCombineScatter
(
const List<commsStruct>& comms,
Container& values,
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template<class Container>
static void mapCombineScatter
(
Container& values,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
//- Scatter data. Reverse of gatherList //- Scatter data. Reverse of gatherList
template<class T> template<class T>
static void scatterList static void scatterList
( (
const List<commsStruct>& comms, const List<commsStruct>& comms,
List<T>& Values, List<T>& values,
const int tag, const int tag,
const label comm const label comm
); );
@ -335,8 +395,8 @@ public:
template<class T> template<class T>
static void scatterList static void scatterList
( (
List<T>& Values, List<T>& values,
const int tag = Pstream::msgType(), const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm const label comm = UPstream::worldComm
); );
@ -403,10 +463,11 @@ public:
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#ifdef NoRepository #ifdef NoRepository
#include "gatherScatter.C" #include "PstreamBroadcast.C"
#include "combineGatherScatter.C" #include "PstreamGather.C"
#include "gatherScatterList.C" #include "PstreamCombineGather.C"
#include "exchange.C" #include "PstreamGatherList.C"
#include "PstreamExchange.C"
#endif #endif
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View File

@ -0,0 +1,142 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "OPstream.H"
#include "IPstream.H"
#include "contiguous.H"
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
template<class T>
void Foam::Pstream::genericBroadcast(T& value, const label comm)
{
// Generic: use stream interface
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
if (UPstream::master(comm))
{
OPBstream toAll(UPstream::masterNo(), comm);
toAll << value;
}
else
{
IPBstream fromMaster(UPstream::masterNo(), comm);
fromMaster >> value;
}
}
}
template<class T>
void Foam::Pstream::broadcast(T& value, const label comm)
{
if (!is_contiguous<T>::value)
{
Pstream::genericBroadcast(value, comm);
}
else if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
UPstream::broadcast
(
reinterpret_cast<char*>(&value),
sizeof(T),
comm,
UPstream::masterNo()
);
}
}
template<class T>
void Foam::Pstream::broadcast(List<T>& values, const label comm)
{
if (!is_contiguous<T>::value)
{
Pstream::genericBroadcast(values, comm);
}
else if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
// Broadcast the size of the list
label len(values.size());
UPstream::broadcast
(
reinterpret_cast<char*>(&len),
sizeof(label),
comm,
UPstream::masterNo()
);
values.resize_nocopy(len); // A no-op on master
if (len)
{
UPstream::broadcast
(
values.data_bytes(),
values.size_bytes(),
comm,
UPstream::masterNo()
);
}
}
}
template<class T, int SizeMin>
void Foam::Pstream::broadcast(DynamicList<T, SizeMin>& values, const label comm)
{
if (!is_contiguous<T>::value)
{
Pstream::genericBroadcast(values, comm);
}
else if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
// Broadcast the size of the list
label len(values.size());
UPstream::broadcast
(
reinterpret_cast<char*>(&len),
sizeof(label),
comm,
UPstream::masterNo()
);
values.resize_nocopy(len); // A no-op on master
if (len)
{
UPstream::broadcast
(
values.data_bytes(),
values.size_bytes(),
comm,
UPstream::masterNo()
);
}
}
}
// ************************************************************************* //

View File

@ -27,7 +27,7 @@ License
Description Description
Variant of gather, scatter. Variant of gather, scatter.
Normal gather uses: Normal gather uses:
- construct null and read (>>) from Istream - default construct and read (>>) from Istream
- binary operator and assignment operator to combine values - binary operator and assignment operator to combine values
combineGather uses: combineGather uses:
@ -47,7 +47,7 @@ template<class T, class CombineOp>
void Foam::Pstream::combineGather void Foam::Pstream::combineGather
( (
const List<UPstream::commsStruct>& comms, const List<UPstream::commsStruct>& comms,
T& Value, T& value,
const CombineOp& cop, const CombineOp& cop,
const int tag, const int tag,
const label comm const label comm
@ -55,22 +55,21 @@ void Foam::Pstream::combineGather
{ {
if (UPstream::parRun() && UPstream::nProcs(comm) > 1) if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{ {
// Get my communication order // My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours // Receive from my downstairs neighbours
forAll(myComm.below(), belowI) for (const label belowID : myComm.below())
{ {
label belowID = myComm.below()[belowI];
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
{ {
T value; T received;
UIPstream::read UIPstream::read
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
belowID, belowID,
reinterpret_cast<char*>(&value), reinterpret_cast<char*>(&received),
sizeof(T), sizeof(T),
tag, tag,
comm comm
@ -79,10 +78,10 @@ void Foam::Pstream::combineGather
if (debug & 2) if (debug & 2)
{ {
Pout<< " received from " Pout<< " received from "
<< belowID << " data:" << value << endl; << belowID << " data:" << received << endl;
} }
cop(Value, value); cop(value, received);
} }
else else
{ {
@ -94,25 +93,25 @@ void Foam::Pstream::combineGather
tag, tag,
comm comm
); );
T value(fromBelow); T received(fromBelow);
if (debug & 2) if (debug & 2)
{ {
Pout<< " received from " Pout<< " received from "
<< belowID << " data:" << value << endl; << belowID << " data:" << received << endl;
} }
cop(Value, value); cop(value, received);
} }
} }
// Send up Value // Send up value
if (myComm.above() != -1) if (myComm.above() != -1)
{ {
if (debug & 2) if (debug & 2)
{ {
Pout<< " sending to " << myComm.above() Pout<< " sending to " << myComm.above()
<< " data:" << Value << endl; << " data:" << value << endl;
} }
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
@ -121,7 +120,7 @@ void Foam::Pstream::combineGather
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.above(), myComm.above(),
reinterpret_cast<const char*>(&Value), reinterpret_cast<const char*>(&value),
sizeof(T), sizeof(T),
tag, tag,
comm comm
@ -137,7 +136,7 @@ void Foam::Pstream::combineGather
tag, tag,
comm comm
); );
toAbove << Value; toAbove << value;
} }
} }
} }
@ -147,7 +146,7 @@ void Foam::Pstream::combineGather
template<class T, class CombineOp> template<class T, class CombineOp>
void Foam::Pstream::combineGather void Foam::Pstream::combineGather
( (
T& Value, T& value,
const CombineOp& cop, const CombineOp& cop,
const int tag, const int tag,
const label comm const label comm
@ -156,7 +155,7 @@ void Foam::Pstream::combineGather
combineGather combineGather
( (
UPstream::whichCommunication(comm), UPstream::whichCommunication(comm),
Value, value,
cop, cop,
tag, tag,
comm comm
@ -168,14 +167,17 @@ template<class T>
void Foam::Pstream::combineScatter void Foam::Pstream::combineScatter
( (
const List<UPstream::commsStruct>& comms, const List<UPstream::commsStruct>& comms,
T& Value, T& value,
const int tag, const int tag,
const label comm const label comm
) )
{ {
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(value, comm);
#else
if (UPstream::parRun() && UPstream::nProcs(comm) > 1) if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{ {
// Get my communication order // My communication order
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)]; const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from up // Receive from up
@ -187,7 +189,7 @@ void Foam::Pstream::combineScatter
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.above(), myComm.above(),
reinterpret_cast<char*>(&Value), reinterpret_cast<char*>(&value),
sizeof(T), sizeof(T),
tag, tag,
comm comm
@ -203,25 +205,14 @@ void Foam::Pstream::combineScatter
tag, tag,
comm comm
); );
Value = T(fromAbove); value = T(fromAbove);
}
if (debug & 2)
{
Pout<< " received from "
<< myComm.above() << " data:" << Value << endl;
} }
} }
// Send to my downstairs neighbours // Send to my downstairs neighbours
forAllReverse(myComm.below(), belowI) forAllReverse(myComm.below(), belowI)
{ {
label belowID = myComm.below()[belowI]; const label belowID = myComm.below()[belowI];
if (debug & 2)
{
Pout<< " sending to " << belowID << " data:" << Value << endl;
}
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
{ {
@ -229,7 +220,7 @@ void Foam::Pstream::combineScatter
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
belowID, belowID,
reinterpret_cast<const char*>(&Value), reinterpret_cast<const char*>(&value),
sizeof(T), sizeof(T),
tag, tag,
comm comm
@ -245,22 +236,27 @@ void Foam::Pstream::combineScatter
tag, tag,
comm comm
); );
toBelow << Value; toBelow << value;
} }
} }
} }
#endif
} }
template<class T> template<class T>
void Foam::Pstream::combineScatter void Foam::Pstream::combineScatter
( (
T& Value, T& value,
const int tag, const int tag,
const label comm const label comm
) )
{ {
combineScatter(UPstream::whichCommunication(comm), Value, tag, comm); #ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(value, comm);
#else
combineScatter(UPstream::whichCommunication(comm), value, tag, comm);
#endif
} }
@ -268,7 +264,7 @@ template<class T, class CombineOp>
void Foam::Pstream::listCombineGather void Foam::Pstream::listCombineGather
( (
const List<UPstream::commsStruct>& comms, const List<UPstream::commsStruct>& comms,
List<T>& Values, List<T>& values,
const CombineOp& cop, const CombineOp& cop,
const int tag, const int tag,
const label comm const label comm
@ -276,24 +272,22 @@ void Foam::Pstream::listCombineGather
{ {
if (UPstream::parRun() && UPstream::nProcs(comm) > 1) if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{ {
// Get my communication order // My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours // Receive from my downstairs neighbours
forAll(myComm.below(), belowI) for (const label belowID : myComm.below())
{ {
label belowID = myComm.below()[belowI];
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
{ {
List<T> receivedValues(Values.size()); List<T> received(values.size());
UIPstream::read UIPstream::read
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
belowID, belowID,
receivedValues.data_bytes(), received.data_bytes(),
receivedValues.size_bytes(), received.size_bytes(),
tag, tag,
comm comm
); );
@ -301,12 +295,12 @@ void Foam::Pstream::listCombineGather
if (debug & 2) if (debug & 2)
{ {
Pout<< " received from " Pout<< " received from "
<< belowID << " data:" << receivedValues << endl; << belowID << " data:" << received << endl;
} }
forAll(Values, i) forAll(values, i)
{ {
cop(Values[i], receivedValues[i]); cop(values[i], received[i]);
} }
} }
else else
@ -319,28 +313,28 @@ void Foam::Pstream::listCombineGather
tag, tag,
comm comm
); );
List<T> receivedValues(fromBelow); List<T> received(fromBelow);
if (debug & 2) if (debug & 2)
{ {
Pout<< " received from " Pout<< " received from "
<< belowID << " data:" << receivedValues << endl; << belowID << " data:" << received << endl;
} }
forAll(Values, i) forAll(values, i)
{ {
cop(Values[i], receivedValues[i]); cop(values[i], received[i]);
} }
} }
} }
// Send up Value // Send up values
if (myComm.above() != -1) if (myComm.above() != -1)
{ {
if (debug & 2) if (debug & 2)
{ {
Pout<< " sending to " << myComm.above() Pout<< " sending to " << myComm.above()
<< " data:" << Values << endl; << " data:" << values << endl;
} }
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
@ -349,8 +343,8 @@ void Foam::Pstream::listCombineGather
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.above(), myComm.above(),
Values.cdata_bytes(), values.cdata_bytes(),
Values.size_bytes(), values.size_bytes(),
tag, tag,
comm comm
); );
@ -365,7 +359,7 @@ void Foam::Pstream::listCombineGather
tag, tag,
comm comm
); );
toAbove << Values; toAbove << values;
} }
} }
} }
@ -375,7 +369,7 @@ void Foam::Pstream::listCombineGather
template<class T, class CombineOp> template<class T, class CombineOp>
void Foam::Pstream::listCombineGather void Foam::Pstream::listCombineGather
( (
List<T>& Values, List<T>& values,
const CombineOp& cop, const CombineOp& cop,
const int tag, const int tag,
const label comm const label comm
@ -384,7 +378,7 @@ void Foam::Pstream::listCombineGather
listCombineGather listCombineGather
( (
UPstream::whichCommunication(comm), UPstream::whichCommunication(comm),
Values, values,
cop, cop,
tag, tag,
comm comm
@ -396,14 +390,17 @@ template<class T>
void Foam::Pstream::listCombineScatter void Foam::Pstream::listCombineScatter
( (
const List<UPstream::commsStruct>& comms, const List<UPstream::commsStruct>& comms,
List<T>& Values, List<T>& values,
const int tag, const int tag,
const label comm const label comm
) )
{ {
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(values, comm);
#else
if (UPstream::parRun() && UPstream::nProcs(comm) > 1) if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{ {
// Get my communication order // My communication order
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)]; const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from up // Receive from up
@ -415,8 +412,8 @@ void Foam::Pstream::listCombineScatter
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.above(), myComm.above(),
Values.data_bytes(), values.data_bytes(),
Values.size_bytes(), values.size_bytes(),
tag, tag,
comm comm
); );
@ -431,25 +428,14 @@ void Foam::Pstream::listCombineScatter
tag, tag,
comm comm
); );
fromAbove >> Values; fromAbove >> values;
}
if (debug & 2)
{
Pout<< " received from "
<< myComm.above() << " data:" << Values << endl;
} }
} }
// Send to my downstairs neighbours // Send to my downstairs neighbours
forAllReverse(myComm.below(), belowI) forAllReverse(myComm.below(), belowI)
{ {
label belowID = myComm.below()[belowI]; const label belowID = myComm.below()[belowI];
if (debug & 2)
{
Pout<< " sending to " << belowID << " data:" << Values << endl;
}
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
{ {
@ -457,8 +443,8 @@ void Foam::Pstream::listCombineScatter
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
belowID, belowID,
Values.cdata_bytes(), values.cdata_bytes(),
Values.size_bytes(), values.size_bytes(),
tag, tag,
comm comm
); );
@ -473,28 +459,33 @@ void Foam::Pstream::listCombineScatter
tag, tag,
comm comm
); );
toBelow << Values; toBelow << values;
} }
} }
} }
#endif
} }
template<class T> template<class T>
void Foam::Pstream::listCombineScatter void Foam::Pstream::listCombineScatter
( (
List<T>& Values, List<T>& values,
const int tag, const int tag,
const label comm const label comm
) )
{ {
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(values, comm);
#else
listCombineScatter listCombineScatter
( (
UPstream::whichCommunication(comm), UPstream::whichCommunication(comm),
Values, values,
tag, tag,
comm comm
); );
#endif
} }
@ -502,7 +493,7 @@ template<class Container, class CombineOp>
void Foam::Pstream::mapCombineGather void Foam::Pstream::mapCombineGather
( (
const List<UPstream::commsStruct>& comms, const List<UPstream::commsStruct>& comms,
Container& Values, Container& values,
const CombineOp& cop, const CombineOp& cop,
const int tag, const int tag,
const label comm const label comm
@ -510,13 +501,13 @@ void Foam::Pstream::mapCombineGather
{ {
if (UPstream::parRun() && UPstream::nProcs(comm) > 1) if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{ {
// Get my communication order // My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours // Receive from my downstairs neighbours
forAll(myComm.below(), belowI) for (const label belowID : myComm.below())
{ {
label belowID = myComm.below()[belowI]; // Map/HashTable: non-contiguous
IPstream fromBelow IPstream fromBelow
( (
@ -526,43 +517,43 @@ void Foam::Pstream::mapCombineGather
tag, tag,
comm comm
); );
Container receivedValues(fromBelow); Container received(fromBelow);
if (debug & 2) if (debug & 2)
{ {
Pout<< " received from " Pout<< " received from "
<< belowID << " data:" << receivedValues << endl; << belowID << " data:" << received << endl;
} }
for for
( (
typename Container::const_iterator slaveIter = auto recvIter = received.cbegin();
receivedValues.begin(); recvIter != received.cend();
slaveIter != receivedValues.end(); ++recvIter
++slaveIter
) )
{ {
typename Container::iterator auto masterIter = values.find(recvIter.key());
masterIter = Values.find(slaveIter.key());
if (masterIter != Values.end()) if (masterIter != values.end()) // == found()
{ {
cop(masterIter(), slaveIter()); // Combine with existing
cop(masterIter.val(), recvIter.val());
} }
else else
{ {
Values.insert(slaveIter.key(), slaveIter()); // Insert new key/value
values.insert(recvIter.key(), recvIter.val());
} }
} }
} }
// Send up Value // Send up values
if (myComm.above() != -1) if (myComm.above() != -1)
{ {
if (debug & 2) if (debug & 2)
{ {
Pout<< " sending to " << myComm.above() Pout<< " sending to " << myComm.above()
<< " data:" << Values << endl; << " data:" << values << endl;
} }
OPstream toAbove OPstream toAbove
@ -573,7 +564,7 @@ void Foam::Pstream::mapCombineGather
tag, tag,
comm comm
); );
toAbove << Values; toAbove << values;
} }
} }
} }
@ -582,7 +573,7 @@ void Foam::Pstream::mapCombineGather
template<class Container, class CombineOp> template<class Container, class CombineOp>
void Foam::Pstream::mapCombineGather void Foam::Pstream::mapCombineGather
( (
Container& Values, Container& values,
const CombineOp& cop, const CombineOp& cop,
const int tag, const int tag,
const label comm const label comm
@ -591,7 +582,7 @@ void Foam::Pstream::mapCombineGather
mapCombineGather mapCombineGather
( (
UPstream::whichCommunication(comm), UPstream::whichCommunication(comm),
Values, values,
cop, cop,
tag, tag,
comm comm
@ -603,14 +594,17 @@ template<class Container>
void Foam::Pstream::mapCombineScatter void Foam::Pstream::mapCombineScatter
( (
const List<UPstream::commsStruct>& comms, const List<UPstream::commsStruct>& comms,
Container& Values, Container& values,
const int tag, const int tag,
const label comm const label comm
) )
{ {
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(values, comm);
#else
if (UPstream::parRun() && UPstream::nProcs(comm) > 1) if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{ {
// Get my communication order // My communication order
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)]; const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from up // Receive from up
@ -624,23 +618,23 @@ void Foam::Pstream::mapCombineScatter
tag, tag,
comm comm
); );
fromAbove >> Values; fromAbove >> values;
if (debug & 2) if (debug & 2)
{ {
Pout<< " received from " Pout<< " received from "
<< myComm.above() << " data:" << Values << endl; << myComm.above() << " data:" << values << endl;
} }
} }
// Send to my downstairs neighbours // Send to my downstairs neighbours
forAllReverse(myComm.below(), belowI) forAllReverse(myComm.below(), belowI)
{ {
label belowID = myComm.below()[belowI]; const label belowID = myComm.below()[belowI];
if (debug & 2) if (debug & 2)
{ {
Pout<< " sending to " << belowID << " data:" << Values << endl; Pout<< " sending to " << belowID << " data:" << values << endl;
} }
OPstream toBelow OPstream toBelow
@ -651,27 +645,32 @@ void Foam::Pstream::mapCombineScatter
tag, tag,
comm comm
); );
toBelow << Values; toBelow << values;
} }
} }
#endif
} }
template<class Container> template<class Container>
void Foam::Pstream::mapCombineScatter void Foam::Pstream::mapCombineScatter
( (
Container& Values, Container& values,
const int tag, const int tag,
const label comm const label comm
) )
{ {
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(values, comm);
#else
mapCombineScatter mapCombineScatter
( (
UPstream::whichCommunication(comm), UPstream::whichCommunication(comm),
Values, values,
tag, tag,
comm comm
); );
#endif
} }

View File

@ -6,6 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2013 OpenFOAM Foundation Copyright (C) 2011-2013 OpenFOAM Foundation
Copyright (C) 2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -32,7 +33,6 @@ Description
combined using the given combination function and the result is combined using the given combination function and the result is
broadcast to all nodes broadcast to all nodes
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#ifndef Foam_PstreamCombineReduceOps_H #ifndef Foam_PstreamCombineReduceOps_H
@ -53,31 +53,33 @@ template<class T, class CombineOp>
void combineReduce void combineReduce
( (
const List<UPstream::commsStruct>& comms, const List<UPstream::commsStruct>& comms,
T& Value, T& value,
const CombineOp& cop, const CombineOp& cop,
const int tag, const int tag,
const label comm const label comm
) )
{ {
Pstream::combineGather(comms, Value, cop, tag, comm); Pstream::combineGather(comms, value, cop, tag, comm);
Pstream::combineScatter(comms, Value, tag, comm); Pstream::broadcast(value, comm);
} }
template<class T, class CombineOp> template<class T, class CombineOp>
void combineReduce void combineReduce
( (
T& Value, T& value,
const CombineOp& cop, const CombineOp& cop,
const int tag = Pstream::msgType(), const int tag = UPstream::msgType(),
const label comm = Pstream::worldComm const label comm = UPstream::worldComm
) )
{ {
const List<UPstream::commsStruct>& comms = if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
UPstream::whichCommunication(comm); {
const auto& comms = UPstream::whichCommunication(comm);
Pstream::combineGather(comms, Value, cop, tag, comm); Pstream::combineGather(comms, value, cop, tag, comm);
Pstream::combineScatter(comms, Value, tag, comm); Pstream::broadcast(value, comm);
}
} }

View File

@ -91,7 +91,7 @@ void Foam::Pstream::exchangeContainer
FatalErrorInFunction FatalErrorInFunction
<< "Cannot send outgoing message. " << "Cannot send outgoing message. "
<< "to:" << proci << " nBytes:" << "to:" << proci << " nBytes:"
<< label(sendBufs[proci].size()*sizeof(T)) << label(sendBufs[proci].size_bytes())
<< Foam::abort(FatalError); << Foam::abort(FatalError);
} }
} }
@ -103,7 +103,7 @@ void Foam::Pstream::exchangeContainer
if (block) if (block)
{ {
Pstream::waitRequests(startOfRequests); UPstream::waitRequests(startOfRequests);
} }
} }
@ -177,7 +177,7 @@ void Foam::Pstream::exchangeBuf
if (block) if (block)
{ {
Pstream::waitRequests(startOfRequests); UPstream::waitRequests(startOfRequests);
} }
} }

View File

@ -36,18 +36,13 @@ Description
#include "IPstream.H" #include "IPstream.H"
#include "contiguous.H" #include "contiguous.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
template<class T, class BinaryOp> template<class T, class BinaryOp>
void Pstream::gather void Foam::Pstream::gather
( (
const List<UPstream::commsStruct>& comms, const List<UPstream::commsStruct>& comms,
T& Value, T& value,
const BinaryOp& bop, const BinaryOp& bop,
const int tag, const int tag,
const label comm const label comm
@ -55,21 +50,21 @@ void Pstream::gather
{ {
if (UPstream::parRun() && UPstream::nProcs(comm) > 1) if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{ {
// Get my communication order // My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours // Receive from my downstairs neighbours
forAll(myComm.below(), belowI) for (const label belowID : myComm.below())
{ {
T value; T received;
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
{ {
UIPstream::read UIPstream::read
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.below()[belowI], belowID,
reinterpret_cast<char*>(&value), reinterpret_cast<char*>(&received),
sizeof(T), sizeof(T),
tag, tag,
comm comm
@ -80,18 +75,18 @@ void Pstream::gather
IPstream fromBelow IPstream fromBelow
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.below()[belowI], belowID,
0, 0,
tag, tag,
comm comm
); );
fromBelow >> value; fromBelow >> received;
} }
Value = bop(Value, value); value = bop(value, received);
} }
// Send up Value // Send up value
if (myComm.above() != -1) if (myComm.above() != -1)
{ {
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
@ -100,7 +95,7 @@ void Pstream::gather
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.above(), myComm.above(),
reinterpret_cast<const char*>(&Value), reinterpret_cast<const char*>(&value),
sizeof(T), sizeof(T),
tag, tag,
comm comm
@ -116,7 +111,7 @@ void Pstream::gather
tag, tag,
comm comm
); );
toAbove << Value; toAbove << value;
} }
} }
} }
@ -124,58 +119,33 @@ void Pstream::gather
template<class T, class BinaryOp> template<class T, class BinaryOp>
void Pstream::gather void Foam::Pstream::gather
( (
T& Value, T& value,
const BinaryOp& bop, const BinaryOp& bop,
const int tag, const int tag,
const label comm const label comm
) )
{ {
gather(UPstream::whichCommunication(comm), Value, bop, tag, comm); gather(UPstream::whichCommunication(comm), value, bop, tag, comm);
} }
template<class T> template<class T>
void Pstream::scatter void Foam::Pstream::scatter
( (
const List<UPstream::commsStruct>& comms, const List<UPstream::commsStruct>& comms,
T& Value, T& value,
const int tag, const int tag,
const label comm const label comm
) )
{ {
#ifndef Foam_Pstream_scatter_nobroadcast #ifndef Foam_Pstream_scatter_nobroadcast
if (UPstream::parRun() && UPstream::nProcs(comm) > 1) Pstream::broadcast(value, comm);
{
if (is_contiguous<T>::value)
{
UPstream::broadcast
(
reinterpret_cast<char*>(&Value),
sizeof(T),
comm,
UPstream::masterNo()
);
}
else
{
if (UPstream::master(comm))
{
OPBstream toAll(UPstream::masterNo(), comm);
toAll << Value;
}
else
{
IPBstream fromMaster(UPstream::masterNo(), comm);
fromMaster >> Value;
}
}
}
#else #else
if (UPstream::parRun() && UPstream::nProcs(comm) > 1) if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{ {
// Get my communication order // My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from up // Receive from up
@ -187,7 +157,7 @@ void Pstream::scatter
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.above(), myComm.above(),
reinterpret_cast<char*>(&Value), reinterpret_cast<char*>(&value),
sizeof(T), sizeof(T),
tag, tag,
comm comm
@ -203,7 +173,7 @@ void Pstream::scatter
tag, tag,
comm comm
); );
fromAbove >> Value; fromAbove >> value;
} }
} }
@ -212,13 +182,15 @@ void Pstream::scatter
// (only when using a tree schedule!) first. // (only when using a tree schedule!) first.
forAllReverse(myComm.below(), belowI) forAllReverse(myComm.below(), belowI)
{ {
const label belowID = myComm.below()[belowI];
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
{ {
UOPstream::write UOPstream::write
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.below()[belowI], belowID,
reinterpret_cast<const char*>(&Value), reinterpret_cast<const char*>(&value),
sizeof(T), sizeof(T),
tag, tag,
comm comm
@ -229,12 +201,12 @@ void Pstream::scatter
OPstream toBelow OPstream toBelow
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.below()[belowI], belowID,
0, 0,
tag, tag,
comm comm
); );
toBelow << Value; toBelow << value;
} }
} }
} }
@ -243,14 +215,14 @@ void Pstream::scatter
template<class T> template<class T>
void Pstream::scatter(T& Value, const int tag, const label comm) void Foam::Pstream::scatter(T& value, const int tag, const label comm)
{ {
scatter(UPstream::whichCommunication(comm), Value, tag, comm); #ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(value, comm);
#else
scatter(UPstream::whichCommunication(comm), value, tag, comm);
#endif
} }
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* // // ************************************************************************* //

View File

@ -29,7 +29,7 @@ Description
communication schedule (usually linear-to-master or tree-to-master). communication schedule (usually linear-to-master or tree-to-master).
The gathered data will be a list with element procID the data from processor The gathered data will be a list with element procID the data from processor
procID. Before calling every processor should insert its value into procID. Before calling every processor should insert its value into
Values[UPstream::myProcNo(comm)]. values[UPstream::myProcNo(comm)].
Note: after gather every processor only knows its own data and that of the Note: after gather every processor only knows its own data and that of the
processors below it. Only the 'master' of the communication schedule holds processors below it. Only the 'master' of the communication schedule holds
a fully filled List. Use scatter to distribute the data. a fully filled List. Use scatter to distribute the data.
@ -40,61 +40,55 @@ Description
#include "OPstream.H" #include "OPstream.H"
#include "contiguous.H" #include "contiguous.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
template<class T> template<class T>
void Pstream::gatherList void Foam::Pstream::gatherList
( (
const List<UPstream::commsStruct>& comms, const List<UPstream::commsStruct>& comms,
List<T>& Values, List<T>& values,
const int tag, const int tag,
const label comm const label comm
) )
{ {
if (UPstream::parRun() && UPstream::nProcs(comm) > 1) if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{ {
if (Values.size() != UPstream::nProcs(comm)) if (values.size() != UPstream::nProcs(comm))
{ {
FatalErrorInFunction FatalErrorInFunction
<< "Size of list:" << Values.size() << "Size of list:" << values.size()
<< " does not equal the number of processors:" << " does not equal the number of processors:"
<< UPstream::nProcs(comm) << UPstream::nProcs(comm)
<< Foam::abort(FatalError); << Foam::abort(FatalError);
} }
// Get my communication order // My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours // Receive from my downstairs neighbours
forAll(myComm.below(), belowI) for (const label belowID : myComm.below())
{ {
label belowID = myComm.below()[belowI];
const labelList& belowLeaves = comms[belowID].allBelow(); const labelList& belowLeaves = comms[belowID].allBelow();
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
{ {
List<T> receivedValues(belowLeaves.size() + 1); List<T> received(belowLeaves.size() + 1);
UIPstream::read UIPstream::read
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
belowID, belowID,
receivedValues.data_bytes(), received.data_bytes(),
receivedValues.size_bytes(), received.size_bytes(),
tag, tag,
comm comm
); );
Values[belowID] = receivedValues[0]; values[belowID] = received[0];
forAll(belowLeaves, leafI) forAll(belowLeaves, leafI)
{ {
Values[belowLeaves[leafI]] = receivedValues[leafI + 1]; values[belowLeaves[leafI]] = received[leafI + 1];
} }
} }
else else
@ -107,32 +101,31 @@ void Pstream::gatherList
tag, tag,
comm comm
); );
fromBelow >> Values[belowID]; fromBelow >> values[belowID];
if (debug & 2) if (debug & 2)
{ {
Pout<< " received through " Pout<< " received through "
<< belowID << " data from:" << belowID << belowID << " data from:" << belowID
<< " data:" << Values[belowID] << endl; << " data:" << values[belowID] << endl;
} }
// Receive from all other processors below belowID // Receive from all other processors below belowID
forAll(belowLeaves, leafI) for (const label leafID : belowLeaves)
{ {
label leafID = belowLeaves[leafI]; fromBelow >> values[leafID];
fromBelow >> Values[leafID];
if (debug & 2) if (debug & 2)
{ {
Pout<< " received through " Pout<< " received through "
<< belowID << " data from:" << leafID << belowID << " data from:" << leafID
<< " data:" << Values[leafID] << endl; << " data:" << values[leafID] << endl;
} }
} }
} }
} }
// Send up from Values: // Send up from values:
// - my own value first // - my own value first
// - all belowLeaves next // - all belowLeaves next
if (myComm.above() != -1) if (myComm.above() != -1)
@ -143,25 +136,25 @@ void Pstream::gatherList
{ {
Pout<< " sending to " << myComm.above() Pout<< " sending to " << myComm.above()
<< " data from me:" << UPstream::myProcNo(comm) << " data from me:" << UPstream::myProcNo(comm)
<< " data:" << Values[UPstream::myProcNo(comm)] << endl; << " data:" << values[UPstream::myProcNo(comm)] << endl;
} }
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
{ {
List<T> sendingValues(belowLeaves.size() + 1); List<T> sending(belowLeaves.size() + 1);
sendingValues[0] = Values[UPstream::myProcNo(comm)]; sending[0] = values[UPstream::myProcNo(comm)];
forAll(belowLeaves, leafI) forAll(belowLeaves, leafI)
{ {
sendingValues[leafI + 1] = Values[belowLeaves[leafI]]; sending[leafI + 1] = values[belowLeaves[leafI]];
} }
OPstream::write UOPstream::write
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.above(), myComm.above(),
sendingValues.cdata_bytes(), sending.cdata_bytes(),
sendingValues.size_bytes(), sending.size_bytes(),
tag, tag,
comm comm
); );
@ -176,19 +169,17 @@ void Pstream::gatherList
tag, tag,
comm comm
); );
toAbove << Values[UPstream::myProcNo(comm)]; toAbove << values[UPstream::myProcNo(comm)];
forAll(belowLeaves, leafI) for (const label leafID : belowLeaves)
{ {
label leafID = belowLeaves[leafI];
if (debug & 2) if (debug & 2)
{ {
Pout<< " sending to " Pout<< " sending to "
<< myComm.above() << " data from:" << leafID << myComm.above() << " data from:" << leafID
<< " data:" << Values[leafID] << endl; << " data:" << values[leafID] << endl;
} }
toAbove << Values[leafID]; toAbove << values[leafID];
} }
} }
} }
@ -197,33 +188,37 @@ void Pstream::gatherList
template<class T> template<class T>
void Pstream::gatherList(List<T>& Values, const int tag, const label comm) void Foam::Pstream::gatherList(List<T>& values, const int tag, const label comm)
{ {
gatherList(UPstream::whichCommunication(comm), Values, tag, comm); gatherList(UPstream::whichCommunication(comm), values, tag, comm);
} }
template<class T> template<class T>
void Pstream::scatterList void Foam::Pstream::scatterList
( (
const List<UPstream::commsStruct>& comms, const List<UPstream::commsStruct>& comms,
List<T>& Values, List<T>& values,
const int tag, const int tag,
const label comm const label comm
) )
{ {
// Apart from the additional (size == nProcs) check, the only difference
// between scatterList() and using broadcast(List<T>&) or a regular
// scatter(List<T>&) is that processor-local data is skipped.
if (UPstream::parRun() && UPstream::nProcs(comm) > 1) if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{ {
if (Values.size() != UPstream::nProcs(comm)) if (values.size() != UPstream::nProcs(comm))
{ {
FatalErrorInFunction FatalErrorInFunction
<< "Size of list:" << Values.size() << "Size of list:" << values.size()
<< " does not equal the number of processors:" << " does not equal the number of processors:"
<< UPstream::nProcs(comm) << UPstream::nProcs(comm)
<< Foam::abort(FatalError); << Foam::abort(FatalError);
} }
// Get my communication order // My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from up // Receive from up
@ -233,21 +228,21 @@ void Pstream::scatterList
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
{ {
List<T> receivedValues(notBelowLeaves.size()); List<T> received(notBelowLeaves.size());
UIPstream::read UIPstream::read
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
myComm.above(), myComm.above(),
receivedValues.data_bytes(), received.data_bytes(),
receivedValues.size_bytes(), received.size_bytes(),
tag, tag,
comm comm
); );
forAll(notBelowLeaves, leafI) forAll(notBelowLeaves, leafI)
{ {
Values[notBelowLeaves[leafI]] = receivedValues[leafI]; values[notBelowLeaves[leafI]] = received[leafI];
} }
} }
else else
@ -261,16 +256,15 @@ void Pstream::scatterList
comm comm
); );
forAll(notBelowLeaves, leafI) for (const label leafID : notBelowLeaves)
{ {
label leafID = notBelowLeaves[leafI]; fromAbove >> values[leafID];
fromAbove >> Values[leafID];
if (debug) if (debug)
{ {
Pout<< " received through " Pout<< " received through "
<< myComm.above() << " data for:" << leafID << myComm.above() << " data for:" << leafID
<< " data:" << Values[leafID] << endl; << " data:" << values[leafID] << endl;
} }
} }
} }
@ -279,24 +273,24 @@ void Pstream::scatterList
// Send to my downstairs neighbours // Send to my downstairs neighbours
forAllReverse(myComm.below(), belowI) forAllReverse(myComm.below(), belowI)
{ {
label belowID = myComm.below()[belowI]; const label belowID = myComm.below()[belowI];
const labelList& notBelowLeaves = comms[belowID].allNotBelow(); const labelList& notBelowLeaves = comms[belowID].allNotBelow();
if (is_contiguous<T>::value) if (is_contiguous<T>::value)
{ {
List<T> sendingValues(notBelowLeaves.size()); List<T> sending(notBelowLeaves.size());
forAll(notBelowLeaves, leafI) forAll(notBelowLeaves, leafI)
{ {
sendingValues[leafI] = Values[notBelowLeaves[leafI]]; sending[leafI] = values[notBelowLeaves[leafI]];
} }
OPstream::write UOPstream::write
( (
UPstream::commsTypes::scheduled, UPstream::commsTypes::scheduled,
belowID, belowID,
sendingValues.cdata_bytes(), sending.cdata_bytes(),
sendingValues.size_bytes(), sending.size_bytes(),
tag, tag,
comm comm
); );
@ -313,16 +307,15 @@ void Pstream::scatterList
); );
// Send data destined for all other processors below belowID // Send data destined for all other processors below belowID
forAll(notBelowLeaves, leafI) for (const label leafID : notBelowLeaves)
{ {
label leafID = notBelowLeaves[leafI]; toBelow << values[leafID];
toBelow << Values[leafID];
if (debug) if (debug)
{ {
Pout<< " sent through " Pout<< " sent through "
<< belowID << " data for:" << leafID << belowID << " data for:" << leafID
<< " data:" << Values[leafID] << endl; << " data:" << values[leafID] << endl;
} }
} }
} }
@ -332,14 +325,15 @@ void Pstream::scatterList
template<class T> template<class T>
void Pstream::scatterList(List<T>& Values, const int tag, const label comm) void Foam::Pstream::scatterList
(
List<T>& values,
const int tag,
const label comm
)
{ {
scatterList(UPstream::whichCommunication(comm), Values, tag, comm); scatterList(UPstream::whichCommunication(comm), values, tag, comm);
} }
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* // // ************************************************************************* //

View File

@ -58,12 +58,11 @@ void reduce
{ {
if (UPstream::warnComm != -1 && comm != UPstream::warnComm) if (UPstream::warnComm != -1 && comm != UPstream::warnComm)
{ {
Pout<< "** reducing:" << value << " with comm:" << comm Pout<< "** reducing:" << value << " with comm:" << comm << endl;
<< endl;
error::printStack(Pout); error::printStack(Pout);
} }
Pstream::gather(comms, value, bop, tag, comm); Pstream::gather(comms, value, bop, tag, comm);
Pstream::scatter(comms, value, tag, comm); Pstream::broadcast(value, comm);
} }

View File

@ -51,6 +51,43 @@ Foam::UPstream::commsTypeNames
}); });
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
void Foam::UPstream::broadcast
(
std::string& str,
const label comm,
const int rootProcNo
)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
// Broadcast the string length
std::size_t len(str.length());
UPstream::broadcast
(
reinterpret_cast<char*>(&len),
sizeof(std::size_t),
comm,
rootProcNo
);
if (!UPstream::master(comm))
{
// Do not touch string on the master even although it would
// be a no-op. We are truly paranoid.
str.resize(len);
}
if (len)
{
UPstream::broadcast(&str[0], len, comm, rootProcNo);
}
}
}
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads) void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)

View File

@ -34,9 +34,6 @@ SourceFiles
UPstream.C UPstream.C
UPstreamCommsStruct.C UPstreamCommsStruct.C
UPstreamTemplates.C UPstreamTemplates.C
combineGatherScatter.C
gatherScatter.C
gatherScatterList.C
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
@ -104,7 +101,7 @@ public:
// Constructors // Constructors
//- Default construct. Above == -1 //- Default construct. Above == -1
commsStruct(); commsStruct() noexcept;
//- Construct from components //- Construct from components
commsStruct commsStruct
@ -684,10 +681,10 @@ public:
); );
// Broadcast Functions // Broadcast Functions
//- Broadcast buffer contents, sizes must match on processors //- Broadcast buffer contents to all processes in communicator.
//- The sizes must match on all processes.
// \return True on success // \return True on success
static bool broadcast static bool broadcast
( (
@ -697,6 +694,14 @@ public:
const int rootProcNo = masterNo() const int rootProcNo = masterNo()
); );
//- Broadcast string content to all processes in communicator.
static void broadcast
(
std::string& str,
const label communicator = worldComm,
const int rootProcNo = masterNo()
);
// Housekeeping // Housekeeping
@ -721,11 +726,11 @@ Ostream& operator<<(Ostream&, const UPstream::commsStruct&);
// Template specialisation for access of commsStruct // Template specialisation for access of commsStruct
template<> template<>
UPstream::commsStruct& UPstream::commsStruct&
UList<UPstream::commsStruct>::operator[](const label); UList<UPstream::commsStruct>::operator[](const label procID);
template<> template<>
const UPstream::commsStruct& const UPstream::commsStruct&
UList<UPstream::commsStruct>::operator[](const label) const; UList<UPstream::commsStruct>::operator[](const label procID) const;
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2021 OpenCFD Ltd. Copyright (C) 2021-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -30,7 +30,7 @@ License
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::UPstream::commsStruct::commsStruct() Foam::UPstream::commsStruct::commsStruct() noexcept
: :
above_(-1), above_(-1),
below_(), below_(),

View File

@ -2074,12 +2074,9 @@ bool Foam::fileOperations::masterUncollatedFileOperation::read
// Scatter operation for regIOobjects // Scatter operation for regIOobjects
// Get my communication order // My communication order
const List<Pstream::commsStruct>& comms = const auto& comms = Pstream::whichCommunication(Pstream::worldComm);
Pstream::whichCommunication(Pstream::worldComm); const auto& myComm = comms[Pstream::myProcNo(Pstream::worldComm)];
const Pstream::commsStruct& myComm =
comms[Pstream::myProcNo(Pstream::worldComm)];
// Receive from up // Receive from up
if (myComm.above() != -1) if (myComm.above() != -1)

View File

@ -654,9 +654,9 @@ bool Foam::fileOperations::uncollatedFileOperation::read
Pstream::scatter(io.headerClassName()); Pstream::scatter(io.headerClassName());
Pstream::scatter(io.note()); Pstream::scatter(io.note());
// Get my communication order // My communication order
const List<Pstream::commsStruct>& comms = Pstream::whichCommunication(); const auto& comms = Pstream::whichCommunication();
const Pstream::commsStruct& myComm = comms[Pstream::myProcNo()]; const auto& myComm = comms[Pstream::myProcNo()];
// Receive from up // Receive from up
if (myComm.above() != -1) if (myComm.above() != -1)

View File

@ -446,7 +446,7 @@ void Foam::globalMeshData::calcSharedEdges() const
} }
// Broadcast: merged edges to all // Broadcast: merged edges to all
Pstream::scatter(globalShared); // == worldComm; Pstream::broadcast(globalShared); // == worldComm;
// Now use the global shared edges list (globalShared) to classify my local // Now use the global shared edges list (globalShared) to classify my local
@ -1914,7 +1914,7 @@ Foam::pointField Foam::globalMeshData::sharedPoints() const
} }
// Broadcast: sharedPoints to all // Broadcast: sharedPoints to all
Pstream::scatter(sharedPoints); // == worldComm Pstream::broadcast(sharedPoints); // == worldComm
return sharedPoints; return sharedPoints;

View File

@ -54,7 +54,7 @@ Foam::List<Foam::labelPair> Foam::mapDistributeBase::schedule
const label nProcs = Pstream::nProcs(comm); const label nProcs = Pstream::nProcs(comm);
// Communications: send and receive processor // Communications: send and receive processor
DynamicList<labelPair> allComms; List<labelPair> allComms;
{ {
labelPairHashSet commsSet(nProcs); labelPairHashSet commsSet(nProcs);
@ -119,7 +119,7 @@ Foam::List<Foam::labelPair> Foam::mapDistributeBase::schedule
} }
// Broadcast: send comms information to all // Broadcast: send comms information to all
Pstream::scatter(allComms, tag, comm); Pstream::broadcast(allComms, comm);
// Determine my schedule. // Determine my schedule.
@ -133,7 +133,7 @@ Foam::List<Foam::labelPair> Foam::mapDistributeBase::schedule
); );
// Processors involved in my schedule // Processors involved in my schedule
return List<labelPair>(UIndirectList<labelPair>(allComms, mySchedule)); return List<labelPair>(allComms, mySchedule);
//if (debug) //if (debug)