Merge branch 'pstream-updates' into 'develop'

Extend some Pstream, PstreamBuffers and globalIndex functionality

See merge request Development/openfoam!640
This commit is contained in:
Mattijs Janssens
2023-11-20 10:10:49 +00:00
26 changed files with 1227 additions and 664 deletions

View File

@ -1,3 +1,3 @@
Test-CompactListList.C Test-CompactListList.cxx
EXE = $(FOAM_USER_APPBIN)/Test-CompactListList EXE = $(FOAM_USER_APPBIN)/Test-CompactListList

View File

@ -38,6 +38,8 @@ Description
#include "SpanStream.H" #include "SpanStream.H"
#include "faceList.H" #include "faceList.H"
#include <iterator> // for back_inserter
using namespace Foam; using namespace Foam;
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -161,6 +163,25 @@ int main(int argc, char *argv[])
faceList fcs2 = compactFcs.unpack<face>(); faceList fcs2 = compactFcs.unpack<face>();
Info<< "deserialized:" << fcs2 << endl; Info<< "deserialized:" << fcs2 << endl;
// Unpack some faces
DynamicList<face> extracted(compactFcs.size());
compactFcs.copy_unpack<face>
(
std::back_inserter(extracted),
2, 2
);
Info<< "copy_unpack 1: " << extracted << nl;
compactFcs.copy_unpack<face>
(
std::back_inserter(extracted)
// labelRange(2, 1)
);
Info<< "copy_unpack 2: " << extracted << nl;
// From some faces // From some faces
IndirectList<face> subfaces(fcs, labelList({2, 4, 1})); IndirectList<face> subfaces(fcs, labelList({2, 4, 1}));

View File

@ -1,3 +0,0 @@
Test-globalIndex.C
EXE = $(FOAM_USER_APPBIN)/Test-globalIndex

View File

@ -1 +0,0 @@
EXE_INC = /* -DFULLDEBUG -g -O0 */

View File

@ -0,0 +1,3 @@
Test-globalIndex1.cxx
EXE = $(FOAM_USER_APPBIN)/Test-globalIndex1

View File

@ -0,0 +1,2 @@
/* EXE_INC = */
/* EXE_LIBS = */

View File

@ -25,7 +25,7 @@ License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Application Application
globalIndexTest Test-globalIndex1
Description Description
Simple tests for the globalIndex class. Simple tests for the globalIndex class.
@ -53,7 +53,7 @@ int main(int argc, char *argv[])
#include "createPolyMesh.H" #include "createPolyMesh.H"
// Global numbering of cells (proc0 elements first, then proc1, etc.) // Global numbering of cells (proc0 elements first, then proc1, etc.)
globalIndex globalNumbering(mesh.nCells()); const globalIndex globalNumbering(mesh.nCells());
Pout<< "local-offset: " << globalIndex::calcOffset(mesh.nCells()) << nl; Pout<< "local-offset: " << globalIndex::calcOffset(mesh.nCells()) << nl;
Pout<< "local-range: " << globalIndex::calcRange(mesh.nCells()) << nl; Pout<< "local-range: " << globalIndex::calcRange(mesh.nCells()) << nl;
@ -185,7 +185,7 @@ int main(int argc, char *argv[])
// Get a few cell indices // Get a few cell indices
const label nTotalCells = globalNumbering.size(); const label nTotalCells = globalNumbering.totalSize();
Random rndGen(UPstream::myProcNo()); Random rndGen(UPstream::myProcNo());
DynamicList<label> globalIDs; DynamicList<label> globalIDs;

View File

@ -0,0 +1,3 @@
Test-globalIndex2.cxx
EXE = $(FOAM_USER_APPBIN)/Test-globalIndex2

View File

@ -0,0 +1,2 @@
/* EXE_INC = */
/* EXE_LIBS = */

View File

