Compare commits

..

1 Commits

Author SHA1 Message Date
00e2d3e4ef WIP: mapped PstreamBuffers with NBX
ENH: sparse storage and data exchange for PstreamBuffers

- change the underlying storage from a numProcs list of buffers to a
  Map of buffers. The reduced memory footprint on large systems is
  on aspect but the primary motivation is to more easily support
  sparse data exchange patterns.
  The Map storage for PstreamBuffers corresponds to a non-blocking
  consensus exchange of sizes that automatically propagates through
  different parts of the code and avoids all-to-all.

CONFIG: enable nonBlockingExchange as default (for testing)

- this changes the Pstream::exchangeSizes to use NBX instead of
  all-to-all, even for List containers.
2023-03-02 12:49:20 +01:00
33 changed files with 958 additions and 541 deletions

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2021-2023 OpenCFD Ltd.
Copyright (C) 2021-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM, distributed under GPL-3.0-or-later.
@ -76,9 +76,9 @@ Description
writer.beginCellData(4);
writer.writeProcIDs();
{
// Use primitive patch order
Field<scalar> fld
(
// Use primitive patch order
faMeshTools::flattenEdgeField(aMesh.magLe(), true)
);
writer.write("magLe", fld);
@ -93,17 +93,11 @@ Description
}
{
const Field<vector> edgeCentres
(
// Use primitive patch order
faMeshTools::flattenEdgeField(aMesh.edgeCentres(), true)
);
// finiteArea - edgeCentres
// (no other convenient way to display vectors on the edges)
vtk::lineWriter writer
(
edgeCentres,
aMesh.edgeCentres(),
edgeList::null(),
// vtk::formatType::INLINE_ASCII,
fileName
@ -115,20 +109,19 @@ Description
writer.writeGeometry();
// PointData
writer.beginPointData(3);
writer.writeProcIDs(); // Unfortunately cannot threshold on points
writer.beginPointData(4);
{
// Use primitive patch order
Field<vector> fld
(
// Use primitive patch order
faMeshTools::flattenEdgeField(aMesh.Le(), true)
);
writer.write("Le", fld);
}
{
// Use primitive patch order
Field<vector> fld
(
// Use primitive patch order
faMeshTools::flattenEdgeField(aMesh.edgeAreaNormals(), true)
);
writer.write("normal", fld);

View File

@ -7,7 +7,7 @@
# \\/ M anipulation |
#------------------------------------------------------------------------------
# Copyright (C) 2011-2015 OpenFOAM Foundation
# Copyright (C) 2017-2023 OpenCFD Ltd.
# Copyright (C) 2017-2021 OpenCFD Ltd.
#------------------------------------------------------------------------------
# License
# This file is part of OpenFOAM, distributed under GPL-3.0-or-later.
@ -45,7 +45,6 @@ options:
-local Same as -spawn=1
-remote Same as -spawn=2
-clean Remove old processor*.{log,sh} files, mpirun.schema etc
-decompose-dict=<file> Specific decomposeParDict name
-help Print the usage
Invoke mpirun with separate per-processor log files or running in
@ -112,7 +111,7 @@ Linux)
esac
unset appName appArgs nProcs
unset method spawn optClean optValue
unset method spawn optClean
optConfirm=true
decompDict="system/decomposeParDict"
@ -127,7 +126,7 @@ do
then
knownOption=true # Assume success
case "$1" in
('') ;; # Ignore junk
'') ;; # ignore junk
-clean) optClean=true ;;
-yes) unset optConfirm ;;
@ -174,25 +173,14 @@ do
shift
;;
(-decompose-dict=*)
optValue="${1#*=}"
case "$optValue" in
('' | none | false) ;; ## Ignore
(*)
decompDict="$optValue"
appArgs="${appArgs}${appArgs:+ }-decomposeParDict '$decompDict'"
;;
esac
;;
-decomposeParDict)
# Grab values and add to args immediately
decompDict="$2"
appArgs="${appArgs}${appArgs:+ }$1 '$2'"
shift
appArgs="${appArgs}${appArgs:+ }-decomposeParDict '$decompDict'"
;;
(*)
*)
knownOption=false # Fallthrough to regular processing
;;
esac
@ -204,24 +192,23 @@ do
fi
fi
# Processing application arguments
case "$1" in
(-help* | --help*) usage ;;
('') ;; ## Ignore junk
-help* | --help*) usage ;;
'') ;; # ignore junk
(-np)
-np)
nProcs="$2"
shift
;;
(-decomposeParDict)
-decomposeParDict)
# Grab values and add to args immediately
decompDict="$2"
appArgs="${appArgs}${appArgs:+ }-decomposeParDict '$decompDict'"
appArgs="${appArgs}${appArgs:+ }$1 '$2'"
shift
;;
(*)
*)
if [ -z "$appName" ]
then
appName="$1"

View File

