ENH: add guards for partially populated communicators

- these are primarily when encountering sparse (eg, inter-host)
  communicators. Additional UPstream convenience methods:

  is_rank(comm)
    => True if process corresponds to a rank in the communicators.
       Can be a master rank or a sub-rank.

  is_parallel(comm)
    => True if parallel algorithm or exchange is used on the process.
       same as

           (parRun() && (nProcs(comm) > 1) && is_rank(comm))
This commit is contained in:
Mark Olesen
2023-04-14 15:00:03 +02:00
parent 74db18a9c3
commit b277b913cf
15 changed files with 257 additions and 152 deletions

View File

@ -52,8 +52,8 @@ void rankInfo(const label comm)
<< "(parent:" << UPstream::parent(comm) << ')'
<< " rank:" << ranki
<< "(sub:" << UPstream::is_subrank(comm)
<< ") nProcs:" << UPstream::nProcs(comm)
<< " baseProcNo:" << UPstream::baseProcNo(comm, ranki);
<< ") nProcs:" << UPstream::nProcs(comm);
// << " baseProcNo:" << UPstream::baseProcNo(comm, ranki);
}
@ -82,8 +82,10 @@ int main(int argc, char *argv[])
#include "setRootCase.H"
Info<< nl
<< "parallel:" << UPstream::parRun()
<< "nProcs = " << UPstream::nProcs()
<< " with " << UPstream::nComms() << " predefined comm(s)" << nl;
<< " with " << UPstream::nComms() << " predefined comm(s)."
<< " proc:" << UPstream::myProcNo() << nl;
Info<< nl;

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -34,31 +34,28 @@ License
template<class Type>
void Foam::Pstream::broadcast(Type& value, const label comm)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (is_contiguous<Type>::value)
{
if (is_contiguous<Type>::value)
// Note: contains parallel guard internally
UPstream::broadcast
(
reinterpret_cast<char*>(&value),
sizeof(Type),
comm,
UPstream::masterNo()
);
}
else if (UPstream::is_parallel(comm))
{
if (UPstream::master(comm))
{
// Note: contains parallel guard internally as well
UPstream::broadcast
(
reinterpret_cast<char*>(&value),
sizeof(Type),
comm,
UPstream::masterNo()
);
OPBstream os(UPstream::masterNo(), comm);
os << value;
}
else
else // UPstream::is_subrank(comm)
{
if (UPstream::master(comm))
{
OPBstream os(UPstream::masterNo(), comm);
os << value;
}
else
{
IPBstream is(UPstream::masterNo(), comm);
is >> value;
}
IPBstream is(UPstream::masterNo(), comm);
is >> value;
}
}
}
@ -67,14 +64,14 @@ void Foam::Pstream::broadcast(Type& value, const label comm)
template<class Type, class... Args>
void Foam::Pstream::broadcasts(const label comm, Type& arg1, Args&&... args)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
if (UPstream::master(comm))
{
OPBstream os(UPstream::masterNo(), comm);
Detail::outputLoop(os, arg1, std::forward<Args>(args)...);
}
else
else // UPstream::is_subrank(comm)
{
IPBstream is(UPstream::masterNo(), comm);
Detail::inputLoop(is, arg1, std::forward<Args>(args)...);

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2019-2022 OpenCFD Ltd.
Copyright (C) 2019-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -53,7 +53,7 @@ void Foam::Pstream::combineGather
const label comm
)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
// My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
@ -155,7 +155,7 @@ void Foam::Pstream::combineScatter
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(value, comm);
#else
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
// My communication order
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
@ -290,7 +290,7 @@ void Foam::Pstream::combineReduce
const label comm
)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
const auto& comms = UPstream::whichCommunication(comm);
@ -312,7 +312,7 @@ void Foam::Pstream::listCombineGather
const label comm
)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
// My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
@ -420,7 +420,7 @@ void Foam::Pstream::listCombineScatter
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(values, comm);
#else
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
// My communication order
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
@ -540,7 +540,7 @@ void Foam::Pstream::listCombineReduce
const label comm
)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
const auto& comms = UPstream::whichCommunication(comm);
@ -562,7 +562,7 @@ void Foam::Pstream::mapCombineGather
const label comm
)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
// My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
@ -645,7 +645,7 @@ void Foam::Pstream::mapCombineScatter
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(values, comm);
#else
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
// My communication order
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
@ -746,7 +746,7 @@ void Foam::Pstream::mapCombineReduce
const label comm
)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
const auto& comms = UPstream::whichCommunication(comm);

