ENH: provide MPI native minOp, maxOp reduce multiple values

- consistent with sumOp

ENH: globalIndex with gatherNonLocal tag, and use leading dispatch tags

- useful for gather/write where the master data can be written
 separately.  Leading vs trailing dispatch tags for more similarity to
 other C++ conventions.
This commit is contained in:
Mark Olesen
2022-11-11 18:19:22 +01:00
parent beea22b2f0
commit 25e874a4f0
19 changed files with 363 additions and 144 deletions

View File

@ -428,7 +428,7 @@ void Foam::mergeAndWrite
// Renumber local ids -> global ids
globalIndex(mesh.nPoints()).inplaceToGlobal(mergedIDs);
globalIndex gatherer(mergedIDs.size(), globalIndex::gatherOnly{});
globalIndex gatherer(globalIndex::gatherOnly{}, mergedIDs.size());
gatherer.gatherInplace(mergedIDs);
gatherer.gatherInplace(mergedPts);
}

View File

@ -144,7 +144,7 @@ public:
void resetCloud(const label localCloudSize)
{
cloudGather_.reset(localCloudSize, globalIndex::gatherOnly{});
cloudGather_.reset(globalIndex::gatherOnly{}, localCloudSize);
origParcelIds_.resize_nocopy(localCloudSize);
origProcIds_.resize_nocopy(localCloudSize);
}

View File

@ -194,10 +194,15 @@ triSurface triangulate
// CGAL : every processor has whole surface
globalIndex globalFaceIdx(surface.size(), globalIndex::gatherOnly());
globalIndex globalPointIdx
const globalIndex globalFaceIdx
(
surface.points().size(), globalIndex::gatherOnly()
globalIndex::gatherOnly{},
surface.size()
);
const globalIndex globalPointIdx
(
globalIndex::gatherOnly{},
surface.points().size()
);
List<labelledTri> globalSurfaceTris(globalFaceIdx.gather(surface));

View File