@ -6,7 +6,7 @@
# \\/ M anipulation |
#------------------------------------------------------------------------------
# Copyright (C) 2011-2016 OpenFOAM Foundation
# Copyright (C) 2015-2023 OpenCFD Ltd.
# Copyright (C) 2015-2022 OpenCFD Ltd.
#------------------------------------------------------------------------------
# License
# This file is part of OpenFOAM, distributed under GPL-3.0-or-later.
@ -180,16 +180,6 @@ getNumberOfProcessors()
{
local dict="${1:-system/decomposeParDict}"
case "$dict" in
(system/*) # Already qualified
;;
(*)
# If it does not exist, assume it refers to location in system/
[ -f "$dict" ] || dict="system/$dict"
;;
esac
# Re-use positional parameters for automatic whitespace elimination
set -- $(foamDictionary -entry numberOfSubdomains -value "$dict" 2>/dev/null)
@ -233,7 +223,7 @@ getApplication()
#
runApplication()
{
local appName appRun optValue logFile logMode
local appName appRun logFile logMode
# Any additional parsed arguments (eg, decomposeParDict)
local appArgs
@ -242,39 +232,25 @@ runApplication()
while [ "$#" -gt 0 ] && [ -z "$appRun" ]
do
case "$1" in
('') ;; # Ignore junk
(-a | -append)
logMode=append
;;
(-o | -overwrite)
logMode=overwrite
;;
(-s | -suffix)
logFile=".$2"
shift
;;
(-decompose-dict=*)
optValue="${1#*=}"
case "$optValue" in
('' | none | false) ;; ## Ignore
(*) appArgs="$appArgs -decomposeParDict $optValue" ;;
esac
;;
(-decomposeParDict)
optValue="$2"
shift
case "$optValue" in
('' | none | false) ;; ## Ignore
(*) appArgs="$appArgs -decomposeParDict $optValue" ;;
esac
;;
(*)
appRun="$1"
;;
-a | -append)
logMode=append
;;
-o | -overwrite)
logMode=overwrite
;;
-s | -suffix)
logFile=".$2"
shift
;;
-decomposeParDict)
appArgs="$appArgs $1 $2"
shift
;;
'')
;;
*)
appRun="$1"
;;
esac
shift
done
@ -304,7 +280,7 @@ runApplication()
#
runParallel()
{
local appName appRun optValue logFile logMode nProcs
local appName appRun logFile logMode nProcs
# Any additional parsed arguments (eg, decomposeParDict)
local appArgs="-parallel"
@ -319,46 +295,30 @@ runParallel()
while [ "$#" -gt 0 ] && [ -z "$appRun" ]
do
case "$1" in
('') ;; # Ignore junk
(-a | -append)
logMode=append
;;
(-o | -overwrite)
logMode=overwrite
;;
(-s | -suffix)
logFile=".$2"
shift
;;
(-n | -np)
nProcs="$2"
shift
;;
(-decompose-dict=*)
optValue="${1#*=}"
case "$optValue" in
('' | none | false) ;; ## Ignore
(*)
appArgs="$appArgs -decomposeParDict $optValue"
nProcs="$(getNumberOfProcessors "$optValue")"
-a | -append)
logMode=append
;;
esac
;;
(-decomposeParDict)
optValue="$2"
shift
case "$optValue" in
('' | none | false) ;; ## Ignore
(*)
appArgs="$appArgs -decomposeParDict $optValue"
nProcs="$(getNumberOfProcessors "$optValue")"
-o | -overwrite)
logMode=overwrite
;;
-s | -suffix)
logFile=".$2"
shift
;;
-n | -np)
nProcs="$2"
shift
;;
-decomposeParDict)
appArgs="$appArgs $1 $2"
nProcs=$(getNumberOfProcessors "$2")
shift
;;
'')
;;
*)
appRun="$1"
;;
esac
;;
(*)
appRun="$1"
;;
esac
shift
done

View File

@ -131,8 +131,6 @@ OptimisationSwitches
nProcsSimpleSum 0;
// Min numProc to use non-blocking exchange algorithm (Hoeffler: NBX)
nonBlockingExchange 1;
// Use hybrid NBX/PEX for PstreamBuffers
pbufs.algorithm 0;
// MPI buffer size (bytes)
// Can override with the MPI_BUFFER_SIZE env variable.

View File

@ -470,7 +470,8 @@ public:
// Exchange
//- Helper: exchange sizes of sendBufs for specified send/recv ranks
//- Helper: exchange sizes of sendBufs for specified
//- set of send/receive processes.
template<class Container>
static void exchangeSizes
(
@ -482,17 +483,6 @@ public:
const label comm = UPstream::worldComm
);
//- Helper: exchange sizes of sendBufs for specified neighbour ranks
template<class Container>
static void exchangeSizes
(
const labelUList& neighProcs,
const Container& sendBufs,
labelList& sizes,
const label tag = UPstream::msgType(),
const label comm = UPstream::worldComm
);
//- Helper: exchange sizes of sendBufs.
//- The sendBufs is the data per processor (in the communicator).
// Returns sizes of sendBufs on the sending processor.

View File

@ -27,29 +27,40 @@ License
\*---------------------------------------------------------------------------*/
#include "PstreamBuffers.H"
#include "debug.H"
#include "registerSwitch.H"
#ifdef Foam_PstreamBuffers_dense
#include "bitSet.H"
#endif
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
int Foam::PstreamBuffers::algorithm
namespace Foam
{
#ifdef Foam_PstreamBuffers_map_storage
//- Retrieve size of specified buffer, first checking for existence
static inline label getBufferSize
(
// Not really the most creative name...
Foam::debug::optimisationSwitch("pbufs.algorithm", 0)
);
registerOptSwitch
const Map<DynamicList<char>>& buffers,
const label proci
)
{
const auto iter = buffers.cfind(proci);
return (iter.good() ? iter.val().size() : 0);
}
#else
//- Retrieve size of specified buffer, no access checking
static inline label getBufferSize
(
"pbufs.algorithm",
int,
Foam::PstreamBuffers::algorithm
);
const UList<DynamicList<char>>& buffers,
const label proci
)
{
return buffers[proci].size();
}
#endif
// Simple enumerations
// -------------------
static constexpr int algorithm_PEX_allToAll = -1; // OpenFOAM 2212 and earlier
//static constexpr int algorithm_PEX_hybrid = 0; // New default?
static constexpr int algorithm_full_NBX = 1; // Experimental
} // End namespace Foam
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
@ -57,7 +68,11 @@ static constexpr int algorithm_full_NBX = 1; // Experimental
void Foam::PstreamBuffers::finalExchange
(
const bool wait,
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes
#else
labelList& recvSizes
#endif
)
{
// Could also check that it is not called twice
@ -66,33 +81,19 @@ void Foam::PstreamBuffers::finalExchange
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
if
// Use PEX algorithm
#ifdef Foam_PstreamBuffers_map_storage
// PEX stage 1: exchange sizes (non-blocking consensus)
Pstream::exchangeSizes
(
wait
&& (algorithm >= algorithm_full_NBX)
&& (UPstream::maxCommsSize <= 0)
)
{
// NBX algorithm (nonblocking exchange)
// - when requested and waiting, no data chunking etc
PstreamDetail::exchangeConsensus<DynamicList<char>, char>
(
sendBuffers_,
recvBuffers_,
recvSizes,
(tag_ + 271828), // some unique tag?
comm_,
wait
);
return;
}
// PEX algorithm with two different flavours of exchanging sizes
// Assemble the send sizes (cf. Pstream::exchangeSizes)
sendBuffers_,
recvSizes,
tag_,
comm_
);
#else
// Like Pstream::exchangeSizes
labelList sendSizes(nProcs_);
forAll(sendBuffers_, proci)
{
@ -100,24 +101,16 @@ void Foam::PstreamBuffers::finalExchange
}
recvSizes.resize_nocopy(nProcs_);
if (algorithm == algorithm_PEX_allToAll)
{
// PEX stage 1: exchange sizes (all-to-all)
UPstream::allToAll(sendSizes, recvSizes, comm_);
}
else
{
// PEX stage 1: exchange sizes (non-blocking consensus)
UPstream::allToAllConsensus
(
sendSizes,
recvSizes,
(tag_ + 314159), // some unique tag?
comm_
);
}
// PEX stage 1: exchange sizes (non-blocking consensus)
UPstream::allToAllConsensus
(
sendSizes,
recvSizes,
(tag_ + 314159), // some unique tag?
comm_
);
#endif
// PEX stage 2: point-to-point data exchange
Pstream::exchange<DynamicList<char>, char>
(
sendBuffers_,
@ -131,9 +124,11 @@ void Foam::PstreamBuffers::finalExchange
}
#ifdef Foam_PstreamBuffers_dense
void Foam::PstreamBuffers::finalExchange
(
const labelUList& neighProcs,
const labelUList& sendProcs,
const labelUList& recvProcs,
const bool wait,
labelList& recvSizes
)
@ -144,40 +139,16 @@ void Foam::PstreamBuffers::finalExchange
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
// Preparation. Temporarily abuse recvSizes as logic to clear
// send buffers that are not in the neighbourhood connection
{
recvSizes.resize_nocopy(nProcs_);
recvSizes = 0;
// Preserve self-send, even if not described by neighbourhood
recvSizes[UPstream::myProcNo(comm_)] = 1;
for (const label proci : neighProcs)
{
recvSizes[proci] = 1; // Connected
}
for (label proci=0; proci < nProcs_; ++proci)
{
if (!recvSizes[proci]) // Not connected
{
sendBuffers_[proci].clear();
}
}
}
// PEX stage 1: exchange sizes (limited neighbourhood)
Pstream::exchangeSizes
(
neighProcs,
sendProcs,
recvProcs,
sendBuffers_,
recvSizes,
tag_,
comm_
);
// PEX stage 2: point-to-point data exchange
Pstream::exchange<DynamicList<char>, char>
(
sendBuffers_,
@ -189,13 +160,19 @@ void Foam::PstreamBuffers::finalExchange
);
}
}
#endif
void Foam::PstreamBuffers::finalGatherScatter
(
const bool isGather,
const bool wait,
const bool needSizes,
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes
#else
labelList& recvSizes
#endif
)
{
// Could also check that it is not called twice
@ -208,10 +185,20 @@ void Foam::PstreamBuffers::finalGatherScatter
// Only send to master [0]. Master is also allowed to 'send' to itself
#ifdef Foam_PstreamBuffers_map_storage
forAllIters(sendBuffers_, iter)
{
if (iter.key() != 0)
{
iter.val().clear();
}
}
#else
for (label proci=1; proci < sendBuffers_.size(); ++proci)
{
sendBuffers_[proci].clear();
}
#endif
}
else
{
@ -227,46 +214,100 @@ void Foam::PstreamBuffers::finalGatherScatter
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
// Use PEX algorithm
// - for a non-sparse gather/scatter, it is presumed that
// MPI_Gather/MPI_Scatter will be the most efficient way to
// communicate the sizes.
#ifdef Foam_PstreamBuffers_map_storage
labelList recvCount;
#else
labelList& recvCount = recvSizes;
#endif
// PEX stage 1: exchange sizes (gather or scatter)
if (isGather)
{
// gather mode (all-to-one): master [0] <- everyone
recvSizes =
UPstream::listGatherValues(sendBuffers_[0].size(), comm_);
const label nSend = getBufferSize(sendBuffers_, 0);
recvCount = UPstream::listGatherValues(nSend, comm_);
#ifdef Foam_PstreamBuffers_map_storage
// Transcribe recv count from list to map
recvSizes.clear();
if (UPstream::master(comm_))
{
for (label proci=1; proci < recvCount.size(); ++proci)
{
if (recvCount[proci] > 0)
{
recvSizes.insert(proci, recvCount[proci]);
}
}
}
#else
if (!UPstream::master(comm_))
{
recvSizes.resize_nocopy(nProcs_);
recvSizes = Zero;
}
#endif
}
else
{
// scatter mode (one-to-all): master [0] -> everyone
recvSizes.resize_nocopy(nProcs_);
if (UPstream::master(comm_))
{
recvCount.resize(nProcs_, Zero);
#ifdef Foam_PstreamBuffers_map_storage
forAllConstIters(sendBuffers_, iter)
{
recvCount[iter.key()] = iter.val().size();
}
#else
forAll(sendBuffers_, proci)
{
recvCount[proci] = sendBuffers_[proci].size();
}
#endif
}
else
{
// Scattering, so non-master sends nothing
recvCount = Zero;
#ifdef Foam_PstreamBuffers_map_storage
recvSizes.clear();
recvSizes.resize_nocopy(nProcs_);
#else
recvSizes = Zero;
#endif
}
const label nRecv(UPstream::listScatterValues(recvCount, comm_));
if (UPstream::master(comm_))
{
forAll(sendBuffers_, proci)
{
recvSizes[proci] = sendBuffers_[proci].size();
}
#ifdef Foam_PstreamBuffers_map_storage
recvSizes.clear();
#else
recvSizes = Zero;
#endif
}
else
{
#ifdef Foam_PstreamBuffers_map_storage
recvSizes.clear();
const label myRecv(UPstream::listScatterValues(recvSizes, comm_));
recvSizes = Zero;
recvSizes[0] = myRecv;
if (nRecv)
{
recvSizes.insert(0, nRecv);
}
#else
recvSizes = Zero;
recvSizes[0] = nRecv;
#endif
}
}
// PEX stage 2: point-to-point data exchange
Pstream::exchange<DynamicList<char>, char>
(
sendBuffers_,
@ -297,9 +338,18 @@ Foam::PstreamBuffers::PstreamBuffers
tag_(tag),
comm_(communicator),
nProcs_(UPstream::nProcs(comm_)),
#ifdef Foam_PstreamBuffers_map_storage
// Default sizing (128) is probably OK.
// Meshes often have 16-20 neighbours (avg) and 100 neighbours (max)
sendBuffers_(),
recvBuffers_(),
recvPositions_()
#else
sendBuffers_(nProcs_),
recvBuffers_(nProcs_),
recvPositions_(nProcs_, Zero)
#endif
{}
@ -307,7 +357,23 @@ Foam::PstreamBuffers::PstreamBuffers
Foam::PstreamBuffers::~PstreamBuffers()
{
// Check that all data has been consumed.
// Check that all data has been consumed
#ifdef Foam_PstreamBuffers_map_storage
forAllConstIters(recvBuffers_, iter)
{
const label proci = iter.key();
const label len = iter.val().size();
const label pos = recvPositions_.lookup(proci, len);
if (pos < len)
{
FatalErrorInFunction
<< "Message from processor " << proci
<< " Only consumed " << pos << " of " << len << " bytes" << nl
<< Foam::abort(FatalError);
}
}
#else
forAll(recvBuffers_, proci)
{
const label pos = recvPositions_[proci];
@ -321,6 +387,7 @@ Foam::PstreamBuffers::~PstreamBuffers()
<< Foam::abort(FatalError);
}
}
#endif
}
@ -331,7 +398,11 @@ Foam::DynamicList<char>& Foam::PstreamBuffers::accessSendBuffer
const label proci
)
{
#ifdef Foam_PstreamBuffers_map_storage
return sendBuffers_(proci); // Created on demand if needed
#else
return sendBuffers_[proci];
#endif
}
@ -340,13 +411,21 @@ Foam::DynamicList<char>& Foam::PstreamBuffers::accessRecvBuffer
const label proci
)
{
#ifdef Foam_PstreamBuffers_map_storage
return recvBuffers_(proci); // Created on demand if needed
#else
return recvBuffers_[proci];
#endif
}
Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
{
#ifdef Foam_PstreamBuffers_map_storage
return recvPositions_(proci, 0); // Created on demand if needed
#else
return recvPositions_[proci];
#endif
}
@ -354,20 +433,38 @@ Foam::label& Foam::PstreamBuffers::accessRecvPosition(const label proci)
void Foam::PstreamBuffers::clearSends()
{
#ifdef Foam_PstreamBuffers_map_storage
forAllIters(sendBuffers_, iter)
{
iter.val().clear();
}
#else
for (DynamicList<char>& buf : sendBuffers_)
{
buf.clear();
}
#endif
}
void Foam::PstreamBuffers::clearRecvs()
{
#ifdef Foam_PstreamBuffers_map_storage
forAllIters(recvBuffers_, iter)
{
iter.val().clear();
}
forAllIters(recvPositions_, iter)
{
iter.val() = 0;
}
#else
for (DynamicList<char>& buf : recvBuffers_)
{
buf.clear();
}
recvPositions_ = Zero;
#endif
}
@ -381,14 +478,41 @@ void Foam::PstreamBuffers::clear()
void Foam::PstreamBuffers::clearSend(const label proci)
{
#ifdef Foam_PstreamBuffers_map_storage
{
auto iter = sendBuffers_.find(proci);
if (iter.good())
{
iter.val().clear();
}
}
#else
sendBuffers_[proci].clear();
#endif
}
void Foam::PstreamBuffers::clearRecv(const label proci)
{
#ifdef Foam_PstreamBuffers_map_storage
{
auto iter = recvBuffers_.find(proci);
if (iter.good())
{
iter.val().clear();
}
}
{
auto iter = recvPositions_.find(proci);
if (iter.good())
{
iter.val() = 0;
}
}
#else
recvBuffers_[proci].clear();
recvPositions_[proci] = 0;
#endif
}
@ -396,6 +520,21 @@ void Foam::PstreamBuffers::clearStorage()
{
// Could also clear out entire sendBuffers_, recvBuffers_ and reallocate.
// Not sure if it makes much difference
#ifdef Foam_PstreamBuffers_map_storage
forAllIters(sendBuffers_, iter)
{
iter.val().clearStorage();
}
forAllIters(recvBuffers_, iter)
{
iter.val().clearStorage();
}
forAllIters(recvPositions_, iter)
{
iter.val() = 0;
}
#else
for (DynamicList<char>& buf : sendBuffers_)
{
buf.clearStorage();
@ -405,6 +544,7 @@ void Foam::PstreamBuffers::clearStorage()
buf.clearStorage();
}
recvPositions_ = Zero;
#endif
finishedSendsCalled_ = false;
}
@ -412,6 +552,15 @@ void Foam::PstreamBuffers::clearStorage()
bool Foam::PstreamBuffers::hasSendData() const
{
#ifdef Foam_PstreamBuffers_map_storage
forAllConstIters(sendBuffers_, iter)
{
if (!iter.val().empty())
{
return true;
}
}
#else
for (const DynamicList<char>& buf : sendBuffers_)
{
if (!buf.empty())
@ -419,6 +568,7 @@ bool Foam::PstreamBuffers::hasSendData() const
return true;
}
}
#endif
return false;
}
@ -427,6 +577,18 @@ bool Foam::PstreamBuffers::hasRecvData() const
{
if (finishedSendsCalled_)
{
#ifdef Foam_PstreamBuffers_map_storage
forAllConstIters(recvBuffers_, iter)
{
const label proci = iter.key();
const label len = iter.val().size();
if (recvPositions_.lookup(proci, 0) < len)
{
return true;
}
}
#else
forAll(recvBuffers_, proci)
{
if (recvPositions_[proci] < recvBuffers_[proci].size())
@ -434,6 +596,7 @@ bool Foam::PstreamBuffers::hasRecvData() const
return true;
}
}
#endif
}
#ifdef FULLDEBUG
else
@ -449,7 +612,7 @@ bool Foam::PstreamBuffers::hasRecvData() const
Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const
{
return sendBuffers_[proci].size();
return getBufferSize(sendBuffers_, proci);
}
@ -457,8 +620,16 @@ Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
{
if (finishedSendsCalled_)
{
#ifdef Foam_PstreamBuffers_map_storage
#else
const label len
(
getBufferSize(recvBuffers_, proci)
- recvPositions_.lookup(proci, 0)
);
#else
const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
#endif
if (len > 0)
{
return len;
@ -482,6 +653,21 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
if (finishedSendsCalled_)
{
#ifdef Foam_PstreamBuffers_map_storage
forAllConstIters(recvBuffers_, iter)
{
const label proci = iter.key();
const label len
(
iter.val().size() - recvPositions_.lookup(proci, 0)
);
if (len > 0)
{
counts[proci] = len;
}
}
#else
forAll(recvBuffers_, proci)
{
const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
@ -491,6 +677,7 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
counts[proci] = len;
}
}
#endif
}
#ifdef FULLDEBUG
else
@ -513,6 +700,17 @@ Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount
if (finishedSendsCalled_)
{
#ifdef Foam_PstreamBuffers_map_storage
forAllConstIters(recvBuffers_, iter)
{
const label proci = iter.key();
if (excludeProci != proci)
{
label len(iter.val().size() - recvPositions_.lookup(proci, 0));
maxLen = max(maxLen, len);
}
}
#else
forAll(recvBuffers_, proci)
{
if (excludeProci != proci)
@ -521,6 +719,7 @@ Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount
maxLen = max(maxLen, len);
}
}
#endif
}
#ifdef FULLDEBUG
else
@ -552,6 +751,24 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const
{
if (finishedSendsCalled_)
{
#ifdef Foam_PstreamBuffers_map_storage
const auto iter = recvBuffers_.cfind(proci);
if (iter.good())
{
const label pos = recvPositions_.lookup(proci, 0);
const label len = iter.val().size();
if (pos < len)
{
return UList<char>
(
const_cast<char*>(iter.val().cbegin(pos)),
(len - pos)
);
}
}
#else
const label pos = recvPositions_[proci];
const label len = recvBuffers_[proci].size();
@ -563,6 +780,7 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const
(len - pos)
);
}
#endif
}
#ifdef FULLDEBUG
else
@ -586,20 +804,25 @@ bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
void Foam::PstreamBuffers::finishedSends(const bool wait)
{
#ifdef Foam_PstreamBuffers_map_storage
Map<label> recvSizes;
#else
labelList recvSizes;
#endif
finalExchange(wait, recvSizes);
}
void Foam::PstreamBuffers::finishedSends
(
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait
)
{
// Resize for copying back
recvSizes.resize_nocopy(sendBuffers_.size());
finalExchange(wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
@ -616,14 +839,161 @@ void Foam::PstreamBuffers::finishedSends
}
void Foam::PstreamBuffers::finishedNeighbourSends
#ifdef Foam_PstreamBuffers_dense
void Foam::PstreamBuffers::finishedSends
(
const labelUList& neighProcs,
const labelUList& sendProcs,
const labelUList& recvProcs,
const bool wait
)
{
labelList recvSizes;
finalExchange(sendProcs, recvProcs, wait, recvSizes);
}
void Foam::PstreamBuffers::finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
const bool wait
)
{
finalExchange(neighProcs, wait, recvSizes);
finalExchange(sendProcs, recvProcs, wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
FatalErrorInFunction
<< "Obtaining sizes not supported in "
<< UPstream::commsTypeNames[commsType_] << endl
<< " since transfers already in progress. Use non-blocking instead."
<< exit(FatalError);
// Note: maybe possible only if using different tag from write started
// by ~UOPstream. Needs some work.
}
}
bool Foam::PstreamBuffers::finishedSends
(
bitSet& sendConnections,
DynamicList<label>& sendProcs,
DynamicList<label>& recvProcs,
const bool wait
)
{
bool changed = (sendConnections.size() != nProcs());
if (changed)
{
sendConnections.resize(nProcs());
}
// Update send connections
// - reasonable to assume there are no self-sends on UPstream::myProcNo
forAll(sendBuffers_, proci)
{
// ie, sendDataCount(proci) != 0
if (sendConnections.set(proci, !sendBuffers_[proci].empty()))
{
// The state changed
changed = true;
}
}
UPstream::reduceOr(changed, comm_);
if (changed)
{
// Create send/recv topology
// The send ranks
sendProcs.clear();
forAll(sendBuffers_, proci)
{
// ie, sendDataCount(proci) != 0
if (!sendBuffers_[proci].empty())
{
sendProcs.push_back(proci);
}
}
labelList recvSizes;
finishedSends(recvSizes, wait); // All-to-all
// The recv ranks
recvProcs.clear();
forAll(recvSizes, proci)
{
if (recvSizes[proci] > 0)
{
recvProcs.push_back(proci);
}
}
}
else
{
// Use existing send/recv ranks
finishedSends(sendProcs, recvProcs, wait);
}
return changed;
}
#endif
void Foam::PstreamBuffers::finishedNeighbourSends
(
const labelUList& neighProcs,
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait
)
{
#ifdef Foam_PstreamBuffers_map_storage
recvSizes.clear();
for (const label proci : neighProcs)
{
recvSizes.insert(proci, 0);
}
// Prune any send buffers that are not neighbours
forAllIters(sendBuffers_, iter)
{
if (!recvSizes.contains(iter.key()))
{
iter.val().clear();
}
}
finalExchange(wait, recvSizes);
#else
// Resize for copying back
recvSizes.resize_nocopy(sendBuffers_.size());
// Prune send buffers that are not neighbours
{
labelHashSet keepProcs(neighProcs);
// Prune send buffers that are not neighbours
forAll(sendBuffers_, proci)
{
if (!keepProcs.contains(proci))
{
sendBuffers_[proci].clear();
}
}
}
finalExchange(wait, recvSizes);
#endif
}
@ -633,32 +1003,65 @@ void Foam::PstreamBuffers::finishedNeighbourSends
const bool wait
)
{
#ifdef Foam_PstreamBuffers_map_storage
finishedSends(neighProcs, neighProcs, wait);
#else
labelList recvSizes;
finalExchange(neighProcs, wait, recvSizes);
// Prune send buffers that are not neighbours
{
labelHashSet keepProcs(neighProcs);
// Prune send buffers that are not neighbours
forAll(sendBuffers_, proci)
{
if (!keepProcs.contains(proci))
{
sendBuffers_[proci].clear();
}
}
}
finalExchange(wait, recvSizes);
#endif
}
void Foam::PstreamBuffers::finishedGathers(const bool wait)
{
#ifdef Foam_PstreamBuffers_map_storage
Map<label> recvSizes;
finalGatherScatter(true, wait, false, recvSizes);
#else
labelList recvSizes;
finalGatherScatter(true, wait, recvSizes);
finalGatherScatter(true, wait, false, recvSizes);
#endif
}
void Foam::PstreamBuffers::finishedScatters(const bool wait)
{
#ifdef Foam_PstreamBuffers_map_storage
Map<label> recvSizes;
finalGatherScatter(false, wait, false, recvSizes);
#else
labelList recvSizes;
finalGatherScatter(false, wait, recvSizes);
finalGatherScatter(false, wait, false, recvSizes);
#endif
}
void Foam::PstreamBuffers::finishedGathers
(
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait
)
{
finalGatherScatter(true, wait, recvSizes);
finalGatherScatter(true, wait, true, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{
@ -676,11 +1079,15 @@ void Foam::PstreamBuffers::finishedGathers
void Foam::PstreamBuffers::finishedScatters
(
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait
)
{
finalGatherScatter(false, wait, recvSizes);
finalGatherScatter(false, wait, true, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking)
{

View File

@ -100,14 +100,28 @@ SourceFiles
#define Foam_PstreamBuffers_H
#include "DynamicList.H"
#include "Map.H"
#include "UPstream.H"
#include "IOstream.H"
// Transitional
#define Foam_PstreamBuffers_map_storage
// #define Foam_PstreamBuffers_dense
#ifdef Foam_PstreamBuffers_dense
#undef Foam_PstreamBuffers_map_storage
#endif
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// Forward Declarations
#ifdef Foam_PstreamBuffers_dense
class bitSet;
#endif
/*---------------------------------------------------------------------------*\
Class PstreamBuffers Declaration
\*---------------------------------------------------------------------------*/
@ -140,6 +154,19 @@ class PstreamBuffers
// Buffer storage
#ifdef Foam_PstreamBuffers_map_storage
//- Send buffers (sparse)
Map<DynamicList<char>> sendBuffers_;
//- Receive buffers (sparse)
Map<DynamicList<char>> recvBuffers_;
//- Current read positions within recvBuffers_ (sparse)
Map<label> recvPositions_;
#else
//- Send buffers. Size is nProcs()
List<DynamicList<char>> sendBuffers_;
@ -149,6 +176,8 @@ class PstreamBuffers
//- Current read positions within recvBuffers_. Size is nProcs()
labelList recvPositions_;
#endif
// Private Member Functions
@ -157,25 +186,37 @@ class PstreamBuffers
void finalExchange
(
const bool wait,
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes
#else
labelList& recvSizes
#endif
);
//- Mark sends as done.
// Only exchange sizes using the neighbour ranks
// Only exchange sizes using the sendProcs/recvProcs subset
// (nonBlocking comms).
#ifdef Foam_PstreamBuffers_dense
void finalExchange
(
const labelUList& neighProcs,
const labelUList& sendProcs,
const labelUList& recvProcs,
const bool wait,
labelList& recvSizes
);
#endif
//- For all-to-one or one-to-all
void finalGatherScatter
void finalExchangeGatherScatter
(
const bool isGather,
const bool wait,
const bool needSizes, // If recvSizes needed or scratch
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes
#else
labelList& recvSizes
#endif
);
@ -197,12 +238,6 @@ class PstreamBuffers
public:
// Static Data
//- Preferred exchange algorithm (may change or be removed in future)
static int algorithm;
// Constructors
//- Construct given communication type (default: nonBlocking), message
@ -393,11 +428,79 @@ public:
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedSends(labelList& recvSizes, const bool wait = true);
void finishedSends
(
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait = true
);
// Functions with restricted neighbours
//- Mark sends as done using subset of send/recv ranks
//- to exchange data on.
//
// Non-blocking mode: populates receive buffers.
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
#ifdef Foam_PstreamBuffers_dense
void finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
const bool wait = true
);
#endif
//- Mark sends as done using subset of send/recv ranks
//- to exchange data on. Recovers the sizes (bytes) received.
//
// Non-blocking mode: populates receive buffers.
//
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param[out] recvSizes the sizes (bytes) received
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
#ifdef Foam_PstreamBuffers_dense
void finishedSends
(
const labelUList& sendProcs,
const labelUList& recvProcs,
labelList& recvSizes,
const bool wait = true
);
#endif
//- A caching version that uses a limited send/recv connectivity.
//
// Non-blocking mode: populates receive buffers.
// \param sendConnections on/off for sending ranks
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \return True if the send/recv connectivity changed
//
// \warning currently only valid for nonBlocking comms.
#ifdef Foam_PstreamBuffers_dense
bool finishedSends
(
bitSet& sendConnections,
DynamicList<label>& sendProcs,
DynamicList<label>& recvProcs,
const bool wait = true
);
#endif
//- Mark sends as done using subset of send/recv ranks
//- and recover the sizes (bytes) received.
//
@ -427,7 +530,11 @@ public:
void finishedNeighbourSends
(
const labelUList& neighProcs,
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait = true
);
@ -452,7 +559,15 @@ public:
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedGathers(labelList& recvSizes, const bool wait = true);
void finishedGathers
(
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait = true
);
//- Mark all sends to sub-procs as done.
//
@ -472,7 +587,15 @@ public:
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
void finishedScatters(labelList& recvSizes, const bool wait = true);
void finishedScatters
(
#ifdef Foam_PstreamBuffers_map_storage
Map<label>& recvSizes,
#else
labelList& recvSizes,
#endif
const bool wait = true
);
};

View File

@ -746,89 +746,58 @@ void Foam::Pstream::exchangeSizes
const label comm
)
{
const label myProci = UPstream::myProcNo(comm);
const label numProcs = UPstream::nProcs(comm);
//const label myProci = UPstream::myProcNo(comm);
if (sendBufs.size() != numProcs)
if (sendBufs.size() != UPstream::nProcs(comm))
{
FatalErrorInFunction
<< "Size of container " << sendBufs.size()
<< " does not equal the number of processors " << numProcs
<< " does not equal the number of processors "
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
labelList sendSizes(numProcs);
for (label proci = 0; proci < numProcs; ++proci)
labelList sendSizes(sendProcs.size());
forAll(sendProcs, i)
{
sendSizes[proci] = sendBufs[proci].size();
sendSizes[i] = sendBufs[sendProcs[i]].size();
}
recvSizes.resize_nocopy(numProcs);
recvSizes.resize_nocopy(sendBufs.size());
recvSizes = 0; // Ensure non-received entries are properly zeroed
// Preserve self-send, even if not described by neighbourhood
recvSizes[myProci] = sendSizes[myProci];
const label startOfRequests = UPstream::nRequests();
for (const label proci : recvProcs)
{
if (proci != myProci)
{
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
reinterpret_cast<char*>(&recvSizes[proci]),
sizeof(label),
tag,
comm
);
}
UIPstream::read
(
UPstream::commsTypes::nonBlocking,
proci,
reinterpret_cast<char*>(&recvSizes[proci]),
sizeof(label),
tag,
comm
);
}
for (const label proci : sendProcs)
forAll(sendProcs, i)
{
if (proci != myProci)
{
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
proci,
reinterpret_cast<char*>(&sendSizes[proci]),
sizeof(label),
tag,
comm
);
}
UOPstream::write
(
UPstream::commsTypes::nonBlocking,
sendProcs[i],
reinterpret_cast<char*>(&sendSizes[i]),
sizeof(label),
tag,
comm
);
}
UPstream::waitRequests(startOfRequests);
}
template<class Container>
void Foam::Pstream::exchangeSizes
(
const labelUList& neighProcs,
const Container& sendBufs,
labelList& recvSizes,
const label tag,
const label comm
)
{
Pstream::exchangeSizes<Container>
(
neighProcs, // send
neighProcs, // recv
sendBufs,
recvSizes,
tag,
comm
);
}
// Sparse sending
template<class Container>
void Foam::Pstream::exchangeSizes

View File

@ -1720,8 +1720,6 @@ void Foam::argList::parse
<< " nProcsSimpleSum : " << Pstream::nProcsSimpleSum << nl
<< " nonBlockingExchange: "
<< Pstream::nProcsNonblockingExchange << nl
<< " exchange algorithm : "
<< PstreamBuffers::algorithm << nl
<< " commsType : "
<< Pstream::commsTypeNames[Pstream::defaultCommsType] << nl
<< " polling iterations : " << Pstream::nPollProcInterfaces

View File

@ -214,7 +214,7 @@ void Foam::globalIndex::reset
// Non-parallel branch: use localSize on-proc, zero elsewhere
localLens.resize(len, Zero);
localLens[UPstream::myProcNo(comm)] = localSize;
localLens[Pstream::myProcNo(comm)] = localSize;
}
reset(localLens, true); // checkOverflow = true

