ENH: remove blocking communication for gather patterns

ENH: eliminate unnecessary duplicate communicator

- in globalMeshData previously had a comm_dup hack to avoid clashes
  with deltaCoeffs calculations. However, this was largely due to a
  manual implementation of reduce() that used point-to-point
  communication. This has since been updated to use an MPI_Allreduce
  and now an MPI_Allgather, neither of which need this hack.
This commit is contained in:
Mark Olesen
2024-02-29 18:35:03 +01:00
parent b98f53ceca
commit 7006056eae
29 changed files with 272 additions and 595 deletions

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2015-2023 OpenCFD Ltd.
Copyright (C) 2015-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -30,6 +30,7 @@ License
#include "addToRunTimeSelectionTable.H"
#include "OSspecific.H"
#include "Fstream.H"
#include "SpanStream.H"
#include "volFields.H"
#include "globalIndex.H"
#include "fvMesh.H"
@ -110,15 +111,15 @@ void Foam::functionObjects::externalCoupled::readColumns
// Get sizes for all processors
const globalIndex globalFaces(globalIndex::gatherOnly{}, nRows);
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
if (Pstream::master())
if (UPstream::master())
{
// Read data from file and send to destination processor
auto& ifile = masterFilePtr();
string line;
// Read data from file and send to destination processor
for (const int proci : Pstream::allProcs())
for (const int proci : UPstream::allProcs())
{
// Temporary storage
List<scalarField> values(nColumns);
@ -136,9 +137,9 @@ void Foam::functionObjects::externalCoupled::readColumns
// Get a line
do
{
if (!masterFilePtr().good())
if (!ifile.good())
{
FatalIOErrorInFunction(masterFilePtr())
FatalIOErrorInFunction(ifile)
<< "Trying to read data for processor " << proci
<< " row " << rowi
<< ". Does your file have as many rows as there are"
@ -146,15 +147,15 @@ void Foam::functionObjects::externalCoupled::readColumns
<< ") ?" << exit(FatalIOError);
}
masterFilePtr().getLine(line);
ifile.getLine(line);
}
while (line.empty() || line[0] == '#');
IStringStream lineStr(line);
ISpanStream isstr(line);
for (label columni = 0; columni < nColumns; ++columni)
{
lineStr >> values[columni][rowi];
isstr >> values[columni][rowi];
}
}
@ -165,7 +166,7 @@ void Foam::functionObjects::externalCoupled::readColumns
}
pBufs.finishedScatters();
// Get scattered data from PstreamBuffers
// Get scattered data
UIPstream fromMaster(UPstream::masterNo(), pBufs);
fromMaster >> data;
}
@ -175,21 +176,21 @@ void Foam::functionObjects::externalCoupled::readLines
(
const label nRows,
autoPtr<IFstream>& masterFilePtr,
OStringStream& lines
std::string& lines
) const
{
// Get sizes for all processors
const globalIndex globalFaces(globalIndex::gatherOnly{}, nRows);
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
PstreamBuffers pBufs(UPstream::commsTypes::nonBlocking);
if (Pstream::master())
if (UPstream::master())
{
// Read line from file and send to destination processor
auto& ifile = masterFilePtr();
string line;
// Read line from file and send to destination processor
for (const int proci : Pstream::allProcs())
for (const int proci : UPstream::allProcs())
{
// Number of rows to read for processor proci
const label procNRows = globalFaces.localSize(proci);
@ -201,9 +202,9 @@ void Foam::functionObjects::externalCoupled::readLines
// Get a line
do
{
if (!masterFilePtr().good())
if (!ifile.good())
{
FatalIOErrorInFunction(masterFilePtr())
FatalIOErrorInFunction(ifile)
<< "Trying to read data for processor " << proci
<< " row " << rowi
<< ". Does your file have as many rows as there are"
@ -211,11 +212,11 @@ void Foam::functionObjects::externalCoupled::readLines
<< ") ?" << exit(FatalIOError);
}
masterFilePtr().getLine(line);
ifile.getLine(line);
}
while (line.empty() || line[0] == '#');
// Send line to the destination processor
// Send line (without newline) to the destination processor
toProc << line;
}
}
@ -223,12 +224,16 @@ void Foam::functionObjects::externalCoupled::readLines
pBufs.finishedScatters();
// Get scattered data from PstreamBuffers
// Sizing is approximate (slightly too large)
lines.reserve(pBufs.recvDataCount(UPstream::masterNo()));
// Get scattered data
UIPstream fromMaster(UPstream::masterNo(), pBufs);
for (label rowi = 0; rowi < nRows; ++rowi)
{
string line(fromMaster);
lines << line.c_str() << nl;
lines += line;
lines += '\n';
}
}
@ -253,9 +258,9 @@ void Foam::functionObjects::externalCoupled::writeGeometry
autoPtr<OFstream> osPointsPtr;
autoPtr<OFstream> osFacesPtr;
if (Pstream::master())
if (UPstream::master())
{
mkDir(dir);
Foam::mkDir(dir);
osPointsPtr.reset(new OFstream(dir/"patchPoints"));
osFacesPtr.reset(new OFstream(dir/"patchFaces"));
@ -278,7 +283,7 @@ void Foam::functionObjects::externalCoupled::writeGeometry
(
mesh.boundaryMesh().patchSet
(
wordRes(one{}, groupName)
wordRes(Foam::one{}, groupName)
).sortedToc()
);
@ -303,13 +308,13 @@ void Foam::functionObjects::externalCoupled::writeGeometry
List<faceList> collectedFaces(Pstream::nProcs());
faceList& patchFaces = collectedFaces[proci];
patchFaces = p.localFaces();
forAll(patchFaces, facei)
for (auto& f : patchFaces)
{
inplaceRenumber(pointToGlobal, patchFaces[facei]);
inplaceRenumber(pointToGlobal, f);
}
Pstream::gatherList(collectedFaces);
if (Pstream::master())
if (UPstream::master())
{
allPoints.clear();
allFaces.clear();

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2015-2020 OpenCFD Ltd.
Copyright (C) 2015-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -167,8 +167,8 @@ SourceFiles
\*---------------------------------------------------------------------------*/
#ifndef functionObjects_externalCoupled_H
#define functionObjects_externalCoupled_H
#ifndef Foam_functionObjects_externalCoupled_H
#define Foam_functionObjects_externalCoupled_H
#include "timeFunctionObject.H"
#include "externalFileCoupler.H"
@ -198,7 +198,6 @@ class externalCoupled
public functionObjects::timeFunctionObject,
public externalFileCoupler
{
// Private Member Data
//- Calculation frequency
@ -259,8 +258,9 @@ class externalCoupled
void initCoupling();
//- Read (and distribute) scalar columns from stream. Every processor
// gets nRows (= patch size) of these. Note: could make its argument
//- Read (and distribute) scalar columns from stream.
//- Every processor gets nRows (= patch size) of these.
// Note: could make its argument
// ISstream& but then would need additional logic to construct valid
// stream on all processors.
void readColumns
@ -271,21 +271,17 @@ class externalCoupled
List<scalarField>& data
) const;
//- Read (and distribute) lines from stream. Every processor
// gets nRows (= patch size) of these. Data kept as stream (instead
// of strings) for ease of interfacing to readData routines that take
// an Istream.
//- Read (and distribute) lines from stream.
//- Every processor gets nRows (= patch size) of these
//- (newline terminated).
void readLines
(
const label nRows,
autoPtr<IFstream>& masterFilePtr,
OStringStream& data
//! [out] the nRows lines read (for this rank) - newline terminated
std::string& lines
) const;
//- Helper: append data from all processors onto master
template<class Type>
static tmp<Field<Type>> gatherAndCombine(const Field<Type>& fld);
static void checkOrder(const wordList& regionNames);
//- Perform the coupling with necessary initialization etc.

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2015-2021 OpenCFD Ltd.
Copyright (C) 2015-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -33,6 +33,7 @@ License
#include "mixedFvPatchFields.H"
#include "fixedGradientFvPatchFields.H"
#include "fixedValueFvPatchFields.H"
#include "SpanStream.H"
#include "StringStream.H"
#include "globalIndex.H"
@ -58,7 +59,7 @@ bool Foam::functionObjects::externalCoupled::readData
// File only opened on master; contains data for all processors, for all
// patchIDs.
autoPtr<IFstream> masterFilePtr;
if (Pstream::master())
if (UPstream::master())
{
const fileName transferFile
(
@ -80,13 +81,13 @@ bool Foam::functionObjects::externalCoupled::readData
}
const wordRes patchSelection(one{}, groupName);
const wordRes patchSelection(Foam::one{}, groupName);
label nFound = 0;
for (const fvMesh& mesh : meshes)
{
const volFieldType* vfptr = mesh.findObject<volFieldType>(fieldName);
auto* vfptr = mesh.getObjectPtr<volFieldType>(fieldName);
if (!vfptr)
{
@ -94,8 +95,7 @@ bool Foam::functionObjects::externalCoupled::readData
}
++nFound;
typename volFieldType::Boundary& bf =
const_cast<volFieldType*>(vfptr)->boundaryFieldRef();
auto& bf = vfptr->boundaryFieldRef();
// Get the patches
const labelList patchIDs
@ -117,16 +117,18 @@ bool Foam::functionObjects::externalCoupled::readData
);
// Read from master into local stream
OStringStream os;
std::string lines;
readLines
(
bf[patchi].size(), // number of lines to read
bf[patchi].size(), // number of lines to read
masterFilePtr,
os
lines
);
ISpanStream isstr(lines);
// Pass responsibility for all reading over to bc
pf.readData(IStringStream(os.str())());
pf.readData(isstr);
// Update the value from the read coefficient. Bypass any
// additional processing by derived type.
@ -265,51 +267,6 @@ bool Foam::functionObjects::externalCoupled::readData
}
template<class Type>
Foam::tmp<Foam::Field<Type>>
Foam::functionObjects::externalCoupled::gatherAndCombine
(
const Field<Type>& fld
)
{
// Collect values from all processors
List<Field<Type>> gatheredValues(Pstream::nProcs());
gatheredValues[Pstream::myProcNo()] = fld;
Pstream::gatherList(gatheredValues);
auto tresult = tmp<Field<Type>>::New();
auto& result = tresult.ref();
if (Pstream::master())
{
// Combine values into single field
label globalElemi = 0;
forAll(gatheredValues, lsti)
{
globalElemi += gatheredValues[lsti].size();
}
result.setSize(globalElemi);
globalElemi = 0;
forAll(gatheredValues, lsti)
{
const Field<Type>& sub = gatheredValues[lsti];
forAll(sub, elemi)
{
result[globalElemi++] = sub[elemi];
}
}
}
return tresult;
}
template<class Type>
bool Foam::functionObjects::externalCoupled::writeData
(
@ -352,14 +309,14 @@ bool Foam::functionObjects::externalCoupled::writeData
}
const wordRes patchSelection(one{}, groupName);
const wordRes patchSelection(Foam::one{}, groupName);
bool headerDone = false;
label nFound = 0;
for (const fvMesh& mesh : meshes)
{
const volFieldType* vfptr = mesh.findObject<volFieldType>(fieldName);
const auto* vfptr = mesh.getObjectPtr<volFieldType>(fieldName);
if (!vfptr)
{
@ -367,8 +324,7 @@ bool Foam::functionObjects::externalCoupled::writeData
}
++nFound;
const typename volFieldType::Boundary& bf =
vfptr->boundaryField();
const auto& bf = vfptr->boundaryField();
// Get the patches
const labelList patchIDs
@ -379,7 +335,7 @@ bool Foam::functionObjects::externalCoupled::writeData
// Handle column-wise writing of patch data. Supports most easy types
for (const label patchi : patchIDs)
{
const globalIndex globalFaces(bf[patchi].size());
// const globalIndex globalFaces(bf[patchi].size());
if (isA<patchFieldType>(bf[patchi]))
{
@ -397,7 +353,7 @@ bool Foam::functionObjects::externalCoupled::writeData
// Collect contributions from all processors and output them on
// master
if (Pstream::master())
if (UPstream::master())
{
// Output master data first
if (!headerDone)
@ -407,27 +363,17 @@ bool Foam::functionObjects::externalCoupled::writeData
}
masterFilePtr() << os.str().c_str();
for (const int proci : Pstream::subProcs())
for (const int proci : UPstream::subProcs())
{
IPstream fromSlave
(
Pstream::commsTypes::scheduled,
proci
);
string str;
IPstream::recv(str, proci);
string str(fromSlave);
masterFilePtr() << str.c_str();
}
}
else
{
OPstream toMaster
(
Pstream::commsTypes::scheduled,
Pstream::masterNo()
);
toMaster << os.str();
OPstream::send(os.str(), UPstream::masterNo());
}
}
else if (isA<mixedFvPatchField<Type>>(bf[patchi]))
@ -435,15 +381,21 @@ bool Foam::functionObjects::externalCoupled::writeData
const mixedFvPatchField<Type>& pf =
refCast<const mixedFvPatchField<Type>>(bf[patchi]);
Field<Type> value(gatherAndCombine(pf));
Field<Type> snGrad(gatherAndCombine(pf.snGrad()()));
Field<Type> refValue(gatherAndCombine(pf.refValue()));
Field<Type> refGrad(gatherAndCombine(pf.refGrad()));
scalarField valueFraction(gatherAndCombine(pf.valueFraction()));
const globalIndex glob
(
globalIndex::gatherOnly{},
pf.size()
);
if (Pstream::master())
Field<Type> value(glob.gather(pf));
Field<Type> snGrad(glob.gather(pf.snGrad()()));
Field<Type> refValue(glob.gather(pf.refValue()));
Field<Type> refGrad(glob.gather(pf.refGrad()));
scalarField valueFraction(glob.gather(pf.valueFraction()));
if (UPstream::master())
{
forAll(refValue, facei)
forAll(value, facei)
{
masterFilePtr()
<< value[facei] << token::SPACE
@ -457,9 +409,17 @@ bool Foam::functionObjects::externalCoupled::writeData
else
{
// Output the value and snGrad
Field<Type> value(gatherAndCombine(bf[patchi]));
Field<Type> snGrad(gatherAndCombine(bf[patchi].snGrad()()));
if (Pstream::master())
const globalIndex glob
(
globalIndex::gatherOnly{},
bf[patchi].size()
);
Field<Type> value(glob.gather(bf[patchi]));
Field<Type> snGrad(glob.gather(bf[patchi].snGrad()()));
if (UPstream::master())
{
forAll(value, facei)
{

View File

@ -5,7 +5,7 @@
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2018-2022 OpenCFD Ltd.
Copyright (C) 2018-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -77,7 +77,7 @@ void Foam::functionObjects::dataCloud::writeListParallel
const List<Type>& field
)
{
if (Pstream::master())
if (UPstream::master())
{
writeList(os, points, field);
@ -85,9 +85,9 @@ void Foam::functionObjects::dataCloud::writeListParallel
Field<Type> recvField;
// Receive and write
for (const int proci : Pstream::subProcs())
for (const int proci : UPstream::subProcs())
{
IPstream fromProc(Pstream::commsTypes::blocking, proci);
IPstream fromProc(UPstream::commsTypes::scheduled, proci);
fromProc >> recvPoints >> recvField;
@ -99,8 +99,8 @@ void Foam::functionObjects::dataCloud::writeListParallel
// Send to master
OPstream toMaster
(
Pstream::commsTypes::blocking,
Pstream::masterNo()
UPstream::commsTypes::scheduled,
UPstream::masterNo()
);
toMaster
@ -134,7 +134,7 @@ void Foam::functionObjects::dataCloud::writeListParallel
const bitSet& selected
)
{
if (Pstream::master())
if (UPstream::master())
{
writeList(os, points, field, selected);
@ -142,9 +142,9 @@ void Foam::functionObjects::dataCloud::writeListParallel
Field<Type> recvField;
// Receive and write
for (const int proci : Pstream::subProcs())
for (const int proci : UPstream::subProcs())
{
IPstream fromProc(Pstream::commsTypes::blocking, proci);
IPstream fromProc(UPstream::commsTypes::scheduled, proci);
fromProc >> recvPoints >> recvField;
@ -156,8 +156,8 @@ void Foam::functionObjects::dataCloud::writeListParallel
// Send to master
OPstream toMaster
(
Pstream::commsTypes::blocking,
Pstream::masterNo()
UPstream::commsTypes::scheduled,
UPstream::masterNo()
);
toMaster
@ -195,7 +195,7 @@ bool Foam::functionObjects::dataCloud::writeField
autoPtr<OFstream> osPtr;
if (Pstream::master())
if (UPstream::master())
{
osPtr.reset(new OFstream(outputName));
osPtr->precision(precision_);