@ -164,6 +164,16 @@ void reduce
#undef Pstream_CommonReductions
#define Pstream_CommonReductions(Native) \
\
/*! \brief Reduce (min) multiple Native values (same size all procs!) */ \
void reduce \
( \
Native values[], \
const int size, \
const minOp<Native>&, \
const int tag = UPstream::msgType(), /*!< (ignored) */ \
const label comm = UPstream::worldComm \
); \
\
/*! \brief Reduce (min) single Native value */ \
void reduce \
( \
@ -173,6 +183,29 @@ void reduce \
const label comm = UPstream::worldComm \
); \
\
/*! \brief Reduce (min) multiple Native values */ \
template<unsigned N> \
inline void reduce \
( \
FixedList<Native, N>& values, \
const minOp<Native>&, \
const int tag = UPstream::msgType(), /*!< (ignored) */ \
const label comm = UPstream::worldComm \
) \
{ \
reduce(values.data(), int(values.size()), minOp<Native>(), tag, comm); \
} \
\
/*! \brief Reduce (max) multiple Native values (same size all procs!) */ \
void reduce \
( \
Native values[], \
const int size, \
const maxOp<Native>&, \
const int tag, /*!< (ignored) */ \
const label comm \
); \
\
/*! \brief Reduce (max) single Native value */ \
void reduce \
( \
@ -182,14 +215,18 @@ void reduce \
const label comm = UPstream::worldComm \
); \
\
/*! \brief Reduce (sum) single Native value */ \
void reduce \
/*! \brief Reduce (max) multiple Native values */ \
template<unsigned N> \
inline void reduce \
( \
Native& value, \
const sumOp<Native>&, \
FixedList<Native, N>& values, \
const maxOp<Native>&, \
const int tag = UPstream::msgType(), /*!< (ignored) */ \
const label comm = UPstream::worldComm \
); \
) \
{ \
reduce(values.data(), int(values.size()), maxOp<Native>(), tag, comm); \
} \
\
/*! \brief Reduce (sum) multiple Native values (same size all procs!) */ \
void reduce \
@ -201,6 +238,15 @@ void reduce \
const label comm \
); \
\
/*! \brief Reduce (sum) single Native value */ \
void reduce \
( \
Native& value, \
const sumOp<Native>&, \
const int tag = UPstream::msgType(), /*!< (ignored) */ \
const label comm = UPstream::worldComm \
); \
\
/*! \brief Reduce (sum) multiple Native values */ \
template<unsigned N> \
inline void reduce \
@ -224,21 +270,21 @@ inline void reduce \
\
Pstream_CommonReductions(Native); \
\
/*! \brief Non-blocking reduce (sum) single Native value. Sets request */ \
/*! \brief Non-blocking reduce (sum) multiple Native values. Sets request */ \
void reduce \
( \
Native& value, \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, /*!< (ignored) */ \
const label comm, \
label& requestID \
); \
\
/*! \brief Non-blocking reduce (sum) multiple Native values. Sets request */ \
/*! \brief Non-blocking reduce (sum) single Native value. Sets request */ \
void reduce \
( \
Native values[], \
const int size, \
Native& value, \
const sumOp<Native>&, \
const int tag, /*!< (ignored) */ \
const label comm, \

View File

@ -1011,13 +1011,9 @@ bool Foam::polyBoundaryMesh::checkParallelSync(const bool report) const
localTypes.resize(nonProci);
// Check and report error(s) on master
// - don't need indexing on master itself
const globalIndex procAddr
(
// Don't need to collect master itself
(Pstream::master() ? 0 : nonProci),
globalIndex::gatherOnly{}
);
const globalIndex procAddr(globalIndex::gatherNonLocal{}, nonProci);
const wordList allNames(procAddr.gather(localNames));
const wordList allTypes(procAddr.gather(localTypes));

View File

@ -767,12 +767,12 @@ bool Foam::ZoneMesh<ZoneType, MeshType>::checkParallelSync
const wordList localTypes(this->types());
// Check and report error(s) on master
// - don't need indexing on master itself
const globalIndex procAddr
(
// Don't need to collect master itself
(Pstream::master() ? 0 : localNames.size()),
globalIndex::gatherOnly{}
globalIndex::gatherNonLocal{},
localNames.size()
);
const wordList allNames(procAddr.gather(localNames));

View File

@ -55,13 +55,13 @@ void Foam::PatchTools::gatherAndMerge
typedef typename PrimitivePatch<FaceList, PointField>::face_type FaceType;
// Faces from all ranks
faceAddr = globalIndex(pp.size(), globalIndex::gatherOnly{});
faceAddr.reset(globalIndex::gatherOnly{}, pp.size());
// Points from all ranks
pointAddr = globalIndex
pointAddr.reset
(
(useLocal ? pp.localPoints().size() : pp.points().size()),
globalIndex::gatherOnly{}
globalIndex::gatherOnly{},
(useLocal ? pp.localPoints().size() : pp.points().size())
);
if (useLocal)
@ -96,14 +96,14 @@ void Foam::PatchTools::gatherAndMerge
{
const globalIndex localPointAddr
(
pp.localPoints().size(),
globalIndex::gatherOnly{}
globalIndex::gatherOnly{},
pp.localPoints().size()
);
const globalIndex bndPointAddr
(
pp.boundaryPoints().size(),
globalIndex::gatherOnly{}
globalIndex::gatherOnly{},
pp.boundaryPoints().size()
);
bndPointAddr.gather(pp.boundaryPoints(), boundaryPoints);

View File

@ -191,18 +191,6 @@ Foam::globalIndex::bin
}
void Foam::globalIndex::reset
(
const label localSize,
const globalIndex::gatherOnly,
const label comm
)
{
// Gather sizes (one-sided)
reset(UPstream::listGatherValues(localSize, comm));
}
void Foam::globalIndex::reset
(
const label localSize,

View File

@ -99,11 +99,19 @@ public:
//- Disambiguation tag (list construction dispatch)
enum accessType : char { OFFSETS, SIZES };
//- Dispatch tag
//- Dispatch tag.
//- Construct with a single (local size) entry, no communication.
struct gatherNone{};
//- Dispatch tag.
//- Construct 'one-sided' from local sizes,
//- using gather but no broadcast.
struct gatherOnly{};
//- Dispatch tag
struct gatherNone{};
//- Dispatch tag.
//- Construct 'one-sided' from the non-master local sizes
//- using gather but no broadcast.
struct gatherNonLocal{};
// Constructors
@ -136,24 +144,37 @@ public:
const bool parallel = UPstream::parRun() //!< use parallel comms
);
//- Construct by gathering local sizes without rescattering.
//- This 'one-sided' globalIndex will be empty on non-master processes.
//- Construct with a single (local size) entry, no communication
inline globalIndex
(
const globalIndex::gatherNone,
const label localSize,
const label comm = -1 //!< no communicator needed
);
//- Construct 'one-sided' from local sizes.
//- Uses UPstream::listGatherValues, but no broadcast.
//- Will be empty on non-master processes.
//
// \note can be used when Pstream::parRun() is false.
inline globalIndex
(
const label localSize,
const globalIndex::gatherOnly,
const label localSize,
const label comm = UPstream::worldComm //!< communicator
);
//- Construct with a single size entry.
//- No communication required
//- Construct 'one-sided' from the non-master local sizes
//- (ie, master size is treated as 0).
//- Uses UPstream::listGatherValues, but no broadcast.
//- Will be empty on non-master processes.
//
// \note can be used when Pstream::parRun() is false.
inline globalIndex
(
const globalIndex::gatherNonLocal,
const label localSize,
const globalIndex::gatherNone,
const label comm = -1 //!< no communicator needed
const label comm = UPstream::worldComm //!< communicator
);
//- Construct from Istream.
@ -220,17 +241,6 @@ public:
const bool parallel = UPstream::parRun() //!< use parallel comms
);
//- Reset by gathering local sizes without rescattering.
//- This 'one-sided' globalIndex will be empty on non-master processes.
//
// \note can be used when Pstream::parRun() is false.
void reset
(
const label localSize,
const globalIndex::gatherOnly,
const label comm = UPstream::worldComm //!< communicator
);
//- Reset from list of local sizes,
//- with optional check for label overflow.
//- No communication required
@ -240,6 +250,39 @@ public:
const bool checkOverflow = false
);
//- Reset to a single (local size) entry, no communication
inline void reset
(
const globalIndex::gatherNone,
const label localSize,
const label comm = -1 //!< no communicator needed
);
//- Reset as 'one-sided' from local sizes.
//- Uses UPstream::listGatherValues, but no broadcast.
//- Will be empty on non-master processes.
//
// \note can be used when Pstream::parRun() is false.
inline void reset
(
const globalIndex::gatherOnly,
const label localSize,
const label comm = UPstream::worldComm //!< communicator
);
//- Reset as 'one-sided' from the non-master local sizes
//- (ie, master size is treated as 0).
//- Uses UPstream::listGatherValues, but no broadcast.
//- Will be empty on non-master processes.
//
// \note can be used when Pstream::parRun() is false.
inline void reset
(
const globalIndex::gatherNonLocal,
const label localSize,
const label comm = UPstream::worldComm //!< communicator
);
//- Alter local size for given processor
void setLocalSize(const label proci, const label len);
@ -853,6 +896,28 @@ public:
// Housekeeping
//- Construct with a single (local size) entry, no communication
globalIndex
(
const label localSize,
const globalIndex::gatherNone,
const label comm = -1 //!< no communicator needed
)
:
globalIndex(gatherNone{}, localSize, comm)
{}
//- Construct 'one-sided' from local sizes.
globalIndex
(
const label localSize,
const globalIndex::gatherOnly,
const label comm = UPstream::worldComm
)
:
globalIndex(gatherOnly{}, localSize, comm)
{}
//- Construct from local size, using gather/broadcast
//- with default/specified communicator if parallel.
FOAM_DEPRECATED_FOR(2022-03, "construct without message tag")
@ -884,6 +949,20 @@ public:
//- Prefer localStart() to avoid confusing with offsets()
FOAM_DEPRECATED_FOR(2022-02, "use localStart()")
label offset(const label proci) const { return localStart(proci); }
//- Reset as 'one-sided' from local sizes [gather, but no broadcast]
//- Will be empty on non-master processes.
//
// \note can be used when Pstream::parRun() is false.
void reset
(
const label localSize,
const globalIndex::gatherOnly,
const label comm = UPstream::worldComm //!< communicator
)
{
reset(globalIndex::gatherOnly{}, localSize, comm);
}
};