View File

@ -519,9 +519,7 @@ public:
{
List<Type> allData;
gather(offsets, comm, procIDs, fld, allData, tag, ct);
const int masterProci = procIDs.size() ? procIDs[0] : 0;
if (UPstream::myProcNo(comm) == masterProci)
if (Pstream::myProcNo(comm) == procIDs[0])
{
fld.transfer(allData);
}

View File

@ -137,14 +137,14 @@ inline Foam::globalIndex::globalIndex
inline bool Foam::globalIndex::empty() const
{
return offsets_.empty() || offsets_.back() == 0;
return offsets_.empty() || offsets_.last() == 0;
}
inline Foam::label Foam::globalIndex::totalSize() const
{
const label len = (offsets_.size() - 1);
return (len < 1) ? 0 : offsets_[len];
return (len < 1) ? static_cast<label>(0) : offsets_[len];
}
@ -163,7 +163,7 @@ inline Foam::labelList Foam::globalIndex::sizes() const
inline Foam::label Foam::globalIndex::nProcs() const noexcept
{
const label len = (offsets_.size() - 1);
return (len < 1) ? 0 : len;
return (len < 1) ? static_cast<label>(0) : len;
}
@ -219,7 +219,7 @@ inline Foam::label Foam::globalIndex::localStart(const label proci) const
inline Foam::label Foam::globalIndex::localStart() const
{
return localStart(UPstream::myProcNo());
return localStart(Pstream::myProcNo());
}
@ -231,7 +231,7 @@ inline Foam::label Foam::globalIndex::localSize(const label proci) const
inline Foam::label Foam::globalIndex::localSize() const
{
return localSize(UPstream::myProcNo());
return localSize(Pstream::myProcNo());
}
@ -244,7 +244,7 @@ inline Foam::label Foam::globalIndex::maxSize() const
inline Foam::label Foam::globalIndex::maxNonLocalSize() const
{
return maxNonLocalSize(UPstream::myProcNo());
return maxNonLocalSize(Pstream::myProcNo());
}
@ -256,7 +256,7 @@ inline Foam::labelRange Foam::globalIndex::range(const label proci) const
inline Foam::labelRange Foam::globalIndex::range() const
{
return range(UPstream::myProcNo());
return range(Pstream::myProcNo());
}
@ -268,7 +268,7 @@ inline bool Foam::globalIndex::isLocal(const label proci, const label i) const
inline bool Foam::globalIndex::isLocal(const label i) const
{
return isLocal(UPstream::myProcNo(), i);
return isLocal(Pstream::myProcNo(), i);
}
@ -284,7 +284,7 @@ inline Foam::label Foam::globalIndex::toGlobal
inline Foam::label Foam::globalIndex::toGlobal(const label i) const
{
return toGlobal(UPstream::myProcNo(), i);
return toGlobal(Pstream::myProcNo(), i);
}
@ -306,7 +306,7 @@ inline Foam::labelList Foam::globalIndex::toGlobal
const labelUList& labels
) const
{
return toGlobal(UPstream::myProcNo(), labels);
return toGlobal(Pstream::myProcNo(), labels);
}
@ -327,7 +327,7 @@ inline void Foam::globalIndex::inplaceToGlobal
inline void Foam::globalIndex::inplaceToGlobal(labelUList& labels) const
{
inplaceToGlobal(UPstream::myProcNo(), labels);
inplaceToGlobal(Pstream::myProcNo(), labels);
}
@ -349,7 +349,7 @@ Foam::globalIndex::toLocal(const label proci, const label i) const
inline Foam::label Foam::globalIndex::toLocal(const label i) const
{
return toLocal(UPstream::myProcNo(), i);
return toLocal(Pstream::myProcNo(), i);
}
@ -363,7 +363,7 @@ inline Foam::label Foam::globalIndex::whichProcID(const label i) const
<< abort(FatalError);
}
const int proci = UPstream::myProcNo();
const label proci(Pstream::myProcNo());
return isLocal(proci, i) ? proci : findLower(offsets_, i+1);
}

