ENH: extend globalIndex mpiGather to use scalar/label components

- MPI_Gatherv requires contiguous data, but a byte-wise transfer can
  quickly exceed the 'int' limits used for MPI sizes/offsets. Thus
  gather label/scalar components when possible to increase the
  effective size limit.

  For non-contiguous types (or large contiguous data) now also
  reverts to manual handling

ENH: handle contiguous data in GAMGAgglomeration gather values

- delegate to globalIndex::gatherValues static method (new)
This commit is contained in:
Mark Olesen
2022-03-28 14:03:05 +02:00
parent 87e3b196b0
commit 6fa23bd7a6
6 changed files with 731 additions and 616 deletions

View File

@ -68,6 +68,29 @@ int main(int argc, char *argv[])
} }
// This now compiles (and works)
// - reverts to normal gather for non-contiguous
{
const wordList sendData({"hello", "world"});
// One-sided sizing! master only
const globalIndex allProcAddr
(
sendData.size(),
globalIndex::gatherOnly{}
);
Pout<< "listGather sizes: " << flatOutput(allProcAddr.sizes()) << nl;
// Collect all values
wordList allValues
(
allProcAddr.mpiGather(sendData)
);
Pout<< "all-data: " << allValues << endl;
}
// Gather all values // Gather all values
{ {
const auto& sendData = localValues; const auto& sendData = localValues;
@ -75,8 +98,8 @@ int main(int argc, char *argv[])
// One-sided sizing! master only // One-sided sizing! master only
const globalIndex allProcAddr const globalIndex allProcAddr
( (
UPstream::listGatherValues<label>(sendData.size()), sendData.size(),
globalIndex::SIZES globalIndex::gatherOnly{}
); );
Pout<< "listGather sizes: " << flatOutput(allProcAddr.sizes()) << nl; Pout<< "listGather sizes: " << flatOutput(allProcAddr.sizes()) << nl;

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2019-2021 OpenCFD Ltd. Copyright (C) 2019-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -206,9 +206,6 @@ void Foam::GAMGAgglomeration::agglomerateLduAddressing
boolList& faceFlipMap = faceFlipMap_[fineLevelIndex]; boolList& faceFlipMap = faceFlipMap_[fineLevelIndex];
label nFlipped = 0;
label nDissapear = 0;
forAll(faceRestrictAddr, fineFacei) forAll(faceRestrictAddr, fineFacei)
{ {
label coarseFacei = faceRestrictAddr[fineFacei]; label coarseFacei = faceRestrictAddr[fineFacei];
@ -225,7 +222,6 @@ void Foam::GAMGAgglomeration::agglomerateLduAddressing
if (cOwn == rmUpperAddr && cNei == rmLowerAddr) if (cOwn == rmUpperAddr && cNei == rmLowerAddr)
{ {
faceFlipMap[fineFacei] = true; faceFlipMap[fineFacei] = true;
nFlipped++;
} }
else if (cOwn == rmLowerAddr && cNei == rmUpperAddr) else if (cOwn == rmLowerAddr && cNei == rmUpperAddr)
{ {
@ -244,10 +240,6 @@ void Foam::GAMGAgglomeration::agglomerateLduAddressing
<< exit(FatalError); << exit(FatalError);
} }
} }
else
{
nDissapear++;
}
} }
@ -513,39 +505,37 @@ void Foam::GAMGAgglomeration::procAgglomerateRestrictAddressing
{ {
// Collect number of cells // Collect number of cells
labelList nFineCells; labelList nFineCells;
gatherList globalIndex::gatherValues
( (
comm, comm,
procIDs, procIDs,
restrictAddressing_[levelIndex].size(), restrictAddressing_[levelIndex].size(),
nFineCells nFineCells,
);
labelList offsets(nFineCells.size()+1); UPstream::msgType(),
{ UPstream::commsTypes::scheduled
offsets[0] = 0; );
forAll(nFineCells, i) labelList fineOffsets(globalIndex::calcOffsets(nFineCells));
{
offsets[i+1] = offsets[i] + nFineCells[i];
}
}
// Combine and renumber nCoarseCells // Combine and renumber nCoarseCells
labelList nCoarseCells; labelList nCoarseCells;
gatherList globalIndex::gatherValues
( (
comm, comm,
procIDs, procIDs,
nCells_[levelIndex], nCells_[levelIndex],
nCoarseCells nCoarseCells,
UPstream::msgType(),
UPstream::commsTypes::scheduled
); );
labelList coarseOffsets(globalIndex::calcOffsets(nCoarseCells));
// (cell)restrictAddressing // (cell)restrictAddressing
const globalIndex cellOffsetter(offsets);
labelList procRestrictAddressing; labelList procRestrictAddressing;
cellOffsetter.gather globalIndex::gather
( (
fineOffsets,
comm, comm,
procIDs, procIDs,
restrictAddressing_[levelIndex], restrictAddressing_[levelIndex],
@ -558,29 +548,22 @@ void Foam::GAMGAgglomeration::procAgglomerateRestrictAddressing
if (Pstream::myProcNo(comm) == procIDs[0]) if (Pstream::myProcNo(comm) == procIDs[0])
{ {
labelList coarseCellOffsets(procIDs.size()+1); nCells_[levelIndex] = coarseOffsets.last(); // ie, totalSize()
{
coarseCellOffsets[0] = 0;
forAll(procIDs, i)
{
coarseCellOffsets[i+1] = coarseCellOffsets[i]+nCoarseCells[i];
}
}
nCells_[levelIndex] = coarseCellOffsets.last();
// Renumber consecutively // Renumber consecutively
for (label proci = 1; proci < procIDs.size(); proci++) for (label proci = 1; proci < procIDs.size(); ++proci)
{ {
SubList<label> procSlot SubList<label> procSlot
( (
procRestrictAddressing, procRestrictAddressing,
offsets[proci+1]-offsets[proci], fineOffsets[proci+1]-fineOffsets[proci],
offsets[proci] fineOffsets[proci]
); );
// procSlot += coarseOffsets[proci];
forAll(procSlot, i) forAll(procSlot, i)
{ {
procSlot[i] += coarseCellOffsets[proci]; procSlot[i] += coarseOffsets[proci];
} }
} }
@ -688,51 +671,6 @@ void Foam::GAMGAgglomeration::combineLevels(const label curLevel)
} }
//void Foam::GAMGAgglomeration::gatherList
//(
// const label comm,
// const labelList& procIDs,
//
// const label myVal,
// labelList& vals,
// const int tag
//)
//{
// vals.setSize(procIDs.size());
//
// if (Pstream::myProcNo(comm) == procIDs[0])
// {
// vals[0] = myVal;
//
// for (label i=1; i<procIDs.size(); i++)
// {
// label& slaveVal = vals[i];
// IPstream::read
// (
// Pstream::commsTypes::scheduled,
// procIDs[i],
// reinterpret_cast<char*>(&slaveVal),
// sizeof(slaveVal),
// tag,
// comm
// );
// }
// }
// else
// {
// OPstream::write
// (
// Pstream::commsTypes::scheduled,
// procIDs[0],
// reinterpret_cast<const char*>(&myVal),
// sizeof(myVal),
// tag,
// comm
// );
// }
//}
void Foam::GAMGAgglomeration::calculateRegionMaster void Foam::GAMGAgglomeration::calculateRegionMaster
( (
const label comm, const label comm,

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2016 OpenFOAM Foundation Copyright (C) 2011-2016 OpenFOAM Foundation
Copyright (C) 2019-2020 OpenCFD Ltd. Copyright (C) 2019-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -37,8 +37,8 @@ SourceFiles
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#ifndef GAMGAgglomeration_H #ifndef Foam_GAMGAgglomeration_H
#define GAMGAgglomeration_H #define Foam_GAMGAgglomeration_H
#include "MeshObject.H" #include "MeshObject.H"
#include "lduPrimitiveMesh.H" #include "lduPrimitiveMesh.H"
@ -161,18 +161,6 @@ protected:
const label nCoarseCells const label nCoarseCells
) const; ) const;
//- Gather value from all procIDs onto procIDs[0]
template<class Type>
static void gatherList
(
const label comm,
const labelList& procIDs,
const Type& myVal,
List<Type>& allVals,
const int tag = Pstream::msgType()
);
void clearLevel(const label leveli); void clearLevel(const label leveli);

View File

@ -31,50 +31,6 @@ License
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
template<class Type>
void Foam::GAMGAgglomeration::gatherList
(
const label comm,
const labelList& procIDs,
const Type& myVal,
List<Type>& allVals,
const int tag
)
{
if (Pstream::myProcNo(comm) == procIDs[0])
{
allVals.setSize(procIDs.size());
allVals[0] = myVal;
for (label i=1; i<procIDs.size(); i++)
{
IPstream fromSlave
(
Pstream::commsTypes::scheduled,
procIDs[i],
0,
tag,
comm
);
fromSlave >> allVals[i];
}
}
else
{
OPstream toMaster
(
Pstream::commsTypes::scheduled,
procIDs[0],
0,
tag,
comm
);
toMaster << myVal;
}
}
template<class Type> template<class Type>
void Foam::GAMGAgglomeration::restrictField void Foam::GAMGAgglomeration::restrictField
( (
@ -114,11 +70,12 @@ void Foam::GAMGAgglomeration::restrictField
restrictField(cf, ff, fineToCoarse); restrictField(cf, ff, fineToCoarse);
label coarseLevelIndex = fineLevelIndex+1; const label coarseLevelIndex = fineLevelIndex+1;
if (procAgglom && hasProcMesh(coarseLevelIndex)) if (procAgglom && hasProcMesh(coarseLevelIndex))
{ {
label fineComm = UPstream::parent(procCommunicator_[coarseLevelIndex]); const label coarseComm =
UPstream::parent(procCommunicator_[coarseLevelIndex]);
const List<label>& procIDs = agglomProcIDs(coarseLevelIndex); const List<label>& procIDs = agglomProcIDs(coarseLevelIndex);
const labelList& offsets = cellOffsets(coarseLevelIndex); const labelList& offsets = cellOffsets(coarseLevelIndex);
@ -126,7 +83,7 @@ void Foam::GAMGAgglomeration::restrictField
globalIndex::gather globalIndex::gather
( (
offsets, offsets,
fineComm, coarseComm,
procIDs, procIDs,
cf, cf,
UPstream::msgType(), UPstream::msgType(),
@ -180,19 +137,17 @@ void Foam::GAMGAgglomeration::prolongField
{ {
const labelList& fineToCoarse = restrictAddressing_[levelIndex]; const labelList& fineToCoarse = restrictAddressing_[levelIndex];
label coarseLevelIndex = levelIndex+1; const label coarseLevelIndex = levelIndex+1;
if (procAgglom && hasProcMesh(coarseLevelIndex)) if (procAgglom && hasProcMesh(coarseLevelIndex))
{ {
label coarseComm = UPstream::parent const label coarseComm =
( UPstream::parent(procCommunicator_[coarseLevelIndex]);
procCommunicator_[coarseLevelIndex]
);
const List<label>& procIDs = agglomProcIDs(coarseLevelIndex); const List<label>& procIDs = agglomProcIDs(coarseLevelIndex);
const labelList& offsets = cellOffsets(coarseLevelIndex); const labelList& offsets = cellOffsets(coarseLevelIndex);
label localSize = nCells_[levelIndex]; const label localSize = nCells_[levelIndex];
Field<Type> allCf(localSize); Field<Type> allCf(localSize);
globalIndex::scatter globalIndex::scatter

View File

@ -408,51 +408,70 @@ public:
const bool checkOverflow = false const bool checkOverflow = false
); );
//- Collect single values in processor order on master (== procIDs[0]).
// Other // Handles contiguous/non-contiguous data.
template<class ProcIDsContainer, class Type>
// Gather (helpers) static void gatherValues
(
const label comm, //!< communicator
const ProcIDsContainer& procIDs,
const Type& localValue,
List<Type>& allValues, //! output field (master only)
const int tag = UPstream::msgType(),
const UPstream::commsTypes = UPstream::commsTypes::scheduled
);
//- Collect data in processor order on master (== procIDs[0]). //- Collect data in processor order on master (== procIDs[0]).
// \note offsets needed on master only. // Handles contiguous/non-contiguous data, skips empty fields.
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
static void gather static void gather
( (
const labelUList& offsets, const labelUList& offsets, //!< offsets (master only)
const label comm, //!< communicator const label comm, //!< communicator
const ProcIDsContainer& procIDs, const ProcIDsContainer& procIDs,
const UList<Type>& fld, const UList<Type>& fld,
List<Type>& allFld, List<Type>& allFld, //! output field (master only)
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking const UPstream::commsTypes = UPstream::commsTypes::nonBlocking
); );
//- Collect indirect data in processor order on master //- Collect indirect data in processor order on master
// \note offsets needed on master only. // Handles contiguous/non-contiguous data, skips empty fields.
template<class Type, class Addr> template<class Type, class Addr>
static void gather static void gather
( (
const labelUList& offsets, const labelUList& offsets, //!< offsets (master only)
const label comm, //!< communicator const label comm, //!< communicator
const UList<int>& procIDs, const UList<int>& procIDs,
const IndirectListBase<Type, Addr>& fld, const IndirectListBase<Type, Addr>& fld,
List<Type>& allFld, List<Type>& allFld, //! output field (master only)
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::scheduled const UPstream::commsTypes = UPstream::commsTypes::scheduled
); );
// Misc low-level gather routines
//- Inplace collect in processor order on master (== procIDs[0]). //- Inplace collect in processor order on master (== procIDs[0]).
// \note offsets needed on master only. // Note: adjust naming?
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
static void gather static void gather
( (
const labelUList& offsets, const labelUList& offsets, //!< offsets (master only)
const label comm, //!< communicator const label comm, //!< communicator
const ProcIDsContainer& procIDs, const ProcIDsContainer& procIDs,
List<Type>& fld, List<Type>& fld, //!< in/out field
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking const UPstream::commsTypes ct = UPstream::commsTypes::nonBlocking
); )
{
List<Type> allData;
gather(offsets, comm, procIDs, fld, allData, tag, ct);
if (Pstream::myProcNo(comm) == procIDs[0])
{
fld.transfer(allData);
}
}
//- Collect data in processor order on master (== procIDs[0]). //- Collect data in processor order on master (== procIDs[0]).
// \note the globalIndex offsets needed on master only. // \note the globalIndex offsets needed on master only.
@ -461,30 +480,29 @@ public:
( (
const label comm, //!< communicator const label comm, //!< communicator
const ProcIDsContainer& procIDs, const ProcIDsContainer& procIDs,
const UList<Type>& fld, const UList<Type>& fld, //!< input field
List<Type>& allFld, List<Type>& allFld, //! output field (master only)
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes commsType = const UPstream::commsTypes ct = UPstream::commsTypes::nonBlocking
Pstream::commsTypes::nonBlocking
) const ) const
{ {
gather(offsets_, comm, procIDs, fld, allFld, tag, commsType); gather(offsets_, comm, procIDs, fld, allFld, tag, ct);
} }
//- Inplace collect in processor order on master (== procIDs[0]). //- Inplace collect in processor order on master (== procIDs[0]).
// \note the globalIndex offsets needed on master only. // \note the globalIndex offsets needed on master only.
// Note: adjust naming?
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
void gather void gather
( (
const label comm, //!< communicator const label comm, //!< communicator
const ProcIDsContainer& procIDs, const ProcIDsContainer& procIDs,
List<Type>& fld, List<Type>& fld, //!< in/out field
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes commsType = const UPstream::commsTypes ct = UPstream::commsTypes::nonBlocking
Pstream::commsTypes::nonBlocking
) const ) const
{ {
gather(offsets_, comm, procIDs, fld, tag, commsType); gather(offsets_, comm, procIDs, fld, tag, ct);
} }
@ -499,7 +517,7 @@ public:
const UList<Type>& sendData, const UList<Type>& sendData,
List<Type>& allData, List<Type>& allData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking, const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
) const; ) const;
@ -512,7 +530,7 @@ public:
const IndirectListBase<Type, Addr>& sendData, const IndirectListBase<Type, Addr>& sendData,
List<Type>& allData, List<Type>& allData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::scheduled, const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
) const; ) const;
@ -526,7 +544,7 @@ public:
( (
const UList<Type>& sendData, const UList<Type>& sendData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking, const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
) const; ) const;
@ -539,7 +557,7 @@ public:
( (
const IndirectListBase<Type, Addr>& sendData, const IndirectListBase<Type, Addr>& sendData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::scheduled, const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
) const; ) const;
@ -552,11 +570,11 @@ public:
( (
List<Type>& fld, List<Type>& fld,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking, const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
) const; ) const;
//- Collect \em contiguous data using a MPI_Gatherv call //- Use MPI_Gatherv call for contiguous data when possible
//- (in serial: performs a simple copy). //- (in serial: performs a simple copy).
// Communication with default/specified communicator. // Communication with default/specified communicator.
// \attention The nProcs for globalIndex and communicator // \attention The nProcs for globalIndex and communicator
@ -568,10 +586,14 @@ public:
( (
const UList<Type>& sendData, const UList<Type>& sendData,
OutputContainer& allData, OutputContainer& allData,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm, //!< communicator
// For fallback routines:
const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const int tag = UPstream::msgType()
) const; ) const;
//- Collect \em contiguous data using a MPI_Gatherv call //- Use MPI_Gatherv call for contiguous data when possible
//- (in serial: performs a simple copy). //- (in serial: performs a simple copy).
// Communication with default/specified communicator. // Communication with default/specified communicator.
// \attention The nProcs for globalIndex and communicator // \attention The nProcs for globalIndex and communicator
@ -582,10 +604,15 @@ public:
OutputContainer mpiGather OutputContainer mpiGather
( (
const UList<Type>& sendData, const UList<Type>& sendData,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm, //!< communicator
// For fallback routines:
const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const int tag = UPstream::msgType()
) const; ) const;
//- Inplace collect \em contiguous data using a MPI_Gatherv call //- Use MPI_Gatherv call to inplace collect contiguous data
//- when possible.
//- (in serial: a no-op). //- (in serial: a no-op).
// Communication with default/specified communicator. // Communication with default/specified communicator.
// \attention The nProcs for globalIndex and communicator // \attention The nProcs for globalIndex and communicator
@ -596,13 +623,17 @@ public:
void mpiGatherInplace void mpiGatherInplace
( (
List<Type>& fld, List<Type>& fld,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm, //!< communicator
// For fallback routines:
const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const int tag = UPstream::msgType()
) const; ) const;
// Gather Operations // Gather Operations
//- Collect \em contiguous data using a MPI_Gatherv call //- Use MPI_Gatherv call to collect contiguous data when possible
//- (in serial: performs a simple copy). //- (in serial: performs a simple copy).
// Communication with default/specified communicator. // Communication with default/specified communicator.
// //
@ -612,10 +643,14 @@ public:
( (
const UList<Type>& sendData, const UList<Type>& sendData,
OutputContainer& allData, OutputContainer& allData,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm, //!< communicator
// For fallback routines:
const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const int tag = UPstream::msgType()
); );
//- Collect \em contiguous data using a MPI_Gatherv call //- Use MPI_Gatherv call to collect contiguous data when possible
//- (in serial: performs a simple copy). //- (in serial: performs a simple copy).
// Communication with default/specified communicator. // Communication with default/specified communicator.
// //
@ -624,10 +659,15 @@ public:
static OutputContainer mpiGatherOp static OutputContainer mpiGatherOp
( (
const UList<Type>& sendData, const UList<Type>& sendData,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm, //!< communicator
// For fallback routines:
const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const int tag = UPstream::msgType()
); );
//- Inplace collect \em contiguous data using a MPI_Gatherv call //- Use MPI_Gatherv call to inplace collect contiguous data
//- when possible.
//- (in serial: a no-op). //- (in serial: a no-op).
// Communication with default/specified communicator. // Communication with default/specified communicator.
// //
@ -636,7 +676,11 @@ public:
static void mpiGatherInplaceOp static void mpiGatherInplaceOp
( (
List<Type>& fld, List<Type>& fld,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm, //!< communicator
// For fallback routines:
const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const int tag = UPstream::msgType()
); );
//- Collect data in processor order on master //- Collect data in processor order on master
@ -650,7 +694,7 @@ public:
const UList<Type>& sendData, const UList<Type>& sendData,
List<Type>& allData, List<Type>& allData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking, const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
); );
@ -665,7 +709,7 @@ public:
const IndirectListBase<Type, Addr>& sendData, const IndirectListBase<Type, Addr>& sendData,
List<Type>& allData, List<Type>& allData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking, const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
); );
@ -679,7 +723,7 @@ public:
( (
const UList<Type>& sendData, const UList<Type>& sendData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking, const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
); );
@ -693,7 +737,7 @@ public:
( (
const IndirectListBase<Type, Addr>& sendData, const IndirectListBase<Type, Addr>& sendData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking, const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
); );
@ -707,7 +751,7 @@ public:
( (
List<Type>& fld, List<Type>& fld,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking, const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
); );
@ -717,17 +761,16 @@ public:
//- Distribute data in processor order. //- Distribute data in processor order.
// Requires fld to be correctly sized! // Requires fld to be correctly sized!
// Communication with default/specified communicator, message tag. // Communication with default/specified communicator, message tag.
// \note offsets needed on master only.
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
static void scatter static void scatter
( (
const labelUList& offsets, const labelUList& offsets, //!< offsets (master only)
const label comm, //!< communicator const label comm, //!< communicator
const ProcIDsContainer& procIDs, const ProcIDsContainer& procIDs,
const UList<Type>& allFld, const UList<Type>& allFld,
UList<Type>& fld, UList<Type>& fld,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking const UPstream::commsTypes = UPstream::commsTypes::nonBlocking
); );
//- Distribute data in processor order. //- Distribute data in processor order.
@ -742,11 +785,11 @@ public:
const UList<Type>& allFld, const UList<Type>& allFld,
UList<Type>& fld, UList<Type>& fld,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes commsType = const UPstream::commsTypes ct =
Pstream::commsTypes::nonBlocking UPstream::commsTypes::nonBlocking
) const ) const
{ {
scatter(offsets_, comm, procIDs, allFld, fld, tag, commsType); scatter(offsets_, comm, procIDs, allFld, fld, tag, ct);
} }
//- Distribute data in processor order. //- Distribute data in processor order.
@ -759,7 +802,7 @@ public:
const UList<Type>& allData, const UList<Type>& allData,
UList<Type>& localData, UList<Type>& localData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking, const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
) const; ) const;
@ -772,7 +815,7 @@ public:
( (
const UList<Type>& allData, const UList<Type>& allData,
const int tag = UPstream::msgType(), const int tag = UPstream::msgType(),
const Pstream::commsTypes = Pstream::commsTypes::nonBlocking, const UPstream::commsTypes = UPstream::commsTypes::nonBlocking,
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
) const; ) const;

