ENH: update globalIndex::mpiGather to use MPI intrinsic/user types

- add provisional support for selecting MPI_Gatherv
  when merging fields in the surface writers.
  Uses the 'gatherv' keyword [experimental]

BUG: gatherv/scatterv wrappers used the incorrect data type

- incorrectly checked against UPstream_basic_dataType trait instead of
  UPstream_dataType, which resulted in a count mismatch for
  user-defined types (eg, vector, tensor,...).

  This was not visibly active bug, since previous internal uses of
  gatherv were restricted to primitive types.

COMP: make UPstream dataType traits size(...) constexpr

- allows static asserts and/or compile-time selection
  of code based on size(1)
This commit is contained in:
Mark Olesen
2025-10-09 12:03:33 +02:00
parent 55c81bce1b
commit 4b92bb6533
14 changed files with 351 additions and 202 deletions

View File

@ -200,7 +200,7 @@ void printTypeName()
template<class Type, bool UseTypeName = true>
void printPstreamTraits(const std::string_view name = std::string_view())
void printPstreamTraits(std::string_view name = std::string_view())
{
Info<< "========" << nl;
Info<< "type: ";
@ -299,6 +299,9 @@ void printPstreamTraits(const std::string_view name = std::string_view())
// Use element or component type (or byte-wise) for data type
using base = typename UPstream_dataType<Type>::base;
// The sizing factor is constexpr
constexpr std::streamsize count = UPstream_dataType<Type>::size(1);
Info<< " : ";
if constexpr (UseTypeName)
{
@ -311,8 +314,7 @@ void printPstreamTraits(const std::string_view name = std::string_view())
Info<< " cmpt-type=";
printDataTypeId(UPstream_dataType<Type>::datatype_id);
Info<< " count=" << UPstream_dataType<Type>::size(1);
Info<< nl;
Info<< " count=" << count << nl;
}
}
@ -362,6 +364,24 @@ void print_data_opType(BinaryOp bop, std::string_view name)
}
template<class Type>
int check_simple(std::string_view name = std::string_view())
{
// The sizing factor is constexpr
constexpr std::streamsize count = UPstream_dataType<Type>::size(1);
static_assert
(
(count == 1),
"Code does not (yet) work with aggregate types"
);
Info<< "check_simple: " << name << ": " << count << nl;
return count;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// Main program:
@ -389,6 +409,8 @@ int main()
printPstreamTraits<const float>();
printPstreamTraits<floatVector>();
check_simple<floatVector>("vector<float>");
printPstreamTraits<scalar>();
printPstreamTraits<double>();
printPstreamTraits<doubleVector>();

View File

@ -1,4 +1,4 @@
mydebugSurfaceWriter.C
Test-surface-sampling.C
mydebugSurfaceWriter.cxx
Test-surface-sampling.cxx
EXE = $(FOAM_USER_APPBIN)/Test-surface-sampling

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022-2023 OpenCFD Ltd.
Copyright (C) 2022-2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -83,12 +83,10 @@ template<> struct narrowType<SymmTensor<double>>
typedef SymmTensor<float> type;
};
// FIXME: Not sure why this one seems to be broken...
//
// template<> struct narrowType<Tensor<double>>
// {
// typedef Tensor<float> type;
// };
template<> struct narrowType<Tensor<double>>
{
typedef Tensor<float> type;
};
} // End namespace Foam
@ -104,12 +102,18 @@ Foam::surfaceWriters::mydebugWriter::mergeField
{
addProfiling(merge, "debugWriter::merge-field");
// This is largely identical to surfaceWriter::mergeField()
// Identical to surfaceWriter::mergeField()
// but with narrowing for communication
if (narrowTransfer_ && parallel_ && UPstream::parRun())
if constexpr (std::is_same_v<Tensor<double>, Type>)
{
// Cannot narrow tensor. Does not compile since MatrixSpace
// does not (yet) allow assigments from different Cmpt types.
}
else if (narrowTransfer_ && parallel_ && UPstream::parRun())
{
// The narrowed type
typedef typename narrowType<Type>::type narrowedType;
using narrowedType = typename narrowType<Type>::type;
// Ensure geometry is also merged
merge();
@ -130,14 +134,29 @@ Foam::surfaceWriters::mydebugWriter::mergeField
ConstPrecisionAdaptor<narrowedType, Type> input(fld);
PrecisionAdaptor<narrowedType, Type> output(allFld);
globIndex.gather
(
input.cref(), // fld,
output.ref(), // allFld,
UPstream::msgType(),
commType_,
UPstream::worldComm
);
if (gatherv_)
{
globIndex.mpiGather
(
input.cref(), // fld
output.ref(), // allFld
UPstream::worldComm,
// For fallback:
commType_,
UPstream::msgType()
);
}
else
{
globIndex.gather
(
input.cref(), // fld
output.ref(), // allFld
UPstream::msgType(),
commType_,
UPstream::worldComm
);
}
// Commit adapted content changes
input.commit();
@ -193,8 +212,19 @@ Foam::surfaceWriters::mydebugWriter::mydebugWriter
{
Info<< "Using debug surface writer ("
<< (this->isPointData() ? "point" : "face") << " data):"
<< " commsType=" << UPstream::commsTypeNames[commType_]
<< " merge=" << Switch::name(enableMerge_)
<< " commsType=";
if (UPstream::parRun())
{
if (gatherv_) Info<< "gatherv+";
Info<< UPstream::commsTypeNames[commType_];
}
else
{
Info<< "serial";
}
Info<< " merge=" << Switch::name(enableMerge_)
<< " write=" << Switch::name(enableWrite_)
<< " narrow=" << Switch::name(narrowTransfer_)
<< endl;

View File

@ -343,11 +343,22 @@ void Foam::UPstream::mpiGatherv
}
// Nothing further to do
}
else if constexpr (UPstream_basic_dataType<Type>::value)
else if constexpr (UPstream_dataType<Type>::value)
{
// Restrict to basic (or aliased) MPI types to avoid recalculating
// the list of counts/offsets.
// The sizing factor (constexpr) must be 1 otherwise
// [recvCounts,recvOffsets] are likely incorrect
constexpr std::streamsize count = UPstream_dataType<Type>::size(1);
static_assert
(
(count == 1),
"Code does not (yet) work with aggregate types"
);
UPstream::mpi_gatherv
(
sendData,
@ -356,7 +367,7 @@ void Foam::UPstream::mpiGatherv
recvCounts,
recvOffsets,
UPstream_basic_dataType<Type>::datatype_id,
UPstream_dataType<Type>::datatype_id,
communicator
);
}
@ -364,7 +375,8 @@ void Foam::UPstream::mpiGatherv
{
static_assert
(
stdFoam::dependent_false_v<Type>, "Only basic MPI data types"
stdFoam::dependent_false_v<Type>,
"Only basic and user data types"
);
}
}
@ -392,11 +404,22 @@ void Foam::UPstream::mpiScatterv
}
// Nothing further to do
}
else if constexpr (UPstream_basic_dataType<Type>::value)
else if constexpr (UPstream_dataType<Type>::value)
{
// Restrict to basic (or aliased) MPI types to avoid recalculating
// the list of counts/offsets.
// The sizing factor (constexpr) must be 1 otherwise
// [sendCounts,sendOffsets] are likely incorrect
constexpr std::streamsize count = UPstream_dataType<Type>::size(1);
static_assert
(
(count == 1),
"Code does not (yet) work with aggregate types"
);
UPstream::mpi_scatterv
(
sendData,
@ -405,7 +428,7 @@ void Foam::UPstream::mpiScatterv
recvData,
recvCount,
UPstream_basic_dataType<Type>::datatype_id,
UPstream_dataType<Type>::datatype_id,
communicator
);
}
@ -413,7 +436,8 @@ void Foam::UPstream::mpiScatterv
{
static_assert
(
stdFoam::dependent_false_v<Type>, "Only basic MPI data types"
stdFoam::dependent_false_v<Type>,
"Only basic and user data types"
);
}
}