View File

@ -91,9 +91,7 @@ void Foam::globalIndex::gatherValues
const label startOfRequests = UPstream::nRequests();
const int masterProci = procIDs.size() ? procIDs[0] : 0;
if (UPstream::myProcNo(comm) == masterProci)
if (UPstream::myProcNo(comm) == procIDs[0])
{
allValues.resize_nocopy(procIDs.size());
allValues[0] = localValue;
@ -128,7 +126,7 @@ void Foam::globalIndex::gatherValues
UOPstream::write
(
commsType,
masterProci,
procIDs[0],
reinterpret_cast<const char*>(&localValue),
sizeof(Type),
tag,
@ -137,7 +135,7 @@ void Foam::globalIndex::gatherValues
}
else
{
OPstream toMaster(commsType, masterProci, 0, tag, comm);
OPstream toMaster(commsType, procIDs[0], 0, tag, comm);
toMaster << localValue;
}
}
@ -178,11 +176,9 @@ void Foam::globalIndex::gather
const label startOfRequests = UPstream::nRequests();
const int masterProci = procIDs.size() ? procIDs[0] : 0;
if (UPstream::myProcNo(comm) == masterProci)
if (Pstream::myProcNo(comm) == procIDs[0])
{
allFld.resize_nocopy(off.back()); // == totalSize()
allFld.resize_nocopy(off.last()); // == totalSize()
// Assign my local data - respect offset information
// so that we can request 0 entries to be copied.
@ -230,7 +226,7 @@ void Foam::globalIndex::gather
UOPstream::write
(
commsType,
masterProci,
procIDs[0],
fld.cdata_bytes(),
fld.size_bytes(),
tag,
@ -239,7 +235,7 @@ void Foam::globalIndex::gather
}
else
{
OPstream toMaster(commsType, masterProci, 0, tag, comm);
OPstream toMaster(commsType, procIDs[0], 0, tag, comm);
toMaster << fld;
}
}
@ -298,11 +294,9 @@ void Foam::globalIndex::gather
const label startOfRequests = UPstream::nRequests();
const int masterProci = procIDs.size() ? procIDs[0] : 0;
if (UPstream::myProcNo(comm) == masterProci)
if (Pstream::myProcNo(comm) == procIDs[0])
{
allFld.resize_nocopy(off.back()); // == totalSize()
allFld.resize_nocopy(off.last()); // == totalSize()
// Assign my local data - respect offset information
// so that we can request 0 entries to be copied
@ -337,7 +331,7 @@ void Foam::globalIndex::gather
}
else
{
OPstream toMaster(commsType, masterProci, 0, tag, comm);
OPstream toMaster(commsType, procIDs[0], 0, tag, comm);
toMaster << fld;
}
}
@ -883,9 +877,7 @@ void Foam::globalIndex::scatter
const label startOfRequests = UPstream::nRequests();
const int masterProci = procIDs.size() ? procIDs[0] : 0;
if (UPstream::myProcNo(comm) == masterProci)
if (Pstream::myProcNo(comm) == procIDs[0])
{
for (label i = 1; i < procIDs.size(); ++i)
{
@ -937,7 +929,7 @@ void Foam::globalIndex::scatter
UIPstream::read
(
commsType,
masterProci,
procIDs[0],
fld.data_bytes(),
fld.size_bytes(),
tag,
@ -946,7 +938,7 @@ void Foam::globalIndex::scatter
}
else
{
IPstream fromMaster(commsType, masterProci, 0, tag, comm);
IPstream fromMaster(commsType, procIDs[0], 0, tag, comm);
fromMaster >> fld;
}
}

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2018-2023 OpenCFD Ltd.
Copyright (C) 2018-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -76,11 +76,11 @@ Foam::Ostream& Foam::vtk::fileWriter::reportBadState
Foam::Ostream& Foam::vtk::fileWriter::reportBadState
(
Ostream& os,
outputState expected1,
outputState expected,
outputState expected2
) const
{
reportBadState(os, expected1)
reportBadState(os, expected)
<< " or (" << stateNames[expected2] << ')';
return os;
}
@ -554,18 +554,10 @@ bool Foam::vtk::fileWriter::writeProcIDs(const label nValues)
{
++nCellData_;
}
else if (isState(outputState::POINT_DATA))
{
++nPointData_;
}
else
{
reportBadState
(
FatalErrorInFunction,
outputState::CELL_DATA,
outputState::POINT_DATA
) << " for procID field" << nl << endl
reportBadState(FatalErrorInFunction, outputState::CELL_DATA)
<< " for procID field" << nl << endl
<< exit(FatalError);
return false;

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2018-2023 OpenCFD Ltd.
Copyright (C) 2018-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -201,8 +201,7 @@ protected:
const UList<Type>& field
);
//- Write nValues of processor ids as CellData or PointData
//- (no-op in serial)
//- Write nValues of processor ids as CellData (no-op in serial)
bool writeProcIDs(const label nValues);
@ -306,16 +305,10 @@ public:
// \return True if the state changed
virtual bool beginPointData(label nFields = 0) = 0;
//- True if output state corresponds to CELL_DATA
inline bool isCellData() const noexcept;
//- True if output state corresponds to POINT_DATA
inline bool isPointData() const noexcept;
//- The number of CellData written for the Piece thus far.
//- Return the number of CellData written for the Piece thus far.
inline label nCellData() const noexcept;
//- The number of PointData written for the Piece thus far.
//- Return the number of PointData written for the Piece thus far.
inline label nPointData() const noexcept;

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2018-2023 OpenCFD Ltd.
Copyright (C) 2018-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -107,16 +107,4 @@ inline Foam::label Foam::vtk::fileWriter::nPointData() const noexcept
}
inline bool Foam::vtk::fileWriter::isCellData() const noexcept
{
return (outputState::CELL_DATA == state_);
}
inline bool Foam::vtk::fileWriter::isPointData() const noexcept
{
return (outputState::POINT_DATA == state_);
}
// ************************************************************************* //

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2021-2023 OpenCFD Ltd.
Copyright (C) 2021 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -131,10 +131,6 @@ void Foam::vtk::lineWriter::piece
bool Foam::vtk::lineWriter::writeProcIDs()
{
if (this->isPointData())
{
return vtk::fileWriter::writeProcIDs(nLocalPoints_);
}
return vtk::fileWriter::writeProcIDs(nLocalLines_);
}

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2021-2023 OpenCFD Ltd.
Copyright (C) 2021 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -155,8 +155,8 @@ public:
void piece(const pointField& points, const edgeList& edges);
//- Write processor ids for each line as CellData or for each point
//- as PointData, depending on isPointData() state. No-op in serial.
//- Write processor ids for each line as CellData
//- (no-op in serial)
bool writeProcIDs();
//- Write a uniform field of Cell (Line) or Point values

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2018-2023 OpenCFD Ltd.
Copyright (C) 2018-2021 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -131,10 +131,6 @@ void Foam::vtk::surfaceWriter::piece
bool Foam::vtk::surfaceWriter::writeProcIDs()
{
if (this->isPointData())
{
return vtk::fileWriter::writeProcIDs(nLocalPoints_);
}
return vtk::fileWriter::writeProcIDs(nLocalPolys_);
}

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2018-2023 OpenCFD Ltd.
Copyright (C) 2018-2021 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -155,8 +155,8 @@ public:
void piece(const pointField& points, const faceList& faces);
//- Write processor ids for each poly as CellData or for each point
//- as PointData, depending on isPointData() state. No-op in serial.
//- Write processor ids for each poly as CellData
//- (no-op in serial)
bool writeProcIDs();
//- Write a uniform field of Cell (Poly) or Point values