View File

@ -90,20 +90,8 @@ inline Foam::globalIndex::globalIndex
inline Foam::globalIndex::globalIndex
(
const label localSize,
const globalIndex::gatherOnly,
const label comm
)
{
// Gather sizes (one-sided)
reset(UPstream::listGatherValues(localSize, comm));
}
inline Foam::globalIndex::globalIndex
(
const label localSize,
const globalIndex::gatherNone,
const label localSize,
const label /* comm (ignored) */
)
:
@ -114,6 +102,37 @@ inline Foam::globalIndex::globalIndex
}
inline Foam::globalIndex::globalIndex
(
const globalIndex::gatherOnly,
const label localSize,
const label comm
)
{
// one-sided
reset(UPstream::listGatherValues(localSize, comm));
}
inline Foam::globalIndex::globalIndex
(
const globalIndex::gatherNonLocal,
const label localSize,
const label comm
)
{
// one-sided: non-master only
reset
(
UPstream::listGatherValues
(
(UPstream::master() ? label(0) : localSize),
comm
)
);
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
inline bool Foam::globalIndex::empty() const
@ -350,6 +369,50 @@ inline Foam::label Foam::globalIndex::whichProcID(const label i) const
}
inline void Foam::globalIndex::reset
(
const globalIndex::gatherNone,
const label localSize,
const label /* comm (ignored) */
)
{
offsets_.resize_nocopy(2);
offsets_[0] = 0;
offsets_[1] = localSize;
}
inline void Foam::globalIndex::reset
(
const globalIndex::gatherOnly,
const label localSize,
const label comm
)
{
// one-sided
reset(UPstream::listGatherValues(localSize, comm));
}
inline void Foam::globalIndex::reset
(
const globalIndex::gatherNonLocal,
const label localSize,
const label comm
)
{
// one-sided: non-master only
reset
(
UPstream::listGatherValues
(
(UPstream::master() ? label(0) : localSize),
comm
)
);
}
// * * * * * * * * * * * * * * * * Iterators * * * * * * * * * * * * * * * * //
inline Foam::globalIndex::const_iterator::