View File

@ -354,7 +354,7 @@ struct UPstream_basic_dataType
UPstream_alias_dataType<base>::datatype_id;
//- The size in terms of the number of underlying data elements
static std::streamsize size(std::streamsize n) noexcept
static constexpr std::streamsize size(std::streamsize n) noexcept
{
if constexpr (UPstream_alias_dataType<T>::value)
{
@ -373,7 +373,10 @@ struct UPstream_basic_dataType
template<> struct UPstream_basic_dataType<void> : UPstream_mpi_dataType<void>
{
using base = void;
static std::streamsize size(std::streamsize n) noexcept { return n; }
static constexpr std::streamsize size(std::streamsize n) noexcept
{
return n;
}
};
@ -410,7 +413,7 @@ struct UPstream_dataType
UPstream_any_dataType<base>::datatype_id;
//- The size in terms of the number of base data elements
static std::streamsize size(std::streamsize n) noexcept
static constexpr std::streamsize size(std::streamsize n) noexcept
{
if constexpr (UPstream_any_dataType<T>::value)
{

View File

@ -27,6 +27,7 @@ License
\*---------------------------------------------------------------------------*/
#include "globalIndex.H"
#include <functional>
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
@ -696,159 +697,94 @@ void Foam::globalIndex::mpiGather
const UList<Type>& sendData,
OutputContainer& allData,
const label comm,
UPstream::commsTypes commsType,
const int tag
[[maybe_unused]] UPstream::commsTypes commsType,
[[maybe_unused]] const int tag
) const
{
if (!UPstream::parRun())
if (!UPstream::is_parallel(comm))
{
// Serial: direct copy
allData = sendData;
return;
}
// MPI_Gatherv requires contiguous data, but a byte-wise transfer can
// quickly exceed the 'int' limits used for MPI sizes/offsets.
// Thus gather label/scalar components when possible to increase the
// effective size limit.
//
// Note: cannot rely on pTraits (cmptType, nComponents) since this method
// needs to compile (and work) even with things like strings etc.
// Single char ad hoc "enum":
// - b(yte): gather bytes
// - f(loat): gather scalars components
// - i(nt): gather label components
// - 0: gather with Pstream read/write etc.
List<int> recvCounts;
List<int> recvOffsets;
char dataMode(0);
int nCmpts(0);
if constexpr (is_contiguous_v<Type>)
if (UPstream::master(comm))
{
if constexpr (is_contiguous_scalar<Type>::value)
{
dataMode = 'f';
nCmpts = static_cast<int>(sizeof(Type)/sizeof(scalar));
}
else if constexpr (is_contiguous_label<Type>::value)
{
dataMode = 'i';
nCmpts = static_cast<int>(sizeof(Type)/sizeof(label));
}
else
{
dataMode = 'b';
nCmpts = static_cast<int>(sizeof(Type));
}
allData.resize_nocopy(offsets_.back()); // == totalSize()
}
else
{
allData.clear(); // zero-size on non-master
}
if constexpr (UPstream_dataType<Type>::value)
{
// Restrict to basic (or aliased) MPI types
// - simplifies calculating counts/offsets
// and can call UPstream::mpiGatherv directly
// The sizing factor is constexpr
constexpr std::streamsize count = UPstream_dataType<Type>::size(1);
static_assert
(
(count == 1),
"Code does not (yet) work with aggregate types"
);
List<int> recvCounts;
List<int> recvOffsets;
// Offsets must fit into int
if (UPstream::master(comm))
{
const globalIndex& globalAddr = *this;
// Must be same as Pstream::nProcs(comm), at least on master!
// if (UPstream::nProcs(comm) != this->nProcs()) ...
if (globalAddr.totalSize() > (INT_MAX/nCmpts))
{
// Offsets do not fit into int - revert to manual.
dataMode = 0;
}
else
{
// Must be same as Pstream::nProcs(comm), at least on master!
const label nproc = globalAddr.nProcs();
recvCounts.resize(offsets_.size()-1);
recvOffsets.resize(offsets_.size());
allData.resize_nocopy(globalAddr.totalSize());
// Copy offsets
std::copy(offsets_.begin(), offsets_.end(), recvOffsets.begin());
recvCounts.resize(nproc);
recvOffsets.resize(nproc+1);
// Calculate sizes. Currently without std::minus <functional>
std::transform
(
offsets_.begin()+1, offsets_.end(),
offsets_.begin(), recvCounts.begin(),
std::minus<>{}
);
for (label proci = 0; proci < nproc; ++proci)
{
recvCounts[proci] = globalAddr.localSize(proci)*nCmpts;
recvOffsets[proci] = globalAddr.localStart(proci)*nCmpts;
}
recvOffsets[nproc] = globalAddr.totalSize()*nCmpts;
// Assign local data directly
recvCounts[0] = 0; // ie, ignore for MPI_Gatherv
SubList<Type>(allData, globalAddr.range(0)) =
SubList<Type>(sendData, globalAddr.range(0));
}
// FUTURE .. fix sizes and offsets by the element factor...
// if constexpr (UPstream_basic_dataType<Type>::size(1) > 1)
}
// Consistent information for everyone
UPstream::broadcast(&dataMode, 1, comm);
int sendSize = static_cast<int>(sendData.size());
// Note we let MPI_Gatherv copy back the local data as well...
UPstream::mpiGatherv
(
sendData.cdata(),
sendSize,
allData.data(),
recvCounts,
recvOffsets,
comm
);
}
// Dispatch
switch (dataMode)
else
{
case 'b': // Byte-wise
{
UPstream::mpiGatherv
(
sendData.cdata_bytes(),
sendData.size_bytes(),
allData.data_bytes(),
recvCounts,
recvOffsets,
comm
);
break;
}
case 'f': // Float (scalar) components
{
typedef scalar cmptType;
UPstream::mpiGatherv
(
reinterpret_cast<const cmptType*>(sendData.cdata()),
(sendData.size()*nCmpts),
reinterpret_cast<cmptType*>(allData.data()),
recvCounts,
recvOffsets,
comm
);
break;
}
case 'i': // Int (label) components
{
typedef label cmptType;
UPstream::mpiGatherv
(
reinterpret_cast<const cmptType*>(sendData.cdata()),
(sendData.size()*nCmpts),
reinterpret_cast<cmptType*>(allData.data()),
recvCounts,
recvOffsets,
comm
);
break;
}
default: // Regular (manual) gathering
{
globalIndex::gather
(
offsets_, // needed on master only
comm,
UPstream::allProcs(comm), // All communicator ranks
sendData,
allData,
tag,
commsType
);
break;
}
}
if (!UPstream::master(comm))
{
allData.clear(); // safety: zero-size on non-master
// Regular (manual) gathering
globalIndex::gather
(
offsets_, // needed on master only
comm,
UPstream::allProcs(comm), // All communicator ranks
sendData,
allData,
tag,
commsType
);
}
}

View File

@ -28,33 +28,45 @@ License
#include "Pstream.H"
#include "PstreamGlobals.H"
#include "UPstreamWrapping.H"
#include "vector.H" // for debugging
#undef STRINGIFY
#undef STRING_QUOTE
#define STRINGIFY(content) #content
#define STRING_QUOTE(input) STRINGIFY(input)
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
static inline bool is_basic_dataType(Foam::UPstream::dataTypes id) noexcept
namespace
{
inline bool is_nonAggregate(Foam::UPstream::dataTypes id) noexcept
{
return
(
int(id) >= int(Foam::UPstream::dataTypes::Basic_begin)
&& int(id) < int(Foam::UPstream::dataTypes::Basic_end)
)
||
(
int(id) >= int(Foam::UPstream::dataTypes::User_begin)
&& int(id) < int(Foam::UPstream::dataTypes::User_end)
);
}
namespace
{
using namespace Foam;
// Local function to print some error information
inline void printErrorNonIntrinsic
(
const char* context,
UPstream::dataTypes dataTypeId
Foam::UPstream::dataTypes dataTypeId
)
{
using namespace Foam;
FatalError
<< "Bad input for " << context << ": likely a programming problem\n"
<< " Non-intrinsic data (" << int(dataTypeId) << ")\n"
<< " Non-intrinsic/non-user data (type:" << int(dataTypeId) << ")\n"
<< Foam::endl;
}
@ -214,19 +226,37 @@ void Foam::UPstream::mpi_gatherv
{
MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
if
(
FOAM_UNLIKELY
(
!is_basic_dataType(dataTypeId)
)
)
// Runtime assert that we are not using aggregated data types
if (FOAM_UNLIKELY(!is_nonAggregate(dataTypeId)))
{
FatalErrorInFunction;
printErrorNonIntrinsic("MPI_Gatherv()", dataTypeId);
FatalError << Foam::abort(FatalError);
}
const label np = UPstream::nProcs(communicator);
// For total-size calculation,
// don't rely on recvOffsets being (np+1)
const int totalSize =
(
(UPstream::master(communicator) && np > 1)
? (recvOffsets[np-1] + recvCounts[np-1])
: 0
);
if (FOAM_UNLIKELY(UPstream::debug))
{
Perr<< "[mpi_gatherv] :"
<< " type:" << int(dataTypeId)
<< " count:" << sendCount
<< " total:" << totalSize
<< " comm:" << communicator
<< " recvCounts:" << flatOutput(recvCounts)
<< " recvOffsets:" << flatOutput(recvOffsets)
<< Foam::endl;
}
{
PstreamDetail::gatherv
(
@ -235,6 +265,48 @@ void Foam::UPstream::mpi_gatherv
datatype, communicator
);
}
// Extended debugging. Limit to master:
#if 0
if (FOAM_UNLIKELY(UPstream::debug))
{
if (UPstream::master(communicator))
{
switch (dataTypeId)
{
#undef dataPrinter
#define dataPrinter(enumType, nativeType) \
case UPstream::dataTypes::enumType : \
{ \
UList<nativeType> combined \
( \
static_cast<nativeType*>(recvData), \
totalSize \
); \
\
Info<< "[mpi_gatherv] => " \
"List<" STRING_QUOTE(nativeType) "> "; \
combined.writeList(Info) << Foam::endl; \
\
break; \
}
// Some common types
dataPrinter(type_int32, int32_t);
dataPrinter(type_int64, int64_t);
dataPrinter(type_float, float);
dataPrinter(type_double, double);
dataPrinter(type_3float, floatVector);
dataPrinter(type_3double, doubleVector);
// Some other type
default: break;
#undef dataPrinter
}
}
}
#endif
}
@ -255,13 +327,8 @@ void Foam::UPstream::mpi_scatterv
{
MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
if
(
FOAM_UNLIKELY
(
!is_basic_dataType(dataTypeId)
)
)
// Runtime assert that we are not using aggregated data types
if (FOAM_UNLIKELY(!is_nonAggregate(dataTypeId)))
{
FatalErrorInFunction;
printErrorNonIntrinsic("MPI_Scatterv()", dataTypeId);

View File

@ -201,6 +201,7 @@ Foam::surfaceWriter::surfaceWriter()
isPointData_(false),
verbose_(false),
commType_(UPstream::commsTypes::scheduled),
gatherv_(false),
nFields_(0),
currTime_(),
outputPath_(),
@ -218,6 +219,8 @@ Foam::surfaceWriter::surfaceWriter(const dictionary& options)
options.readIfPresent("verbose", verbose_);
UPstream::commsTypeNames.readIfPresent("commsType", options, commType_);
gatherv_ = false;
options.readIfPresent("gatherv", gatherv_);
geometryScale_ = 1;
geometryCentre_ = Zero;
@ -244,7 +247,19 @@ Foam::surfaceWriter::surfaceWriter(const dictionary& options)
{
Info<< "Create surfaceWriter ("
<< (this->isPointData() ? "point" : "face") << " data):"
<< " commsType=" << UPstream::commsTypeNames[commType_] << endl;
<< " commsType=";
if (UPstream::parRun())
{
if (gatherv_) Info<< "gatherv+";
Info<< UPstream::commsTypeNames[commType_];
}
else
{
Info<< "serial";
}
Info<< endl;
}
}
@ -605,14 +620,29 @@ Foam::tmp<Foam::Field<Type>> Foam::surfaceWriter::mergeFieldTemplate
: mergedSurf_.faceGlobalIndex()
);
globIndex.gather
(
fld,
allFld,
UPstream::msgType(),
commType_,
UPstream::worldComm
);
if (gatherv_)
{
globIndex.mpiGather
(
fld,
allFld,
UPstream::worldComm,
// For fallback:
commType_,
UPstream::msgType()
);
}
else
{
globIndex.gather
(
fld,
allFld,
UPstream::msgType(),
commType_,
UPstream::worldComm
);
}
// Renumber (point data) to correspond to merged points
if
@ -626,6 +656,15 @@ Foam::tmp<Foam::Field<Type>> Foam::surfaceWriter::mergeFieldTemplate
allFld.resize(mergedSurf_.points().size());
}
// Extended debugging. Limit to master:
#if 0
if (UPstream::master())
{
Info<< "merged List<" << pTraits<Type>::typeName << "> : ";
allFld.writeList(Info) << endl;
}
#endif
return tfield;
}

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2012 OpenFOAM Foundation
Copyright (C) 2015-2024 OpenCFD Ltd.
Copyright (C) 2015-2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -82,6 +82,7 @@ Description
Property | Description | Reqd | Default
verbose | Additional output verbosity | no | no
commsType | Communication type | no | scheduled
gatherv | Use MPI gatherv [experimental] | no | false
scale | Output geometry scaling | no | 1
transform | Output coordinate transform | no |
fieldLevel | Subtract field level before scaling | no | empty dict
@ -97,6 +98,9 @@ Note
it is the responsibility of the implementation (not the caller)
to ensure that this occurs.
Using MPI gatherv [experimental] is not well tested and may change
or be removed in the future!
SourceFiles
surfaceWriter.C
surfaceWriterI.H
@ -191,6 +195,9 @@ protected:
//- Communication type (for field merging)
UPstream::commsTypes commType_;
//- Prefer MPI gatherv intrinsic (for field merging) [experimental]
bool gatherv_;
//- The number of fields
label nFields_;

View File

@ -88,8 +88,11 @@ Foam::surfaceWriters::debugWriter::debugWriter
streamOpt_(IOstreamOption::BINARY)
{
Info<< "Using debug surface writer ("
<< (this->isPointData() ? "point" : "face") << " data):"
<< " commsType=" << UPstream::commsTypeNames[commType_]
<< (this->isPointData() ? "point" : "face") << " data):";
if (gatherv_) Info<< " <gatherv>";
Info<< " commsType=" << UPstream::commsTypeNames[commType_]
<< " merge=" << Switch::name(enableMerge_)
<< " write=" << Switch::name(enableWrite_) << endl;
}

View File

@ -0,0 +1,11 @@
#!/bin/sh
cd "${0%/*}" || exit # Run from this directory
. ${WM_PROJECT_DIR:?}/bin/tools/CleanFunctions # Tutorial clean functions
#------------------------------------------------------------------------------
cleanCase0
# Remove surface and features
rm -rf constant/triSurface
#------------------------------------------------------------------------------

View File

@ -49,7 +49,8 @@ __surfaceFieldValue
{
default
{
verbose true;
verbose true;
//gatherv true;
}
}

View File

@ -19,6 +19,12 @@ debug
formatOptions
{
default
{
verbose true;
//gatherv true;
}
ensight
{
collateTimes true;