View File

@ -43,7 +43,12 @@ Foam::zoneDistribute::zoneDistribute(const fvMesh& mesh)
MeshObject<fvMesh, Foam::TopologicalMeshObject, zoneDistribute>(mesh),
stencil_(zoneCPCStencil::New(mesh)),
globalNumbering_(stencil_.globalNumbering()),
#ifdef Foam_PstreamBuffers_dense
send_(UPstream::nProcs()),
#else
// Default map sizing (as per PstreamBuffers)
send_(),
#endif
pBufs_(UPstream::commsTypes::nonBlocking)
{
// Don't clear storage on persistent buffer
@ -90,6 +95,7 @@ void Foam::zoneDistribute::setUpCommforZone
if (UPstream::parRun())
{
#ifdef Foam_PstreamBuffers_dense
List<labelHashSet> needed(UPstream::nProcs());
// Bin according to originating (sending) processor
@ -109,22 +115,22 @@ void Foam::zoneDistribute::setUpCommforZone
}
}
// Stream the send data into PstreamBuffers
// Stream the send data into PstreamBuffers,
// which we also use to track the current topology.
pBufs_.clear();
for (const int proci : pBufs_.allProcs())
for (const int proci : UPstream::allProcs())
{
const auto& indices = needed[proci];
if (proci != UPstream::myProcNo() && !indices.empty())
if (proci != UPstream::myProcNo() && !needed[proci].empty())
{
// Serialize as List
UOPstream toProc(proci, pBufs_);
toProc << indices.sortedToc();
toProc << needed[proci].sortedToc();
}
}
pBufs_.finishedSends();
pBufs_.finishedSends(sendConnections_, sendProcs_, recvProcs_);
for (const int proci : pBufs_.allProcs())
{
@ -136,6 +142,61 @@ void Foam::zoneDistribute::setUpCommforZone
fromProc >> send_[proci];
}
}
#else
forAllIters(send_, iter)
{
iter.val().clear();
}
// Bin according to originating (sending) processor
for (const label celli : stencil.needsComm())
{
if (zone[celli])
{
for (const label gblIdx : stencil_[celli])
{
const label proci = globalNumbering_.whichProcID(gblIdx);
if (proci != Pstream::myProcNo())
{
send_(proci).insert(gblIdx);
}
}
}
}
// Stream the send data into PstreamBuffers,
pBufs_.clear();
forAllIters(send_, iter)
{
const label proci = iter.key();
auto& indices = iter.val();
if (proci != UPstream::myProcNo() && !indices.empty())
{
// Serialize as List
UOPstream toProc(proci, pBufs_);
toProc << indices;
}
// Clear out old contents
indices.clear();
}
pBufs_.finishedSends();
for (const int proci : pBufs_.allProcs())
{
if (proci != UPstream::myProcNo() && pBufs_.recvDataCount(proci))
{
UIPstream fromProc(proci, pBufs_);
fromProc >> send_(proci);
}
}
#endif
}
}