View File

@ -64,6 +64,90 @@ Foam::globalIndex::calcListOffsets
} }
template<class ProcIDsContainer, class Type>
void Foam::globalIndex::gatherValues
(
const label comm,
const ProcIDsContainer& procIDs,
const Type& localValue,
List<Type>& allValues,
const int tag,
const UPstream::commsTypes preferredCommsType
)
{
// low-level: no parRun guard
// Automatically change from nonBlocking to scheduled for
// non-contiguous data.
const UPstream::commsTypes commsType =
(
(
!is_contiguous<Type>::value
&& UPstream::commsTypes::nonBlocking == preferredCommsType
)
? UPstream::commsTypes::scheduled
: preferredCommsType
);
const label startOfRequests = UPstream::nRequests();
if (UPstream::myProcNo(comm) == procIDs[0])
{
allValues.resize_nocopy(procIDs.size());
allValues[0] = localValue;
for (label i = 1; i < procIDs.size(); ++i)
{
if (is_contiguous<Type>::value)
{
IPstream::read
(
commsType,
procIDs[i],
reinterpret_cast<char*>(&allValues[i]),
sizeof(Type),
tag,
comm
);
}
else
{
IPstream fromProc(commsType, procIDs[i], 0, tag, comm);
fromProc >> allValues[i];
}
}
}
else
{
allValues.clear(); // safety: zero-size on non-master
if (is_contiguous<Type>::value)
{
OPstream::write
(
commsType,
procIDs[0],
reinterpret_cast<const char*>(&localValue),
sizeof(Type),
tag,
comm
);
}
else
{
OPstream toMaster(commsType, procIDs[0], 0, tag, comm);
toMaster << localValue;
}
}
if (commsType == UPstream::commsTypes::nonBlocking)
{
// Wait for all to finish
UPstream::waitRequests(startOfRequests);
}
}
template<class ProcIDsContainer, class Type> template<class ProcIDsContainer, class Type>
void Foam::globalIndex::gather void Foam::globalIndex::gather
( (
@ -73,7 +157,7 @@ void Foam::globalIndex::gather
const UList<Type>& fld, const UList<Type>& fld,
List<Type>& allFld, List<Type>& allFld,
const int tag, const int tag,
const Pstream::commsTypes preferredCommsType const UPstream::commsTypes preferredCommsType
) )
{ {
// low-level: no parRun guard // low-level: no parRun guard
@ -156,7 +240,7 @@ void Foam::globalIndex::gather
} }
} }
if (commsType == Pstream::commsTypes::nonBlocking) if (commsType == UPstream::commsTypes::nonBlocking)
{ {
// Wait for all to finish // Wait for all to finish
UPstream::waitRequests(startOfRequests); UPstream::waitRequests(startOfRequests);
@ -173,7 +257,7 @@ void Foam::globalIndex::gather
const IndirectListBase<Type, Addr>& fld, const IndirectListBase<Type, Addr>& fld,
List<Type>& allFld, List<Type>& allFld,
const int tag, const int tag,
const Pstream::commsTypes preferredCommsType const UPstream::commsTypes preferredCommsType
) )
{ {
// low-level: no parRun guard // low-level: no parRun guard
@ -190,6 +274,8 @@ void Foam::globalIndex::gather
: preferredCommsType : preferredCommsType
); );
const label startOfRequests = UPstream::nRequests();
if (Pstream::myProcNo(comm) == procIDs[0]) if (Pstream::myProcNo(comm) == procIDs[0])
{ {
allFld.resize_nocopy(off.last()); // == totalSize() allFld.resize_nocopy(off.last()); // == totalSize()
@ -231,29 +317,11 @@ void Foam::globalIndex::gather
toMaster << fld; toMaster << fld;
} }
} }
}
if (commsType == UPstream::commsTypes::nonBlocking)
template<class ProcIDsContainer, class Type>
void Foam::globalIndex::gather
(
const labelUList& off, // needed on master only
const label comm,
const ProcIDsContainer& procIDs,
List<Type>& fld,
const int tag,
const Pstream::commsTypes commsType
)
{ {
// low-level: no parRun guard // Wait for all to finish
UPstream::waitRequests(startOfRequests);
List<Type> allData;
gather(off, comm, procIDs, fld, allData, tag, commsType);
if (Pstream::myProcNo(comm) == procIDs[0])
{
fld.transfer(allData);
} }
} }
@ -266,13 +334,19 @@ void Foam::globalIndex::gather
const UList<Type>& sendData, const UList<Type>& sendData,
List<Type>& allData, List<Type>& allData,
const int tag, const int tag,
const Pstream::commsTypes commsType, const UPstream::commsTypes commsType,
const label comm const label comm
) const ) const
{ {
if (UPstream::parRun()) if (!UPstream::parRun())
{ {
gather // Serial: direct copy
allData = sendData;
return;
}
{
globalIndex::gather
( (
offsets_, // needed on master only offsets_, // needed on master only
comm, comm,
@ -287,11 +361,6 @@ void Foam::globalIndex::gather
allData.clear(); // safety: zero-size on non-master allData.clear(); // safety: zero-size on non-master
} }
} }
else
{
// Serial: direct copy
allData = sendData;
}
} }
@ -301,13 +370,19 @@ void Foam::globalIndex::gather
const IndirectListBase<Type, Addr>& sendData, const IndirectListBase<Type, Addr>& sendData,
List<Type>& allData, List<Type>& allData,
const int tag, const int tag,
const Pstream::commsTypes commsType, const UPstream::commsTypes commsType,
const label comm const label comm
) const ) const
{ {
if (UPstream::parRun()) if (!UPstream::parRun())
{ {
gather // Serial: direct copy
allData = sendData;
return;
}
{
globalIndex::gather
( (
offsets_, // needed on master only offsets_, // needed on master only
comm, comm,
@ -322,11 +397,6 @@ void Foam::globalIndex::gather
allData.clear(); // safety: zero-size on non-master allData.clear(); // safety: zero-size on non-master
} }
} }
else
{
// Serial: direct copy
allData = List<Type>(sendData);
}
} }
@ -335,7 +405,7 @@ OutputContainer Foam::globalIndex::gather
( (
const UList<Type>& sendData, const UList<Type>& sendData,
const int tag, const int tag,
const Pstream::commsTypes commsType, const UPstream::commsTypes commsType,
const label comm const label comm
) const ) const
{ {
@ -350,7 +420,7 @@ OutputContainer Foam::globalIndex::gather
( (
const IndirectListBase<Type, Addr>& sendData, const IndirectListBase<Type, Addr>& sendData,
const int tag, const int tag,
const Pstream::commsTypes commsType, const UPstream::commsTypes commsType,
const label comm const label comm
) const ) const
{ {
@ -365,23 +435,14 @@ void Foam::globalIndex::gatherInplace
( (
List<Type>& fld, List<Type>& fld,
const int tag, const int tag,
const Pstream::commsTypes commsType, const UPstream::commsTypes commsType,
const label comm const label comm
) const ) const
{ {
if (UPstream::parRun()) if (UPstream::parRun())
{ {
List<Type> allData; List<Type> allData;
gather(fld, allData, tag, commsType, comm);
gather
(
comm,
UPstream::procID(comm),
fld,
allData,
tag,
commsType
);
if (UPstream::master(comm)) if (UPstream::master(comm))
{ {
@ -401,7 +462,10 @@ void Foam::globalIndex::mpiGather
( (
const UList<Type>& sendData, const UList<Type>& sendData,
OutputContainer& allData, OutputContainer& allData,
const label comm const label comm,
const UPstream::commsTypes commsType,
const int tag
) const ) const
{ {
if (!UPstream::parRun()) if (!UPstream::parRun())
@ -410,57 +474,150 @@ void Foam::globalIndex::mpiGather
allData = sendData; allData = sendData;
return; return;
} }
if (!is_contiguous<Type>::value)
{
FatalErrorInFunction
<< "Cannot be called for non-contiguous data" << nl
<< abort(FatalError);
}
auto nSendBytes = sendData.size_bytes(); // MPI_Gatherv requires contiguous data, but a byte-wise transfer can
// quickly exceed the 'int' limits used for MPI sizes/offsets.
// Thus gather label/scalar components when possible to increase the
// effective size limit.
//
// Note: cannot rely on pTraits (cmptType, nComponents) since this method
// needs to compile (and work) even with things like strings etc.
List<int> recvSizes; // Single char ad hoc "enum":
// - b(yte): gather bytes
// - f(loat): gather scalars components
// - i(nt): gather label components
// - 0: gather with Pstream read/write etc.
List<int> recvCounts;
List<int> recvOffsets; List<int> recvOffsets;
char dataMode(0);
int nCmpts(0);
if (is_contiguous<Type>::value)
{
if (is_contiguous_scalar<Type>::value)
{
dataMode = 'f';
nCmpts = static_cast<int>(sizeof(Type)/sizeof(scalar));
}
else if (is_contiguous_label<Type>::value)
{
dataMode = 'i';
nCmpts = static_cast<int>(sizeof(Type)/sizeof(label));
}
else
{
dataMode = 'b';
nCmpts = static_cast<int>(sizeof(Type));
}
// Offsets must fit into int
if (UPstream::master(comm)) if (UPstream::master(comm))
{ {
const globalIndex& globalAddr = *this; const globalIndex& globalAddr = *this;
// Must be the same as Pstream::nProcs(comm), at least on master!! if (globalAddr.totalSize() > (INT_MAX/nCmpts))
const label nproc = globalAddr.nProcs();
// Allow request of 0 entries to be sent on master
if (!globalAddr.localSize(0))
{ {
nSendBytes = 0; // Offsets do not fit into int - revert to manual.
dataMode = 0;
} }
else
{
// Must be same as Pstream::nProcs(comm), at least on master!
const label nproc = globalAddr.nProcs();
allData.resize_nocopy(globalAddr.totalSize()); allData.resize_nocopy(globalAddr.totalSize());
recvSizes.resize(nproc); recvCounts.resize(nproc);
recvOffsets.resize(nproc+1); recvOffsets.resize(nproc+1);
for (label proci = 0; proci < nproc; ++proci) for (label proci = 0; proci < nproc; ++proci)
{ {
recvSizes[proci] = globalAddr.localSize(proci) * sizeof(Type); recvCounts[proci] = globalAddr.localSize(proci)*nCmpts;
recvOffsets[proci] = globalAddr.localStart(proci) * sizeof(Type); recvOffsets[proci] = globalAddr.localStart(proci)*nCmpts;
} }
recvOffsets[nproc] = globalAddr.totalSize() * sizeof(Type); recvOffsets[nproc] = globalAddr.totalSize()*nCmpts;
// Assign local data directly
recvCounts[0] = 0; // ie, ignore for MPI_Gatherv
SubList<Type>(allData, globalAddr.range(0)) =
SubList<Type>(sendData, globalAddr.range(0));
} }
else
{
allData.clear(); // safety: zero-size on non-master
} }
// Consistent information for everyone
UPstream::broadcast(&dataMode, 1, comm);
}
// Dispatch
switch (dataMode)
{
case 'b': // Byte-wise
{
UPstream::gather UPstream::gather
( (
sendData.cdata_bytes(), sendData.cdata_bytes(),
nSendBytes, sendData.size_bytes(),
allData.data_bytes(), allData.data_bytes(),
recvSizes, recvCounts,
recvOffsets, recvOffsets,
comm comm
); );
break;
}
case 'f': // Float (scalar) components
{
typedef scalar cmptType;
UPstream::gather
(
reinterpret_cast<const cmptType*>(sendData.cdata()),
(sendData.size()*nCmpts),
reinterpret_cast<cmptType*>(allData.data()),
recvCounts,
recvOffsets,
comm
);
break;
}
case 'i': // Int (label) components
{
typedef label cmptType;
UPstream::gather
(
reinterpret_cast<const cmptType*>(sendData.cdata()),
(sendData.size()*nCmpts),
reinterpret_cast<cmptType*>(allData.data()),
recvCounts,
recvOffsets,
comm
);
break;
}
default: // Regular (manual) gathering
{
globalIndex::gather
(
offsets_, // needed on master only
comm,
UPstream::procID(comm),
sendData,
allData,
tag,
commsType
);
break;
}
}
if (!UPstream::master(comm))
{
allData.clear(); // safety: zero-size on non-master
}
} }
@ -468,11 +625,14 @@ template<class Type, class OutputContainer>
OutputContainer Foam::globalIndex::mpiGather OutputContainer Foam::globalIndex::mpiGather
( (
const UList<Type>& sendData, const UList<Type>& sendData,
const label comm const label comm,
const UPstream::commsTypes commsType,
const int tag
) const ) const
{ {
OutputContainer allData; OutputContainer allData;
mpiGather(sendData, allData, comm); mpiGather(sendData, allData, comm, commsType, tag);
return allData; return allData;
} }
@ -481,13 +641,16 @@ template<class Type>
void Foam::globalIndex::mpiGatherInplace void Foam::globalIndex::mpiGatherInplace
( (
List<Type>& fld, List<Type>& fld,
const label comm const label comm,
const UPstream::commsTypes commsType,
const int tag
) const ) const
{ {
if (UPstream::parRun()) if (UPstream::parRun())
{ {
List<Type> allData; List<Type> allData;
mpiGather(fld, allData, comm); mpiGather(fld, allData, comm, commsType, tag);
if (UPstream::master(comm)) if (UPstream::master(comm))
{ {
@ -507,14 +670,17 @@ void Foam::globalIndex::mpiGatherOp
( (
const UList<Type>& sendData, const UList<Type>& sendData,
OutputContainer& allData, OutputContainer& allData,
const label comm const label comm,
const UPstream::commsTypes commsType,
const int tag
) )
{ {
if (UPstream::parRun()) if (UPstream::parRun())
{ {
// Gather sizes - only needed on master // Gather sizes - only needed on master
globalIndex(sendData.size(), globalIndex::gatherOnly{}, comm) globalIndex(sendData.size(), globalIndex::gatherOnly{}, comm)
.mpiGather(sendData, allData, comm); .mpiGather(sendData, allData, comm, commsType, tag);
} }
else else
{ {
@ -528,11 +694,14 @@ template<class Type, class OutputContainer>
OutputContainer Foam::globalIndex::mpiGatherOp OutputContainer Foam::globalIndex::mpiGatherOp
( (
const UList<Type>& sendData, const UList<Type>& sendData,
const label comm const label comm,
const UPstream::commsTypes commsType,
const int tag
) )
{ {
OutputContainer allData; OutputContainer allData;
mpiGatherOp(sendData, allData, comm); mpiGatherOp(sendData, allData, comm, commsType, tag);
return allData; return allData;
} }
@ -541,13 +710,16 @@ template<class Type>
void Foam::globalIndex::mpiGatherInplaceOp void Foam::globalIndex::mpiGatherInplaceOp
( (
List<Type>& fld, List<Type>& fld,
const label comm const label comm,
const UPstream::commsTypes commsType,
const int tag
) )
{ {
if (UPstream::parRun()) if (UPstream::parRun())
{ {
List<Type> allData; List<Type> allData;
mpiGatherOp(fld, allData, comm); mpiGatherOp(fld, allData, comm, commsType, tag);
if (UPstream::master(comm)) if (UPstream::master(comm))
{ {
@ -568,7 +740,7 @@ void Foam::globalIndex::gatherOp
const UList<Type>& sendData, const UList<Type>& sendData,
List<Type>& allData, List<Type>& allData,
const int tag, const int tag,
const Pstream::commsTypes commsType, const UPstream::commsTypes commsType,
const label comm const label comm
) )
{ {
@ -592,7 +764,7 @@ void Foam::globalIndex::gatherOp
const IndirectListBase<Type, Addr>& sendData, const IndirectListBase<Type, Addr>& sendData,
List<Type>& allData, List<Type>& allData,
const int tag, const int tag,
const Pstream::commsTypes commsType, const UPstream::commsTypes commsType,
const label comm const label comm
) )
{ {
@ -615,7 +787,7 @@ OutputContainer Foam::globalIndex::gatherOp
( (
const UList<Type>& sendData, const UList<Type>& sendData,
const int tag, const int tag,
const Pstream::commsTypes commsType, const UPstream::commsTypes commsType,
const label comm const label comm
) )
{ {
@ -630,7 +802,7 @@ OutputContainer Foam::globalIndex::gatherOp
( (
const IndirectListBase<Type, Addr>& sendData, const IndirectListBase<Type, Addr>& sendData,
const int tag, const int tag,
const Pstream::commsTypes commsType, const UPstream::commsTypes commsType,
const label comm const label comm
) )
{ {
@ -645,7 +817,7 @@ void Foam::globalIndex::gatherInplaceOp
( (
List<Type>& fld, List<Type>& fld,
const int tag, const int tag,
const Pstream::commsTypes commsType, const UPstream::commsTypes commsType,
const label comm const label comm
) )
{ {
@ -668,7 +840,7 @@ void Foam::globalIndex::scatter
const UList<Type>& allFld, const UList<Type>& allFld,
UList<Type>& fld, UList<Type>& fld,
const int tag, const int tag,
const Pstream::commsTypes preferredCommsType const UPstream::commsTypes preferredCommsType
) )
{ {
// low-level: no parRun guard // low-level: no parRun guard
@ -685,10 +857,6 @@ void Foam::globalIndex::scatter
: preferredCommsType : preferredCommsType
); );
// FUTURE:
// could decide which procs will receive data and use mpiScatter
// to distribute. Could then skip send/receive for those procs...
const label startOfRequests = UPstream::nRequests(); const label startOfRequests = UPstream::nRequests();
if (Pstream::myProcNo(comm) == procIDs[0]) if (Pstream::myProcNo(comm) == procIDs[0])
@ -757,7 +925,7 @@ void Foam::globalIndex::scatter
} }
} }
if (commsType == Pstream::commsTypes::nonBlocking) if (commsType == UPstream::commsTypes::nonBlocking)
{ {
// Wait for all to finish // Wait for all to finish
UPstream::waitRequests(startOfRequests); UPstream::waitRequests(startOfRequests);
@ -771,7 +939,7 @@ void Foam::globalIndex::scatter
const UList<Type>& allData, const UList<Type>& allData,
UList<Type>& localData, UList<Type>& localData,
const int tag, const int tag,
const Pstream::commsTypes commsType, const UPstream::commsTypes commsType,
const label comm const label comm
) const ) const
{ {
@ -802,7 +970,7 @@ OutputContainer Foam::globalIndex::scatter
( (
const UList<Type>& allData, const UList<Type>& allData,
const int tag, const int tag,
const Pstream::commsTypes commsType, const UPstream::commsTypes commsType,
const label comm const label comm
) const ) const
{ {
@ -853,7 +1021,7 @@ void Foam::globalIndex::get
); );
// Send local indices to individual processors as local index // Send local indices to individual processors as local index
PstreamBuffers sendBufs(Pstream::commsTypes::nonBlocking, tag, comm); PstreamBuffers sendBufs(UPstream::commsTypes::nonBlocking, tag, comm);
for (const auto proci : validBins) for (const auto proci : validBins)
{ {
@ -870,7 +1038,7 @@ void Foam::globalIndex::get
sendBufs.finishedSends(); sendBufs.finishedSends();
PstreamBuffers returnBufs(Pstream::commsTypes::nonBlocking, tag, comm); PstreamBuffers returnBufs(UPstream::commsTypes::nonBlocking, tag, comm);
for (const int proci : sendBufs.allProcs()) for (const int proci : sendBufs.allProcs())
{ {