@ -0,0 +1,156 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Application
Test-globalIndex2
Description
More functional tests for the globalIndex class.
\*---------------------------------------------------------------------------*/
#include "globalIndex.H"
#include "argList.H"
#include "Time.H"
#include "IOstreams.H"
#include "Random.H"
#include "IndirectList.H"
#include "SliceList.H"
using namespace Foam;
void printTest1
(
const globalIndex& gi,
const label proci,
const label value
)
{
// With range check
Info<< " value:" << value << " on:" << gi.findProc(proci, value)
<< " below/above: ("
<< gi.findProcBelow(proci, value) << ' '
<< gi.findProcAbove(proci, value) << ')' << nl;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// Main program:
int main(int argc, char *argv[])
{
Random rnd(123456);
// #include "setRootCase.H"
// #include "createTime.H"
Info<< nl
<< "simple globalIndex tests" << nl << nl;
Info<< "Construct from indirect list(s)" << nl;
{
// Some sizes
labelList rawSizes(25);
forAll(rawSizes, i)
{
rawSizes[i] = rnd.position<label>(0, 100);
}
Info<< nl
<< "plain sizes: "
<< flatOutput(rawSizes) << nl
<< " offsets: "
<< flatOutput(globalIndex::calcOffsets(rawSizes))
<< nl;
sliceRange slice(0, 5, 5);
Info<< nl
<< "range min/max " << slice.min() << '/' << slice.max() << nl;
SliceList<label> sliceSizes(rawSizes, slice);
Info<< nl
<< "indirect addr: " << sliceSizes.addressing() << nl
<< "indirect sizes: "
<< flatOutput(sliceSizes) << nl
<< " offsets: "
<< flatOutput(globalIndex::calcOffsets(sliceSizes))
<< nl;
}
{
// From some offsets
globalIndex gi;
globalIndex gi0
(
labelList({ 0, 10, 20, 30, 40, 50, 60 })
);
Info<< "offsets: " << gi0.offsets() << nl;
// Alternative to copy assigment
gi.reset(gi0);
Info<< "globalIndex :"; gi.offsets().writeList(Info) << nl;
// Resizing is fine, but also check the binary search results!
gi.resize(10);
Info<< "globalIndex :"; gi.offsets().writeList(Info) << nl;
// NB: these routines are mostly failsafe on bad addresses
// for (const label proci : { 4, 8, -1 })
for (const label proci : { 4 })
{
Info<< "proc:" << proci
<< " : [" << gi.localStart(proci)
<< "," << gi.localEnd(proci) << ")" << nl;
for (label i = 0; i < 25; ++i)
{
const label value = rnd.position<label>(-8, 100);
printTest1(gi, proci, value);
}
Info<< "other on proc:" << proci << nl;
printTest1(gi, proci, gi.localStart(proci));
printTest1(gi, proci, gi.localEnd(proci));
Info<< "other on proc:0" << nl;
printTest1(gi, 0, gi.localStart(proci));
printTest1(gi, 0, gi.localEnd(proci));
}
}
Info<< "\nEnd\n" << endl;
return 0;
}
// ************************************************************************* //

View File

@ -1,7 +1,7 @@
/*--------------------------------*- C++ -*----------------------------------*\ /*--------------------------------*- C++ -*----------------------------------*\
| ========= | | | ========= | |
| \\ / F ield | OpenFOAM: The Open Source CFD Toolbox | | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox |
| \\ / O peration | Version: v2306 | | \\ / O peration | Version: v2312 |
| \\ / A nd | Website: www.openfoam.com | | \\ / A nd | Website: www.openfoam.com |
| \\/ M anipulation | | | \\/ M anipulation | |
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
@ -174,12 +174,13 @@ OptimisationSwitches
nbx.tuning 0; nbx.tuning 0;
// Additional PstreamBuffers tuning parameters (experimental) // Additional PstreamBuffers tuning parameters (experimental)
// -1 : PEX with all-to-all for buffer sizes and point-to-point // 0 : (legacy PEX)
// for contents (legacy approach) // * all-to-all for buffer sizes [legacy approach]
// 0 : hybrid PEX with NBX for buffer sizes and point-to-point // * point-to-point for contents
// for contents (proposed new approach) // 1 : (hybrid PEX)
// 1 : full NBX for buffer sizes and contents (very experimental) // * NBX for buffer sizes [new approach]
pbufs.tuning -1; // * point-to-point for contents
pbufs.tuning 0;
// ===== // =====

View File

@ -35,16 +35,25 @@ template<class T>
void Foam::CompactListList<T>::reportOverflowAndExit void Foam::CompactListList<T>::reportOverflowAndExit
( (
const label idx, const label idx,
const labelUList& localLens const label prevOffset,
const label count
) )
{ {
if (idx < 0)
{
// No overflow tagged
return;
}
FatalErrorInFunction FatalErrorInFunction
<< "Overflow : sum of sizes exceeds labelMax (" << "Overflow : sum of sizes exceeds labelMax ("
<< labelMax << ") after index " << idx; << labelMax << ") after index " << idx;
if (!localLens.empty()) if (prevOffset >= 0 && count >= 0)
{ {
FatalError << " of " << flatOutput(localLens); FatalError
<< " while trying to add (" << count
<< ") to offset (" << prevOffset << ")";
} }
FatalError FatalError
@ -76,12 +85,14 @@ Foam::CompactListList<T> Foam::CompactListList<T>::pack_impl
for (label i = 0; i < len; ++i) for (label i = 0; i < len; ++i)
{ {
const label count = lists[i].size();
newOffsets[i] = total; newOffsets[i] = total;
total += lists[i].size(); total += count;
if (checkOverflow && total < newOffsets[i]) if (checkOverflow && total < newOffsets[i])
{ {
reportOverflowAndExit(i); reportOverflowAndExit(i, newOffsets[i], count);
} }
} }
newOffsets[len] = total; newOffsets[len] = total;
@ -162,12 +173,14 @@ Foam::label Foam::CompactListList<T>::resize_offsets
for (label i = 0; i < len; ++i) for (label i = 0; i < len; ++i)
{ {
const label count = listSizes[i];
offsets_[i] = total; offsets_[i] = total;
total += listSizes[i]; total += count;
if (checkOverflow && total < offsets_[i]) if (checkOverflow && total < offsets_[i])
{ {
reportOverflowAndExit(i, listSizes); reportOverflowAndExit(i, offsets_[i], count);
} }
} }
@ -229,8 +242,8 @@ Foam::label Foam::CompactListList<T>::maxNonLocalSize(const label rowi) const
{ {
if (i != rowi) if (i != rowi)
{ {
const label localLen = (offsets_[i+1] - offsets_[i]); const label count = (offsets_[i+1] - offsets_[i]);
maxLen = max(maxLen, localLen); maxLen = max(maxLen, count);
} }
} }
@ -380,6 +393,72 @@ void Foam::CompactListList<T>::transfer
} }
template<class T>
template<class SubListType, class OutputIter>
OutputIter Foam::CompactListList<T>::copy_unpack
(
OutputIter d_iter,
const label pos,
label len
) const
{
if (pos >= 0 && pos < this->size())
{
// Change sub-length to (one-past) end position
// len == -1 (like std::string::npos) - search until end
if (len > 0) len += pos;
if (len < 0 || len > this->size())
{
len = this->size();
}
for (label i = pos; i < len; ++i)
{
*d_iter = SubListType(this->localList(i));
++d_iter;
}
}
return d_iter;
}
template<class T>
template<class SubListType, class OutputIter>
OutputIter Foam::CompactListList<T>::copy_unpack
(
OutputIter d_iter,
const labelRange& range
) const
{
return this->copy_unpack<SubListType>(d_iter, range.start(), range.size());
}
template<class T>
template<class SubListType, class OutputIter>
OutputIter Foam::CompactListList<T>::copy_unpack
(
OutputIter d_iter,
const labelUList& indices
) const
{
for (label i : indices)
{
*d_iter = SubListType(this->localList(i));
++d_iter;
}
return d_iter;
}
// Could also support copy_unpack() with IndirectListBase, as required...
// or the caller can also just use copy_unpack with len = 1 and the
// desired position
template<class T> template<class T>
template<class SubListType> template<class SubListType>
Foam::List<SubListType> Foam::List<SubListType>
@ -387,10 +466,7 @@ Foam::CompactListList<T>::unpack() const
{ {
List<SubListType> lists(size()); List<SubListType> lists(size());
forAll(lists, i) this->copy_unpack<SubListType>(lists.begin());
{
lists[i] = SubListType(this->localList(i));
}
return lists; return lists;
} }
@ -403,16 +479,24 @@ Foam::CompactListList<T>::unpack(const labelRange& range) const
{ {
List<SubListType> lists(range.size()); List<SubListType> lists(range.size());
auto iter = lists.begin(); this->copy_unpack<SubListType>(lists.begin(), range.start(), range.size());
for (const label i : range)
{
*iter = SubListType(this->localList(i));
++iter;
}
return lists; return lists;
} }
template<class T>
template<class SubListType>
Foam::List<SubListType>
Foam::CompactListList<T>::unpack(const labelUList& indices) const
{
List<SubListType> lists(indices.size());
this->copy_unpack<SubListType>(lists.begin(), indices);
return lists;
}
// ************************************************************************* // // ************************************************************************* //

View File

@ -82,11 +82,12 @@ class CompactListList
// Private Member Functions // Private Member Functions
//- Report overflow at specified index //- Report overflow at specified (non-negative) index
static void reportOverflowAndExit static void reportOverflowAndExit
( (
const label idx, const label idx,
const labelUList& listSizes = labelUList::null() const label prevOffset = -1, // The last valid offset value
const label count = 0 // The count to add to prevOffset
); );
//- Construct by packing together the list of lists //- Construct by packing together the list of lists
@ -318,18 +319,6 @@ public:
void setLocalSize(const label rowi, const label len); void setLocalSize(const label rowi, const label len);
//- Redimension - same as resize()
inline void setSize(const label mRows);
//- Redimension - same as resize()
inline void setSize(const label mRows, const label nVals);
//- Redimension - same as resize()
inline void setSize(const label mRows, const label nVals, const T&);
//- Reset sizes - same as resize()
inline void setSize(const labelUList& listSizes);
//- Swap contents //- Swap contents
void swap(CompactListList<T>& other); void swap(CompactListList<T>& other);
@ -357,6 +346,49 @@ public:
// Pack / Unpack // Pack / Unpack
//- Unpack sub-list copies in the range defined by \p pos and \p len
//- with bounding behaviour like List::slice() by copy constructing
//- begin at the destination iterator \p d_iter.
//
// \returns Output iterator to the element in the destination range,
// one past the last element copied.
template<class SubListType, class OutputIter>
OutputIter copy_unpack
(
//! [out] The output destination
OutputIter d_iter,
//! The start of sub-region to copy (no-op if -ve or out-of-range)
const label pos = 0,
//! The length of sub-region to copy (-ve = until the end)
label len = -1
) const;
//- Unpack sub-list copies in the specified range.
//
// \returns Output iterator to the element in the destination range,
// one past the last element copied.
template<class SubListType, class OutputIter>
OutputIter copy_unpack
(
//! [out] The output destination
OutputIter d_iter,
//! The sub-region to copy
const labelRange& range
) const;
//- Unpack sub-list copies for the specified indices
//
// \returns Output iterator to the element in the destination range,
// one past the last element copied.
template<class SubListType, class OutputIter>
OutputIter copy_unpack
(
//! [out] The output destination
OutputIter d_iter,
//! The sub-regions to copy
const labelUList& indices
) const;
//- Return non-compact list of lists //- Return non-compact list of lists
template<class SubListType = List<T>> template<class SubListType = List<T>>
List<SubListType> unpack() const; List<SubListType> unpack() const;
@ -365,6 +397,10 @@ public:
template<class SubListType = List<T>> template<class SubListType = List<T>>
List<SubListType> unpack(const labelRange& range) const; List<SubListType> unpack(const labelRange& range) const;
//- Return non-compact list of lists for specified indices
template<class SubListType = List<T>>
List<SubListType> unpack(const labelUList& indices) const;
// Assignment // Assignment
@ -442,11 +478,11 @@ public:
// Housekeeping // Housekeeping
//- Const access to the packed values //- Const access to the packed values
//FOAM_DEPRECATED_STRICT(2022-03, "values()") FOAM_DEPRECATED_STRICT(2022-03, "values()")
const List<T>& m() const noexcept { return values_; } const List<T>& m() const noexcept { return values_; }
//- Non-const access to the packed values //- Non-const access to the packed values
//FOAM_DEPRECATED_STRICT(2022-03, "values()") FOAM_DEPRECATED_STRICT(2022-03, "values()")
List<T>& m() noexcept { return values_; } List<T>& m() noexcept { return values_; }
//- Return flat index into packed values //- Return flat index into packed values
@ -460,6 +496,30 @@ public:
{ {
return this->toLocal(rowi, i); return this->toLocal(rowi, i);
} }
//- Redimension - same as resize()
void setSize(const label mRows)
{
this->resize(mRows);
}
//- Redimension - same as resize()
void setSize(const label mRows, const label nVals)
{
this->resize(mRows, nVals);
}
//- Redimension - same as resize()
void setSize(const label mRows, const label nVals, const T& val)
{
this->resize(mRows, nVals, val);
}
//- Reset sizes - same as resize()
void setSize(const labelUList& listSizes)
{
this->resize(listSizes);
}
}; };

View File

@ -364,12 +364,10 @@ inline void Foam::CompactListList<T>::resize(const label mRows)
} }
else if (mRows > size()) else if (mRows > size())
{ {
// Grow // Extend number of rows, each with local size of 0
FatalErrorInFunction const label endOffset = offsets_.empty() ? 0 : offsets_.back();
<< "Cannot be used to extend the list from " << size()
<< " to " << mRows << nl offsets_.resize(mRows+1, endOffset);
<< " Please use a different resize() function"
<< abort(FatalError);
} }
} }
@ -381,9 +379,17 @@ inline void Foam::CompactListList<T>::resize
const label nVals const label nVals
) )
{ {
offsets_.resize(mRows+1, Zero); if (mRows < 1)
values_.resize(nVals); {
// Optionally: enforceSizeSanity(); // Enforce sizing sanity
offsets_.clear();
values_.clear();
}
else
{
offsets_.resize(mRows+1, Zero);
values_.resize(nVals);
}
} }
@ -394,9 +400,17 @@ inline void Foam::CompactListList<T>::resize_nocopy
const label nVals const label nVals
) )
{ {
offsets_.resize(mRows+1, Zero); if (mRows < 1)
values_.resize_nocopy(nVals); {
// Optionally: enforceSizeSanity(); // Enforce sizing sanity
offsets_.clear();
values_.clear();
}
else
{
offsets_.resize(mRows+1, Zero);
values_.resize_nocopy(nVals);
}
} }
@ -408,49 +422,17 @@ inline void Foam::CompactListList<T>::resize
const T& val const T& val
) )
{ {
offsets_.resize(mRows+1, Zero); if (mRows < 1)
values_.resize(nVals, val); {
// Optionally: enforceSizeSanity(); // Enforce sizing sanity
} offsets_.clear();
values_.clear();
}
template<class T> else
inline void Foam::CompactListList<T>::setSize(const label mRows) {
{ offsets_.resize(mRows+1, Zero);
this->resize(mRows); values_.resize(nVals, val);
} }
template<class T>
inline void Foam::CompactListList<T>::setSize
(
const label mRows,
const label nVals
)
{
this->resize(mRows+1, nVals);
}
template<class T>
inline void Foam::CompactListList<T>::setSize
(
const label mRows,
const label nVals,
const T& val
)
{
this->resize(mRows+1, nVals, val);
}
template<class T>
inline void Foam::CompactListList<T>::setSize
(
const labelUList& listSizes
)
{
this->resize(listSizes);
} }

View File

@ -599,7 +599,7 @@ public:
// Non-blocking exchange // Non-blocking exchange
//- Exchange \em contiguous data using non-blocking consensus //- Exchange \em contiguous data using non-blocking consensus (NBX)
//- Sends sendData, receives into recvData. //- Sends sendData, receives into recvData.
// //
// Each entry of the recvBufs list is cleared before receipt. // Each entry of the recvBufs list is cleared before receipt.
@ -614,10 +614,10 @@ public:
List<Container>& recvBufs, List<Container>& recvBufs,
const int tag, const int tag,
const label comm, const label comm,
const bool wait = true //!< Wait for requests to complete const bool wait = true //!< (ignored)
); );
//- Exchange \em contiguous data using non-blocking consensus //- Exchange \em contiguous data using non-blocking consensus (NBX)
//- Sends sendData, receives into recvData. //- Sends sendData, receives into recvData.
// //
// Each \em entry of the recvBufs map is cleared before receipt, // Each \em entry of the recvBufs map is cleared before receipt,
@ -636,7 +636,23 @@ public:
Map<Container>& recvBufs, Map<Container>& recvBufs,
const int tag, const int tag,
const label comm, const label comm,
const bool wait = true //!< Wait for requests to complete const bool wait = true //!< (ignored)
);
//- Exchange \em contiguous data using non-blocking consensus (NBX)
//- Sends sendData returns receive information.
//
// For \b non-parallel : copy own rank (if it exists and non-empty)
//
// \note The message tag should be chosen to be a unique value
// since the implementation uses probing with ANY_SOURCE !!
template<class Container, class Type>
static Map<Container> exchangeConsensus
(
const Map<Container>& sendBufs,
const int tag,
const label comm,
const bool wait = true //!< (ignored)
); );
}; };

View File

@ -36,7 +36,7 @@ License
int Foam::PstreamBuffers::algorithm int Foam::PstreamBuffers::algorithm
( (
// Name may change in the future (JUN-2023) // Name may change in the future (JUN-2023)
Foam::debug::optimisationSwitch("pbufs.tuning", -1) Foam::debug::optimisationSwitch("pbufs.tuning", 0)
); );
registerOptSwitch registerOptSwitch
( (
@ -46,20 +46,19 @@ registerOptSwitch
); );
// Simple enumerations
// -------------------
static constexpr int algorithm_PEX_allToAll = -1; // Traditional PEX
//static constexpr int algorithm_PEX_hybrid = 0; // Possible new default?
static constexpr int algorithm_full_NBX = 1; // Very experimental
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
inline void Foam::PstreamBuffers::setFinished(bool on) noexcept
{
finishedSendsCalled_ = on;
}
inline void Foam::PstreamBuffers::initFinalExchange() inline void Foam::PstreamBuffers::initFinalExchange()
{ {
// Could also check that it is not called twice // Could also check that it is not called twice
// but that is used for overlapping send/recv (eg, overset) // but that is used for overlapping send/recv (eg, overset)
finishedSendsCalled_ = true; setFinished(true);
clearUnregistered(); clearUnregistered();
} }
@ -67,65 +66,149 @@ inline void Foam::PstreamBuffers::initFinalExchange()
void Foam::PstreamBuffers::finalExchange void Foam::PstreamBuffers::finalExchange
( (
enum modeOption mode,
const bool wait, const bool wait,
labelList& recvSizes labelList& recvSizes
) )
{ {
initFinalExchange(); initFinalExchange();
// Pre-flight checks
switch (mode)
{
case modeOption::DEFAULT :
{
// Choose (ALL_TO_ALL | NBX_PEX) from static settings
mode =
(
(algorithm <= 0)
? modeOption::ALL_TO_ALL
: modeOption::NBX_PEX
);
break;
}
case modeOption::GATHER :
{
// gather mode (all-to-one) : master [0] <- everyone
// - only send to master [0]
// note: master [0] is also allowed to 'send' to itself
for (label proci = 1; proci < sendBuffers_.size(); ++proci)
{
sendBuffers_[proci].clear();
}
break;
}
case modeOption::SCATTER :
{
// scatter mode (one-to-all) : master [0] -> everyone
if (!UPstream::master(comm_))
{
// Non-master: has no sends
clearSends();
}
break;
}
default :
break;
}
if (commsType_ == UPstream::commsTypes::nonBlocking) if (commsType_ == UPstream::commsTypes::nonBlocking)
{ {
if // PEX algorithm with different flavours of exchanging sizes
( // PEX stage 1: exchange sizes
wait
&& (algorithm >= algorithm_full_NBX) labelList sendSizes; // Not used by gather/scatter
&& (UPstream::maxCommsSize <= 0)
) switch (mode)
{ {
// NBX algorithm (nonblocking exchange) case modeOption::GATHER :
// - when requested and waiting, no data chunking etc {
// gather mode (all-to-one): master [0] <- everyone
// - presumed that MPI_Gather will be the most efficient
PstreamDetail::exchangeConsensus<DynamicList<char>, char> recvSizes =
( UPstream::listGatherValues(sendBuffers_[0].size(), comm_);
sendBuffers_,
recvBuffers_,
recvSizes,
(tag_ + 271828), // some unique tag?
comm_,
wait
);
return; if (!UPstream::master(comm_))
{
recvSizes.resize_nocopy(nProcs_);
recvSizes = Zero;
}
break;
}
case modeOption::SCATTER :
{
// scatter mode (one-to-all): master [0] -> everyone
// - presumed that MPI_Scatter will be the most efficient
recvSizes.resize_nocopy(nProcs_);
if (UPstream::master(comm_))
{
forAll(sendBuffers_, proci)
{
recvSizes[proci] = sendBuffers_[proci].size();
}
}
const label myRecv
(
UPstream::listScatterValues(recvSizes, comm_)
);
recvSizes = Zero;
recvSizes[0] = myRecv;
break;
}
case modeOption::NBX_PEX :
{
// Assemble the send sizes (cf. Pstream::exchangeSizes)
sendSizes.resize_nocopy(nProcs_);
forAll(sendBuffers_, proci)
{
sendSizes[proci] = sendBuffers_[proci].size();
}
recvSizes.resize_nocopy(nProcs_);
// Exchange sizes (non-blocking consensus)
UPstream::allToAllConsensus
(
sendSizes,
recvSizes,
(tag_ + 314159), // some unique tag?
comm_
);
break;
}
case modeOption::DEFAULT :
case modeOption::ALL_TO_ALL :
{
// Assemble the send sizes (cf. Pstream::exchangeSizes)
sendSizes.resize_nocopy(nProcs_);
forAll(sendBuffers_, proci)
{
sendSizes[proci] = sendBuffers_[proci].size();
}
recvSizes.resize_nocopy(nProcs_);
// Exchange sizes (all-to-all)
UPstream::allToAll(sendSizes, recvSizes, comm_);
break;
}
} }
// PEX algorithm with two different flavours of exchanging sizes
// Assemble the send sizes (cf. Pstream::exchangeSizes)
labelList sendSizes(nProcs_);
forAll(sendBuffers_, proci)
{
sendSizes[proci] = sendBuffers_[proci].size();
}
recvSizes.resize_nocopy(nProcs_);
if (algorithm == algorithm_PEX_allToAll)
{
// PEX stage 1: exchange sizes (all-to-all)
UPstream::allToAll(sendSizes, recvSizes, comm_);
}
else
{
// PEX stage 1: exchange sizes (non-blocking consensus)
UPstream::allToAllConsensus
(
sendSizes,
recvSizes,
(tag_ + 314159), // some unique tag?
comm_
);
}
// PEX stage 2: point-to-point data exchange // PEX stage 2: point-to-point data exchange
Pstream::exchange<DynamicList<char>, char> Pstream::exchange<DynamicList<char>, char>
( (
@ -166,7 +249,7 @@ void Foam::PstreamBuffers::finalExchange
recvSizes[proci] = 1; // Connected recvSizes[proci] = 1; // Connected
} }
for (label proci=0; proci < nProcs_; ++proci) for (label proci = 0; proci < nProcs_; ++proci)
{ {
if (!recvSizes[proci]) // Not connected if (!recvSizes[proci]) // Not connected
{ {
@ -200,93 +283,6 @@ void Foam::PstreamBuffers::finalExchange
} }
void Foam::PstreamBuffers::finalGatherScatter
(
const bool isGather,
const bool wait,
labelList& recvSizes
)
{
initFinalExchange();
if (isGather)
{
// gather mode (all-to-one)
// Only send to master [0]. Master is also allowed to 'send' to itself
for (label proci=1; proci < sendBuffers_.size(); ++proci)
{
sendBuffers_[proci].clear();
}
}
else
{
// scatter mode (one-to-all)
if (!UPstream::master(comm_))
{
// Non-master: has no sends
clearSends();
}
}
if (commsType_ == UPstream::commsTypes::nonBlocking)
{
// Use PEX algorithm
// - for a non-sparse gather/scatter, it is presumed that
// MPI_Gather/MPI_Scatter will be the most efficient way to
// communicate the sizes.
// PEX stage 1: exchange sizes (gather or scatter)
if (isGather)
{
// gather mode (all-to-one): master [0] <- everyone
recvSizes =
UPstream::listGatherValues(sendBuffers_[0].size(), comm_);
if (!UPstream::master(comm_))
{
recvSizes.resize_nocopy(nProcs_);
recvSizes = Zero;
}
}
else
{
// scatter mode (one-to-all): master [0] -> everyone
recvSizes.resize_nocopy(nProcs_);
if (UPstream::master(comm_))
{
forAll(sendBuffers_, proci)
{
recvSizes[proci] = sendBuffers_[proci].size();
}
}
const label myRecv(UPstream::listScatterValues(recvSizes, comm_));
recvSizes = Zero;
recvSizes[0] = myRecv;
}
// PEX stage 2: point-to-point data exchange
Pstream::exchange<DynamicList<char>, char>
(
sendBuffers_,
recvSizes,
recvBuffers_,
tag_,
comm_,
wait
);
}
}
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::PstreamBuffers::PstreamBuffers Foam::PstreamBuffers::PstreamBuffers
@ -382,7 +378,7 @@ void Foam::PstreamBuffers::clear()
{ {
clearSends(); clearSends();
clearRecvs(); clearRecvs();
finishedSendsCalled_ = false; setFinished(false);
} }
@ -431,13 +427,13 @@ void Foam::PstreamBuffers::clearStorage()
} }
recvPositions_ = Zero; recvPositions_ = Zero;
finishedSendsCalled_ = false; setFinished(false);
} }
void Foam::PstreamBuffers::initRegisterSend() void Foam::PstreamBuffers::initRegisterSend()
{ {
if (!finishedSendsCalled_) if (!finished())
{ {
for (label proci = 0; proci < nProcs_; ++proci) for (label proci = 0; proci < nProcs_; ++proci)
{ {
@ -474,7 +470,7 @@ bool Foam::PstreamBuffers::hasSendData() const
bool Foam::PstreamBuffers::hasRecvData() const bool Foam::PstreamBuffers::hasRecvData() const
{ {
if (finishedSendsCalled_) if (finished())
{ {
forAll(recvBuffers_, proci) forAll(recvBuffers_, proci)
{ {
@ -504,7 +500,7 @@ Foam::label Foam::PstreamBuffers::sendDataCount(const label proci) const
Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const Foam::label Foam::PstreamBuffers::recvDataCount(const label proci) const
{ {
if (finishedSendsCalled_) if (finished())
{ {
const label len(recvBuffers_[proci].size() - recvPositions_[proci]); const label len(recvBuffers_[proci].size() - recvPositions_[proci]);
@ -529,7 +525,7 @@ Foam::labelList Foam::PstreamBuffers::recvDataCounts() const
{ {
labelList counts(nProcs_, Zero); labelList counts(nProcs_, Zero);
if (finishedSendsCalled_) if (finished())
{ {
forAll(recvBuffers_, proci) forAll(recvBuffers_, proci)
{ {
@ -560,7 +556,7 @@ Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount
{ {
label maxLen = 0; label maxLen = 0;
if (finishedSendsCalled_) if (finished())
{ {
forAll(recvBuffers_, proci) forAll(recvBuffers_, proci)
{ {
@ -599,7 +595,7 @@ Foam::label Foam::PstreamBuffers::maxNonLocalRecvCount() const
const Foam::UList<char> const Foam::UList<char>
Foam::PstreamBuffers::peekRecvData(const label proci) const Foam::PstreamBuffers::peekRecvData(const label proci) const
{ {
if (finishedSendsCalled_) if (finished())
{ {
const label pos = recvPositions_[proci]; const label pos = recvPositions_[proci];
const label len = recvBuffers_[proci].size(); const label len = recvBuffers_[proci].size();
@ -625,18 +621,17 @@ Foam::PstreamBuffers::peekRecvData(const label proci) const
} }
bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
{
bool old(allowClearRecv_);
allowClearRecv_ = on;
return old;
}
void Foam::PstreamBuffers::finishedSends(const bool wait) void Foam::PstreamBuffers::finishedSends(const bool wait)
{ {
labelList recvSizes; labelList recvSizes;
finalExchange(wait, recvSizes); finalExchange(modeOption::DEFAULT, wait, recvSizes);
}
void Foam::PstreamBuffers::finishedSendsNBX(const bool wait)
{
labelList recvSizes;
finalExchange(modeOption::NBX_PEX, wait, recvSizes);
} }
@ -649,7 +644,7 @@ void Foam::PstreamBuffers::finishedSends
// Resize for copying back // Resize for copying back
recvSizes.resize_nocopy(sendBuffers_.size()); recvSizes.resize_nocopy(sendBuffers_.size());
finalExchange(wait, recvSizes); finalExchange(modeOption::DEFAULT, wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking) if (commsType_ != UPstream::commsTypes::nonBlocking)
{ {
@ -717,8 +712,9 @@ bool Foam::PstreamBuffers::finishedSends
if (changed) if (changed)
{ {
// Update send/recv topology // Update send/recv topology
labelList recvSizes; labelList recvSizes;
finishedSends(recvSizes, wait); // eg, using all-to-all finishedSends(recvSizes, wait); // modeOption::DEFAULT (eg all-to-all)
// The send ranks // The send ranks
sendProcs.clear(); sendProcs.clear();
@ -754,14 +750,14 @@ bool Foam::PstreamBuffers::finishedSends
void Foam::PstreamBuffers::finishedGathers(const bool wait) void Foam::PstreamBuffers::finishedGathers(const bool wait)
{ {
labelList recvSizes; labelList recvSizes;
finalGatherScatter(true, wait, recvSizes); finalExchange(modeOption::GATHER, wait, recvSizes);
} }
void Foam::PstreamBuffers::finishedScatters(const bool wait) void Foam::PstreamBuffers::finishedScatters(const bool wait)
{ {
labelList recvSizes; labelList recvSizes;
finalGatherScatter(false, wait, recvSizes); finalExchange(modeOption::SCATTER, wait, recvSizes);
} }
@ -771,7 +767,7 @@ void Foam::PstreamBuffers::finishedGathers
const bool wait const bool wait
) )
{ {
finalGatherScatter(true, wait, recvSizes); finalExchange(modeOption::GATHER, wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking) if (commsType_ != UPstream::commsTypes::nonBlocking)
{ {
@ -793,7 +789,7 @@ void Foam::PstreamBuffers::finishedScatters
const bool wait const bool wait
) )
{ {
finalGatherScatter(false, wait, recvSizes); finalExchange(modeOption::SCATTER, wait, recvSizes);
if (commsType_ != UPstream::commsTypes::nonBlocking) if (commsType_ != UPstream::commsTypes::nonBlocking)
{ {
@ -809,4 +805,27 @@ void Foam::PstreamBuffers::finishedScatters
} }
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// Controls
bool Foam::PstreamBuffers::finished() const noexcept
{
return finishedSendsCalled_;
}
bool Foam::PstreamBuffers::allowClearRecv() const noexcept
{
return allowClearRecv_;
}
bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
{
bool old(allowClearRecv_);
allowClearRecv_ = on;
return old;
}
// ************************************************************************* // // ************************************************************************* //

View File

@ -32,7 +32,7 @@ Description
Use UOPstream to stream data into buffers, call finishedSends() to Use UOPstream to stream data into buffers, call finishedSends() to
notify that data is in buffers and then use IUPstream to get data out notify that data is in buffers and then use IUPstream to get data out
of received buffers. Works with both blocking and nonBlocking. Does of received buffers. Works with both blocking and non-blocking. Does
not make much sense with scheduled since there you would not need these not make much sense with scheduled since there you would not need these
explicit buffers. explicit buffers.
@ -151,6 +151,19 @@ class bitSet;
class PstreamBuffers class PstreamBuffers
{ {
// Private Data Types
//- Private enumeration for handling PEX stage 1 (sizing) modes
enum class modeOption : unsigned char
{
DEFAULT, //!< Use statically configured algorithm
GATHER, //!< Use all-to-one (gather) of sizes
SCATTER, //!< Use one-to-all (scatter) of sizes
ALL_TO_ALL, //!< Use allToAll to obtain sizes
NBX_PEX //!< Use consensus exchange (NBX) to obtain sizes
};
// Private Data // Private Data
//- Track if sends are complete //- Track if sends are complete
@ -190,20 +203,24 @@ class PstreamBuffers
// Private Member Functions // Private Member Functions
//- Change status of finished sends called
inline void setFinished(bool on) noexcept;
//- Clear 'unregistered' send buffers, tag as being send-ready //- Clear 'unregistered' send buffers, tag as being send-ready
inline void initFinalExchange(); inline void initFinalExchange();
//- Mark all sends as having been done. //- Mark all sends as having been done.
// This will start receives (nonBlocking comms). // This will start receives (non-blocking comms).
void finalExchange void finalExchange
( (
enum modeOption mode,
const bool wait, const bool wait,
labelList& recvSizes labelList& recvSizes
); );
//- Mark sends as done. //- Mark sends as done.
// Only exchange sizes using the neighbour ranks // Only exchange sizes using the neighbour ranks
// (nonBlocking comms). // (non-blocking comms).
void finalExchange void finalExchange
( (
const labelUList& sendProcs, const labelUList& sendProcs,
@ -212,14 +229,6 @@ class PstreamBuffers
labelList& recvSizes labelList& recvSizes
); );
//- For all-to-one or one-to-all
void finalGatherScatter
(
const bool isGather,
const bool wait,
labelList& recvSizes
);
// Friendship Access // Friendship Access
@ -343,17 +352,11 @@ public:
// Queries // Queries
//- True if finishedSends() or finishedNeighbourSends() has been called //- True if finishedSends() or finishedNeighbourSends() has been called
bool finished() const noexcept bool finished() const noexcept;
{
return finishedSendsCalled_;
}
//- Is clearStorage of individual receive buffer by external hooks //- Is clearStorage of individual receive buffer by external hooks
//- allowed? (default: true) //- allowed? (default: true)
bool allowClearRecv() const noexcept bool allowClearRecv() const noexcept;
{
return allowClearRecv_;
}
//- True if any (local) send buffers have data //- True if any (local) send buffers have data
bool hasSendData() const; bool hasSendData() const;
@ -436,74 +439,96 @@ public:
// Regular Functions // Regular Functions
//- Mark sends as done //- Mark the send phase as being finished.
// //
// Non-blocking mode: populates receive buffers (all-to-all). // Non-blocking mode: populates receive buffers using all-to-all
// \param wait wait for requests to complete (in nonBlocking mode) // or NBX (depending on tuning parameters).
// \param wait wait for requests to complete (in non-blocking mode)
void finishedSends(const bool wait = true); void finishedSends(const bool wait = true);
//- Mark sends as done. //- Mark the send phase as being finished.
//
// Non-blocking mode: populates receive buffers using NBX.
// \param wait wait for requests to complete (in non-blocking mode)
void finishedSendsNBX(const bool wait = true);
//- Mark the send phase as being finished.
//- Recovers the sizes (bytes) received. //- Recovers the sizes (bytes) received.
// //
// Non-blocking mode: populates receive buffers (all-to-all). // Non-blocking mode: populates receive buffers using all-to-all
// \param[out] recvSizes the sizes (bytes) received // or NBX (depending on tuning parameters).
// \param wait wait for requests to complete (in nonBlocking mode) // \warning currently only valid for non-blocking comms.
void finishedSends
(
//! [out] the sizes (bytes) received
labelList& recvSizes,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true
);
//- Mark the send phase as being finished.
//- Recovers the sizes (bytes) received.
// //
// \warning currently only valid for nonBlocking comms. // Non-blocking mode: populates receive buffers using NBX.
void finishedSends(labelList& recvSizes, const bool wait = true); // \warning currently only valid for non-blocking comms.
void finishedSendsNBX
(
//! [out] the sizes (bytes) received
labelList& recvSizes,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true
);
// Functions with restricted neighbours // Functions with restricted neighbours
//- Mark sends as done using subset of send/recv ranks //- Mark the send phase as being finished, with communication
//- and recover the sizes (bytes) received. //- being limited to a known subset of send/recv ranks.
// //
// Non-blocking mode: populates receive buffers. // Non-blocking mode: populates receive buffers.
// //
// \param neighProcs ranks used for sends/recvs // \warning currently only valid for non-blocking comms.
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking comms.
// \note Same as finishedSends with identical sendProcs/recvProcs // \note Same as finishedSends with identical sendProcs/recvProcs
void finishedNeighbourSends void finishedNeighbourSends
( (
//! ranks used for sends/recvs
const labelUList& neighProcs, const labelUList& neighProcs,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true const bool wait = true
); );
//- Mark sends as done using subset of send/recv ranks //- Mark the send phase as being finished, with communication
//- and recover the sizes (bytes) received. //- being limited to a known subset of send/recv ranks.
//- Recovers the sizes (bytes) received.
// //
// Non-blocking mode: it will populate receive buffers. // Non-blocking mode: it will populate receive buffers.
// //
// \param neighProcs ranks used for sends/recvs // \warning currently only valid for non-blocking mode.
// \param[out] recvSizes the sizes (bytes) received
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \warning currently only valid for nonBlocking mode.
void finishedNeighbourSends void finishedNeighbourSends
( (
//! ranks used for sends/recvs
const labelUList& neighProcs, const labelUList& neighProcs,
//! [out] the sizes (bytes) received
labelList& recvSizes, labelList& recvSizes,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true const bool wait = true
); );
//- A caching version that uses a limited send/recv connectivity. //- A caching version that uses a limited send/recv connectivity.
// //
// Non-blocking mode: populates receive buffers. // Non-blocking mode: populates receive buffers.
// \param sendConnections on/off for sending ranks
// \param sendProcs ranks used for sends
// \param recvProcs ranks used for recvs
// \param wait wait for requests to complete (in nonBlocking mode)
//
// \return True if the send/recv connectivity changed // \return True if the send/recv connectivity changed
// //
// \warning currently only valid for nonBlocking comms. // \warning currently only valid for non-blocking comms.
bool finishedSends bool finishedSends
( (
//! inter-rank connections (on/off) for sending ranks
bitSet& sendConnections, bitSet& sendConnections,
//! ranks used for sends
DynamicList<label>& sendProcs, DynamicList<label>& sendProcs,
//! ranks used for recvs
DynamicList<label>& recvProcs, DynamicList<label>& recvProcs,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true const bool wait = true
); );
@ -515,40 +540,46 @@ public:
// Non-blocking mode: populates receive buffers. // Non-blocking mode: populates receive buffers.
// Can use recvDataCount, maxRecvCount etc to recover sizes received. // Can use recvDataCount, maxRecvCount etc to recover sizes received.
// //
// \param wait wait for requests to complete (in nonBlocking mode) // \param wait wait for requests to complete (in non-blocking mode)
// //
// \warning currently only valid for nonBlocking comms. // \warning currently only valid for non-blocking comms.
void finishedGathers(const bool wait = true); void finishedGathers(const bool wait = true);
//- Mark all sends to master as done. //- Mark all sends to master as done.
//- Recovers the sizes (bytes) received. //- Recovers the sizes (bytes) received.
// //
// Non-blocking mode: populates receive buffers (all-to-one). // Non-blocking mode: populates receive buffers (all-to-one).
// \param[out] recvSizes the sizes (bytes) received // \warning currently only valid for non-blocking comms.
// \param wait wait for requests to complete (in nonBlocking mode) void finishedGathers
// (
// \warning currently only valid for nonBlocking comms. //! [out] the sizes (bytes) received
void finishedGathers(labelList& recvSizes, const bool wait = true); labelList& recvSizes,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true
);
//- Mark all sends to sub-procs as done. //- Mark all sends to sub-procs as done.
// //
// Non-blocking mode: populates receive buffers. // Non-blocking mode: populates receive buffers.
// Can use recvDataCount, maxRecvCount etc to recover sizes received. // Can use recvDataCount, maxRecvCount etc to recover sizes received.
// //
// \param wait wait for requests to complete (in nonBlocking mode) // \param wait wait for requests to complete (in non-blocking mode)
// //
// \warning currently only valid for nonBlocking comms. // \warning currently only valid for non-blocking comms.
void finishedScatters(const bool wait = true); void finishedScatters(const bool wait = true);
//- Mark all sends to sub-procs as done. //- Mark all sends to sub-procs as done.
//- Recovers the sizes (bytes) received. //- Recovers the sizes (bytes) received.
// //
// Non-blocking mode: populates receive buffers (all-to-one). // Non-blocking mode: populates receive buffers (all-to-one).
// \param[out] recvSizes the sizes (bytes) received // \warning currently only valid for non-blocking comms.
// \param wait wait for requests to complete (in nonBlocking mode) void finishedScatters
// (
// \warning currently only valid for nonBlocking comms. //! [out] the sizes (bytes) received
void finishedScatters(labelList& recvSizes, const bool wait = true); labelList& recvSizes,
//! wait for requests to complete (in non-blocking mode)
const bool wait = true
);
}; };

View File

@ -46,21 +46,19 @@ namespace Foam
namespace PstreamDetail namespace PstreamDetail
{ {
//- Exchange \em contiguous data using non-blocking consensus exchange //- Exchange \em contiguous data using non-blocking consensus exchange (NBX)
//- with optional tracking of the receive sizes. //- with optional tracking of the receive sizes.
// //
// No internal guards or resizing - data containers are all properly // No internal guards or resizing - data containers are all properly
// sized before calling. // sized before calling.
// //
// \param[in] sendBufs The send buffers list (size: numProcs) // \param[in] sendBufs The send buffers list (size: numProc)
// \param[out] recvBufs The recv buffers list (size: numProcs) // \param[out] recvBufs The recv buffers list (size: numProc)
// \param[out] recvSizes The recv sizes (size: 0 or numProcs). // \param[out] recvSizes The recv sizes (size: 0 or numProc).
// This parameter can be an empty list, in which case the receive sizes // This parameter can be an empty list, in which case the receive sizes
// are not returned. // are not returned.
// \param tag The message tag // \param tag The message tag
// \param comm The communicator // \param comm The communicator
// \param wait Wait for non-blocking receives to complete
// \param recvCommType If blocking or (default) non-blocking
template<class Container, class Type> template<class Container, class Type>
void exchangeConsensus void exchangeConsensus
@ -69,20 +67,17 @@ void exchangeConsensus
UList<Container>& recvBufs, UList<Container>& recvBufs,
labelUList& recvSizes, labelUList& recvSizes,
const int tag, const int tag,
const label comm, const label comm
const bool wait = true,
const UPstream::commsTypes recvCommType = UPstream::commsTypes::nonBlocking
) )
{ {
static_assert(is_contiguous<Type>::value, "Contiguous data only!"); static_assert(is_contiguous<Type>::value, "Contiguous data only!");
const bool initialBarrier = (UPstream::tuning_NBX_ > 0); const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
const label startOfRequests = UPstream::nRequests();
const label myProci = UPstream::myProcNo(comm); const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::nProcs(comm); const label numProc = UPstream::nProcs(comm);
// Initial: clear all receive information // Initial: clear all receive locations
for (auto& buf : recvBufs) for (auto& buf : recvBufs)
{ {
buf.clear(); buf.clear();
@ -98,28 +93,37 @@ void exchangeConsensus
if (sendBufs.size() > numProc) if (sendBufs.size() > numProc)
{ {
FatalErrorInFunction FatalErrorInFunction
<< "Send buffers:" << sendBufs.size() << " > numProcs:" << numProc << "Send buffers:" << sendBufs.size() << " > numProc:" << numProc
<< Foam::abort(FatalError); << Foam::abort(FatalError);
} }
if (recvBufs.size() < numProc) if (recvBufs.size() < numProc)
{ {
FatalErrorInFunction FatalErrorInFunction
<< "Recv buffers:" << recvBufs.size() << " < numProcs:" << numProc << "Recv buffers:" << recvBufs.size() << " < numProc:" << numProc
<< Foam::abort(FatalError); << Foam::abort(FatalError);
} }
// #endif // #endif
if (!UPstream::is_parallel(comm)) // Fake send/recv for myself - parallel or non-parallel
{ {
// Do myself
recvBufs[myProci] = sendBufs[myProci]; recvBufs[myProci] = sendBufs[myProci];
if (myProci < recvSizes.size()) if (myProci < recvSizes.size())
{ {
recvSizes[myProci] = recvBufs.size(); recvSizes[myProci] = recvBufs.size();
} }
}
if (!UPstream::is_parallel(comm))
{
// Nothing left to do
return; return;
} }
// ------------------------------------------------------------------------
// Setup sends
// ------------------------------------------------------------------------
// An initial barrier may help to avoid synchronisation problems // An initial barrier may help to avoid synchronisation problems
// caused elsewhere // caused elsewhere
if (initialBarrier) if (initialBarrier)
@ -127,11 +131,12 @@ void exchangeConsensus
UPstream::barrier(comm); UPstream::barrier(comm);
} }
// Algorithm NBX: Nonblocking consensus with List containers // Algorithm NBX: Nonblocking consensus with List containers
DynamicList<UPstream::Request> sendRequests(sendBufs.size()); DynamicList<UPstream::Request> sendRequests(sendBufs.size());
// Start nonblocking synchronous send to processor dest // Start nonblocking synchronous send to destination ranks
for (label proci = 0; proci < numProc; ++proci) for (label proci = 0; proci < numProc; ++proci)
{ {
const auto& sendData = sendBufs[proci]; const auto& sendData = sendBufs[proci];
@ -140,19 +145,8 @@ void exchangeConsensus
{ {
// Do not send/recv empty data // Do not send/recv empty data
} }
else if (proci == myProci) else if (proci != myProci)
{ {
// Do myself
recvBufs[proci] = sendData;
if (proci < recvSizes.size())
{
recvSizes[proci] = sendData.size();
}
}
else
{
// Has data to send.
// The MPI send requests are tracked on a local list
UOPstream::write UOPstream::write
( (
sendRequests.emplace_back(), sendRequests.emplace_back(),
@ -167,7 +161,15 @@ void exchangeConsensus
} }
// ------------------------------------------------------------------------
// Probe and receive // Probe and receive
// ------------------------------------------------------------------------
//
// When receiving can use resize() instead of resize_nocopy() since the
// slots were already initially cleared.
// The resize() also works fine with FixedList since it will
// corresponds to a no-op: send and recv sizes will always be
// identical to its fixed size() / max_size()
UPstream::Request barrierRequest; UPstream::Request barrierRequest;
@ -191,17 +193,16 @@ void exchangeConsensus
const label count = (probed.second / sizeof(Type)); const label count = (probed.second / sizeof(Type));
auto& recvData = recvBufs[proci]; auto& recvData = recvBufs[proci];
recvData.resize_nocopy(count); recvData.resize(count); // OK with resize() instead of _nocopy()
if (proci < recvSizes.size()) if (proci < recvSizes.size())
{ {
recvSizes[proci] = count; recvSizes[proci] = count;
} }
// Any non-blocking MPI recv requests are tracked on internal stack
UIPstream::read UIPstream::read
( (
recvCommType, UPstream::commsTypes::blocking,
proci, proci,
recvData.data_bytes(), recvData.data_bytes(),
recvData.size_bytes(), recvData.size_bytes(),
@ -229,26 +230,18 @@ void exchangeConsensus
} }
} }
} }
// Wait for non-blocking receives to finish
if (wait && recvCommType == UPstream::commsTypes::nonBlocking)
{
UPstream::waitRequests(startOfRequests);
}
} }
//- Exchange \em contiguous data using non-blocking consensus exchange. //- Exchange \em contiguous data using non-blocking consensus exchange (NBX)
// //
// No internal guards - the sending Map corresponds to a segment of // No internal guards - the sending Map corresponds to a segment of
// 0-numProcs. // 0-numProc.
// //
// \param[in] sendBufs The send buffers map (addr: 0-numProcs) // \param[in] sendBufs The send buffers map (addr: 0-numProc)
// \param[out] recvBufs The recv buffers map // \param[out] recvBufs The recv buffers map
// \param tag The message tag // \param tag The message tag
// \param comm The communicator // \param comm The communicator
// \param wait Wait for non-blocking receives to complete
// \param recvCommType If blocking or (default) non-blocking
template<class Container, class Type> template<class Container, class Type>
void exchangeConsensus void exchangeConsensus
@ -256,17 +249,17 @@ void exchangeConsensus
const Map<Container>& sendBufs, const Map<Container>& sendBufs,
Map<Container>& recvBufs, Map<Container>& recvBufs,
const int tag, const int tag,
const label comm, const label comm
const bool wait = true,
const UPstream::commsTypes recvCommType = UPstream::commsTypes::nonBlocking
) )
{ {
static_assert(is_contiguous<Type>::value, "Contiguous data only!"); static_assert(is_contiguous<Type>::value, "Contiguous data only!");
const label startOfRequests = UPstream::nRequests(); // TDB: const bool initialBarrier = (UPstream::tuning_NBX_ > 0);
const label myProci = UPstream::myProcNo(comm);
// Initial: clear out receive 'slots' const label myProci = UPstream::myProcNo(comm);
const label numProc = UPstream::myProcNo(comm);
// Initial: clear all receive locations
// Preferrable to clear out the map entries instead of the map itself // Preferrable to clear out the map entries instead of the map itself
// since this can potentially preserve allocated space // since this can potentially preserve allocated space
// (eg DynamicList entries) between calls // (eg DynamicList entries) between calls
@ -276,9 +269,13 @@ void exchangeConsensus
iter.val().clear(); iter.val().clear();
} }
if (!UPstream::is_parallel(comm)) if (!UPstream::is_rank(comm))
{
return; // Process not in communicator
}
// Fake send/recv for myself - parallel or non-parallel
{ {
// Do myself
const auto iter = sendBufs.find(myProci); const auto iter = sendBufs.find(myProci);
if (iter.good()) if (iter.good())
{ {
@ -290,43 +287,38 @@ void exchangeConsensus
recvBufs(iter.key()) = sendData; recvBufs(iter.key()) = sendData;
} }
} }
}
if (!UPstream::is_parallel(comm))
{
// Nothing left to do
return; return;
} }
// ------------------------------------------------------------------------
// Setup sends
// ------------------------------------------------------------------------
// TDB: initialBarrier ...
// Algorithm NBX: Nonblocking consensus with Map (HashTable) containers // Algorithm NBX: Nonblocking consensus with Map (HashTable) containers
DynamicList<UPstream::Request> sendRequests(sendBufs.size()); DynamicList<UPstream::Request> sendRequests(sendBufs.size());
// Start nonblocking synchronous send to process dest // Start nonblocking synchronous send to destination ranks
forAllConstIters(sendBufs, iter) forAllConstIters(sendBufs, iter)
{ {
const label proci = iter.key(); const label proci = iter.key();
const auto& sendData = iter.val(); const auto& sendData = iter.val();
#ifdef FULLDEBUG if (sendData.empty() || proci < 0 || proci >= numProc)
if (proci >= UPstream::nProcs(comm))
{ {
FatalErrorInFunction // Do not send/recv empty data or invalid destinations
<< "Send buffer:" << proci << " >= numProcs:"
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
} }
#endif else if (proci != myProci)
if (sendData.empty())
{ {
// Do not send/recv empty data
}
else if (proci == myProci)
{
// Do myself: insert_or_assign
recvBufs(proci) = sendData;
}
else
{
// Has data to send.
// The MPI send requests are tracked on a local list
UOPstream::write UOPstream::write
( (
sendRequests.emplace_back(), sendRequests.emplace_back(),
@ -341,7 +333,15 @@ void exchangeConsensus
} }
// ------------------------------------------------------------------------
// Probe and receive // Probe and receive
// ------------------------------------------------------------------------
//
// When receiving can use resize() instead of resize_nocopy() since the
// slots were already initially cleared.
// The resize() also works fine with FixedList since it will
// corresponds to a no-op: send and recv sizes will always be
// identical to its fixed size() / max_size()
UPstream::Request barrierRequest; UPstream::Request barrierRequest;
@ -365,12 +365,11 @@ void exchangeConsensus
const label count = (probed.second / sizeof(Type)); const label count = (probed.second / sizeof(Type));
auto& recvData = recvBufs(proci); auto& recvData = recvBufs(proci);
recvData.resize_nocopy(count); recvData.resize(count); // OK with resize() instead of _nocopy()
// Any non-blocking MPI recv requests are tracked on internal stack
UIPstream::read UIPstream::read
( (
recvCommType, UPstream::commsTypes::blocking,
proci, proci,
recvData.data_bytes(), recvData.data_bytes(),
recvData.size_bytes(), recvData.size_bytes(),
@ -397,12 +396,6 @@ void exchangeConsensus
} }
} }
} }
// Wait for non-blocking receives to finish
if (wait && recvCommType == UPstream::commsTypes::nonBlocking)
{
UPstream::waitRequests(startOfRequests);
}
} }
} // namespace PstreamDetail } // namespace PstreamDetail
@ -418,7 +411,7 @@ void Foam::Pstream::exchangeConsensus
List<Container>& recvBufs, List<Container>& recvBufs,
const int tag, const int tag,
const label comm, const label comm,
const bool wait const bool /* wait (ignored) */
) )
{ {
static_assert(is_contiguous<Type>::value, "Contiguous data only!"); static_assert(is_contiguous<Type>::value, "Contiguous data only!");
@ -427,7 +420,7 @@ void Foam::Pstream::exchangeConsensus
{ {
FatalErrorInFunction FatalErrorInFunction
<< "Send buffers size:" << sendBufs.size() << "Send buffers size:" << sendBufs.size()
<< " != numProcs:" << UPstream::nProcs(comm) << " != numProc:" << UPstream::nProcs(comm)
<< Foam::abort(FatalError); << Foam::abort(FatalError);
} }
@ -435,14 +428,13 @@ void Foam::Pstream::exchangeConsensus
recvBufs.resize_nocopy(sendBufs.size()); recvBufs.resize_nocopy(sendBufs.size());
labelList dummyRecvSizes; labelList dummyRecvSizes;
PstreamDetail::exchangeConsensus PstreamDetail::exchangeConsensus<Container, Type>
( (
sendBufs, sendBufs,
recvBufs, recvBufs,
dummyRecvSizes, dummyRecvSizes,
tag, tag,
comm, comm
wait
); );
} }
@ -454,20 +446,45 @@ void Foam::Pstream::exchangeConsensus
Map<Container>& recvBufs, Map<Container>& recvBufs,
const int tag, const int tag,
const label comm, const label comm,
const bool wait const bool /* wait (ignored) */
) )
{ {
static_assert(is_contiguous<Type>::value, "Contiguous data only!"); static_assert(is_contiguous<Type>::value, "Contiguous data only!");
PstreamDetail::exchangeConsensus PstreamDetail::exchangeConsensus<Container, Type>
( (
sendBufs, sendBufs,
recvBufs, recvBufs,
tag, tag,
comm, comm
wait
); );
} }
template<class Container, class Type>
Foam::Map<Container>
Foam::Pstream::exchangeConsensus
(
const Map<Container>& sendBufs,
const int tag,
const label comm,
const bool /* wait (ignored) */
)
{
Map<Container> recvBufs;
static_assert(is_contiguous<Type>::value, "Contiguous data only!");
PstreamDetail::exchangeConsensus<Container, Type>
(
sendBufs,
recvBufs,
tag,
comm
);
return recvBufs;
}
// ************************************************************************* // // ************************************************************************* //