View File

@ -90,9 +90,28 @@ class zoneDistribute
//- Global number into index of cells/faces
const globalIndex& globalNumbering_;
#ifdef Foam_PstreamBuffers_dense
//- Global cell/face index to send for processor-to-processor comms
List<labelList> send_;
//- Parallel [cache]: send connectivity (true/false)
bitSet sendConnections_;
//- Parallel [cache]: send data to these ranks
DynamicList<label> sendProcs_;
//- Parallel [cache]: recv data from these ranks
DynamicList<label> recvProcs_;
#else
//- Per proc the global cell/face index to send for
//- processor-to-processor comms
Map<labelHashSet> send_;
#endif
//- Persistent set of exchange buffers
PstreamBuffers pBufs_;

View File

@ -161,11 +161,64 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
if (UPstream::parRun())
{
#ifdef Foam_PstreamBuffers_dense
if (sendConnections_.empty())
{
WarningInFunction
<< "The send/recv connections not initialized - "
<< "likely that setUpCommforZone() was not called"
<< endl;
// But don't exit/abort for now
}
// Stream the send data into PstreamBuffers,
// which we also use to track the current topology.
pBufs_.clear();
for (const int proci : UPstream::allProcs())
{
if (proci != UPstream::myProcNo() && !send_[proci].empty())
{
// Serialize as Map
Map<Type> sendValues(2*send_[proci].size());
for (const label sendIdx : send_[proci])
{
sendValues.insert
(
sendIdx,
getLocalValue(phi, globalNumbering_.toLocal(sendIdx))
);
}
UOPstream toProc(proci, pBufs_);
toProc << sendValues;
}
}
pBufs_.finishedSends(sendConnections_, sendProcs_, recvProcs_);
for (const int proci : pBufs_.allProcs())
{
const auto& indices = send_[proci];
if (proci != UPstream::myProcNo() && pBufs_.recvDataCount(proci))
{
UIPstream fromProc(proci, pBufs_);
Map<Type> tmpValues(fromProc);
neiValues += tmpValues;
}
}
#else
pBufs_.clear();
forAllConstIters(send_, iter)
{
const label proci = iter.key();
const auto& indices = iter.val();
if (proci != UPstream::myProcNo() && !indices.empty())
{
@ -198,6 +251,7 @@ Foam::Map<Type> Foam::zoneDistribute::getDatafromOtherProc
neiValues += tmpValues;
}
}
#endif
}
return neiValues;