View File

@ -697,7 +697,7 @@ void Foam::globalIndex::mpiGatherOp
if (UPstream::parRun())
{
// Gather sizes - only needed on master
globalIndex(sendData.size(), globalIndex::gatherOnly{}, comm)
globalIndex(globalIndex::gatherOnly{}, sendData.size(), comm)
.mpiGather(sendData, allData, comm, commsType, tag);
}
else
@ -765,7 +765,7 @@ void Foam::globalIndex::gatherOp
if (UPstream::parRun())
{
// Gather sizes - only needed on master
globalIndex(sendData.size(), globalIndex::gatherOnly{}, comm)
globalIndex(globalIndex::gatherOnly{}, sendData.size(), comm)
.gather(sendData, allData, tag, commsType, comm);
}
else
@ -789,7 +789,7 @@ void Foam::globalIndex::gatherOp
if (UPstream::parRun())
{
// Gather sizes - only needed on master
globalIndex(sendData.size(), globalIndex::gatherOnly{}, comm)
globalIndex(globalIndex::gatherOnly{}, sendData.size(), comm)
.gather(sendData, allData, tag, commsType, comm);
}
else
@ -842,7 +842,7 @@ void Foam::globalIndex::gatherInplaceOp
if (UPstream::parRun())
{
// Gather sizes - only needed on master
globalIndex(fld.size(), globalIndex::gatherOnly{}, comm)
globalIndex(globalIndex::gatherOnly{}, fld.size(), comm)
.gather(fld, tag, commsType, comm);
}
// Serial: (no-op)

View File