View File

@ -44,6 +44,7 @@ SourceFiles
#include "labelList.H" #include "labelList.H"
#include "DynamicList.H" #include "DynamicList.H"
#include "HashTable.H" #include "HashTable.H"
#include "Map.H"
#include "Enum.H" #include "Enum.H"
#include "ListOps.H" #include "ListOps.H"
@ -55,9 +56,6 @@ namespace Foam
//- Implementation details for UPstream/Pstream/MPI etc. //- Implementation details for UPstream/Pstream/MPI etc.
namespace PstreamDetail {} namespace PstreamDetail {}
// Forward Declarations
template<class T> class Map;
/*---------------------------------------------------------------------------*\ /*---------------------------------------------------------------------------*\
Class UPstream Declaration Class UPstream Declaration
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
@ -968,119 +966,76 @@ public:
//- Shutdown (finalize) MPI as required and exit program with errNo. //- Shutdown (finalize) MPI as required and exit program with errNo.
static void exit(int errNo = 1); static void exit(int errNo = 1);
//- Exchange integer data with all processors (in the communicator).
// \c sendData[proci] is the value to send to proci.
// After return recvData contains the data from the other processors.
// \n
// For \b non-parallel : does a simple copy of sendData to recvData
static void allToAll
(
const UList<int32_t>& sendData,
UList<int32_t>& recvData,
const label communicator = worldComm
);
//- Exchange integer data with all processors (in the communicator). #undef Pstream_CommonRoutines
// \c sendData[proci] is the value to send to proci. #define Pstream_CommonRoutines(Native) \
// After return recvData contains the data from the other processors. \
// \n /*!\brief Exchange \c Native data with all ranks in communicator */ \
// For \b non-parallel : does a simple copy of sendData to recvData /*! \em non-parallel : simple copy of \p sendData to \p recvData */ \
static void allToAll static void allToAll \
( ( \
const UList<int64_t>& sendData, /*! [in] The value at [proci] is sent to proci */ \
UList<int64_t>& recvData, const UList<Native>& sendData, \
const label communicator = worldComm /*! [out] The data received from the other ranks */ \
); UList<Native>& recvData, \
const label communicator = worldComm \
); \
\
/*!\brief Exchange \em non-zero \c Native data between ranks [NBX] */ \
/*! \p recvData is always initially assigned zero and no non-zero */ \
/*! values are sent/received from other ranks. */ \
/*! \em non-parallel : simple copy of \p sendData to \p recvData */ \
/*! \note The message tag should be chosen to be a unique value */ \
/*! since the implementation uses probing with ANY_SOURCE !! */ \
/*! An initial barrier may help to avoid synchronisation problems */ \
/*! caused elsewhere (See "nbx.tuning" opt switch) */ \
static void allToAllConsensus \
( \
/*! [in] The \em non-zero value at [proci] is sent to proci */ \
const UList<Native>& sendData, \
/*! [out] The non-zero value received from each rank */ \
UList<Native>& recvData, \
/*! Message tag for the communication */ \
const int tag, \
const label communicator = worldComm \
); \
\
/*!\brief Exchange \c Native data between ranks [NBX] */ \
/*! \p recvData map is always cleared initially so a simple check */ \
/*! of its keys is sufficient to determine connectivity. */ \
/*! \em non-parallel : copy own rank (if it exists) */ \
/*! See notes about message tags and "nbx.tuning" opt switch */ \
static void allToAllConsensus \
( \
/*! [in] The value at [proci] is sent to proci. */ \
const Map<Native>& sendData, \
/*! [out] The values received from given ranks. */ \
Map<Native>& recvData, \
/*! Message tag for the communication */ \
const int tag, \
const label communicator = worldComm \
); \
\
/*!\brief Exchange \c Native data between ranks [NBX] */ \
/*! \returns any received data as a Map */ \
static Map<Native> allToAllConsensus \
( \
/*! [in] The value at [proci] is sent to proci. */ \
const Map<Native>& sendData, \
/*! Message tag for the communication */ \
const int tag, \
const label communicator = worldComm \
) \
{ \
Map<Native> recvData; \
allToAllConsensus(sendData, recvData, tag, communicator); \
return recvData; \
}
//- Exchange \b non-zero integer data with all ranks in the communicator Pstream_CommonRoutines(int32_t);
//- using non-blocking consensus exchange. Pstream_CommonRoutines(int64_t);
// The \c sendData[proci] is the (non-zero) value to send to proci.
// After return recvData contains the non-zero values sent from the
// other processors. The recvData list is always assigned zero before
// receipt and values of zero are never transmitted.
// After return recvData contains the data from the other processors.
// \n
// For \b non-parallel : does a simple copy of sendData to recvData
//
// \note The message tag should be chosen to be a unique value
// since the implementation uses probing with ANY_SOURCE !!
// An initial barrier may help to avoid synchronisation problems
// caused elsewhere (See "nbx.tuning" opt switch)
static void allToAllConsensus
(
const UList<int32_t>& sendData,
UList<int32_t>& recvData,
const int tag,
const label communicator = worldComm
);
//- Exchange \b non-zero integer data with all ranks in the communicator #undef Pstream_CommonRoutines
//- using non-blocking consensus exchange.
// The \c sendData[proci] is the (non-zero) value to send to proci.
// After return recvData contains the non-zero values sent from the
// other processors. The recvData list is always assigned zero before
// receipt and values of zero are never transmitted.
// After return recvData contains the data from the other processors.
// \n
// For \b non-parallel : does a simple copy of sendData to recvData
//
// \note The message tag should be chosen to be a unique value
// since the implementation uses probing with ANY_SOURCE !!
// An initial barrier may help to avoid synchronisation problems
// caused elsewhere (See "nbx.tuning" opt switch)
static void allToAllConsensus
(
const UList<int64_t>& sendData,
UList<int64_t>& recvData,
const int tag,
const label communicator = worldComm
);
//- Exchange \b non-zero integer data with all ranks in the communicator
//- using non-blocking consensus exchange.
// The \c sendData[proci] is the (non-zero) value to send to proci.
// After return recvData contains the non-zero values sent from the
// other processors. Since the recvData map always cleared before
// receipt and values of zero are never transmitted, a simple check
// of its keys is sufficient to determine connectivity.
// \n
// For \b non-parallel : copy own rank (if it exists and non-zero)
// from sendData to recvData.
//
// \note The message tag should be chosen to be a unique value
// since the implementation uses probing with ANY_SOURCE !!
// An initial barrier may help to avoid synchronisation problems
// caused elsewhere (See "nbx.tuning" opt switch)
static void allToAllConsensus
(
const Map<int32_t>& sendData,
Map<int32_t>& recvData,
const int tag,
const label communicator = worldComm
);
//- Exchange \b non-zero integer data with all ranks in the communicator
//- using non-blocking consensus exchange.
// The \c sendData[proci] is the (non-zero) value to send to proci.
// After return recvData contains the non-zero values sent from the
// other processors. Since the recvData map always cleared before
// receipt and values of zero are never transmitted, a simple check
// of its keys is sufficient to determine connectivity.
// \n
// For \b non-parallel : copy own rank (if it exists and non-zero)
// from sendData to recvData.
//
// \note The message tag should be chosen to be a unique value
// since the implementation uses probing with ANY_SOURCE !!
// An initial barrier may help to avoid synchronisation problems
// caused elsewhere (See "nbx.tuning" opt switch)
static void allToAllConsensus
(
const Map<int64_t>& sendData,
Map<int64_t>& recvData,
const int tag,
const label communicator = worldComm
);
// Low-level gather/scatter routines // Low-level gather/scatter routines
@ -1121,13 +1076,7 @@ public:
/*! Number of send/recv data per rank. Globally consistent! */ \ /*! Number of send/recv data per rank. Globally consistent! */ \
int count, \ int count, \
const label communicator = worldComm \ const label communicator = worldComm \
); ); \
Pstream_CommonRoutines(char);
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native) \
\ \
/*! \brief Receive variable length \c Native data from all ranks */ \ /*! \brief Receive variable length \c Native data from all ranks */ \
static void gather \ static void gather \