View File

@ -201,9 +201,8 @@ public:
void piece(const UPtrList<const pointField>& points);
//- Write processor ids for each line as CellData or for each point
//- as PointData, depending on isPointData() state. No-op in serial.
// Not implemented.
//- Write processor ids for each line as CellData
//- (no-op in serial)
bool writeProcIDs();

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2018-2023 OpenCFD Ltd.
Copyright (C) 2018-2021 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -159,10 +159,6 @@ public:
//- Write processor ids for each poly as CellData
bool writeProcIDs()
{
if (this->isPointData())
{
return vtk::fileWriter::writeProcIDs(nLocalPoints_);
}
return vtk::polyWriter::writeProcIDs(nLocalPolys_);
}
};

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2016-2023 OpenCFD Ltd.
Copyright (C) 2016-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -585,10 +585,6 @@ void Foam::vtk::patchMeshWriter::writePatchIDs()
bool Foam::vtk::patchMeshWriter::writeProcIDs()
{
if (this->isPointData())
{
return vtk::fileWriter::writeProcIDs(nLocalPoints_);
}
return vtk::fileWriter::writeProcIDs(nLocalPolys_);
}

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2016-2023 OpenCFD Ltd.
Copyright (C) 2016-2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -209,8 +209,8 @@ public:
// Must be called within the CELL_DATA state.
void writePatchIDs();
//- Write processor ids for each line as CellData or for each point
//- as PointData, depending on isPointData() state. No-op in serial.
//- Write processor ids as CellData. This is no-op in serial.
// Must be called within the CELL_DATA state.
bool writeProcIDs();
//- Write processor neighbour ids as CellData. This is no-op in serial.