@ -66,6 +66,36 @@ void Foam::reduce
#define Pstream_CommonReductions(Native) \
\
void Foam::reduce \
( \
Native values[], \
const int size, \
const minOp<Native>&, \
const int tag, \
const label comm \
) \
{} \
\
void Foam::reduce \
( \
Native values[], \
const int size, \
const maxOp<Native>&, \
const int tag, \
const label comm \
) \
{} \
\
void Foam::reduce \
( \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, \
const label comm \
) \
{} \
\
void Foam::reduce \
( \
Native& value, \
const minOp<Native>&, \
@ -90,16 +120,6 @@ void Foam::reduce \
const int tag, \
const label comm \
) \
{} \
\
void Foam::reduce \
( \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, \
const label comm \
) \
{}
@ -114,7 +134,8 @@ Pstream_CommonReductions(Native); \
\
void Foam::reduce \
( \
Native& value, \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, \
const label comm, \
@ -124,8 +145,7 @@ void Foam::reduce \
\
void Foam::reduce \
( \
Native values[], \
const int size, \
Native& value, \
const sumOp<Native>&, \
const int tag, \
const label comm, \

View File

@ -80,6 +80,51 @@ void Foam::reduce
#define Pstream_CommonReductions(Native, TaggedType) \
\
void Foam::reduce \
( \
Native values[], \
const int size, \
const minOp<Native>&, \
const int tag, /* (unused) */ \
const label comm \
) \
{ \
PstreamDetail::allReduce<Native> \
( \
values, size, TaggedType, MPI_MIN, comm \
); \
} \
\
void Foam::reduce \
( \
Native values[], \
const int size, \
const maxOp<Native>&, \
const int tag, /* (unused) */ \
const label comm \
) \
{ \
PstreamDetail::allReduce<Native> \
( \
values, size, TaggedType, MPI_MAX, comm \
); \
} \
\
void Foam::reduce \
( \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, /* (unused) */ \
const label comm \
) \
{ \
PstreamDetail::allReduce<Native> \
( \
values, size, TaggedType, MPI_SUM, comm \
); \
} \
\
void Foam::reduce \
( \
Native& value, \
const minOp<Native>&, \
@ -119,21 +164,6 @@ void Foam::reduce \
( \
&value, 1, TaggedType, MPI_SUM, comm \
); \
} \
\
void Foam::reduce \
( \
Native values[], \
const int size, \
const sumOp<Native>&, \
const int tag, /* (unused) */ \
const label comm \
) \
{ \
PstreamDetail::allReduce<Native> \
( \
values, size, TaggedType, MPI_SUM, comm \
); \
}
@ -147,21 +177,6 @@ void Foam::reduce \
Pstream_CommonReductions(Native, TaggedType); \
\
void Foam::reduce \
( \
Native& value, \
const sumOp<Native>&, \
const int tag, /* (unused) */ \
const label comm, \
label& requestID \
) \
{ \
PstreamDetail::allReduce<Native> \
( \
&value, 1, TaggedType, MPI_SUM, comm, &requestID \
); \
} \
\
void Foam::reduce \
( \
Native values[], \
const int size, \
@ -177,6 +192,21 @@ void Foam::reduce \
); \
} \
\
void Foam::reduce \
( \
Native& value, \
const sumOp<Native>&, \
const int tag, /* (unused) */ \
const label comm, \
label& requestID \
) \
{ \
PstreamDetail::allReduce<Native> \
( \
&value, 1, TaggedType, MPI_SUM, comm, &requestID \
); \
} \
\
void Foam::sumReduce \
( \
Native& value, \

View File

@ -398,13 +398,9 @@ void Foam::fvMeshDistribute::getFieldNames
if (syncPar && Pstream::parRun())
{
// Check and report error(s) on master
// - don't need indexing on master itself
const globalIndex procAddr
(
// Don't need to collect master itself
(Pstream::master() ? 0 : list.size()),
globalIndex::gatherOnly{}
);
const globalIndex procAddr(globalIndex::gatherNonLocal{}, list.size());
const wordList allNames(procAddr.gather(list));

View File

@ -93,8 +93,8 @@ void Foam::ensightOutput::Detail::writeFieldContent
const globalIndex procAddr
(
parallel
? globalIndex(fld.size(), globalIndex::gatherOnly{})
: globalIndex(fld.size(), globalIndex::gatherNone{})
? globalIndex(globalIndex::gatherOnly{}, fld.size())
: globalIndex(globalIndex::gatherNone{}, fld.size())
);

View File

@ -567,8 +567,8 @@ bool Foam::vtk::fileWriter::writeProcIDs(const label nValues)
const globalIndex procAddr
(
parallel_
? globalIndex(nValues, globalIndex::gatherOnly{})
: globalIndex(nValues, globalIndex::gatherNone{})
? globalIndex(globalIndex::gatherOnly{}, nValues)
: globalIndex(globalIndex::gatherNone{}, nValues)
);
const label totalCount = procAddr.totalSize();

View File

@ -689,13 +689,9 @@ bool Foam::faBoundaryMesh::checkParallelSync(const bool report) const
localTypes.resize(nonProci);
// Check and report error(s) on master
// - don't need indexing on master itself
const globalIndex procAddr
(
// Don't need to collect master itself
(Pstream::master() ? 0 : nonProci),
globalIndex::gatherOnly{}
);
const globalIndex procAddr(globalIndex::gatherNonLocal{}, nonProci);
const wordList allNames(procAddr.gather(localNames));
const wordList allTypes(procAddr.gather(localTypes));

View File

@ -108,7 +108,7 @@ void Foam::functionObjects::externalCoupled::readColumns
) const
{
// Get sizes for all processors
const globalIndex globalFaces(nRows, globalIndex::gatherOnly{});
const globalIndex globalFaces(globalIndex::gatherOnly{}, nRows);
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
@ -179,7 +179,7 @@ void Foam::functionObjects::externalCoupled::readLines
) const
{
// Get sizes for all processors
const globalIndex globalFaces(nRows, globalIndex::gatherOnly{});
const globalIndex globalFaces(globalIndex::gatherOnly{}, nRows);
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);

View File

@ -164,7 +164,7 @@ void Foam::sampledSets::gatherAllSets()
{
const coordSet& coords = localSets[seti];
globalIndices_[seti].reset(coords.size(), globalIndex::gatherOnly{});
globalIndices_[seti].reset(globalIndex::gatherOnly{}, coords.size());
gatheredSets_.set(seti, coords.gatherSort(gatheredSorting_[seti]));
}
}