View File

@ -34,16 +34,25 @@ License
void Foam::globalIndex::reportOverflowAndExit void Foam::globalIndex::reportOverflowAndExit
( (
const label idx, const label idx,
const labelUList& localLens const label prevOffset,
const label count
) )
{ {
if (idx < 0)
{
// No overflow tagged
return;
}
FatalErrorInFunction FatalErrorInFunction
<< "Overflow : sum of sizes exceeds labelMax (" << "Overflow : sum of sizes exceeds labelMax ("
<< labelMax << ") after index " << idx; << labelMax << ") after index " << idx;
if (!localLens.empty()) if (prevOffset >= 0 && count >= 0)
{ {
FatalError << " of " << flatOutput(localLens); FatalError
<< " while trying to add (" << count
<< ") to offset (" << prevOffset << ")";
} }
FatalError FatalError
@ -70,27 +79,29 @@ Foam::globalIndex::calcRange
} }
const label myProci = UPstream::myProcNo(comm); const label myProci = UPstream::myProcNo(comm);
const labelList localLens = UPstream::allGatherValues(localSize, comm); const labelList counts = UPstream::allGatherValues(localSize, comm);
if (checkOverflow) if (checkOverflow)
{ {
const label len = localLens.size(); const label len = counts.size();
label start = 0; label start = 0;
for (label i = 0; i < len; ++i) for (label i = 0; i < len; ++i)
{ {
const label count = counts[i];
if (i == myProci) if (i == myProci)
{ {
myRange.start() = start; myRange.start() = start;
} }
const label prev = start; const label prev = start;
start += localLens[i]; start += count;
if (start < prev) if (start < prev)
{ {
reportOverflowAndExit(i, localLens); reportOverflowAndExit(i, prev, count);
} }
} }
} }
@ -98,8 +109,8 @@ Foam::globalIndex::calcRange
{ {
// std::accumulate // std::accumulate
// ( // (
// localLens.cbegin(), // counts.cbegin(),
// localLens.cbegin(myProci), // counts.cbegin(myProci),
// label(0) // label(0)
// ); // );
@ -107,7 +118,7 @@ Foam::globalIndex::calcRange
for (label i = 0; i < myProci; ++i) for (label i = 0; i < myProci; ++i)
{ {
start += localLens[i]; start += counts[i];
} }
myRange.start() = start; myRange.start() = start;
} }
@ -133,27 +144,28 @@ Foam::globalIndex::calcOffset
} }
const label myProci = UPstream::myProcNo(comm); const label myProci = UPstream::myProcNo(comm);
const labelList localLens = UPstream::allGatherValues(localSize, comm); const labelList counts = UPstream::allGatherValues(localSize, comm);
if (checkOverflow) if (checkOverflow)
{ {
const label len = localLens.size(); const label len = counts.size();
label start = 0; label start = 0;
for (label i = 0; i < len; ++i) for (label i = 0; i < len; ++i)
{ {
const label count = counts[i];
if (i == myProci) if (i == myProci)
{ {
myOffset = start; myOffset = start;
} }
const label prev = start; const label prev = start;
start += localLens[i]; start += count;
if (start < prev) if (start < prev)
{ {
reportOverflowAndExit(i, localLens); reportOverflowAndExit(i, prev, count);
} }
} }
} }
@ -161,8 +173,8 @@ Foam::globalIndex::calcOffset
{ {
// std::accumulate // std::accumulate
// ( // (
// localLens.cbegin(), // counts.cbegin(),
// localLens.cbegin(myProci), // counts.cbegin(myProci),
// label(0) // label(0)
// ); // );
@ -170,7 +182,7 @@ Foam::globalIndex::calcOffset
for (label i = 0; i < myProci; ++i) for (label i = 0; i < myProci; ++i)
{ {
start += localLens[i]; start += counts[i];
} }
myOffset = start; myOffset = start;
} }
@ -182,13 +194,13 @@ Foam::globalIndex::calcOffset
Foam::labelList Foam::labelList
Foam::globalIndex::calcOffsets Foam::globalIndex::calcOffsets
( (
const labelUList& localLens, const labelUList& counts,
const bool checkOverflow const bool checkOverflow
) )
{ {
labelList values; labelList values;
const label len = localLens.size(); const label len = counts.size();
if (len) if (len)
{ {
@ -197,12 +209,13 @@ Foam::globalIndex::calcOffsets
label start = 0; label start = 0;
for (label i = 0; i < len; ++i) for (label i = 0; i < len; ++i)
{ {
const label count = counts[i];
values[i] = start; values[i] = start;
start += localLens[i]; start += count;
if (checkOverflow && start < values[i]) if (checkOverflow && start < values[i])
{ {
reportOverflowAndExit(i, localLens); reportOverflowAndExit(i, values[i], count);
} }
} }
values[len] = start; values[len] = start;
@ -215,13 +228,13 @@ Foam::globalIndex::calcOffsets
Foam::List<Foam::labelRange> Foam::List<Foam::labelRange>
Foam::globalIndex::calcRanges Foam::globalIndex::calcRanges
( (
const labelUList& localLens, const labelUList& counts,
const bool checkOverflow const bool checkOverflow
) )
{ {
List<labelRange> values; List<labelRange> values;
const label len = localLens.size(); const label len = counts.size();
if (len) if (len)
{ {
@ -230,12 +243,18 @@ Foam::globalIndex::calcRanges
label start = 0; label start = 0;
for (label i = 0; i < len; ++i) for (label i = 0; i < len; ++i)
{ {
values[i].reset(start, localLens[i]); const label count = counts[i];
start += localLens[i]; values[i].reset(start, count);
start += count;
if (checkOverflow && start < values[i].start()) if
(
checkOverflow
&& (start < values[i].start())
&& (i < len-1) // Do not check the one beyond the end range
)
{ {
reportOverflowAndExit(i, localLens); reportOverflowAndExit(i, values[i].start(), count);
} }
} }
} }
@ -317,6 +336,19 @@ Foam::globalIndex::bin
} }
void Foam::globalIndex::resize(const label n)
{
if (n < 1)
{
offsets_.clear();
}
else
{
offsets_.resize(n+1, end_value());
}
}
void Foam::globalIndex::reset void Foam::globalIndex::reset
( (
const label localSize, const label localSize,
@ -328,11 +360,11 @@ void Foam::globalIndex::reset
if (len) if (len)
{ {
labelList localLens; labelList counts;
if (parallel && UPstream::parRun()) // or UPstream::is_parallel(comm) if (parallel && UPstream::parRun()) // or UPstream::is_parallel(comm)
{ {
localLens = UPstream::allGatherValues(localSize, comm); counts = UPstream::allGatherValues(localSize, comm);
} }
else else
{ {
@ -340,11 +372,11 @@ void Foam::globalIndex::reset
// TBD: check for (proci >= 0) ? // TBD: check for (proci >= 0) ?
const auto proci = UPstream::myProcNo(comm); const auto proci = UPstream::myProcNo(comm);
localLens.resize(len, Zero); counts.resize(len, Zero);
localLens[proci] = localSize; counts[proci] = localSize;
} }
reset(localLens, true); // checkOverflow = true reset(counts, true); // checkOverflow = true
} }
else else
{ {
@ -356,11 +388,11 @@ void Foam::globalIndex::reset
void Foam::globalIndex::reset void Foam::globalIndex::reset
( (
const labelUList& localLens, const labelUList& counts,
const bool checkOverflow const bool checkOverflow
) )
{ {
const label len = localLens.size(); const label len = counts.size();
if (len) if (len)
{ {
@ -369,12 +401,13 @@ void Foam::globalIndex::reset
label start = 0; label start = 0;
for (label i = 0; i < len; ++i) for (label i = 0; i < len; ++i)
{ {
const label count = counts[i];
offsets_[i] = start; offsets_[i] = start;
start += localLens[i]; start += count;
if (checkOverflow && start < offsets_[i]) if (checkOverflow && start < offsets_[i])
{ {
reportOverflowAndExit(i, localLens); reportOverflowAndExit(i, offsets_[i], count);
} }
} }
offsets_[len] = start; offsets_[len] = start;
@ -468,8 +501,8 @@ Foam::label Foam::globalIndex::maxNonLocalSize(const label proci) const
{ {
if (i != proci) if (i != proci)
{ {
const label localLen = (offsets_[i+1] - offsets_[i]); const label count = (offsets_[i+1] - offsets_[i]);
maxLen = max(maxLen, localLen); maxLen = max(maxLen, count);
} }
} }
@ -506,17 +539,17 @@ Foam::labelRange Foam::globalIndex::back() const
} }
// * * * * * * * * * * * * * * * Friend Operators * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * IOstream Operators * * * * * * * * * * * * //
Foam::Istream& Foam::operator>>(Istream& is, globalIndex& gi) Foam::Istream& Foam::operator>>(Istream& is, globalIndex& gi)
{ {
return is >> gi.offsets_; return is >> gi.offsets();
} }
Foam::Ostream& Foam::operator<<(Ostream& os, const globalIndex& gi) Foam::Ostream& Foam::operator<<(Ostream& os, const globalIndex& gi)
{ {
return os << gi.offsets_; return os << gi.offsets();
} }

View File

@ -83,11 +83,12 @@ class globalIndex
DynamicList<label>& validBins DynamicList<label>& validBins
); );
//- Report overflow at specified index //- Report overflow at specified (non-negative) index
static void reportOverflowAndExit static void reportOverflowAndExit
( (
const label idx, const label idx,
const labelUList& localLens = labelUList::null() const label prevOffset = -1, // The last valid offset value
const label count = 0 // The count to add to prevOffset
); );
public: public:
@ -122,7 +123,7 @@ public:
//- No communication required //- No communication required
inline explicit globalIndex(labelList&& listOffsets); inline explicit globalIndex(labelList&& listOffsets);
//- Copy construct from a list of sizes. //- Construct from a list of sizes and calculate the offsets.
//- No communication required //- No communication required
inline globalIndex inline globalIndex
( (
@ -256,6 +257,10 @@ public:
//- Reset to be empty (no offsets) //- Reset to be empty (no offsets)
inline void clear(); inline void clear();
//- Change the number of entries (nProcs) in the offsets table.
//- Extending will fill with empty local sizes.
void resize(const label n);
//- Reset from local size, using gather/broadcast //- Reset from local size, using gather/broadcast
//- with default/specified communicator if parallel. //- with default/specified communicator if parallel.
void reset void reset
@ -265,12 +270,12 @@ public:
const bool parallel = UPstream::parRun() //!< use parallel comms const bool parallel = UPstream::parRun() //!< use parallel comms
); );
//- Reset from list of local sizes, //- Reset offsets from a list of local sizes,
//- with optional check for label overflow. //- with optional check for label overflow.
//- No communication required //- No communication required
void reset void reset
( (
const labelUList& localLens, const labelUList& counts,
const bool checkOverflow = false const bool checkOverflow = false
); );
@ -307,12 +312,18 @@ public:
const label comm = UPstream::worldComm //!< communicator const label comm = UPstream::worldComm //!< communicator
); );
//- Reset the globalIndex. Same as copy assignment.
inline void reset(const globalIndex& gi);
//- Alter local size for given processor //- Alter local size for given processor
void setLocalSize(const label proci, const label len); void setLocalSize(const label proci, const label len);
// Queries and renumbering // Queries and renumbering
//- True if contained within the offsets range
inline bool contains(const label i) const noexcept;
//- Start of proci data //- Start of proci data
inline label localStart(const label proci) const; inline label localStart(const label proci) const;
@ -352,14 +363,26 @@ public:
//- From global to local on proci //- From global to local on proci
inline label toLocal(const label proci, const label i) const; inline label toLocal(const label proci, const label i) const;
//- Which processor does global id come from? //- Find processor with specified global id.
// Does an initial check for isLocal first (assumed to occur //- Check proci first, followed by binary search.
// reasonably frequently) followed by a binary search. // \return the processor number or -1 (not found)
//- Fatal for out of range ids (eg, negative or >= totalSize() inline label findProc(const label proci, const label i) const;
inline label whichProcID(const label i) const;
//- Find processor above proci with specified global id - binary search.
// \return the processor above proci or -1 otherwise
// (including for out-of-range indices)
inline label findProcAbove(const label proci, const label i) const;
//- Find processor below proci with specified global id - binary search.
// Binary search.
// \return the processor below proci or -1 otherwise
// (including for out-of-range indices)
inline label findProcBelow(const label proci, const label i) const;
//- Which processor does global id come from? //- Which processor does global id come from?
//- Checks proci first. //- Checks proci first (assumed to occur reasonably frequently)
//- followed by a binary search.
//- Fatal for out-of-range indices
inline label whichProcID(const label proci, const label i) const; inline label whichProcID(const label proci, const label i) const;
@ -396,10 +419,17 @@ public:
// FatalError if not on local processor. // FatalError if not on local processor.
inline label toLocal(const label i) const; inline label toLocal(const label i) const;
//- Which processor does global id come from?
// Uses myProcNo for the initial local check.
inline label whichProcID(const label i) const;
// Iteration // Iteration
//- Forward input iterator with const access //- Forward input iterator with const access that is used to
//- iterate across the globalIndex offsets() table.
// The de-referenced value is the range() with (start, size),
// but it also provides separate index, start, size information.
class const_iterator class const_iterator
{ {
//- The parent class for which this is an iterator //- The parent class for which this is an iterator
@ -504,7 +534,16 @@ public:
//- with optional check for label overflow //- with optional check for label overflow
static labelList calcOffsets static labelList calcOffsets
( (
const labelUList& localLens, const labelUList& counts,
const bool checkOverflow = false
);
//- Calculate offsets from an indirect list of local sizes,
//- with optional check for label overflow
template<class Addr>
static labelList calcOffsets
(
const IndirectListBase<label, Addr>& counts,
const bool checkOverflow = false const bool checkOverflow = false
); );
@ -521,7 +560,7 @@ public:
//- with optional check for label overflow //- with optional check for label overflow
static List<labelRange> calcRanges static List<labelRange> calcRanges
( (
const labelUList& localLens, const labelUList& counts,
const bool checkOverflow = false const bool checkOverflow = false
); );
@ -955,6 +994,27 @@ public:
) const; ) const;
// Member Operators
//- Compare for equality - uses the offsets
bool operator==(const globalIndex& rhs) const
{
return (this->offsets() == rhs.offsets());
}
//- Compare for inequality - uses the offsets
bool operator!=(const globalIndex& rhs) const
{
return !(*this == rhs);
}
//- Compare for less-than - uses the offsets
bool operator<(const globalIndex& rhs) const
{
return (this->offsets() < rhs.offsets());
}
// IOstream Operators // IOstream Operators
friend Istream& operator>>(Istream& is, globalIndex& gi); friend Istream& operator>>(Istream& is, globalIndex& gi);

View File

@ -163,6 +163,12 @@ inline Foam::label Foam::globalIndex::end_value() const noexcept
} }
inline bool Foam::globalIndex::contains(const label i) const noexcept
{
return (begin_value() <= i) && (i < end_value());
}
inline Foam::label Foam::globalIndex::totalSize() const noexcept inline Foam::label Foam::globalIndex::totalSize() const noexcept
{ {
return end_value(); return end_value();
@ -295,7 +301,7 @@ inline Foam::labelRange Foam::globalIndex::range() const
inline bool Foam::globalIndex::isLocal(const label proci, const label i) const inline bool Foam::globalIndex::isLocal(const label proci, const label i) const
{ {
// range contains() // range contains()
return i >= offsets_[proci] && i < offsets_[proci+1]; return (offsets_[proci] <= i) && (i < offsets_[proci+1]);
} }
@ -377,16 +383,16 @@ inline void Foam::globalIndex::inplaceToGlobal(labelUList& labels) const
inline Foam::label inline Foam::label
Foam::globalIndex::toLocal(const label proci, const label i) const Foam::globalIndex::toLocal(const label proci, const label i) const
{ {
const label locali = i - offsets_[proci]; // range !contains()
if (i < offsets_[proci] || i >= offsets_[proci+1])
if (locali < 0 || i >= offsets_[proci+1])
{ {
FatalErrorInFunction FatalErrorInFunction
<< "Global " << i << " does not belong on processor " << "Global id:" << i << " does not belong on processor "
<< proci << nl << "Offsets:" << offsets_ << proci << nl
<< " Offsets:" << offsets_
<< abort(FatalError); << abort(FatalError);
} }
return locali; return (i - offsets_[proci]);
} }
@ -396,21 +402,109 @@ inline Foam::label Foam::globalIndex::toLocal(const label i) const
} }
inline Foam::label Foam::globalIndex::findProc
(
const label proci,
const label i
) const
{
// Simple checks first
if
(
(proci < 0) // Invalid proc
|| (proci+1 > offsets_.size()) // Invalid proc
|| (i < offsets_.front()) // Below the start
|| (i >= offsets_.back()) // Beyond the end
)
{
return -1;
}
// Assume that in many cases we have many queries for the local proci,
// so query that first but also make it the split point for
// restricting the binary searches
if (isLocal(proci, i))
{
return proci;
}
if (i < offsets_[proci])
{
// Can restrict search to procs below proci
const labelList::subList slice(offsets_, proci);
return findLower(slice, i+1);
}
// Search starts at proci+1 (and above)
return findLower(offsets_, i+1, proci+1);
}
inline Foam::label Foam::globalIndex::findProcAbove
(
const label proci,
const label i
) const
{
// Simple checks first
if
(
(proci < 0) // Invalid proci
|| (proci+1 >= offsets_.size()) // Nothing above last proc
|| (i < offsets_[proci+1]) // Exclude proc-local and below
|| (i >= offsets_.back()) // Beyond the end
)
{
return -1;
}
// Search starts at proci+1 (and above)
return findLower(offsets_, i+1, (proci+1));
}
inline Foam::label Foam::globalIndex::findProcBelow
(
const label proci,
const label i
) const
{
if
(
(proci <= 0) // Nothing below first proc
|| (proci >= offsets_.size()) // Invalid proci
|| (i >= offsets_[proci]) // Exclude proc-local and above
|| (i < offsets_[0]) // Beyond the begin
)
{
return -1;
}
// Restrict search to procs below proci
const labelList::subList slice(offsets_, proci);
return findLower(slice, i+1);
}
// Note: could split this into failsafe/non-failsafe versions
inline Foam::label Foam::globalIndex::whichProcID inline Foam::label Foam::globalIndex::whichProcID
( (
const label proci, const label proci,
const label i const label i
) const ) const
{ {
if (i < 0 || i >= totalSize()) label foundProc = findProc(proci, i);
if (foundProc < 0)
{ {
FatalErrorInFunction FatalErrorInFunction
<< "Global " << i << " does not belong on any processor." << "Global id:" << i << " does not belong on any processor." << nl
<< " Offsets:" << offsets_ << "Offsets:" << offsets_
<< abort(FatalError); << abort(FatalError);
} }
return isLocal(proci, i) ? proci : findLower(offsets_, i+1); return foundProc;
} }
@ -467,6 +561,16 @@ inline void Foam::globalIndex::reset
} }
inline void Foam::globalIndex::reset(const globalIndex& rhs)
{
if (this == &rhs)
{
return; // Self-assignment is a no-op
}
this->offsets_ = rhs.offsets_;
}
// * * * * * * * * * * * * * * * * Iterators * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * Iterators * * * * * * * * * * * * * * * * //
inline Foam::globalIndex::const_iterator:: inline Foam::globalIndex::const_iterator::

View File

@ -30,6 +30,42 @@ License
// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * // // * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * //
template<class Addr>
Foam::labelList
Foam::globalIndex::calcOffsets
(
const IndirectListBase<label, Addr>& counts,
const bool checkOverflow
)
{
labelList values;
const label len = counts.size();
if (len)
{
values.resize(len+1);
label start = 0;
for (label i = 0; i < len; ++i)
{
const label count = counts[i];
values[i] = start;
start += count;
if (checkOverflow && start < values[i])
{
reportOverflowAndExit(i, values[i], count);
}
}
values[len] = start;
}
return values;
}
template<class SubListType> template<class SubListType>
Foam::labelList Foam::labelList
Foam::globalIndex::calcListOffsets Foam::globalIndex::calcListOffsets
@ -49,12 +85,14 @@ Foam::globalIndex::calcListOffsets
label start = 0; label start = 0;
for (label i = 0; i < len; ++i) for (label i = 0; i < len; ++i)
{ {
const label count = lists[i].size();
values[i] = start; values[i] = start;
start += lists[i].size(); start += count;
if (checkOverflow && start < values[i]) if (checkOverflow && start < values[i])
{ {
reportOverflowAndExit(i); reportOverflowAndExit(i, values[i], count);
} }
} }
values[len] = start; values[len] = start;
@ -1005,12 +1043,12 @@ OutputContainer Foam::globalIndex::scatter
// The globalIndex might be correct on master only, // The globalIndex might be correct on master only,
// so scatter local sizes to ensure consistency // so scatter local sizes to ensure consistency
const label localLen const label count
( (
UPstream::listScatterValues<label>(this->localSizes(), comm) UPstream::listScatterValues<label>(this->localSizes(), comm)
); );
OutputContainer localData(localLen); OutputContainer localData(count);
this->scatter(allData, localData, tag, commsType, comm); this->scatter(allData, localData, tag, commsType, comm);
return localData; return localData;

View File

@ -63,17 +63,9 @@ void Foam::UPstream::mpiAllGather \
int count, \ int count, \
const label comm \ const label comm \
) \ ) \
{} {} \
\
Pstream_CommonRoutines(char); \
#undef Pstream_CommonRoutines
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native) \
void Foam::UPstream::gather \ void Foam::UPstream::gather \
( \ ( \
const Native* sendData, \ const Native* sendData, \

View File

@ -79,17 +79,8 @@ void Foam::UPstream::mpiAllGather \
allData, count, \ allData, count, \
TaggedType, comm \ TaggedType, comm \
); \ ); \
} } \
\
Pstream_CommonRoutines(char, MPI_BYTE);
#undef Pstream_CommonRoutines
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#undef Pstream_CommonRoutines
#define Pstream_CommonRoutines(Native, TaggedType) \
void Foam::UPstream::gather \ void Foam::UPstream::gather \
( \ ( \
const Native* sendData, \ const Native* sendData, \

View File

@ -507,7 +507,7 @@ void Foam::PstreamDetail::allToAllConsensus
if (!UPstream::is_rank(comm)) if (!UPstream::is_rank(comm))
{ {
return; return; // Process not in communicator
} }
const label myProci = UPstream::myProcNo(comm); const label myProci = UPstream::myProcNo(comm);
@ -539,11 +539,15 @@ void Foam::PstreamDetail::allToAllConsensus
if (!UPstream::is_parallel(comm)) if (!UPstream::is_parallel(comm))
{ {
// deep copy // Non-parallel : deep copy
recvData.deepCopy(sendData); recvData.deepCopy(sendData);
return; return;
} }
// Fake send/recv for myself
{
recvData[myProci] = sendData[myProci];
}
// Implementation description // Implementation description
// -------------------------- // --------------------------
@ -562,7 +566,10 @@ void Foam::PstreamDetail::allToAllConsensus
// This is because we are dealing with a flat list of entries to // This is because we are dealing with a flat list of entries to
// send and not a sparse Map etc. // send and not a sparse Map etc.
DynamicList<MPI_Request> sendRequests(sendData.size());
// ------------------------------------------------------------------------
// Setup sends
// ------------------------------------------------------------------------
profilingPstream::beginTiming(); profilingPstream::beginTiming();
@ -573,20 +580,16 @@ void Foam::PstreamDetail::allToAllConsensus
MPI_Barrier(PstreamGlobals::MPICommunicators_[comm]); MPI_Barrier(PstreamGlobals::MPICommunicators_[comm]);
} }
DynamicList<MPI_Request> sendRequests(sendData.size());
// Start nonblocking synchronous send to process dest // Start nonblocking synchronous send to destination rank
for (label proci = 0; proci < numProc; ++proci) for (label proci = 0; proci < numProc; ++proci)
{ {
if (sendData[proci] == zeroValue) if (sendData[proci] == zeroValue)
{ {
// Do not send/recv empty data // Do not send/recv empty data
} }
else if (proci == myProci) else if (proci != myProci)
{
// Do myself
recvData[proci] = sendData[proci];
}
else
{ {
// Has data to send // Has data to send
@ -604,7 +607,9 @@ void Foam::PstreamDetail::allToAllConsensus
} }
// ------------------------------------------------------------------------
// Probe and receive // Probe and receive
// ------------------------------------------------------------------------
MPI_Request barrierRequest; MPI_Request barrierRequest;
@ -721,22 +726,29 @@ void Foam::PstreamDetail::allToAllConsensus
} }
// Initial: clear out everything // Initial: clear out everything
const Type zeroValue = pTraits<Type>::zero;
recvBufs.clear(); recvBufs.clear();
if (!UPstream::is_parallel(comm)) // Fake send/recv for myself - parallel or non-parallel
{ {
// Do myself
const auto iter = sendBufs.find(myProci); const auto iter = sendBufs.find(myProci);
if (iter.good() && (iter.val() != zeroValue)) if (iter.good())
{ {
// Do myself: insert_or_assign // Do myself: insert_or_assign
recvBufs(iter.key()) = iter.val(); recvBufs(iter.key()) = iter.val();
} }
}
if (!UPstream::is_parallel(comm))
{
// Nothing left to do
return; return;
} }
// ------------------------------------------------------------------------
// Setup sends
// ------------------------------------------------------------------------
// Algorithm NBX: Nonblocking consensus // Algorithm NBX: Nonblocking consensus
// Implementation like above, but sending map data. // Implementation like above, but sending map data.
@ -752,7 +764,7 @@ void Foam::PstreamDetail::allToAllConsensus
} }
// Start nonblocking synchronous send to process dest // Start nonblocking synchronous send to destination ranks
// Same as forAllConstIters() // Same as forAllConstIters()
const auto endIter = sendBufs.cend(); const auto endIter = sendBufs.cend();
@ -761,19 +773,8 @@ void Foam::PstreamDetail::allToAllConsensus
const label proci = iter.key(); const label proci = iter.key();
const auto& sendData = iter.val(); const auto& sendData = iter.val();
if (sendData == zeroValue) if (proci != myProci && proci >= 0 && proci < numProc)
{ {
// Do not send/recv empty/zero data
}
else if (proci == myProci)
{
// Do myself: insert_or_assign
recvBufs(proci) = sendData;
}
else
{
// Has data to send
MPI_Issend MPI_Issend
( (
&sendData, &sendData,
@ -788,7 +789,9 @@ void Foam::PstreamDetail::allToAllConsensus
} }
// ------------------------------------------------------------------------
// Probe and receive // Probe and receive
// ------------------------------------------------------------------------
MPI_Request barrierRequest; MPI_Request barrierRequest;