View File

@ -3,30 +3,19 @@ cd "${0%/*}" || exit # Run from this directory
. ${WM_PROJECT_DIR:?}/bin/tools/RunFunctions # Tutorial run functions
#------------------------------------------------------------------------------
unset decompDict
# decompDict=system/decomposeParDict-6
# decompDict=system/decomposeParDict-7
runApplication blockMesh
runApplication makeFaMesh
runApplication -decompose-dict="$decompDict" decomposePar
runApplication decomposePar
# For ids and processor partitioning
runParallel -s volume -decompose-dict="$decompDict" \
foamToVTK -name VTK-parallel -time 0 \
-no-finite-area -no-internal -no-lagrangian -no-fields -with-ids \
-patches filmWalls
# For the processor partitioning
runParallel -s finiteVolume foamToVTK -name VTK-parallel -time 0 \
-no-finite-area -no-internal -no-lagrangian -no-fields -with-ids
runParallel -decompose-dict="$decompDict" $(getApplication)
runParallel $(getApplication)
if false
then
# Not usually needed - uses areaWrite
runParallel -s area -decompose-dict="$decompDict" \
foamToVTK -name VTK-parallel \
-no-boundary -no-internal -no-lagrangian -with-ids
fi
runParallel -s finiteArea foamToVTK -name VTK-parallel \
-no-boundary -no-internal -no-lagrangian
#------------------------------------------------------------------------------

View File

@ -7,18 +7,13 @@ runApplication blockMesh
runApplication makeFaMesh
# For ids and processor partitioning
runApplication -s volume.serial foamToVTK -name VTK-serial -time 0 \
-no-finite-area -no-internal -no-lagrangian -no-fields -with-ids \
-patches filmWalls
# For the cell ids etc
runApplication -s finiteVolume.serial foamToVTK -name VTK-serial -time 0 \
-no-finite-area -no-internal -no-lagrangian -no-fields -with-ids
runApplication $(getApplication)
if false
then
# Not usually needed - uses areaWrite
runApplication -s area.serial foamToVTK -name VTK-serial \
-no-boundary -no-internal -no-lagrangian
fi
runApplication -s finiteArea.serial foamToVTK -name VTK-serial \
-no-boundary -no-internal -no-lagrangian
#------------------------------------------------------------------------------

View File

@ -1,25 +0,0 @@
// -*- C++ -*-
// Use the areaWrite function object
areaWrite
{
type areaWrite;
libs (utilityFunctionObjects);
log true;
writeControl writeTime;
writeInterval 1;
// Fields to output (words or regex)
fields (Uf_film hf_film pf_film);
surfaceFormat ensight;
formatOptions
{
default { format binary; }
}
}
// ************************************************************************* //

View File

@ -52,10 +52,5 @@ regionFaMaxCo 1;
maxDeltaT 0.1;
functions
{
#include "areaWrite"
}
// ************************************************************************* //

View File

@ -1,21 +0,0 @@
/*--------------------------------*- C++ -*----------------------------------*\
| ========= | |
| \\ / F ield | OpenFOAM: The Open Source CFD Toolbox |
| \\ / O peration | Version: v2306 |
| \\ / A nd | Website: www.openfoam.com |
| \\/ M anipulation | |
\*---------------------------------------------------------------------------*/
FoamFile
{
version 2.0;
format ascii;
class dictionary;
object decomposeParDict;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
numberOfSubdomains 6;
method scotch;
// ************************************************************************* //

View File

@ -1,21 +0,0 @@
/*--------------------------------*- C++ -*----------------------------------*\
| ========= | |
| \\ / F ield | OpenFOAM: The Open Source CFD Toolbox |
| \\ / O peration | Version: v2306 |
| \\ / A nd | Website: www.openfoam.com |
| \\/ M anipulation | |
\*---------------------------------------------------------------------------*/
FoamFile
{
version 2.0;
format ascii;
class dictionary;
object decomposeParDict;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
numberOfSubdomains 7;
method scotch;
// ************************************************************************* //