ENH: components to support one-sided communication

This commit is contained in:
Mark Olesen
2025-03-07 16:04:37 +01:00
parent ab7cfdcf49
commit 36ae93d017
7 changed files with 2410 additions and 0 deletions

View File

@ -0,0 +1,3 @@
Test-one-sided1.cxx
EXE = $(FOAM_USER_APPBIN)/Test-one-sided1

View File

@ -0,0 +1,2 @@
/* EXE_INC = */
/* EXE_LIBS = */

View File

@ -0,0 +1,354 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Application
Test-one-sided1
Description
Simple test of one-sided communication
\*---------------------------------------------------------------------------*/
#include "argList.H"
#include "Time.H"
#include "IPstream.H"
#include "OPstream.H"
#include "SubField.H"
#include "vector.H"
#include "IOstreams.H"
using namespace Foam;
template<class T>
Ostream& printSpanInfo(Ostream& os, const UList<T>& span)
{
os << "addr=" << Foam::name(span.cdata())
<< " size= " << span.size();
return os;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
int main(int argc, char *argv[])
{
argList::noCheckProcessorDirectories();
argList::addVerboseOption();
argList::addBoolOption("no-shared", "disable shared memory tests");
argList::addBoolOption("no-sleep", "disable sleep for async test");
#include "setRootCase.H"
const bool with_shared = !args.found("no-shared");
const bool with_sleep = !args.found("no-sleep");
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
Info<< nl
<< "nProcs = " << UPstream::nProcs()
<< " with " << UPstream::nComms() << " predefined comm(s)" << nl;
if (!UPstream::parRun())
{
Info<< "###############" << nl
<< "Not running in parallel. Stopping now" << nl
<< "###############" << endl;
return 1;
}
const auto myProci = UPstream::myProcNo();
const auto numProc = UPstream::nProcs();
// Make some windows
Field<label> buffer(10 + myProci);
buffer = myProci;
Pout<< "input: " << flatOutput(buffer) << endl;
UPstream::Window win;
win.create(buffer, UPstream::worldComm);
// Pass 1
// - grab things from sub-ranks
if (UPstream::master())
{
win.lock_all(true);
win.get
(
buffer.slice(4, 2),
1, // target_rank
2 // target_disp
);
win.unlock_all();
}
Pout<< "output: " << flatOutput(buffer) << endl;
// Pass 2:
// accumulate into master
if (UPstream::is_subrank())
{
win.lock(0);
win.put
(
UPstream::opCodes::op_sum,
buffer.slice(2, 4),
UPstream::masterNo(),
2 // target_disp
);
win.unlock(0);
}
Pout<< "updated: " << flatOutput(buffer) << endl;
// Pass 3:
// Update some values - something very asynchronous
if (UPstream::is_subrank())
{
if (with_sleep)
{
if (UPstream::myProcNo() % 3)
{
Foam::sleep(3);
}
else
{
Foam::sleep(1);
}
}
buffer *= 10;
forAll(buffer, i)
{
buffer[i] *= 1 + (i % 3);
}
}
// Needs a process sync, otherwise master fetches old values
UPstream::barrier(UPstream::worldComm);
label lastValue(-1);
if (UPstream::master())
{
win.lock_all(true);
for (const auto proci : UPstream::subProcs())
{
win.fetch_and_op
(
UPstream::opCodes::op_sum,
buffer[0],
lastValue,
proci,
2 // target_disp
);
}
// Force changes to occur
win.flush_all();
win.unlock_all();
}
Pout<< "last-value : " << lastValue << nl
<< "final : " << flatOutput(buffer) << endl;
labelList allUpdates;
if (UPstream::master())
{
allUpdates.resize(UPstream::nProcs(), -10);
win.lock_all(true);
for (const auto proci : UPstream::subProcs())
{
win.get_value
(
allUpdates[proci],
proci,
2 // target_disp
);
}
win.flush_all();
win.unlock_all();
}
Info<< "gets: " << flatOutput(allUpdates) << endl;
// This should fail (runtime)
#if 0
if (UPstream::master())
{
labelPair value1(-1, -1);
win.lock_all(true);
for (const auto proci : UPstream::subProcs())
{
win.fetch_and_op
(
UPstream::opCodes::op_sum,
value1,
lastValue,
proci,
8 // target_disp
);
}
win.unlock_all();
}
#endif
// Last thing before closing out
// replace values. Not very efficient...
// Persistent data to move onto target:
const label newValue(333);
const label multiplier(-3);
if (UPstream::master())
{
win.lock_all(true);
for (const auto proci : UPstream::subProcs())
{
win.fetch_and_op
(
UPstream::opCodes::op_replace,
newValue,
lastValue,
proci, // target_rank
3 // target_disp
);
win.put_value
(
UPstream::opCodes::op_prod,
multiplier,
proci, // target_rank
5 // target_disp
);
}
win.unlock_all();
}
win.close(); // process collective
Pout<< "modified: " << flatOutput(buffer) << endl;
if (with_shared)
{
// Make some shared window
UList<label> newBuffer;
{
label localLen(0);
if
(
(myProci == 3)
|| (myProci == numProc-2)
)
{
localLen = 0;
}
else
{
localLen = (10 + UPstream::myProcNo());
}
// Just to prove that we can shallow copy the view...
newBuffer =
win.allocate_shared<label>(localLen, UPstream::worldComm);
}
newBuffer = UPstream::myProcNo();
Pout<< "Shared: " << flatOutput(newBuffer) << endl;
{
UList<label> local = win.view<label>();
Pout<< "local win: ";
printSpanInfo(Pout, local) << endl;
}
Pout<< "Query rank1" << endl;
{
// UPtrList<UList<label>> totalList(UPstream::nProcs());
//
// totalList.set(0, &newBuffer);
const label* ptr0 = nullptr;
{
UList<label> buf = win.view_shared<label>(0);
ptr0 = buf.cdata();
Pout<< "addr 0 = " << Foam::name(ptr0)
<< " diff = " << label(0)
<< " + " << buf.size() << endl;
}
// UList<label> other = win.global_view<label>();
for (const auto proci : UPstream::subProcs())
{
UList<label> other = win.view_shared<label>(proci);
const label* ptr = other.cdata();
Pout<< "addr " << proci << " = "
<< Foam::name(ptr)
<< " diff = " << label(ptr - ptr0)
<< " + " << other.size() << endl;
// totalList.set(proci, &other);
}
}
win.close();
}
// Since close() is ignored on null window,
// can call it an arbitrary number of times
win.close();
win.close();
win.close();
Info<< "\nEnd\n" << endl;
return 0;
}
// ************************************************************************* //

View File

@ -30,11 +30,16 @@ Description
An opaque wrapper for MPI_Win with a vendor-independent
representation and without any \c <mpi.h> header dependency.
The handling for window declaration is very generous - it does not
distinguish between readonly and read/write windows. This may become
more restrictive in the future.
Note
The MPI standard states that MPI_Win is always an opaque object.
Generally it is either an integer (eg, mpich) or a pointer (eg, openmpi).
SourceFiles
UPstreamWindow.txx
\*---------------------------------------------------------------------------*/
@ -69,6 +74,193 @@ private:
//- The MPI_Win (as wrapped value)
value_type value_;
protected:
// Protected Method Functions
//- Sizing helper for disp_unit
template<class Type>
static constexpr auto element_width() noexcept
{
if constexpr (std::is_void_v<Type>) { return 1; }
else { return sizeof(Type); }
}
//- Allocate a local or shared memory window.
//- Uses MPI_Win_allocate() or MPI_Win_allocate_shared(), respectively.
// A no-op if not running in parallel.
// This is a \em collective call.
// \returns (address, size) tuple
std::pair<void*, int64_t> mpi_win_allocate
(
//! Number of elements in the window (non-negative integer)
std::streamsize num_elements,
//! Local unit size for displacements (positive integer)
//! == sizeof(Type)
int disp_unit,
//! Communicator (wrapped)
UPstream::Communicator communicator,
//! Create shared memory
const bool shared = false
);
//- Allocate a local or shared memory window.
//- Uses MPI_Win_allocate() or MPI_Win_allocate_shared(), respectively.
// A no-op if not running in parallel.
// This is a \em collective call.
// \returns (address, size) tuple
std::pair<void*, int64_t> mpi_win_allocate
(
//! Number of elements in the window (non-negative integer)
std::streamsize num_elements,
//! Local unit size for displacements (positive integer)
//! == sizeof(Type)
int disp_unit,
//! Communicator (internally indexed)
int communicator,
//! Create shared memory
const bool shared = false
);
//- Create window onto existing memory with MPI_Win_create().
// A no-op if not running in parallel.
// This is a \em collective call.
bool mpi_win_create
(
//! Address of window
void *baseptr,
//! Number of elements in the window (non-negative integer)
std::streamsize num_elements,
//! Local unit size for displacements (positive integer)
//! == sizeof(Type)
int disp_unit,
//! Communicator (wrapped)
UPstream::Communicator communicator
);
//- Create window onto existing memory with MPI_Win_create().
// A no-op if not running in parallel.
// This is a \em collective call.
bool mpi_win_create
(
//! Address of window
void *baseptr,
//! Number of elements in the window (non-negative integer)
std::streamsize num_elements,
//! Local unit size for displacements (positive integer)
//! == sizeof(Type)
int disp_unit,
//! Communicator (internally indexed)
int communicator
);
//- Retrieve window sizing information as address/count tuple.
//- The expected sizeof(Type) is supplied as an assertion parameter.
//
// \returns (nullptr, 0) if not running in parallel or if either
// the address or count are zero
static std::pair<void*, int64_t> mpi_win_query
(
UPstream::Window window,
//! Expected element size (fatal for mismatch with queried value)
const int expected_disp_unit
);
//- Retrieve shared window information as address/count tuple.
//- The expected sizeof(Type) is supplied as an assertion parameter.
//
// \returns (nullptr, 0) if not running in parallel or if either
// the address or count are zero
static std::pair<void*, int64_t> mpi_win_query_shared
(
UPstream::Window window,
//! The rank to query
int target_rank,
//! Expected element size (fatal for mismatch with queried value)
const int expected_disp_unit
);
// Protected Member Functions
//- Get buffer contents from given rank.
// A no-op if not running in parallel and for empty or null buffer.
//
// \note The method uses a \c void pointer and the required data type
// (as per MPI). This means it should almost never be called directly
// but always via a compile-time checked caller.
// \return True on success
bool get_data
(
//! Destination buffer
void* origin,
//! The data count - number of elements. Identical on both sides.
std::streamsize count,
const UPstream::dataTypes dataTypeId,
int target_rank,
int target_disp = 0 //!< Window displacement on target
) const;
//- Put buffer contents to given rank.
// A no-op if not running in parallel and for empty or null buffer.
bool put_data
(
//! Source buffer
const void* origin,
//! The data count - number of elements. Identical on both sides.
std::streamsize count,
const UPstream::dataTypes dataTypeId,
int target_rank,
int target_disp = 0 //!< Window displacement on target
) const;
//- Put accumulate buffer contents to given rank.
// A no-op if not running in parallel and for empty or null buffer.
bool put_data
(
//! The op-code for accumulate
const UPstream::opCodes opCodeId,
//! Source buffer
const void* origin,
//! The data count - number of elements. Identical on both sides.
std::streamsize count,
const UPstream::dataTypes dataTypeId,
int target_rank,
int target_disp = 0 //!< Window displacement on target
) const;
//- Retrieve the remote content (a single value) and then combine
//- in new content.
// A no-op if not running in parallel and for empty or null buffer.
bool mpi_fetch_and_op
(
//! The op-code for accumulate
const UPstream::opCodes opCodeId,
//! [in] the content to combine with target
const void* origin,
//! [out] target content before the operation
void* result,
const UPstream::dataTypes dataTypeId,
int target_rank,
int target_disp = 0 //!< Window displacement on target
) const;
//- Entry point to MPI_Win_flush(), MPI_Win_flush_all(),
//- MPI_Win_flush_local(), MPI_Win_flush_local_all().
// Uses rank == -1 to signal 'all'
void mpi_win_flushing(int rank, bool local=false);
//- Entry point to MPI_Win_lock(), MPI_Win_lock_all(),
//- optionally as exclusive lock.
// Uses rank == -1 to signal 'all'
void mpi_win_locking(int rank, bool exclusive=false);
//- Entry point to MPI_Win_unlock(), MPI_Win_unlock_all().
// Uses rank == -1 to signal 'all'
void mpi_win_unlocking(int rank);
public:
// Generated Methods
@ -142,6 +334,387 @@ public:
// The same as querying the original communicator, assuming the
// communicator is available within the current code scope.
int size() const;
// Window creation with allocation
//- Allocate a local memory region and create window onto it.
// A no-op if not running in parallel.
// This is a \em collective call.
template<class Type>
inline UList<Type> allocate
(
//! Number of \b elements to allocate
std::streamsize count,
UPstream::Communicator communicator,
//! Create shared memory region
const bool shared = false
);
//- Allocate a local memory region and create window onto it.
// A no-op if not running in parallel.
// This is a \em collective call.
template<class Type>
inline UList<Type> allocate
(
//! Number of \b elements to allocate
std::streamsize count,
const int communicator,
//! Create shared memory region
const bool shared = false
);
//- Allocate a shared memory region and create window onto it.
// A no-op if not running in parallel.
// This is a \em collective call.
template<class Type>
inline UList<Type> allocate_shared
(
//! Number of \b elements to allocate
std::streamsize count,
UPstream::Communicator communicator
);
//- Create a window by allocating a new shared memory region.
// A no-op if not running in parallel.
// This is a \em collective call.
template<class Type>
inline UList<Type> allocate_shared
(
//! Number of \b elements to allocate
std::streamsize count,
const int communicator
);
// Window creation with existing memory
//- A window exposing a zero-sized memory region.
// A no-op if not running in parallel.
// This is a \em collective call.
template<class Type = void>
bool create(std::nullptr_t, UPstream::Communicator comm)
{
return mpi_win_create(nullptr, 0, element_width<Type>(), comm);
}
//- A window exposing a zero-sized memory region.
// A no-op if not running in parallel.
// This is a \em collective call.
template<class Type = void>
bool create(std::nullptr_t, const int comm)
{
return mpi_win_create(nullptr, 0, element_width<Type>(), comm);
}
//- A window exposing an existing memory region.
// A no-op if not running in parallel.
// This is a \em collective call.
// \note Ignores constness since we can't specify a priori
// if this should be a read/write or read only buffer.
template<class Type>
inline bool create
(
const Type* buffer,
//! Number of elements in the window
std::streamsize count,
UPstream::Communicator communicator
);
//- A window exposing an existing memory region.
// A no-op if not running in parallel.
// This is a \em collective call.
template<class Type>
inline bool create
(
const Type* buffer,
//! Number of elements in the window
std::streamsize count,
const int communicator
);
//- A window exposing the specified buffer contents.
// A no-op if not running in parallel.
// This is a \em collective call.
template<class Type>
bool create(const UList<Type>& buffer, UPstream::Communicator comm)
{
return create(buffer.cdata(), buffer.size(), comm);
}
//- A window exposing the specified buffer contents.
// A no-op if not running in parallel.
// This is a \em collective call.
template<class Type>
bool create(const UList<Type>& buffer, int communicator)
{
return create(buffer.cdata(), buffer.size(), communicator);
}
//- A window exposing the specified buffer contents.
// A no-op if not running in parallel.
// This is a \em collective call.
template<class Type>
bool create(SubList<Type> buffer, UPstream::Communicator communicator)
{
return create(buffer.cdata(), buffer.size(), communicator);
}
//- A window exposing the specified buffer contents.
// A no-op if not running in parallel.
// This is a \em collective call.
template<class Type>
bool create(SubList<Type> buffer, int communicator)
{
return create(buffer.cdata(), buffer.size(), communicator);
}
// Synchronization and resource management
//- MPI_Win_lock() for given target rank (no assertions),
//- optionally as exclusive lock.
void lock(int rank, bool exclusive=false)
{
mpi_win_locking(rank, exclusive);
}
//- MPI_Win_unlock() for given target rank.
void unlock(int rank) { mpi_win_unlocking(rank); }
//- MPI_Win_lock_all(), optionally as exclusive lock.
void lock_all(bool exclusive=false) { mpi_win_locking(-1, exclusive); }
//- MPI_Win_unlock_all()
void unlock_all() { mpi_win_unlocking(-1); }
//- MPI_Win_flush() for given target rank.
// Complete all outstanding RMA operations initiated by the
// calling process to the target rank.
void flush(int rank) { mpi_win_flushing(rank); }
//- MPI_Win_flush_all()
// Complete all outstanding RMA operations at both the origin
// and the target
void flush_all() { mpi_win_flushing(-1); }
//- MPI_Win_flush_local()
// Locally complete at the origin all outstanding RMA
// operations initiated by the calling process to the target
// process specified by rank.
void flush_local(int rank) { mpi_win_flushing(rank, true); }
//- MPI_Win_flush_local_all()
// Locally complete at the origin all outstanding RMA
// operations to all targets.
void flush_local_all() { mpi_win_flushing(-1, true); }
//- MPI_Win_sync() - ignored if the window is not active.
void sync();
//- MPI_Win_free().
//- Closes the window view and frees any associated memory,
// eg, from allocate() or allocate_shared().
// Ignored if the window is not active.
// This is a \em collective call.
void close();
// Queries
//- Test if the window is a shared memory window
bool is_shared(const bool failNonShared = false) const;
//- Return view of the currently exposed window content.
//- No restriction on the type of memory associated with the window.
// A no-op (empty list) if not running in parallel
// or the window is not active.
template<class Type>
inline UList<Type> view() const;
//- Return view of shared memory window content.
// A no-op (empty list) if not running in parallel.
// Undefined behaviour (likely Fatal) if a shared memory window
// has not been allocated.
template<class Type>
inline UList<Type> view_shared(int target_rank) const;
// Window Access
//- Get buffer contents from given rank.
//- A no-op for an empty or null buffer, or if not running in parallel.
template<class Type>
inline bool get
(
//! Destination buffer
Type* buffer,
//! The data count - number of elements. Identical on both sides.
std::streamsize count,
int fromProcNo,
int target_disp = 0 //!< Window displacement on target
) const;
//- Put buffer contents to given rank.
// A no-op for an empty/null buffer, or if not running in parallel.
template<class Type>
inline bool put
(
//! Source buffer
const Type* buffer,
//! The data count - number of elements. Identical on both sides.
std::streamsize count,
int toProcNo,
int target_disp = 0 //!< Window displacement on target
) const;
//- Put accumulate buffer contents to given rank.
// A no-op for an empty/null buffer, or if not running in parallel.
// \note Must correspond to basic data types!
template<class Type>
inline bool put
(
const UPstream::opCodes opCodeId,
//! Source buffer
const Type* buffer,
//! The data count - number of elements. Identical on both sides.
std::streamsize count,
int toProcNo,
int target_disp = 0 //!< Window displacement on target
) const;
//- Get a single value from given rank.
// \note Use persistent data (not temporary) for value
template<class Type>
inline bool get_value
(
Type& value,
int fromProcNo,
int target_disp = 0 //!< Window displacement on target
) const
{
return this->get(&value, 1, fromProcNo, target_disp);
}
//- Put a single value to given rank.
// \note Use persistent data (not temporary) for value
template<class Type>
inline bool put_value
(
const Type& value,
int toProcNo,
int target_disp = 0 //!< Window displacement on target
) const
{
return this->put(&value, 1, toProcNo, target_disp);
}
//- Put and accumulate a single value to given rank
// \note Use persistent data (not temporary) for value
template<class Type>
inline bool put_value
(
const UPstream::opCodes opCodeId,
const Type& value,
int toProcNo,
int target_disp = 0 //!< Window displacement on target
) const
{
return this->put(opCodeId, &value, 1, toProcNo, target_disp);
}
//- Get into List storage (contiguous data only)
//- from window location on given processor.
//- A no-op for an empty or null buffer, or if not running in parallel.
// \note Only valid for contiguous data types.
template<class Type>
inline bool get
(
UList<Type>& buffer,
//! Offset number of elements on the toProcNo rank
int fromProcNo,
int target_disp = 0 //!< Window displacement on target
) const;
//- Put from List storage (contiguous data only)
//- to window location on given processor.
// \note Only valid for contiguous data types.
template<class Type>
inline bool put
(
const UList<Type>& buffer,
int toProcNo,
int target_disp = 0 //!< Window displacement on target
) const;
//- Put and accumulate from List storage (contiguous data only)
//- to window location on given processor.
//- A no-op for an empty or null buffer, or if not running in parallel.
// \note Only valid for contiguous data types.
template<class Type>
inline bool put
(
const UPstream::opCodes opCodeId,
const UList<Type>& buffer,
int toProcNo,
int target_disp = 0 //!< Window displacement on target
) const;
//- Get into SubList storage (contiguous data only)
//- from window location on given processor.
//- A no-op for an empty or null buffer, or if not running in parallel.
// \note Only valid for contiguous data types.
template<class Type>
inline bool get
(
SubList<Type> buffer,
int fromProcNo,
int target_disp = 0 //!< Window displacement on target
) const;
//- Combine the value of origin into the target and return
//- the resulting value (MPI_Fetch_and_op).
// \note Use persistent data (not temporary) for origin and result,
// which must also be disjoint locations.
// Only valid for single element (no aggregates).
// \return the old target value (before the operation)
template<class Type>
inline bool fetch_and_op
(
const UPstream::opCodes opCodeId,
//! the content to combine with target
const Type& origin,
//! [out] the old target value before the operation
Type& result,
int target_rank, //!< The target rank
int target_disp = 0 //!< Window displacement on target
) const;
//- Put from SubList storage (contiguous data only)
//- to window location on given processor.
//- A no-op for an empty or null buffer, or if not running in parallel.
// \note Only valid for contiguous data types.
template<class Type>
inline bool put
(
const SubList<Type> buffer,
int toProcNo,
int target_disp = 0 //!< Window displacement on target
) const;
//- Put and accumulate from SubList storage (contiguous data only)
//- to window location on given processor.
//- A no-op for an empty or null buffer, or if not running in parallel.
// \note Only valid for contiguous data types.
template<class Type>
inline bool put
(
const UPstream::opCodes opCodeId,
const SubList<Type> buffer,
int toProcNo,
int target_disp = 0 //!< Window displacement on target
) const;
};
@ -150,6 +723,12 @@ public:
} // End namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#ifdef NoRepository
#include "UPstreamWindow.txx"
#endif
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#endif

