ENH: generalize MPI broadcast to basic and user-defined MPI types

- simplify and rationalize some of the broadcast methods for more code
  reuse.

  The bottom level UPstream::broadcast is now always to/from "root=0".
  This was previously passed as a default parameter, but never used
  anything other than '0' in the code. Fixing it as '0' makes it
  consistent with the 'top-down' logical for node-based broadcast.
This commit is contained in:
Mark Olesen
2025-02-28 10:41:10 +01:00
parent 151f4df546
commit f0b844eb47
8 changed files with 115 additions and 65 deletions

View File

@ -345,8 +345,8 @@ static void reportOffsets(const globalIndex& gi)
UPstream::broadcast UPstream::broadcast
( (
allOffsets.data_bytes(), allOffsets.data(),
allOffsets.size_bytes(), allOffsets.size(),
interNodeComm interNodeComm
); );
} }

View File

@ -109,6 +109,15 @@ public:
const int communicator = UPstream::worldComm const int communicator = UPstream::worldComm
); );
//- Broadcast fixed-list content (contiguous or non-contiguous) to all
//- communicator ranks. Does nothing in \b non-parallel.
template<class Type, unsigned N>
static void broadcast
(
FixedList<Type, N>& list,
const int communicator = UPstream::worldComm
);
//- Broadcast multiple items to all communicator ranks. //- Broadcast multiple items to all communicator ranks.
//- Does nothing in \b non-parallel. //- Does nothing in \b non-parallel.
template<class Type, class... Args> template<class Type, class... Args>

View File

@ -43,7 +43,6 @@ void Foam::Pstream::broadcast
} }
else if constexpr (is_contiguous_v<Type>) else if constexpr (is_contiguous_v<Type>)
{ {
// Note: contains parallel guard internally
UPstream::broadcast UPstream::broadcast
( (
reinterpret_cast<char*>(&value), reinterpret_cast<char*>(&value),
@ -65,6 +64,37 @@ void Foam::Pstream::broadcast
} }
template<class Type, unsigned N>
void Foam::Pstream::broadcast
(
FixedList<Type, N>& list,
const int communicator
)
{
if (!UPstream::is_parallel(communicator))
{
return;
}
else if constexpr (is_contiguous_v<Type>)
{
// Size is known and identical on all ranks
UPstream::broadcast(list.data(), list.size(), communicator);
}
else
{
// Non-contiguous content - serialize it
if (UPstream::master(communicator))
{
OPBstream::send(list, communicator);
}
else
{
IPBstream::recv(list, communicator);
}
}
}
template<class Type, class... Args> template<class Type, class... Args>
void Foam::Pstream::broadcasts void Foam::Pstream::broadcasts
( (
@ -77,8 +107,15 @@ void Foam::Pstream::broadcasts
{ {
return; return;
} }
else if constexpr (!sizeof...(values) && is_contiguous_v<Type>)
{
// A single-value and contiguous
UPstream::broadcast(&value, 1, communicator);
}
else else
{ {
// Non-contiguous data, or multiple data - needs serialization
if (UPstream::master(communicator)) if (UPstream::master(communicator))
{ {
OPBstream::sends OPBstream::sends
@ -129,19 +166,10 @@ void Foam::Pstream::broadcastList
communicator communicator
); );
if (UPstream::is_subrank(communicator))
{
list.resize_nocopy(len);
}
if (len) if (len)
{ {
UPstream::broadcast // Only broadcast non-empty content
( UPstream::broadcast(list.data(), list.size(), communicator);
list.data_bytes(),
list.size_bytes(),
communicator
);
} }
} }
else else
@ -150,13 +178,28 @@ void Foam::Pstream::broadcastList
if (UPstream::master(communicator)) if (UPstream::master(communicator))
{ {
OPBstream os(communicator); if (list.empty())
os << list; {
// Do not serialize if empty.
// Just broadcast zero-size in a form that IPBstream can expect
OPBstream::send(Foam::zero{}, communicator);
}
else
{
OPBstream::send(list, communicator);
}
} }
else else
{ {
IPBstream is(communicator); IPBstream is(communicator);
is >> list; if (is.remaining() > 0) // Received a non-empty buffer
{
is >> list;
}
else
{
list.clear();
}
} }
} }
} }

View File

@ -1720,16 +1720,16 @@ public:
// Broadcast Functions // Broadcast Functions
//- Broadcast buffer contents to all processes in given communicator. //- Broadcast buffer contents (contiguous types), from rank=0
//- The sizes must match on all processes. //- to all ranks. The sizes must match on all processes.
// For \b non-parallel : do nothing. // For \b non-parallel : do nothing.
// \return True on success // \return True on success
static bool broadcast template<class Type>
inline static bool broadcast
( (
char* buf, Type* buffer,
const std::streamsize bufSize, std::streamsize count,
const label communicator, const int communicator
const int rootProcNo = masterNo()
); );

View File

@ -27,6 +27,44 @@ License
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
template<class Type>
bool Foam::UPstream::broadcast
(
Type* buffer,
std::streamsize count,
const int communicator
)
{
// Likely no reason to check for nullptr
if (!UPstream::is_parallel(communicator))
{
// Nothing to do - ignore
return true;
}
else if constexpr (!is_contiguous_v<Type>)
{
// Also report parameters to silence compiler warnings about unused
FatalErrorInFunction
<< "Invalid for non-contiguous data types."
<< " buffer:" << (buffer != nullptr)
<< " count:" << count
<< Foam::abort(FatalError);
return false;
}
else
{
// Use element or component type (or byte-wise) for data type
return UPstream::mpi_broadcast
(
buffer, // The data or cmpt pointer
UPstream_dataType<Type>::size(count),
UPstream_dataType<Type>::datatype_id,
communicator
);
}
}
template<class T> template<class T>
Foam::List<T> Foam::UPstream::allGatherValues Foam::List<T> Foam::UPstream::allGatherValues
( (

View File

@ -319,12 +319,7 @@ bool Foam::globalIndex::splitNodeOffsets
allOffsets.resize_nocopy(numProc+1); allOffsets.resize_nocopy(numProc+1);
} }
UPstream::broadcast UPstream::broadcast(allOffsets.data(), allOffsets.size(), interNodeComm);
(
allOffsets.data_bytes(),
allOffsets.size_bytes(),
interNodeComm
);
if (FOAM_UNLIKELY(allOffsets.empty())) if (FOAM_UNLIKELY(allOffsets.empty()))

View File

@ -42,19 +42,4 @@ bool Foam::UPstream::mpi_broadcast
} }
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UPstream::broadcast
(
char* buf,
const std::streamsize bufSize,
const label comm,
const int rootProcNo
)
{
// Nothing to do - ignore
return true;
}
// ************************************************************************* // // ************************************************************************* //

View File

@ -92,24 +92,4 @@ bool Foam::UPstream::mpi_broadcast
} }
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UPstream::broadcast
(
char* buf,
const std::streamsize count,
const label communicator,
const int rootProcNo
)
{
return UPstream::mpi_broadcast
(
buf,
count,
UPstream::dataTypes::type_byte,
communicator
);
}
// ************************************************************************* // // ************************************************************************* //