View File

@ -469,6 +469,11 @@ void Foam::Pstream::exchange
{
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
if (!UPstream::is_rank(comm))
{
return; // Process not in communicator
}
const label myProci = UPstream::myProcNo(comm);
const label numProcs = UPstream::nProcs(comm);
@ -482,7 +487,7 @@ void Foam::Pstream::exchange
recvBufs.resize_nocopy(numProcs);
if (UPstream::parRun() && numProcs > 1)
if (UPstream::is_parallel(comm))
{
// Presize all receive buffers
forAll(recvSizes, proci)
@ -567,7 +572,7 @@ void Foam::Pstream::exchange
}
}
// Do myself
// Do myself. Already checked if in communicator
recvBufs[myProci] = sendBufs[myProci];
}
@ -586,7 +591,6 @@ void Foam::Pstream::exchange
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
const int myProci = UPstream::myProcNo(comm);
const label numProcs = UPstream::nProcs(comm);
// Initial: clear out receive 'slots'
// Preferrable to clear out the map entries instead of the map itself
@ -598,7 +602,7 @@ void Foam::Pstream::exchange
iter.val().clear();
}
if (UPstream::parRun() && numProcs > 1)
if (UPstream::is_parallel(comm))
{
// Presize all receive buffers
forAllIters(recvSizes, iter)
@ -708,8 +712,8 @@ void Foam::Pstream::exchange
}
}
// Do myself
// Do myself (if actually in the communicator)
if (UPstream::is_rank(comm))
{
const auto iter = sendBufs.find(myProci);
@ -746,6 +750,12 @@ void Foam::Pstream::exchangeSizes
const label comm
)
{
if (!UPstream::is_rank(comm))
{
recvSizes.clear();
return; // Process not in communicator
}
const label myProci = UPstream::myProcNo(comm);
const label numProcs = UPstream::nProcs(comm);
@ -817,6 +827,12 @@ void Foam::Pstream::exchangeSizes
const label comm
)
{
if (!UPstream::is_rank(comm))
{
recvSizes.clear();
return; // Process not in communicator
}
Pstream::exchangeSizes<Container>
(
neighProcs, // send
@ -842,6 +858,11 @@ void Foam::Pstream::exchangeSizes
Map<label> sendSizes(2*sendBufs.size());
recvSizes.clear(); // Done in allToAllConsensus too, but be explicit here
if (!UPstream::is_rank(comm))
{
return; // Process not in communicator
}
forAllConstIters(sendBufs, iter)
{
const label proci = iter.key();
@ -871,6 +892,12 @@ void Foam::Pstream::exchangeSizes
const label comm
)
{
if (!UPstream::is_rank(comm))
{
recvSizes.clear();
return; // Process not in communicator
}
const label numProcs = UPstream::nProcs(comm);
if (sendBufs.size() != numProcs)

View File

@ -80,6 +80,18 @@ void exchangeConsensus
const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::nProcs(comm);
// Initial: clear all receive information
for (auto& buf : recvBufs)
{
buf.clear();
}
recvSizes = Zero;
if (!UPstream::is_rank(comm))
{
return; // Process not in communicator
}
// #ifdef FULLDEBUG
if (sendBufs.size() > numProc)
{
@ -95,14 +107,7 @@ void exchangeConsensus
}
// #endif
// Initial: clear all receive information
for (auto& buf : recvBufs)
{
buf.clear();
}
recvSizes = Zero;
if (!UPstream::parRun() || numProc < 2)
if (!UPstream::is_parallel(comm))
{
// Do myself
recvBufs[myProci] = sendBufs[myProci];
@ -252,7 +257,6 @@ void exchangeConsensus
const label startOfRequests = UPstream::nRequests();
const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::nProcs(comm);
// Initial: clear out receive 'slots'
// Preferrable to clear out the map entries instead of the map itself
@ -264,7 +268,7 @@ void exchangeConsensus
iter.val().clear();
}
if (!UPstream::parRun() || numProc < 2)
if (!UPstream::is_parallel(comm))
{
// Do myself
const auto iter = sendBufs.find(myProci);
@ -293,10 +297,11 @@ void exchangeConsensus
const auto& sendData = iter.val();
#ifdef FULLDEBUG
if (proci >= numProc)
if (proci >= UPstream::nProcs(comm))
{
FatalErrorInFunction
<< "Send buffer:" << proci << " >= numProcs:" << numProc
<< "Send buffer:" << proci << " >= numProcs:"
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
#endif

View File

@ -48,7 +48,7 @@ void Foam::Pstream::gather
const label comm
)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
// My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
@ -130,7 +130,7 @@ void Foam::Pstream::scatter
#ifndef Foam_Pstream_scatter_nobroadcast
Pstream::broadcast(value, comm);
#else
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
// My communication order
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2015-2022 OpenCFD Ltd.
Copyright (C) 2015-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -51,13 +51,12 @@ void Foam::Pstream::gatherList
const label comm
)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
if (values.size() != UPstream::nProcs(comm))
{
FatalErrorInFunction
<< "Size of list:" << values.size()
<< " does not equal the number of processors:"
<< "Size of list:" << values.size() << " != numProcs:"
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
@ -200,7 +199,7 @@ void Foam::Pstream::scatterList
// between scatterList() and using broadcast(List<T>&) or a regular
// scatter(List<T>&) is that processor-local data is skipped.
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
if (values.size() != UPstream::nProcs(comm))
{
@ -349,8 +348,23 @@ void Foam::Pstream::allGatherList
const label comm
)
{
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
if (UPstream::is_parallel(comm))
{
// TBD
// if (std::is_arithmetic<T>::value) // or is_contiguous ?
// {
// if (values.size() != UPstream::nProcs(comm))
// {
// FatalErrorInFunction
// << "Size of list:" << values.size()
// << " != number of processors:"
// << UPstream::nProcs(comm)
// << Foam::abort(FatalError);
// }
// UPstream::mpiAllGather(values.data_bytes(), sizeof(T), comm);
// return;
// }
const auto& comms = UPstream::whichCommunication(comm);
Pstream::gatherList(comms, values, tag, comm);

View File

@ -79,7 +79,7 @@ void reduce
const label comm = UPstream::worldComm
)
{
if (UPstream::parRun())
if (UPstream::is_parallel(comm))
{
Foam::reduce(UPstream::whichCommunication(comm), value, bop, tag, comm);
}
@ -375,7 +375,7 @@ void sumReduce
const label comm = UPstream::worldComm
)
{
if (UPstream::parRun())
if (UPstream::is_parallel(comm))
{
Foam::reduce(value, sumOp<T>(), tag, comm);
Foam::reduce(count, sumOp<label>(), tag, comm);

View File

@ -142,12 +142,7 @@ void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
}
Foam::label Foam::UPstream::allocateCommunicator
(
const label parentIndex,
const labelUList& subRanks,
const bool doPstream
)
Foam::label Foam::UPstream::getAvailableCommIndex(const label parentIndex)
{
label index;
if (!freeComms_.empty())
@ -177,16 +172,29 @@ Foam::label Foam::UPstream::allocateCommunicator
treeCommunication_.emplace_back();
}
return index;
}
Foam::label Foam::UPstream::allocateCommunicator
(
const label parentIndex,
const labelUList& subRanks,
const bool doPstream
)
{
const label index = getAvailableCommIndex(parentIndex);
if (debug)
{
Pout<< "Communicators : Allocating communicator " << index << endl
<< " parent : " << parentIndex << endl
<< " procs : " << subRanks << endl
Pout<< "Allocating communicator " << index << nl
<< " parent : " << parentIndex << nl
<< " procs : " << subRanks << nl
<< endl;
}
// Initialise; overwritten by allocatePstreamCommunicator
myProcNo_[index] = 0;
// Initially treat as master, overwritten by allocatePstreamCommunicator
myProcNo_[index] = UPstream::masterNo();
// The selected sub-ranks.
// - transcribe from label to int. Treat negative values as 'ignore'
@ -222,10 +230,6 @@ Foam::label Foam::UPstream::allocateCommunicator
procIds.resize(numSubRanks);
// Size but do not fill structure - this is done on-the-fly
linearCommunication_[index] = List<commsStruct>(numSubRanks);
treeCommunication_[index] = List<commsStruct>(numSubRanks);
if (doPstream && parRun())
{
allocatePstreamCommunicator(parentIndex, index);
@ -243,16 +247,15 @@ Foam::label Foam::UPstream::allocateCommunicator
/// myProcNo_[index] = -(myProcNo_[parentIndex]+1);
/// }
/// }
// Did communicator allocation adjust procIDs_ as well?
if (numSubRanks != procIDs_[index].size())
{
numSubRanks = procIDs_[index].size();
linearCommunication_[index] = List<commsStruct>(numSubRanks);
treeCommunication_[index] = List<commsStruct>(numSubRanks);
}
}
// In case communicator allocation adjusted procIDs_
numSubRanks = procIDs_[index].size();
// Size but do not fill structure - this is done on-the-fly
linearCommunication_[index] = List<commsStruct>(numSubRanks);
treeCommunication_[index] = List<commsStruct>(numSubRanks);
return index;
}

View File

@ -90,10 +90,8 @@ public:
// Public Classes
// Forward Declarations
//- Wrapper for MPI_Request
class Request;
class Request; // Forward Declaration
//- Structure for communicating between processors
class commsStruct
@ -237,6 +235,9 @@ private:
//- Set data for parallel running
static void setParRun(const label nProcs, const bool haveThreads);
//- Initialise entries for new communicator. Return the index
static label getAvailableCommIndex(const label parentIndex);
//- Allocate a communicator with index
static void allocatePstreamCommunicator
(
@ -621,46 +622,57 @@ public:
//- Test if this a parallel run
// Modify access is deprecated
static bool& parRun() noexcept
{
return parRun_;
}
static bool& parRun() noexcept { return parRun_; }
//- Have support for threads
static bool haveThreads() noexcept
{
return haveThreads_;
}
static bool haveThreads() noexcept { return haveThreads_; }
//- Number of ranks in parallel run (for given communicator)
//- is 1 for serial run
//- Relative rank for the master process - is always 0.
static constexpr int masterNo() noexcept { return 0; }
//- Number of ranks in parallel run (for given communicator).
//- It is 1 for serial run
static label nProcs(const label communicator = worldComm)
{
return procIDs_[communicator].size();
}
//- Process index of the master (always 0)
static constexpr int masterNo() noexcept
//- Rank of this process in the communicator (starting from masterNo()).
//- Can be negative if the process is not a rank in the communicator
static int myProcNo(const label communicator = worldComm)
{
return 0;
return myProcNo_[communicator];
}
//- Am I the master rank
//- True if process corresponds to the master rank in the communicator
static bool master(const label communicator = worldComm)
{
return myProcNo_[communicator] == masterNo();
}
//- Is this process a sub-rank on the communicator
//- True if process corresponds to any rank (master or sub-rank)
//- in the given communicator
static bool is_rank(const label communicator = worldComm)
{
return myProcNo_[communicator] >= 0;
}
//- True if process corresponds to a sub-rank in the given communicator
static bool is_subrank(const label communicator = worldComm)
{
return myProcNo_[communicator] > masterNo();
}
//- Number of this process (starting from masterNo() = 0)
static int myProcNo(const label communicator = worldComm)
//- True if parallel algorithm or exchange is required.
// This is when parRun() == true, the process corresponds to a rank
// in the communicator and there is more than one rank in the
// communicator
static bool is_parallel(const label communicator = worldComm)
{
return myProcNo_[communicator];
return
(
parRun_ && is_rank(communicator) && nProcs(communicator) > 1
);
}
//- The parent communicator
@ -669,7 +681,7 @@ public:
return parentComm_(communicator);
}
//- Process IDs within a given communicator
//- The list of ranks within a given communicator
static List<int>& procID(const label communicator)
{
return procIDs_[communicator];

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2021 OpenCFD Ltd.
Copyright (C) 2021-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -44,7 +44,10 @@ Foam::List<T> Foam::UPstream::listGatherValues
List<T> allValues;
const label nproc = (UPstream::parRun() ? UPstream::nProcs(comm) : 1);
const label nproc =
(
UPstream::is_parallel(comm) ? UPstream::nProcs(comm) : 1
);
if (nproc > 1)
{
@ -88,7 +91,10 @@ T Foam::UPstream::listScatterValues
}
const label nproc = (UPstream::parRun() ? UPstream::nProcs(comm) : 1);
const label nproc =
(
UPstream::is_parallel(comm) ? UPstream::nProcs(comm) : 1
);
T localValue;
@ -115,13 +121,13 @@ T Foam::UPstream::listScatterValues
{
// non-parallel: return local value
if (allValues.empty()) // Extra safety
if (UPstream::is_rank(comm) && !allValues.empty())
{
localValue = Zero;
localValue = allValues[0];
}
else
{
localValue = allValues[0];
localValue = Zero;
}
}

View File

@ -696,9 +696,10 @@ void Foam::UPstream::freePstreamCommunicator(const label index)
void Foam::UPstream::barrier(const label communicator, UPstream::Request* req)
{
// No-op for non-parallel
if (!UPstream::parRun())
// No-op for non-parallel or not on communicator
if (!UPstream::parRun() || !UPstream::is_rank(communicator))
{
PstreamGlobals::reset_request(req);
return;
}
@ -753,7 +754,8 @@ Foam::UPstream::probeMessage
{
std::pair<int,int> result(-1, 0);
if (!UPstream::parRun())
// No-op for non-parallel or not on communicator
if (!UPstream::parRun() || !UPstream::is_rank(communicator))
{
return result;
}

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
Copyright (C) 2022-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -39,7 +39,7 @@ bool Foam::UPstream::broadcast
const int rootProcNo
)
{
if (!UPstream::parRun() || UPstream::nProcs(comm) < 2)
if (!UPstream::is_parallel(comm))
{
// Nothing to do - ignore
return true;

View File

@ -247,7 +247,7 @@ void Foam::sumReduce \
const label comm \
) \
{ \
if (UPstream::parRun() && UPstream::nProcs(comm) > 1) \
if (UPstream::is_parallel(comm)) \
{ \
Native values[2]; \
values[0] = static_cast<Native>(count); \

View File

@ -42,7 +42,7 @@ void Foam::PstreamDetail::broadcast0
const label comm
)
{
if (!UPstream::parRun())
if (!UPstream::is_parallel(comm))
{
return;
}
@ -73,7 +73,7 @@ void Foam::PstreamDetail::reduce0
const label comm
)
{
if (!UPstream::parRun())
if (!UPstream::is_parallel(comm))
{
return;
}
@ -127,13 +127,13 @@ void Foam::PstreamDetail::allReduce
{
PstreamGlobals::reset_request(req, requestID);
if (!UPstream::parRun())
const bool immediate = (req || requestID);
if (!UPstream::is_parallel(comm))
{
return;
}
const bool immediate = (req || requestID);
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
{
if (immediate)
@ -238,6 +238,11 @@ void Foam::PstreamDetail::allToAll
const bool immediate = (req || requestID);
if (!UPstream::is_rank(comm))
{
return;
}
const label numProc = UPstream::nProcs(comm);
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
@ -258,7 +263,10 @@ void Foam::PstreamDetail::allToAll
error::printStack(Pout);
}
if (sendData.size() != numProc || recvData.size() != numProc)
if
(
(sendData.size() != numProc || recvData.size() != numProc)
)
{
FatalErrorInFunction
<< "Have " << numProc << " ranks, but size of sendData:"
@ -267,7 +275,7 @@ void Foam::PstreamDetail::allToAll
<< Foam::abort(FatalError);
}
if (!UPstream::parRun() || numProc < 2)
if (!UPstream::is_parallel(comm))
{
recvData.deepCopy(sendData);
return;
@ -365,6 +373,11 @@ void Foam::PstreamDetail::allToAllv
const bool immediate = (req || requestID);
if (!UPstream::is_rank(comm))
{
return;
}
const label np = UPstream::nProcs(comm);
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
@ -400,7 +413,7 @@ void Foam::PstreamDetail::allToAllv
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
if (!UPstream::is_parallel(comm))
{
if (recvCounts[0] != sendCounts[0])
{
@ -502,6 +515,11 @@ void Foam::PstreamDetail::allToAllConsensus
const label comm
)
{
if (!UPstream::is_rank(comm))
{
return;
}
const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::nProcs(comm);
@ -529,7 +547,7 @@ void Foam::PstreamDetail::allToAllConsensus
const Type zeroValue = pTraits<Type>::zero;
recvData = zeroValue;
if (!UPstream::parRun() || numProc < 2)
if (!UPstream::is_parallel(comm))
{
// deep copy
recvData.deepCopy(sendData);
@ -691,6 +709,11 @@ void Foam::PstreamDetail::allToAllConsensus
const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::nProcs(comm);
if (!UPstream::is_rank(comm))
{
return;
}
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
{
Pout<< "** non-blocking consensus Alltoall (map):";
@ -706,7 +729,7 @@ void Foam::PstreamDetail::allToAllConsensus
const Type zeroValue = pTraits<Type>::zero;
recvBufs.clear();
if (!UPstream::parRun() || numProc < 2)
if (!UPstream::is_parallel(comm))
{
// Do myself
const auto iter = sendBufs.find(myProci);
@ -873,15 +896,19 @@ void Foam::PstreamDetail::gather
{
PstreamGlobals::reset_request(req, requestID);
if (!UPstream::parRun())
const bool immediate = (req || requestID);
if (!UPstream::is_rank(comm))
{
return;
}
if (!UPstream::is_parallel(comm))
{
std::memmove(recvData, sendData, recvCount*sizeof(Type));
return;
}
const bool immediate = (req || requestID);
const label np = UPstream::nProcs(comm);
const label numProc = UPstream::nProcs(comm);
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
{
@ -893,7 +920,7 @@ void Foam::PstreamDetail::gather
{
Pout<< "** MPI_Gather (blocking):";
}
Pout<< " np:" << np
Pout<< " numProc:" << numProc
<< " recvCount:" << recvCount
<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm
@ -989,15 +1016,19 @@ void Foam::PstreamDetail::scatter
{
PstreamGlobals::reset_request(req, requestID);
if (!UPstream::parRun())
const bool immediate = (req || requestID);
if (!UPstream::is_rank(comm))
{
return;
}
if (!UPstream::is_parallel(comm))
{
std::memmove(recvData, sendData, recvCount*sizeof(Type));
return;
}
const bool immediate = (req || requestID);
const label np = UPstream::nProcs(comm);
const label numProc = UPstream::nProcs(comm);
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
{
@ -1009,7 +1040,7 @@ void Foam::PstreamDetail::scatter
{
Pout<< "** MPI_Scatter (blocking):";
}
Pout<< " np:" << np
Pout<< " numProc:" << numProc
<< " recvCount:" << recvCount
<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm
@ -1106,15 +1137,19 @@ void Foam::PstreamDetail::gatherv
{
PstreamGlobals::reset_request(req, requestID);
if (!UPstream::parRun())
const bool immediate = (req || requestID);
if (!UPstream::is_rank(comm))
{
return;
}
if (!UPstream::is_parallel(comm))
{
// recvCounts[0] may be invalid - use sendCount instead
std::memmove(recvData, sendData, sendCount*sizeof(Type));
return;
}
const bool immediate = (req || requestID);
const label np = UPstream::nProcs(comm);
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
@ -1249,14 +1284,18 @@ void Foam::PstreamDetail::scatterv
{
PstreamGlobals::reset_request(req, requestID);
if (!UPstream::parRun())
const bool immediate = (req || requestID);
if (!UPstream::is_rank(comm))
{
return;
}
if (!UPstream::is_parallel(comm))
{
std::memmove(recvData, sendData, recvCount*sizeof(Type));
return;
}
const bool immediate = (req || requestID);
const label np = UPstream::nProcs(comm);
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
@ -1383,14 +1422,12 @@ void Foam::PstreamDetail::allGather
const bool immediate = (req || requestID);
if (!UPstream::parRun() || UPstream::nProcs(comm) < 2)
if (!UPstream::is_parallel(comm))
{
// Nothing to do - ignore
return;
}
const label numProc = UPstream::nProcs(comm);
if (UPstream::warnComm >= 0 && comm != UPstream::warnComm)
{
if (immediate)
@ -1401,7 +1438,7 @@ void Foam::PstreamDetail::allGather
{
Pout<< "** MPI_Allgather (blocking):";
}
Pout<< " numProc:" << numProc
Pout<< " numProc:" << UPstream::nProcs(comm)
<< " with comm:" << comm
<< " warnComm:" << UPstream::warnComm
<< endl;