ENH: globalIndex gather ops with reduced communication (#2332)

- for contiguous data, added mpiGatherOp() to complement the
  gatherOp() static method

- the gather ops (static methods) populate the globalIndex on the
  master only (not needed on other procs) for reduced communication

- rename inplace gather methods to include 'inplace' in their name.
  Regular gather methods return the gathered data directly, which
  allows the following:

      const scalarField mergedWeights(globalFaces().gather(wghtSum));

  vs.
      scalarField mergedWeights;
      globalFaces().gather(wghtSum, mergedWeights());

  or even:

      scalarField mergedWeights;
      List<scalarField> allWeights(Pstream::nProcs());
      allWeights[Pstream::myProcNo()] = wghtSum;
      Pstream::gatherList(allWeights);
      if (Pstream::master())
      {
          mergedWeights =
              ListListOps::combine<scalarField>
              (
                  allWeights, accessOp<scalarField>()
              );
       }

- add parRun guards on various globalIndex gather methods
  (simple copies or no-ops in serial) to simplify the effort for callers.
This commit is contained in:
Mark Olesen
2022-01-19 10:33:19 +01:00
parent 6b99fea4e7
commit e1f06bf38e
5 changed files with 656 additions and 360 deletions

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2021 OpenCFD Ltd. Copyright (C) 2021-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -198,15 +198,15 @@ int main(int argc, char *argv[])
// Do the brute-force method as well : collect all cell centres on all // Do the brute-force method as well : collect all cell centres on all
// processors // processors
pointField allCcs(globalNumbering.size());
globalNumbering.gather Info<< "Gathered/scattered cell centres:" << endl;
(
Pstream::worldComm, labelPair inOut;
Pstream::procID(Pstream::worldComm), pointField allCcs(globalNumbering.gather(mesh.cellCentres()));
mesh.cellCentres(), inOut[0] = allCcs.size();
allCcs
);
Pstream::scatter(allCcs); Pstream::scatter(allCcs);
inOut[1] = allCcs.size();
Pout<< " " << inOut << endl;
// Compare // Compare
forAll(ccs, i) forAll(ccs, i)
@ -239,10 +239,13 @@ int main(int argc, char *argv[])
Info<< "local-sizes: " << globalPointsPtr().sizes() << nl; Info<< "local-sizes: " << globalPointsPtr().sizes() << nl;
UIndirectList<point> procPoints(mesh.points(), uniqueMeshPointLabels); pointField patchPoints
pointField patchPoints; (
globalPointsPtr().gather
globalPointsPtr().gather(procPoints, patchPoints); (
UIndirectList<point>(mesh.points(), uniqueMeshPointLabels)
)
);
Info<< "gathered point field = " << patchPoints.size() << " points\n"; Info<< "gathered point field = " << patchPoints.size() << " points\n";
} }

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2018-2021 OpenCFD Ltd. Copyright (C) 2018-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -176,6 +176,12 @@ void Foam::globalIndex::bin
} }
void Foam::globalIndex::reset(const label localSize)
{
reset(localSize, Pstream::msgType(), UPstream::worldComm, true);
}
void Foam::globalIndex::reset void Foam::globalIndex::reset
( (
const label localSize, const label localSize,
@ -189,6 +195,8 @@ void Foam::globalIndex::reset
if (len) if (len)
{ {
// Seed with localSize, zero elsewhere (for non-parallel branch) // Seed with localSize, zero elsewhere (for non-parallel branch)
// NB: can consider UPstream::listGatherValues
labelList localSizes(len, Zero); labelList localSizes(len, Zero);
localSizes[Pstream::myProcNo(comm)] = localSize; localSizes[Pstream::myProcNo(comm)] = localSize;
@ -202,6 +210,7 @@ void Foam::globalIndex::reset
} }
else else
{ {
// Nothing to do
offsets_.clear(); offsets_.clear();
} }
} }

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2018-2021 OpenCFD Ltd. Copyright (C) 2018-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -35,14 +35,14 @@ Description
label globalFacei = globalFaces.toGlobal(facei); label globalFacei = globalFaces.toGlobal(facei);
SourceFiles SourceFiles
globalIndexI.H
globalIndex.C globalIndex.C
globalIndexI.H
globalIndexTemplates.C globalIndexTemplates.C
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#ifndef globalIndex_H #ifndef Foam_globalIndex_H
#define globalIndex_H #define Foam_globalIndex_H
#include "Pstream.H" #include "Pstream.H"
#include "CompactListList.H" #include "CompactListList.H"
@ -175,7 +175,7 @@ public:
//- Reset from local size. //- Reset from local size.
// Does communication with default communicator and message tag. // Does communication with default communicator and message tag.
inline void reset(const label localSize); void reset(const label localSize);
//- Reset from local size. //- Reset from local size.
// Does communication with given communicator and message tag, // Does communication with given communicator and message tag,
@ -377,10 +377,10 @@ public:
// Other // Other
// Gather // Gather (helpers)
//- Collect data in processor order on master (== procIDs[0]). //- Collect data in processor order on master (== procIDs[0]).
// Offsets needed on master only. // \note offsets needed on master only.
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
static void gather static void gather
( (
@ -394,7 +394,7 @@ public:
); );
//- Collect indirect data in processor order on master //- Collect indirect data in processor order on master
// Offsets needed on master only. // \note offsets needed on master only.
template<class Type, class Addr> template<class Type, class Addr>
static void gather static void gather
( (
@ -407,8 +407,21 @@ public:
const Pstream::commsTypes = Pstream::commsTypes::scheduled const Pstream::commsTypes = Pstream::commsTypes::scheduled
); );
//- Inplace collect in processor order on master (== procIDs[0]).
// \note offsets needed on master only.
template<class ProcIDsContainer, class Type>
static void gather
(
const labelUList& offsets,
const label comm, //!< communicator
const ProcIDsContainer& procIDs,
List<Type>& fld,
const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking
);
//- Collect data in processor order on master (== procIDs[0]). //- Collect data in processor order on master (== procIDs[0]).
// Offsets needed on master only. // \note the globalIndex offsets needed on master only.
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
void gather void gather
( (
@ -424,45 +437,8 @@ public:
gather(offsets_, comm, procIDs, fld, allFld, tag, commsType); gather(offsets_, comm, procIDs, fld, allFld, tag, commsType);
} }
//- Collect data in processor order on master.
// Does communication with default communicator and message tag.
template<class Type>
void gather
(
const UList<Type>& fld,
List<Type>& allFld,
const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator
) const;
//- Collect data indirectly in processor order on master.
// Does communication with default communicator and message tag.
template<class Type, class Addr>
void gather
(
const IndirectListBase<Type, Addr>& fld,
List<Type>& allFld,
const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::scheduled,
const label comm = UPstream::worldComm //!< communicator
) const;
//- Inplace collect in processor order on master (== procIDs[0]). //- Inplace collect in processor order on master (== procIDs[0]).
//- Needs offsets only on master. // \note the globalIndex offsets needed on master only.
template<class ProcIDsContainer, class Type>
static void gather
(
const labelUList& offsets,
const label comm, //!< communicator
const ProcIDsContainer& procIDs,
List<Type>& fld,
const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking
);
//- Inplace collect in processor order on master (== procIDs[0]).
//- Needs offsets only on master.
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
void gather void gather
( (
@ -477,11 +453,68 @@ public:
gather(offsets_, comm, procIDs, fld, tag, commsType); gather(offsets_, comm, procIDs, fld, tag, commsType);
} }
//- Inplace collect data in processor order on master
// Does communication with default communicator and message tag. // Gather
// After the gather, the field is zero-sized on the slaves.
//- Collect data in processor order on master
//- (in serial: performs a simple copy).
// Communication with default/specified communicator, message tag.
template<class Type> template<class Type>
void gather void gather
(
const UList<Type>& sendData,
List<Type>& allData,
const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator
) const;
//- Collect data indirectly in processor order on master
//- (in serial: performs a simple copy).
// Communication with default/specified communicator, message tag.
template<class Type, class Addr>
void gather
(
const IndirectListBase<Type, Addr>& sendData,
List<Type>& allData,
const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::scheduled,
const label comm = UPstream::worldComm //!< communicator
) const;
//- Collect data in processor order on master
//- (in serial: performs a simple copy).
// Communication with default/specified communicator, message tag.
//
// \return output (master), zero-sized on non-master
template<class Type, class OutputContainer = List<Type>>
OutputContainer gather
(
const UList<Type>& sendData,
const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator
) const;
//- Collect data indirectly in processor order on master.
// Communication with default/specified communicator, message tag.
//
// \return output (master), zero-sized on non-master
template<class Type, class Addr, class OutputContainer = List<Type>>
OutputContainer gather
(
const IndirectListBase<Type, Addr>& sendData,
const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::scheduled,
const label comm = UPstream::worldComm //!< communicator
) const;
//- Inplace collect data in processor order on master
//- (in serial: a no-op).
// Communication with default/specified communicator, message tag.
// After the gather, the field is zero-sized on the slaves.
template<class Type>
void gatherInplace
( (
List<Type>& fld, List<Type>& fld,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
@ -490,19 +523,27 @@ public:
) const; ) const;
//- Collect \em contiguous data using a MPI_Gatherv call //- Collect \em contiguous data using a MPI_Gatherv call
//- (in serial: performs a simple copy).
// Communication with default/specified communicator.
// \attention The nProcs for globalIndex and communicator // \attention The nProcs for globalIndex and communicator
// must match!! // must match!!
//
// The allData is output (master), zero-sized on non-master
template<class Type, class OutputContainer = List<Type>> template<class Type, class OutputContainer = List<Type>>
void mpiGather void mpiGather
( (
const UList<Type>& sendData, const UList<Type>& sendData,
OutputContainer& allValues, OutputContainer& allData,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
) const; ) const;
//- Collect \em contiguous data using a MPI_Gatherv call //- Collect \em contiguous data using a MPI_Gatherv call
//- (in serial: performs a simple copy).
// Communication with default/specified communicator.
// \attention The nProcs for globalIndex and communicator // \attention The nProcs for globalIndex and communicator
// must match!! // must match!!
//
// \return output (master), zero-sized on non-master
template<class Type, class OutputContainer = List<Type>> template<class Type, class OutputContainer = List<Type>>
OutputContainer mpiGather OutputContainer mpiGather
( (
@ -510,33 +551,139 @@ public:
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
) const; ) const;
//- Inplace collect \em contiguous data using a MPI_Gatherv call
//- (in serial: a no-op).
// Communication with default/specified communicator.
// \attention The nProcs for globalIndex and communicator
// must match!!
//
// After the gather, the field is zero-sized on non-master.
template<class Type>
void mpiGatherInplace
(
List<Type>& fld,
const label comm = UPstream::worldComm //!< communicator
) const;
//- Collect data in processor order on master.
// Does communication with default communicator and message tag. // Gather Operations
//- Collect \em contiguous data using a MPI_Gatherv call
//- (in serial: performs a simple copy).
// Communication with default/specified communicator.
//
// The allData is output (master), zero-sized on non-master
template<class Type, class OutputContainer = List<Type>>
static void mpiGatherOp
(
const UList<Type>& sendData,
OutputContainer& allData,
const label comm = UPstream::worldComm //!< communicator
);
//- Collect \em contiguous data using a MPI_Gatherv call
//- (in serial: performs a simple copy).
// Communication with default/specified communicator.
//
// \return output (master), zero-sized on non-master
template<class Type, class OutputContainer = List<Type>>
static OutputContainer mpiGatherOp
(
const UList<Type>& sendData,
const label comm = UPstream::worldComm //!< communicator
);
//- Inplace collect \em contiguous data using a MPI_Gatherv call
//- (in serial: a no-op).
// Communication with default/specified communicator.
//
// After the gather, the field is zero-sized on non-master.
template<class Type>
static void mpiGatherInplaceOp
(
List<Type>& fld,
const label comm = UPstream::worldComm //!< communicator
);
//- Collect data in processor order on master
//- (in serial: performs a simple copy).
// Communication with default/specified communicator, message tag.
//
// The allFld is output (master), zero-sized on non-master
template<class Type> template<class Type>
static void gatherOp static void gatherOp
( (
const UList<Type>& fld, const UList<Type>& sendData,
List<Type>& allFld, List<Type>& allData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking const Pstream::commsTypes = Pstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator
);
//- Collect data in processor order on master
//- (in serial: performs a simple copy).
// Communication with default/specified communicator, message tag.
//
// The allFld is output (master), zero-sized on non-master
template<class Type, class Addr>
static void gatherOp
(
const IndirectListBase<Type, Addr>& sendData,
List<Type>& allData,
const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator
);
//- Collect and return data in processor order on master
//- (in serial: performs a simple copy).
// Communication with default/specified communicator, message tag.
//
// \return output (master), zero-sized on non-master
template<class Type, class OutputContainer = List<Type>>
static OutputContainer gatherOp
(
const UList<Type>& sendData,
const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator
);
//- Collect and return data in processor order on master
//- (in serial: performs a simple copy).
// Communication with default/specified communicator, message tag.
//
// \return output (master), zero-sized on non-master
template<class Type, class Addr, class OutputContainer = List<Type>>
static OutputContainer gatherOp
(
const IndirectListBase<Type, Addr>& sendData,
const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator
); );
//- Inplace collect data in processor order on master //- Inplace collect data in processor order on master
// Does communication with default communicator and message tag. //- (in serial: a no-op).
// Communication with default/specified communicator, message tag.
//
// After the gather, the field is zero-sized on the slaves. // After the gather, the field is zero-sized on the slaves.
template<class Type> template<class Type>
static void gatherOp static void gatherInplaceOp
( (
List<Type>& fld, List<Type>& fld,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking const Pstream::commsTypes = Pstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator
); );
// Scatter // Scatter
//- Distribute data in processor order. Requires fld to be sized! //- Distribute data in processor order.
// Requires fld to be correctly sized!
// Communication with default/specified communicator, message tag.
// \note offsets needed on master only.
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
static void scatter static void scatter
( (
@ -549,7 +696,10 @@ public:
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking const Pstream::commsTypes = Pstream::commsTypes::nonBlocking
); );
//- Distribute data in processor order. Requires fld to be sized! //- Distribute data in processor order.
// Requires fld to be correctly sized!
// Communication with default/specified communicator, message tag.
// \note the globalIndex offsets needed on master only.
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
void scatter void scatter
( (
@ -565,8 +715,10 @@ public:
scatter(offsets_, comm, procIDs, allFld, fld, tag, commsType); scatter(offsets_, comm, procIDs, allFld, fld, tag, commsType);
} }
//- Distribute data in processor order. Requires fld to be sized! //- Distribute data in processor order.
// Does communication with default communicator and message tag. // Requires fld to be correctly sized!
// Communication with default/specified communicator, message tag.
// \note the globalIndex offsets needed on master only.
template<class Type> template<class Type>
void scatter void scatter
( (
@ -582,6 +734,7 @@ public:
//- Get (potentially remote) data. //- Get (potentially remote) data.
//- Elements required given as global indices //- Elements required given as global indices
// Communication with default/specified communicator, message tag.
template<class Type, class CombineOp> template<class Type, class CombineOp>
void get void get
( (

View File

@ -154,12 +154,6 @@ inline Foam::label Foam::globalIndex::size() const
} }
inline void Foam::globalIndex::reset(const label localSize)
{
reset(localSize, Pstream::msgType(), UPstream::worldComm, true);
}
inline Foam::label Foam::globalIndex::offset(const label proci) const inline Foam::label Foam::globalIndex::offset(const label proci) const
{ {
return offsets_[proci]; return offsets_[proci];

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2013-2017 OpenFOAM Foundation Copyright (C) 2013-2017 OpenFOAM Foundation
Copyright (C) 2019-2021 OpenCFD Ltd. Copyright (C) 2019-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -68,12 +68,10 @@ Foam::globalIndex::calcListOffsets
} }
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
void Foam::globalIndex::gather void Foam::globalIndex::gather
( (
const labelUList& off, const labelUList& off, // needed on master only
const label comm, const label comm,
const ProcIDsContainer& procIDs, const ProcIDsContainer& procIDs,
const UList<Type>& fld, const UList<Type>& fld,
@ -82,6 +80,8 @@ void Foam::globalIndex::gather
const Pstream::commsTypes commsType const Pstream::commsTypes commsType
) )
{ {
// low-level: no parRun guard
if if
( (
!is_contiguous<Type>::value !is_contiguous<Type>::value
@ -94,6 +94,8 @@ void Foam::globalIndex::gather
// Could also warn and change to scheduled etc... // Could also warn and change to scheduled etc...
} }
const label startOfRequests = UPstream::nRequests();
if (Pstream::myProcNo(comm) == procIDs[0]) if (Pstream::myProcNo(comm) == procIDs[0])
{ {
allFld.resize_nocopy(off.last()); allFld.resize_nocopy(off.last());
@ -106,53 +108,12 @@ void Foam::globalIndex::gather
SubList<Type>(allFld, off[1]-off[0], off[0]) = SubList<Type>(allFld, off[1]-off[0], off[0]) =
SubList<Type>(fld, off[1]-off[0]); SubList<Type>(fld, off[1]-off[0]);
if for (label i = 1; i < procIDs.size(); ++i)
(
commsType == Pstream::commsTypes::scheduled
|| commsType == Pstream::commsTypes::blocking
)
{ {
for (label i = 1; i < procIDs.size(); ++i) SubList<Type> procSlot(allFld, off[i+1]-off[i], off[i]);
if (is_contiguous<Type>::value)
{ {
SubList<Type> procSlot(allFld, off[i+1]-off[i], off[i]);
if (is_contiguous<Type>::value)
{
IPstream::read
(
commsType,
procIDs[i],
procSlot.data_bytes(),
procSlot.size_bytes(),
tag,
comm
);
}
else
{
IPstream fromProc
(
commsType,
procIDs[i],
0,
tag,
comm
);
fromProc >> procSlot;
}
}
}
else
{
// nonBlocking && is_contiguous == true (already checked)
const label startOfRequests = Pstream::nRequests();
// Set up reads
for (label i = 1; i < procIDs.size(); ++i)
{
SubList<Type> procSlot(allFld, off[i+1]-off[i], off[i]);
IPstream::read IPstream::read
( (
commsType, commsType,
@ -163,51 +124,24 @@ void Foam::globalIndex::gather
comm comm
); );
} }
// Wait for all to finish
Pstream::waitRequests(startOfRequests);
}
}
else
{
if
(
commsType == Pstream::commsTypes::scheduled
|| commsType == Pstream::commsTypes::blocking
)
{
if (is_contiguous<Type>::value)
{
OPstream::write
(
commsType,
procIDs[0],
fld.cdata_bytes(),
fld.size_bytes(),
tag,
comm
);
}
else else
{ {
OPstream toMaster IPstream fromProc
( (
commsType, commsType,
procIDs[0], procIDs[i],
0, 0,
tag, tag,
comm comm
); );
toMaster << fld; fromProc >> procSlot;
} }
} }
else }
else
{
if (is_contiguous<Type>::value)
{ {
// nonBlocking && is_contiguous == true (already checked)
const label startOfRequests = Pstream::nRequests();
// Set up write
OPstream::write OPstream::write
( (
commsType, commsType,
@ -217,10 +151,25 @@ void Foam::globalIndex::gather
tag, tag,
comm comm
); );
// Wait for all to finish
Pstream::waitRequests(startOfRequests);
} }
else
{
OPstream toMaster
(
commsType,
procIDs[0],
0,
tag,
comm
);
toMaster << fld;
}
}
if (commsType == Pstream::commsTypes::nonBlocking)
{
// Wait for all to finish
UPstream::waitRequests(startOfRequests);
} }
} }
@ -228,7 +177,7 @@ void Foam::globalIndex::gather
template<class Type, class Addr> template<class Type, class Addr>
void Foam::globalIndex::gather void Foam::globalIndex::gather
( (
const labelUList& off, const labelUList& off, // needed on master only
const label comm, const label comm,
const UList<int>& procIDs, const UList<int>& procIDs,
const IndirectListBase<Type, Addr>& fld, const IndirectListBase<Type, Addr>& fld,
@ -237,6 +186,8 @@ void Foam::globalIndex::gather
const Pstream::commsTypes commsType const Pstream::commsTypes commsType
) )
{ {
// low-level: no parRun guard
if (commsType == Pstream::commsTypes::nonBlocking) if (commsType == Pstream::commsTypes::nonBlocking)
{ {
WarningInFunction WarningInFunction
@ -289,55 +240,10 @@ void Foam::globalIndex::gather
} }
template<class Type>
void Foam::globalIndex::gather
(
const UList<Type>& fld,
List<Type>& allFld,
const int tag,
const Pstream::commsTypes commsType,
const label comm
) const
{
gather
(
comm,
UPstream::procID(comm),
fld,
allFld,
tag,
commsType
);
}
template<class Type, class Addr>
void Foam::globalIndex::gather
(
const IndirectListBase<Type, Addr>& fld,
List<Type>& allFld,
const int tag,
const Pstream::commsTypes commsType,
const label comm
) const
{
gather
(
offsets_,
comm,
UPstream::procID(comm),
fld,
allFld,
tag,
commsType
);
}
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
void Foam::globalIndex::gather void Foam::globalIndex::gather
( (
const labelUList& off, const labelUList& off, // needed on master only
const label comm, const label comm,
const ProcIDsContainer& procIDs, const ProcIDsContainer& procIDs,
List<Type>& fld, List<Type>& fld,
@ -345,19 +251,123 @@ void Foam::globalIndex::gather
const Pstream::commsTypes commsType const Pstream::commsTypes commsType
) )
{ {
List<Type> allFld; // low-level: no parRun guard
gather(off, comm, procIDs, fld, allFld, tag, commsType); List<Type> allData;
gather(off, comm, procIDs, fld, allData, tag, commsType);
if (Pstream::myProcNo(comm) == procIDs[0]) if (Pstream::myProcNo(comm) == procIDs[0])
{ {
fld.transfer(allFld); fld.transfer(allData);
} }
} }
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
template<class Type> template<class Type>
void Foam::globalIndex::gather void Foam::globalIndex::gather
(
const UList<Type>& sendData,
List<Type>& allData,
const int tag,
const Pstream::commsTypes commsType,
const label comm
) const
{
if (UPstream::parRun())
{
gather
(
offsets_, // needed on master only
comm,
UPstream::procID(comm),
sendData,
allData,
tag,
commsType
);
if (!UPstream::master(comm))
{
allData.clear(); // safety: zero-size on non-master
}
}
else
{
// Serial: direct copy
allData = sendData;
}
}
template<class Type, class Addr>
void Foam::globalIndex::gather
(
const IndirectListBase<Type, Addr>& sendData,
List<Type>& allData,
const int tag,
const Pstream::commsTypes commsType,
const label comm
) const
{
if (UPstream::parRun())
{
gather
(
offsets_, // needed on master only
comm,
UPstream::procID(comm),
sendData,
allData,
tag,
commsType
);
if (!UPstream::master(comm))
{
allData.clear(); // safety: zero-size on non-master
}
}
else
{
// Serial: direct copy
allData = List<Type>(sendData);
}
}
template<class Type, class OutputContainer>
OutputContainer Foam::globalIndex::gather
(
const UList<Type>& sendData,
const int tag,
const Pstream::commsTypes commsType,
const label comm
) const
{
OutputContainer allData;
gather(sendData, allData, tag, commsType, comm);
return allData;
}
template<class Type, class Addr, class OutputContainer>
OutputContainer Foam::globalIndex::gather
(
const IndirectListBase<Type, Addr>& sendData,
const int tag,
const Pstream::commsTypes commsType,
const label comm
) const
{
OutputContainer allData;
gather(sendData, allData, tag, commsType, comm);
return allData;
}
template<class Type>
void Foam::globalIndex::gatherInplace
( (
List<Type>& fld, List<Type>& fld,
const int tag, const int tag,
@ -365,26 +375,30 @@ void Foam::globalIndex::gather
const label comm const label comm
) const ) const
{ {
List<Type> allFld; if (UPstream::parRun())
gather
(
comm,
UPstream::procID(comm),
fld,
allFld,
tag,
commsType
);
if (Pstream::master(comm))
{ {
fld.transfer(allFld); List<Type> allData;
}
else gather
{ (
fld.clear(); comm,
UPstream::procID(comm),
fld,
allData,
tag,
commsType
);
if (UPstream::master(comm))
{
fld.transfer(allData);
}
else
{
fld.clear(); // zero-size on non-master
}
} }
// Serial: (no-op)
} }
@ -392,10 +406,16 @@ template<class Type, class OutputContainer>
void Foam::globalIndex::mpiGather void Foam::globalIndex::mpiGather
( (
const UList<Type>& sendData, const UList<Type>& sendData,
OutputContainer& allValues, OutputContainer& allData,
const label comm const label comm
) const ) const
{ {
if (!UPstream::parRun())
{
// Serial: direct copy
allData = sendData;
return;
}
if (!is_contiguous<Type>::value) if (!is_contiguous<Type>::value)
{ {
FatalErrorInFunction FatalErrorInFunction
@ -403,29 +423,25 @@ void Foam::globalIndex::mpiGather
<< abort(FatalError); << abort(FatalError);
} }
const label proci = Pstream::myProcNo(comm);
const globalIndex& globalAddr = *this;
// Must be the same as Pstream::nProcs(comm), at least on master!!
const label nproc = globalAddr.nProcs();
auto nSendBytes = sendData.size_bytes(); auto nSendBytes = sendData.size_bytes();
// Respect local size information so that we can request
// 0 entries to be sent on master
if (proci < nproc && !globalAddr.localSize(proci))
{
nSendBytes = 0;
}
List<int> recvSizes; List<int> recvSizes;
List<int> recvOffsets; List<int> recvOffsets;
if (Pstream::master(comm)) if (UPstream::master(comm))
{ {
allValues.resize_nocopy(globalAddr.size()); const globalIndex& globalAddr = *this;
// Must be the same as Pstream::nProcs(comm), at least on master!!
const label nproc = globalAddr.nProcs();
// Allow request of 0 entries to be sent on master
if (!globalAddr.localSize(0))
{
nSendBytes = 0;
}
allData.resize_nocopy(globalAddr.size());
recvSizes.resize(nproc); recvSizes.resize(nproc);
recvOffsets.resize(nproc+1); recvOffsets.resize(nproc+1);
@ -439,14 +455,14 @@ void Foam::globalIndex::mpiGather
} }
else else
{ {
allValues.clear(); allData.clear(); // safety: zero-size on non-master
} }
UPstream::gather UPstream::gather
( (
sendData.cdata_bytes(), sendData.cdata_bytes(),
nSendBytes, nSendBytes,
allValues.data_bytes(), allData.data_bytes(),
recvSizes, recvSizes,
recvOffsets, recvOffsets,
comm comm
@ -461,41 +477,210 @@ OutputContainer Foam::globalIndex::mpiGather
const label comm const label comm
) const ) const
{ {
OutputContainer allValues; OutputContainer allData;
mpiGather<Type, OutputContainer>(sendData, allValues, comm); mpiGather(sendData, allData, comm);
return allValues; return allData;
}
template<class Type>
void Foam::globalIndex::mpiGatherInplace
(
List<Type>& fld,
const label comm
) const
{
if (UPstream::parRun())
{
List<Type> allData;
mpiGather(fld, allData, comm);
if (UPstream::master(comm))
{
fld.transfer(allData);
}
else
{
fld.clear(); // zero-size on non-master
}
}
// Serial: (no-op)
}
template<class Type, class OutputContainer>
void Foam::globalIndex::mpiGatherOp
(
const UList<Type>& sendData,
OutputContainer& allData,
const label comm
)
{
if (UPstream::parRun())
{
// Gather sizes - only needed on master
globalIndex
(
UPstream::listGatherValues(sendData.size(), comm),
accessType::SIZES
).mpiGather(sendData, allData, comm);
}
else
{
// Serial: direct copy
allData = sendData;
}
}
template<class Type, class OutputContainer>
OutputContainer Foam::globalIndex::mpiGatherOp
(
const UList<Type>& sendData,
const label comm
)
{
OutputContainer allData;
mpiGatherOp(sendData, allData, comm);
return allData;
}
template<class Type>
void Foam::globalIndex::mpiGatherInplaceOp
(
List<Type>& fld,
const label comm
)
{
if (UPstream::parRun())
{
List<Type> allData;
mpiGatherOp(fld, allData, comm);
if (UPstream::master(comm))
{
fld.transfer(allData);
}
else
{
fld.clear(); // zero-size on non-master
}
}
// Serial: (no-op)
} }
template<class Type> template<class Type>
void Foam::globalIndex::gatherOp void Foam::globalIndex::gatherOp
( (
const UList<Type>& fld, const UList<Type>& sendData,
List<Type>& allFld, List<Type>& allData,
const int tag, const int tag,
const Pstream::commsTypes commsType const Pstream::commsTypes commsType,
const label comm
) )
{ {
globalIndex(fld.size()).gather(fld, allFld, tag, commsType); if (UPstream::parRun())
{
// Gather sizes - only needed on master
globalIndex
(
UPstream::listGatherValues(sendData.size(), comm),
accessType::SIZES
).gather(sendData, allData, tag, commsType, comm);
}
else
{
// Serial: direct copy
allData = sendData;
}
}
template<class Type, class Addr>
void Foam::globalIndex::gatherOp
(
const IndirectListBase<Type, Addr>& sendData,
List<Type>& allData,
const int tag,
const Pstream::commsTypes commsType,
const label comm
)
{
if (UPstream::parRun())
{
// Gather sizes - only needed on master
globalIndex
(
UPstream::listGatherValues(sendData.size(), comm),
accessType::SIZES
).gather(sendData, allData, tag, commsType, comm);
}
else
{
// Serial: direct copy
allData = List<Type>(sendData);
}
}
template<class Type, class OutputContainer>
OutputContainer Foam::globalIndex::gatherOp
(
const UList<Type>& sendData,
const int tag,
const Pstream::commsTypes commsType,
const label comm
)
{
OutputContainer allData;
gatherOp(sendData, allData, tag, commsType, comm);
return allData;
}
template<class Type, class Addr, class OutputContainer>
OutputContainer Foam::globalIndex::gatherOp
(
const IndirectListBase<Type, Addr>& sendData,
const int tag,
const Pstream::commsTypes commsType,
const label comm
)
{
OutputContainer allData;
gatherOp(sendData, allData, tag, commsType, comm);
return allData;
} }
template<class Type> template<class Type>
void Foam::globalIndex::gatherOp void Foam::globalIndex::gatherInplaceOp
( (
List<Type>& fld, List<Type>& fld,
const int tag, const int tag,
const Pstream::commsTypes commsType const Pstream::commsTypes commsType,
const label comm
) )
{ {
globalIndex(fld.size()).gather(fld, tag, commsType); if (UPstream::parRun())
{
// Gather sizes - only needed on master
globalIndex
(
UPstream::listGatherValues(fld.size(), comm),
accessType::SIZES
).gather(fld, tag, commsType, comm);
}
// Serial: (no-op)
} }
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
void Foam::globalIndex::scatter void Foam::globalIndex::scatter
( (
const labelUList& off, const labelUList& off, // needed on master only
const label comm, const label comm,
const ProcIDsContainer& procIDs, const ProcIDsContainer& procIDs,
const UList<Type>& allFld, const UList<Type>& allFld,
@ -516,6 +701,8 @@ void Foam::globalIndex::scatter
// Could also warn and change to scheduled etc... // Could also warn and change to scheduled etc...
} }
const label startOfRequests = UPstream::nRequests();
if (Pstream::myProcNo(comm) == procIDs[0]) if (Pstream::myProcNo(comm) == procIDs[0])
{ {
const SubList<Type> localSlot(allFld, off[1]-off[0], off[0]); const SubList<Type> localSlot(allFld, off[1]-off[0], off[0]);
@ -525,53 +712,12 @@ void Foam::globalIndex::scatter
fld.deepCopy(localSlot); fld.deepCopy(localSlot);
} }
if for (label i = 1; i < procIDs.size(); ++i)
(
commsType == Pstream::commsTypes::scheduled
|| commsType == Pstream::commsTypes::blocking
)
{ {
for (label i = 1; i < procIDs.size(); ++i) const SubList<Type> procSlot(allFld, off[i+1]-off[i], off[i]);
if (is_contiguous<Type>::value)
{ {
const SubList<Type> procSlot(allFld, off[i+1]-off[i], off[i]);
if (is_contiguous<Type>::value)
{
OPstream::write
(
commsType,
procIDs[i],
procSlot.cdata_bytes(),
procSlot.size_bytes(),
tag,
comm
);
}
else
{
OPstream toProc
(
commsType,
procIDs[i],
0,
tag,
comm
);
toProc << procSlot;
}
}
}
else
{
// nonBlocking && is_contiguous == true (already checked)
const label startOfRequests = Pstream::nRequests();
// Set up writes
for (label i = 1; i < procIDs.size(); ++i)
{
const SubList<Type> procSlot(allFld, off[i+1]-off[i], off[i]);
OPstream::write OPstream::write
( (
commsType, commsType,
@ -582,51 +728,24 @@ void Foam::globalIndex::scatter
comm comm
); );
} }
// Wait for all to finish
Pstream::waitRequests(startOfRequests);
}
}
else
{
if
(
commsType == Pstream::commsTypes::scheduled
|| commsType == Pstream::commsTypes::blocking
)
{
if (is_contiguous<Type>::value)
{
IPstream::read
(
commsType,
procIDs[0],
fld.data_bytes(),
fld.size_bytes(),
tag,
comm
);
}
else else
{ {
IPstream fromMaster OPstream toProc
( (
commsType, commsType,
procIDs[0], procIDs[i],
0, 0,
tag, tag,
comm comm
); );
fromMaster >> fld; toProc << procSlot;
} }
} }
else }
else
{
if (is_contiguous<Type>::value)
{ {
// nonBlocking && is_contiguous == true (already checked)
const label startOfRequests = Pstream::nRequests();
// Set up read
IPstream::read IPstream::read
( (
commsType, commsType,
@ -636,10 +755,25 @@ void Foam::globalIndex::scatter
tag, tag,
comm comm
); );
// Wait for all to finish
Pstream::waitRequests(startOfRequests);
} }
else
{
IPstream fromMaster
(
commsType,
procIDs[0],
0,
tag,
comm
);
fromMaster >> fld;
}
}
if (commsType == Pstream::commsTypes::nonBlocking)
{
// Wait for all to finish
UPstream::waitRequests(startOfRequests);
} }
} }
@ -654,16 +788,19 @@ void Foam::globalIndex::scatter
const label comm const label comm
) const ) const
{ {
scatter // TBD: protection and special handling for serial?
( {
offsets_, scatter
comm, (
UPstream::procID(comm), offsets_, // needed on master only
allFld, comm,
fld, UPstream::procID(comm),
tag, allFld,
commsType fld,
); tag,
commsType
);
}
} }