View File

@ -0,0 +1,433 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2025 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
template<class Type>
Foam::UList<Type> Foam::UPstream::Window::allocate
(
std::streamsize count,
UPstream::Communicator communicator,
const bool shared
)
{
if constexpr (!is_contiguous_v<Type>)
{
FatalErrorInFunction
<< "Only contiguous data can be supported!"
<< Foam::abort(FatalError);
return UList<Type>();
}
else
{
auto [ptr, len] =
mpi_win_allocate(count, sizeof(Type), communicator, shared);
return UList<Type>(reinterpret_cast<Type*>(ptr), len);
}
}
template<class Type>
Foam::UList<Type> Foam::UPstream::Window::allocate
(
std::streamsize count,
const int communicator,
const bool shared
)
{
if constexpr (!is_contiguous_v<Type>)
{
FatalErrorInFunction
<< "Only contiguous data can be supported!"
<< Foam::abort(FatalError);
return UList<Type>();
}
else
{
auto [ptr, len] =
mpi_win_allocate(count, sizeof(Type), communicator, shared);
return UList<Type>(reinterpret_cast<Type*>(ptr), len);
}
}
template<class Type>
Foam::UList<Type> Foam::UPstream::Window::allocate_shared
(
std::streamsize count,
UPstream::Communicator communicator
)
{
return allocate<Type>(count, communicator, true);
}
template<class Type>
Foam::UList<Type> Foam::UPstream::Window::allocate_shared
(
std::streamsize count,
const int communicator
)
{
return allocate<Type>(count, communicator, true);
}
template<class Type>
bool Foam::UPstream::Window::create
(
const Type* buffer,
std::streamsize count,
UPstream::Communicator communicator
)
{
if constexpr (!is_contiguous_v<Type>)
{
FatalErrorInFunction
<< "Only contiguous data can be supported!"
<< Foam::abort(FatalError);
return false;
}
else
{
// const_cast since we cannot specify readonly or read/write
return mpi_win_create
(
const_cast<Type*>(buffer),
count,
sizeof(Type),
communicator
);
}
}
template<class Type>
bool Foam::UPstream::Window::create
(
const Type* buffer,
std::streamsize count,
const int communicator
)
{
if constexpr (!is_contiguous_v<Type>)
{
FatalErrorInFunction
<< "Only contiguous data can be supported!"
<< Foam::abort(FatalError);
return false;
}
else
{
// const_cast since we cannot specify readonly or read/write
return mpi_win_create
(
const_cast<Type*>(buffer),
count,
sizeof(Type),
communicator
);
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
template<class Type>
Foam::UList<Type> Foam::UPstream::Window::view() const
{
if constexpr (!is_contiguous_v<Type>)
{
FatalErrorInFunction
<< "Only contiguous data can be supported!"
<< Foam::abort(FatalError);
return UList<Type>();
}
else
{
auto [ptr, len] = mpi_win_query(*this, sizeof(Type));
return UList<Type>(reinterpret_cast<Type*>(ptr), len);
}
}
template<class Type>
Foam::UList<Type> Foam::UPstream::Window::view_shared(int target_rank) const
{
if constexpr (!is_contiguous_v<Type>)
{
FatalErrorInFunction
<< "Only contiguous data can be supported!"
<< Foam::abort(FatalError);
return UList<Type>();
}
else
{
auto [ptr, count] =
mpi_win_query_shared(*this, target_rank, sizeof(Type));
return UList<Type>(reinterpret_cast<Type*>(ptr), count);
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
template<class Type>
bool Foam::UPstream::Window::get
(
Type* buffer,
std::streamsize count,
int fromProcNo,
int target_disp
) const
{
if constexpr (!is_contiguous_v<Type>)
{
FatalErrorInFunction
<< "Only contiguous data can be supported!"
<< Foam::abort(FatalError);
return false;
}
else
{
// Use element or component type (or byte-wise) for data type
return this->get_data
(
buffer, // The data or cmpt pointer
UPstream_dataType<Type>::size(count),
UPstream_dataType<Type>::datatype_id,
fromProcNo,
target_disp
);
}
}
template<class Type>
bool Foam::UPstream::Window::put
(
const Type* buffer,
std::streamsize count,
int toProcNo,
int target_disp
) const
{
if constexpr (!is_contiguous_v<Type>)
{
FatalErrorInFunction
<< "Only contiguous data can be supported!"
<< Foam::abort(FatalError);
return false;
}
else
{
// Use element or component type (or byte-wise) for data type
return this-put_data
(
buffer, // The data or cmpt pointer
UPstream_dataType<Type>::size(count),
UPstream_dataType<Type>::datatype_id,
toProcNo,
target_disp
);
}
}
template<class Type>
bool Foam::UPstream::Window::put
(
const UPstream::opCodes opCodeId,
const Type* buffer,
std::streamsize count,
int toProcNo,
int target_disp
) const
{
if constexpr (!UPstream_basic_dataType<Type>::value)
{
FatalErrorInFunction
<< "Only basic data types are supported!"
<< Foam::abort(FatalError);
return false;
}
else
{
// Basic types (no user types) only
return this->put_data
(
opCodeId,
buffer, // The data or cmpt pointer
UPstream_basic_dataType<Type>::size(count),
UPstream_basic_dataType<Type>::datatype_id,
toProcNo,
target_disp
);
}
}
template<class Type>
bool Foam::UPstream::Window::get
(
UList<Type>& buffer,
int fromProcNo,
int target_disp
) const
{
return this->get
(
buffer.data(), buffer.size(),
fromProcNo,
target_disp
);
}
template<class Type>
bool Foam::UPstream::Window::put
(
const UList<Type>& buffer,
int toProcNo,
int target_disp
) const
{
return this->put
(
buffer.cdata(), buffer.size(),
toProcNo,
target_disp
);
}
template<class Type>
bool Foam::UPstream::Window::put
(
const UPstream::opCodes opCodeId,
const UList<Type>& buffer,
int toProcNo,
int target_disp
) const
{
return this->put
(
opCodeId,
buffer.cdata(), buffer.size(),
toProcNo,
target_disp
);
}
template<class Type>
bool Foam::UPstream::Window::get
(
SubList<Type> buffer,
int fromProcNo,
int target_disp
) const
{
return this->get
(
buffer.data(), buffer.size(),
fromProcNo,
target_disp
);
}
template<class Type>
bool Foam::UPstream::Window::fetch_and_op
(
const UPstream::opCodes opCodeId,
const Type& origin,
Type& result,
int target_rank,
int target_disp
) const
{
if constexpr (!UPstream_basic_dataType<Type>::value)
{
FatalErrorInFunction
<< "Only basic data types are supported!"
<< Foam::abort(FatalError);
return false;
}
else
{
// Basic types (no user types) and a single element only!
return this->mpi_fetch_and_op
(
opCodeId,
&origin,
&result,
UPstream_basic_dataType<Type>::datatype_id,
target_rank,
target_disp
);
}
}
template<class Type>
bool Foam::UPstream::Window::put
(
const SubList<Type> buffer,
int toProcNo,
int target_disp
) const
{
return this->put
(
buffer.cdata(), buffer.size(),
toProcNo,
target_disp
);
}
template<class Type>
bool Foam::UPstream::Window::put
(
const UPstream::opCodes opCodeId,
const SubList<Type> buffer,
int toProcNo,
int target_disp
) const
{
return this->put
(
opCodeId,
buffer.cdata(), buffer.size(),
toProcNo,
target_disp
);
}
// ************************************************************************* //

View File

@ -53,4 +53,179 @@ int Foam::UPstream::Window::size() const
}
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
std::pair<void*,int64_t>
Foam::UPstream::Window::mpi_win_allocate
(
std::streamsize num_elements,
int disp_unit,
UPstream::Communicator communicator,
const bool shared
)
{
NotImplemented;
return {nullptr, 0};
}
std::pair<void*,int64_t>
Foam::UPstream::Window::mpi_win_allocate
(
std::streamsize num_elements,
int disp_unit,
int communicator,
const bool shared
)
{
NotImplemented;
return {nullptr, 0};
}
bool Foam::UPstream::Window::mpi_win_create
(
void *baseptr,
std::streamsize num_elements,
int disp_unit,
UPstream::Communicator communicator
)
{
NotImplemented;
return false;
}
bool Foam::UPstream::Window::mpi_win_create
(
void *baseptr,
std::streamsize num_elements,
int disp_unit,
int communicator
)
{
NotImplemented;
return false;
}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
void Foam::UPstream::Window::close()
{}
// * * * * * * * * * * * * * * * Synchronization * * * * * * * * * * * * * * //
void Foam::UPstream::Window::mpi_win_flushing(int rank, bool local)
{}
void Foam::UPstream::Window::sync()
{}
void Foam::UPstream::Window::mpi_win_locking(int rank, bool exclusive)
{}
void Foam::UPstream::Window::mpi_win_unlocking(int rank)
{}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UPstream::Window::get_data
(
void* origin,
std::streamsize count,
const UPstream::dataTypes dataTypeId,
int target_rank,
int target_disp
) const
{
NotImplemented;
return false;
}
bool Foam::UPstream::Window::put_data
(
const void* origin,
std::streamsize count,
const UPstream::dataTypes dataTypeId,
int target_rank,
int target_disp
) const
{
NotImplemented;
return false;
}
bool Foam::UPstream::Window::put_data
(
const UPstream::opCodes opCodeId,
const void* origin,
std::streamsize count,
const UPstream::dataTypes dataTypeId,
int target_rank,
int target_disp
) const
{
NotImplemented;
return false;
}
bool Foam::UPstream::Window::mpi_fetch_and_op
(
const UPstream::opCodes opCodeId,
const void* origin,
void* result,
const UPstream::dataTypes dataTypeId,
int target_rank,
int target_disp
) const
{
NotImplemented;
return false;
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UPstream::Window::is_shared(const bool failNonShared) const
{
return false;
}
std::pair<void*,int64_t>
Foam::UPstream::Window::mpi_win_query
(
UPstream::Window window,
const int expected_disp_unit
)
{
// No window to query
NotImplemented;
return {nullptr, 0};
}
std::pair<void*,int64_t>
Foam::UPstream::Window::mpi_win_query_shared
(
UPstream::Window window,
int target_rank,
const int expected_disp_unit
)
{
// No window to query
NotImplemented;
return {nullptr, 0};
}
// ************************************************************************* //

View File

@ -75,4 +75,868 @@ int Foam::UPstream::Window::size() const
}
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
//
// Allocate a local or shared memory window.
// Uses MPI_Win_allocate() or MPI_Win_allocate_shared(), respectively.
//
static std::pair<void*,int64_t>
call_window_allocate
(
Foam::UPstream::Window* self,
MPI_Comm communicator,
// [in] number of elements (not bytes)
std::streamsize num_elements,
// [in] size of each element == sizeof(Type)
const int disp_unit,
const bool shared
)
{
using namespace Foam;
// No-op for non-parallel
if (!UPstream::parRun())
{
*self = UPstream::Window(MPI_WIN_NULL);
return {nullptr, 0};
}
// if (FOAM_UNLIKELY(MPI_COMM_NULL == communicator))
// {
// FatalErrorInFunction
// << "Attempt to use NULL communicator"
// << Foam::abort(FatalError);
// return false;
// }
MPI_Win win = PstreamUtils::Cast::to_mpi(*self);
// Stringent handling of existing windows
if (FOAM_UNLIKELY(MPI_WIN_NULL != win))
{
FatalErrorInFunction
<< "Window already exists. Use close() first"
<< Foam::abort(FatalError);
return {nullptr, 0};
}
int returnCode(MPI_SUCCESS);
void *baseptr = nullptr;
if (shared)
{
returnCode = MPI_Win_allocate_shared
(
// From num elements -> num of bytes
std::streamsize(num_elements * disp_unit),
disp_unit,
MPI_INFO_NULL,
communicator,
&baseptr,
&win
);
}
else
{
returnCode = MPI_Win_allocate
(
// From num elements -> num of bytes
std::streamsize(num_elements * disp_unit),
disp_unit,
MPI_INFO_NULL,
communicator,
&baseptr,
&win
);
}
if (FOAM_UNLIKELY((MPI_SUCCESS != returnCode) || (MPI_WIN_NULL == win)))
{
if (shared)
{
FatalError("MPI_Win_allocate_shared()")
<< Foam::abort(FatalError);
}
else
{
FatalError("MPI_Win_allocate()")
<< Foam::abort(FatalError);
}
return {nullptr, 0};
}
// Now have a window
*self = UPstream::Window(win);
// The address and the type-specific count
return {baseptr, num_elements};
}
// ------------------------------------------------------------------------- //
std::pair<void*,int64_t>
Foam::UPstream::Window::mpi_win_allocate
(
std::streamsize num_elements,
int disp_unit,
UPstream::Communicator communicator,
const bool shared
)
{
return call_window_allocate
(
this,
PstreamUtils::Cast::to_mpi(communicator),
num_elements,
disp_unit,
shared
);
}
std::pair<void*,int64_t>
Foam::UPstream::Window::mpi_win_allocate
(
std::streamsize num_elements,
int disp_unit,
int communicator, // Index into MPICommunicators_
const bool shared
)
{
return call_window_allocate
(
this,
PstreamGlobals::MPICommunicators_[communicator],
num_elements,
disp_unit,
shared
);
}
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
// NOTE: Currently no
// - MPI_Win_create_dynamic()
// - MPI_Win_attach()
// - MPI_Win_detach()
// since working with their addresses (and broadcasting them)
// is fairly painful and probably not particularly efficient either
//
// Create a window to existing memory with MPI_Win_create().
//
static bool call_window_create
(
Foam::UPstream::Window* self,
MPI_Comm communicator,
// [in] base address
void *baseptr,
// [in] number of elements (not bytes)
std::streamsize num_elements,
// [in] size of each element == sizeof(Type)
const int disp_unit
)
{
using namespace Foam;
// No-op for non-parallel
if (!UPstream::parRun())
{
*self = UPstream::Window(MPI_WIN_NULL);
return false;
}
// if (FOAM_UNLIKELY(MPI_COMM_NULL == communicator))
// {
// using namespace Foam;
// FatalErrorInFunction
// << "Attempt to use NULL communicator"
// << Foam::abort(FatalError);
// return false;
// }
MPI_Win win = PstreamUtils::Cast::to_mpi(*self);
// Stringent handling of existing windows
if (FOAM_UNLIKELY(MPI_WIN_NULL != win))
{
FatalErrorInFunction
<< "Window already exists. Use close() first"
<< Foam::abort(FatalError);
return false;
}
// Leave nothing to chance
if (!baseptr || !num_elements)
{
baseptr = nullptr;
num_elements = 0;
}
int returnCode = MPI_Win_create
(
baseptr,
// From num elements -> num of bytes
std::streamsize(num_elements * disp_unit),
disp_unit,
MPI_INFO_NULL,
communicator,
&win
);
if (FOAM_UNLIKELY((MPI_SUCCESS != returnCode) || (MPI_WIN_NULL == win)))
{
FatalError("MPI_Win_create()")
<< Foam::abort(FatalError);
return false;
}
// Now have a window
*self = UPstream::Window(win);
return (MPI_SUCCESS == returnCode);
}
bool Foam::UPstream::Window::mpi_win_create
(
void *baseptr,
std::streamsize num_elements,
const int disp_unit,
UPstream::Communicator communicator
)
{
return call_window_create
(
this,
PstreamUtils::Cast::to_mpi(communicator),
baseptr,
num_elements,
disp_unit
);
}
bool Foam::UPstream::Window::mpi_win_create
(
void *baseptr,
std::streamsize num_elements,
const int disp_unit,
int communicator // Index into MPICommunicators_
)
{
return call_window_create
(
this,
PstreamGlobals::MPICommunicators_[communicator],
baseptr,
num_elements,
disp_unit
);
}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
void Foam::UPstream::Window::close()
{
MPI_Win win = PstreamUtils::Cast::to_mpi(*this);
if (UPstream::parRun() && (MPI_WIN_NULL != win))
{
MPI_Win_free(&win);
*this = UPstream::Window(MPI_WIN_NULL);
}
}
// * * * * * * * * * * * * * * * Synchronization * * * * * * * * * * * * * * //
void Foam::UPstream::Window::mpi_win_flushing(int rank, bool local)
{
MPI_Win win = PstreamUtils::Cast::to_mpi(*this);
if (UPstream::parRun() && (MPI_WIN_NULL != win))
{
if (rank < 0)
{
if (local) MPI_Win_flush_local_all(win);
else /* */ MPI_Win_flush_all(win);
}
else
{
if (local) MPI_Win_flush_local(rank, win);
else /* */ MPI_Win_flush(rank, win);
}
}
}
void Foam::UPstream::Window::sync()
{
MPI_Win win = PstreamUtils::Cast::to_mpi(*this);
if (UPstream::parRun() && (MPI_WIN_NULL != win))
{
MPI_Win_sync(win);
}
}
void Foam::UPstream::Window::mpi_win_locking(int rank, bool exclusive)
{
MPI_Win win = PstreamUtils::Cast::to_mpi(*this);
if (UPstream::parRun() && (MPI_WIN_NULL != win))
{
if (rank < 0)
{
MPI_Win_lock_all
(
(exclusive ? MPI_MODE_NOCHECK : 0),
win
);
}
else
{
MPI_Win_lock
(
(exclusive ? MPI_LOCK_EXCLUSIVE : MPI_LOCK_SHARED),
rank,
0, // No assertion
win
);
}
}
}
void Foam::UPstream::Window::mpi_win_unlocking(int rank)
{
MPI_Win win = PstreamUtils::Cast::to_mpi(*this);
if (UPstream::parRun() && (MPI_WIN_NULL != win))
{
if (rank < 0)
{
MPI_Win_unlock_all(win);
}
else
{
MPI_Win_unlock(rank, win);
}
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UPstream::Window::get_data
(
void* origin, // Type checking done by caller
std::streamsize count,
const UPstream::dataTypes dataTypeId,
int target_rank,
int target_disp
) const
{
if (!UPstream::parRun() || !origin || !count)
{
// Nothing to do
return true;
}
MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
MPI_Win win = PstreamUtils::Cast::to_mpi(*this);
if (FOAM_UNLIKELY(MPI_WIN_NULL == win))
{
FatalError("MPI_Get()")
<< "Called with MPI_WIN_NULL."
<< Foam::abort(FatalError);
return false;
}
int returnCode = MPI_Get
(
// origin
origin, count, datatype,
// target
target_rank, target_disp, count, datatype,
// window
win
);
// Error handling
if (FOAM_UNLIKELY(returnCode != MPI_SUCCESS))
{
FatalError("MPI_Get()")
<< Foam::abort(FatalError);
return false;
}
return (MPI_SUCCESS == returnCode);
}
bool Foam::UPstream::Window::put_data
(
const void* origin, // Type checking done by caller
std::streamsize count,
const UPstream::dataTypes dataTypeId,
int target_rank,
int target_disp
) const
{
if (!UPstream::parRun() || !origin || !count)
{
// Nothing to do
return true;
}
MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
MPI_Win win = PstreamUtils::Cast::to_mpi(*this);
if (FOAM_UNLIKELY(MPI_WIN_NULL == win))
{
FatalError("MPI_Put()")
<< "Called with MPI_WIN_NULL."
<< Foam::abort(FatalError);
return false;
}
int returnCode = MPI_Put
(
// origin
origin, count, datatype,
// target
target_rank, target_disp, count, datatype,
// window
win
);
// Error handling
if (FOAM_UNLIKELY(returnCode != MPI_SUCCESS))
{
FatalError("MPI_Put()")
<< Foam::abort(FatalError);
return false;
}
return (MPI_SUCCESS == returnCode);
}
bool Foam::UPstream::Window::put_data
(
const UPstream::opCodes opCodeId,
const void* origin, // Type checking done by caller
std::streamsize count,
const UPstream::dataTypes dataTypeId,
int target_rank,
int target_disp
) const
{
if (UPstream::opCodes::invalid == opCodeId)
{
// Regular data put - doesn't use/need an op-type!
return this->put_data
(
origin,
count,
dataTypeId,
target_rank,
target_disp
);
}
if (!UPstream::parRun() || !origin || !count)
{
// Nothing to do
return true;
}
MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
MPI_Op optype = PstreamGlobals::getOpCode(opCodeId);
MPI_Win win = PstreamUtils::Cast::to_mpi(*this);
if (FOAM_UNLIKELY(MPI_WIN_NULL == win))
{
FatalError("MPI_Accumulate()")
<< "Called with MPI_WIN_NULL."
<< Foam::abort(FatalError);
return false;
}
if (FOAM_UNLIKELY(MPI_OP_NULL == optype))
{
FatalError("MPI_Accumulate()")
<< "Invalid opcode:" << int(opCodeId)
<< " type:" << int(dataTypeId) << " count:" << label(count) << nl
<< Foam::abort(FatalError);
return false;
}
int returnCode = MPI_Accumulate
(
// origin
origin, count, datatype,
// target
target_rank, target_disp, count, datatype,
// operation
optype,
// window
win
);
// Error handling
if (FOAM_UNLIKELY(returnCode != MPI_SUCCESS))
{
FatalError("MPI_Accumulate()")
<< Foam::abort(FatalError);
return false;
}
return (MPI_SUCCESS == returnCode);
}
bool Foam::UPstream::Window::mpi_fetch_and_op
(
const UPstream::opCodes opCodeId,
const void* origin, // Type checking done by caller
void* result, // Type checking done by caller
const UPstream::dataTypes dataTypeId,
int target_rank,
int target_disp
) const
{
if (!UPstream::parRun())
{
// Fails in non-parallel
return false;
}
MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId);
MPI_Op optype = PstreamGlobals::getOpCode(opCodeId);
MPI_Win win = PstreamUtils::Cast::to_mpi(*this);
if (FOAM_UNLIKELY(MPI_WIN_NULL == win))
{
FatalError("MPI_Fetch_and_op()")
<< "Called with MPI_WIN_NULL."
<< Foam::abort(FatalError);
return false;
}
if (FOAM_UNLIKELY(MPI_OP_NULL == optype))
{
FatalError("MPI_Fetch_and_op()")
<< "Invalid opcode:" << int(opCodeId)
<< " type:" << int(dataTypeId) << nl
<< Foam::abort(FatalError);
return false;
}
int returnCode = MPI_Fetch_and_op
(
origin, result, datatype,
// target
target_rank, target_disp,
// operation
optype,
// window
win
);
// Error handling
if (FOAM_UNLIKELY(returnCode != MPI_SUCCESS))
{
FatalError("MPI_Fetch_and_op()")
<< Foam::abort(FatalError);
return false;
}
return (MPI_SUCCESS == returnCode);
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
// Check for failure of MPI_Win_get_attr
#undef CheckFail_Win_get_attr
#define CheckFail_Win_get_attr(returnCode, flag, attribute) \
{ \
if (FOAM_UNLIKELY((MPI_SUCCESS != returnCode) || !flag)) \
{ \
FatalError("MPI_Win_get_attr()") \
<< "Failed getting attribute " << attribute << endl \
<< Foam::abort(FatalError); \
} \
}
bool Foam::UPstream::Window::is_shared(const bool failNonShared) const
{
if (!UPstream::parRun())
{
// Nothing to do
return false;
}
MPI_Win win = PstreamUtils::Cast::to_mpi(*this);
if (FOAM_UNLIKELY(MPI_WIN_NULL == win))
{
return false;
}
// Error handling flags
int returnCode(MPI_ERR_UNKNOWN);
int flag(1);
int flavour(0);
// MPI_WIN_CREATE_FLAVOR : Type (int *)
{
// const auto key = MPI_WIN_CREATE_FLAVOR;
typedef int value_type;
void* val(nullptr);
returnCode = MPI_Win_get_attr(win, MPI_WIN_CREATE_FLAVOR, &val, &flag);
CheckFail_Win_get_attr(returnCode, flag, "MPI_WIN_CREATE_FLAVOR");
flavour = int
(
*static_cast<value_type*>(val)
);
}
if (failNonShared && (MPI_WIN_FLAVOR_SHARED != flavour))
{
FatalErrorInFunction
<< "Expecting a shared window but had ("
<< flavour << ") flavour instead" << endl
<< Foam::abort(FatalError);
}
return (MPI_WIN_FLAVOR_SHARED == flavour);
}
std::pair<void*,int64_t>
Foam::UPstream::Window::mpi_win_query
(
UPstream::Window window,
const int expected_disp_unit
)
{
if (!UPstream::parRun())
{
// Nothing to do
return {nullptr, 0};
}
MPI_Win win = PstreamUtils::Cast::to_mpi(window);
if (FOAM_UNLIKELY(MPI_WIN_NULL == win))
{
FatalError("MPI_Win_get_attr()")
<< "Called with MPI_WIN_NULL."
<< Foam::abort(FatalError);
return {nullptr, 0};
}
// Error handling flags
int returnCode(MPI_ERR_UNKNOWN);
int flag(1);
// Debugging
// MPI_WIN_CREATE_FLAVOR : Type (int *)
// if (FOAM_UNLIKELY(UPstream::debug & 2))
// {
// // const auto key = MPI_WIN_CREATE_FLAVOR;
// typedef int value_type;
// void* val(nullptr);
//
// returnCode =
// MPI_Win_get_attr(win, MPI_WIN_CREATE_FLAVOR, &val, &flag);
// CheckFail_Win_get_attr(returnCode, flag, "MPI_WIN_CREATE_FLAVOR");
//
// int flavour = *static_cast<value_type*>(val);
// Perr<< "Window created with flavour (" << flavour << ')' << endl;
// }
std::pair<void*,int64_t> result(nullptr, 0);
// The window size
// MPI_WIN_SIZE : Type (MPI_Aint *)
{
// const auto key = MPI_WIN_SIZE;
typedef MPI_Aint value_type;
void* val(nullptr);
returnCode = MPI_Win_get_attr(win, MPI_WIN_SIZE, &val, &flag);
CheckFail_Win_get_attr(returnCode, flag, "MPI_WIN_SIZE");
result.second = *static_cast<value_type*>(val);
}
// Early exit
if (result.second == 0)
{
return {nullptr, 0};
}
// The base address
// MPI_WIN_BASE : Type (void *)
{
// const auto key = MPI_WIN_BASE;
void* value(nullptr);
returnCode = MPI_Win_get_attr(win, MPI_WIN_BASE, &value, &flag);
CheckFail_Win_get_attr(returnCode, flag, "MPI_WIN_BASE");
result.first = value;
}
// Early exit - this probably can never happen
// (ie, nullptr but non-zero size)
if (result.first == nullptr)
{
return {nullptr, 0};
}
// Scale count by the expected displacement unit
if (expected_disp_unit)
{
result.second /= expected_disp_unit;
int disp_unit = 1;
// The displacement units
// MPI_WIN_DISP_UNIT : Type (int *)
{
// const auto key = MPI_WIN_DISP_UNIT;
typedef int value_type;
void* val(nullptr);
returnCode = MPI_Win_get_attr(win, MPI_WIN_DISP_UNIT, &val, &flag);
CheckFail_Win_get_attr(returnCode, flag, "MPI_WIN_DISP_UNIT");
disp_unit = *static_cast<value_type*>(val);
}
// Error if the expected disp_unit is incorrect
// - ignore this check if the window is empty
if (expected_disp_unit != disp_unit)
{
FatalErrorInFunction
<< "Window [size=" << result.second
<< "] created with Type size=" << disp_unit
<< " but expecting Type size=" << expected_disp_unit << endl
<< Foam::abort(FatalError);
}
}
return result;
}
std::pair<void*,int64_t>
Foam::UPstream::Window::mpi_win_query_shared
(
UPstream::Window window,
int target_rank,
const int expected_disp_unit
)
{
if (!UPstream::parRun())
{
// Nothing to do
return {nullptr, 0};
}
MPI_Win win = PstreamUtils::Cast::to_mpi(window);
if (FOAM_UNLIKELY(MPI_WIN_NULL == win))
{
FatalError("MPI_Win_shared_query()")
<< "Called with MPI_WIN_NULL."
<< Foam::abort(FatalError);
return {nullptr, 0};
}
// Fail if window is not shared
const bool shared = window.is_shared(true);
if (!shared)
{
return {nullptr, 0};
}
// Initial values and fallback
MPI_Aint num_bytes = 0;
void *baseptr = nullptr;
int disp_unit = 1;
int returnCode = MPI_Win_shared_query
(
win,
target_rank,
&num_bytes,
&disp_unit,
&baseptr
);
if (FOAM_UNLIKELY(MPI_SUCCESS != returnCode))
{
FatalError("MPI_Win_shared_query()")
<< Foam::abort(FatalError);
return {nullptr, 0};
}
std::pair<void*,int64_t> result(baseptr, num_bytes);
// Scale count by the expected displacement unit
// - probably Fatal not to supply this value
//
// Note that with share the baseptr will be non-null even if the
// local window has zero bytes. This maintains the contiguous
// addressing across all ranks
if (result.second && expected_disp_unit)
{
result.second /= expected_disp_unit;
// Error if the expected disp_unit is incorrect
// - ignore this check if the window is empty
if (expected_disp_unit != disp_unit)
{
FatalErrorInFunction
<< "Window on rank(" << target_rank
<< ") [size=" << result.second
<< "] created with Type size=" << disp_unit
<< " but expecting Type size=" << expected_disp_unit << endl
<< Foam::abort(FatalError);
}
}
return result;
}
#undef CheckFail_Win_get_attr
// ************************************************************************* //