ENH: allow configurable field send/receive for surfaceNoise (#2639)

- replaced PstreamBuffers mechanism with globalIndex for both gather
  and scatter operations. Use scheduled communication by default, but
  is selectable.

- reduced communication with ensemble averaging and no-write
This commit is contained in:
Mark Olesen
2022-11-22 17:07:40 +01:00
parent 9114e01de9
commit d69ac516e8
5 changed files with 255 additions and 224 deletions

View File

@ -735,7 +735,7 @@ bool Foam::noiseModel::read(const dictionary& dict)
);
}
Info<< nl << endl;
Info<< endl;
return true;
}

View File

@ -74,7 +74,8 @@ void pointNoise::processData
const Function1Types::CSV<scalar>& data
)
{
Info<< "Reading data file " << data.fName() << endl;
Info<< "Reading data file: "
<< fileObr_.time().relativePath(data.fName()) << endl;
const word fNameBase(data.fName().stem());

View File

@ -28,6 +28,7 @@ License
#include "surfaceNoise.H"
#include "surfaceReader.H"
#include "surfaceWriter.H"
#include "globalIndex.H"
#include "argList.H"
#include "addToRunTimeSelectionTable.H"
@ -47,7 +48,8 @@ addToRunTimeSelectionTable(noiseModel, surfaceNoise, dictionary);
void surfaceNoise::initialise(const fileName& fName)
{
Info<< "Reading data file " << fName << endl;
Info<< "Reading data file: "
<< fileObr_.time().relativePath(fName) << endl;
instantList allTimes;
label nAvailableTimes = 0;
@ -107,7 +109,8 @@ void surfaceNoise::initialise(const fileName& fName)
// Read the surface geometry
// Note: hard-coded to read mesh from first time index
const meshedSurface& surf = readerPtr_->geometry(0);
nFace_ = surf.nFaces();
nFaces_ = surf.nFaces();
}
Pstream::broadcasts
@ -115,14 +118,14 @@ void surfaceNoise::initialise(const fileName& fName)
UPstream::worldComm,
times_,
deltaT_,
nFace_
nFaces_
);
}
void surfaceNoise::readSurfaceData
(
const labelList& procFaceOffset,
const globalIndex& procFaceAddr,
List<scalarField>& pData
)
{
@ -131,12 +134,23 @@ void surfaceNoise::readSurfaceData
// surface face. In serial mode, this results in all pressure data being
// loaded into memory (!)
Info << "Reading pressure data" << endl;
const label nLocalFace = procFaceAddr.localSize();
// Complete pressure time history data for subset of faces
pData.resize_nocopy(nLocalFace);
const label nTimes = times_.size();
for (scalarField& pf : pData)
{
pf.resize_nocopy(nTimes);
}
Info<< "Reading pressure data" << endl;
// Master only
scalarField allData;
if (Pstream::parRun())
{
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Procedure:
// 1. Master processor reads pressure data for all faces for all times
// 2. Master sends each processor a subset of faces
@ -144,96 +158,89 @@ void surfaceNoise::readSurfaceData
// of faces
// Note: reading all data on master to avoid potential NFS problems...
const label myProcI = Pstream::myProcNo();
const label nLocalFace =
procFaceOffset[myProcI + 1] - procFaceOffset[myProcI];
scalarField scratch;
// Complete pressure time history data for subset of faces
pData.setSize(nLocalFace);
const label nTimes = times_.size();
for (scalarField& pf : pData)
if (!useBroadcast_)
{
pf.setSize(nTimes);
scratch.resize(nLocalFace);
}
// Read and send data
forAll(times_, i)
// Read data and send to sub-ranks
forAll(times_, timei)
{
pBufs.clear();
const label fileTimeIndex = timei + startTimeIndex_;
if (Pstream::master())
{
label timeI = i + startTimeIndex_;
Info<< " time: " << times_[i] << endl;
Info<< " time: " << times_[timei] << endl;
// Read pressure at all faces for time timeI
scalarField p(readerPtr_->field(timeI, pIndex_, scalar(0)));
// Apply conversions
p *= rhoRef_;
// Send subset of faces to each processor
for (const int procI : Pstream::allProcs())
{
label face0 = procFaceOffset[procI];
label nLocalFace =
procFaceOffset[procI + 1] - procFaceOffset[procI];
UOPstream toProc(procI, pBufs);
toProc << SubList<scalar>(p, nLocalFace, face0);
}
allData = readerPtr_->field(fileTimeIndex, pIndex_, scalar(0));
}
pBufs.finishedScatters();
// Receive data from the master
UIPstream fromProc(Pstream::masterNo(), pBufs);
scalarList pSlice(fromProc);
forAll(pSlice, faceI)
if (useBroadcast_)
{
pData[faceI][i] = pSlice[faceI];
Pstream::broadcast(allData);
}
else
{
procFaceAddr.scatter
(
allData,
scratch,
UPstream::msgType(),
commType_,
UPstream::worldComm
);
}
}
forAll(pData, faceI)
{
pData[faceI] -= average(pData[faceI]);
scalarField::subField procData =
(
useBroadcast_
? allData.slice(procFaceAddr.range())
: scratch.slice(0, nLocalFace)
);
// Apply conversions
procData *= rhoRef_;
// Transcribe this time snapshot (transpose)
forAll(procData, facei)
{
pData[facei][timei] = procData[facei];
}
}
}
else
{
const label nLocalFace = procFaceOffset[0];
pData.setSize(nLocalFace);
for (scalarField& pf : pData)
// Read data - no sub-ranks
forAll(times_, timei)
{
pf.setSize(times_.size());
}
const label fileTimeIndex = timei + startTimeIndex_;
forAll(times_, i)
{
label timeI = i + startTimeIndex_;
Info<< " time: " << times_[timei] << endl;
Info<< " time: " << times_[i] << endl;
scalarField p(readerPtr_->field(timeI, pIndex_, scalar(0)));
allData = readerPtr_->field(fileTimeIndex, pIndex_, scalar(0));
auto& procData = allData;
// Apply conversions
p *= rhoRef_;
procData *= rhoRef_;
forAll(p, faceI)
// Transcribe this time snapshot (transpose)
forAll(procData, facei)
{
pData[faceI][i] = p[faceI];
pData[facei][timei] = procData[facei];
}
}
forAll(pData, faceI)
{
pData[faceI] -= average(pData[faceI]);
}
}
forAll(pData, facei)
{
pData[facei] -= average(pData[facei]);
}
Info<< "Read "
<< returnReduce(pData.size(), sumOp<label>())
<< " pressure traces with "
@ -242,6 +249,69 @@ void surfaceNoise::readSurfaceData
}
scalar surfaceNoise::surfaceAverage
(
const scalarField& data,
const globalIndex& procFaceAddr
) const
{
if (!nFaces_)
{
// Already reduced, can use as sanity check
return 0;
}
scalar areaAverage = 0;
if (areaAverage_)
{
if (Pstream::parRun())
{
// Collect the surface data so that we can output the surfaces
scalarField allData;
procFaceAddr.gather
(
data,
allData,
UPstream::msgType(),
commType_,
UPstream::worldComm
);
if (Pstream::master())
{
// Note: hard-coded to read mesh from first time index
const meshedSurface& surf = readerPtr_->geometry(0);
areaAverage = sum(allData*surf.magSf())/sum(surf.magSf());
}
}
else
{
// Note: hard-coded to read mesh from first time index
const meshedSurface& surf = readerPtr_->geometry(0);
areaAverage = sum(data*surf.magSf())/sum(surf.magSf());
}
Pstream::broadcast(areaAverage);
}
else
{
// Ensemble averaged
// - same as gAverage, but already know number of faces
areaAverage = sum(data);
reduce(areaAverage, sumOp<scalar>());
areaAverage /= (scalar(nFaces_) + ROOTVSMALL);
}
return areaAverage;
}
scalar surfaceNoise::writeSurfaceData
(
const fileName& outDirBase,
@ -249,7 +319,7 @@ scalar surfaceNoise::writeSurfaceData
const word& title,
const scalar freq,
const scalarField& data,
const labelList& procFaceOffset,
const globalIndex& procFaceAddr,
const bool writeSurface
) const
{
@ -257,43 +327,39 @@ scalar surfaceNoise::writeSurfaceData
const instant freqInst(freq, Foam::name(freq));
if (!writeSurface)
{
return surfaceAverage(data, procFaceAddr);
}
scalar areaAverage = 0;
if (Pstream::parRun())
{
// Collect the surface data so that we can output the surfaces
scalarField allData;
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
procFaceAddr.gather
(
data,
allData,
UPstream::msgType(),
commType_,
UPstream::worldComm
);
if (!Pstream::master())
{
UOPstream toProc(Pstream::masterNo(), pBufs);
toProc << data;
}
pBufs.finishedGathers();
scalar areaAverage = 0;
if (Pstream::master())
{
// Note: hard-coded to read mesh from first time index
const meshedSurface& surf = readerPtr_->geometry(0);
scalarField allData(surf.size());
forAll(data, faceI)
if (areaAverage_)
{
// Master procFaceOffset is zero...
allData[faceI] = data[faceI];
areaAverage = sum(allData*surf.magSf())/sum(surf.magSf());
}
for (const int procI : Pstream::subProcs())
else
{
UIPstream fromProc(procI, pBufs);
scalarList dataSlice(fromProc);
forAll(dataSlice, i)
{
label faceI = procFaceOffset[procI] + i;
allData[faceI] = dataSlice[i];
}
areaAverage = sum(allData)/(allData.size() + ROOTVSMALL);
}
if (writeSurface)
@ -315,25 +381,22 @@ scalar surfaceNoise::writeSurfaceData
writerPtr_->endTime();
writerPtr_->clear();
}
if (areaAverage_)
{
areaAverage = sum(allData*surf.magSf())/sum(surf.magSf());
}
else
{
areaAverage = sum(allData)/(allData.size() + ROOTVSMALL);
}
}
Pstream::broadcast(areaAverage);
return areaAverage;
}
else
{
// Note: hard-coded to read mesh from first time index
const meshedSurface& surf = readerPtr_->geometry(0);
if (areaAverage_)
{
areaAverage = sum(data*surf.magSf())/sum(surf.magSf());
}
else
{
areaAverage = sum(data)/(data.size() + ROOTVSMALL);
}
if (writeSurface)
{
// Time-aware, with time spliced into the output path
@ -353,88 +416,10 @@ scalar surfaceNoise::writeSurfaceData
writerPtr_->endTime();
writerPtr_->clear();
}
if (areaAverage_)
{
return sum(data*surf.magSf())/sum(surf.magSf());
}
else
{
return sum(data)/(data.size() + ROOTVSMALL);
}
}
}
scalar surfaceNoise::surfaceAverage
(
const scalarField& data,
const labelList& procFaceOffset
) const
{
if (Pstream::parRun())
{
// Collect the surface data so that we can output the surfaces
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
if (!Pstream::master())
{
UOPstream toProc(Pstream::masterNo(), pBufs);
toProc << data;
}
pBufs.finishedGathers();
scalar areaAverage = 0;
if (Pstream::master())
{
// Note: hard-coded to read mesh from first time index
const meshedSurface& surf = readerPtr_->geometry(0);
scalarField allData(surf.size());
forAll(data, faceI)
{
// Master procFaceOffset is zero...
allData[faceI] = data[faceI];
}
for (const int procI : Pstream::subProcs())
{
UIPstream fromProc(procI, pBufs);
scalarList dataSlice(fromProc);
forAll(dataSlice, i)
{
label faceI = procFaceOffset[procI] + i;
allData[faceI] = dataSlice[i];
}
}
if (areaAverage_)
{
areaAverage = sum(allData*surf.magSf())/sum(surf.magSf());
}
else
{
areaAverage = sum(allData)/allData.size();
}
}
Pstream::broadcast(areaAverage);
return areaAverage;
}
else
{
if (areaAverage_)
{
// Note: hard-coded to read mesh from first time index
const meshedSurface& surf = readerPtr_->geometry(0);
return sum(data*surf.magSf())/sum(surf.magSf());
}
return sum(data)/data.size();
}
Pstream::broadcast(areaAverage);
return areaAverage;
}
@ -455,10 +440,12 @@ surfaceNoise::surfaceNoise
times_(),
deltaT_(0),
startTimeIndex_(0),
nFace_(0),
nFaces_(0),
fftWriteInterval_(1),
areaAverage_(false),
readerType_(word::null),
useBroadcast_(false),
commType_(UPstream::commsTypes::scheduled),
readerType_(),
readerPtr_(nullptr),
writerPtr_(nullptr)
{
@ -484,7 +471,8 @@ bool surfaceNoise::read(const dictionary& dict)
dict.readIfPresent("p", pName_);
dict.readIfPresent("fftWriteInterval", fftWriteInterval_);
Info<< " Pressure field name: " << pName_ << nl
Info<< this->type() << nl
<< " Pressure field name: " << pName_ << nl
<< " FFT write interval: " << fftWriteInterval_ << nl;
dict.readIfPresent("areaAverage", areaAverage_);
@ -498,6 +486,22 @@ bool surfaceNoise::read(const dictionary& dict)
Info<< " Averaging: ensemble" << endl;
}
useBroadcast_ = false;
dict.readIfPresent("broadcast", useBroadcast_);
UPstream::commsTypeNames.readIfPresent("commsType", dict, commType_);
if (Pstream::parRun())
{
Info<< " Distribute fields: "
<< UPstream::commsTypeNames[commType_];
if (useBroadcast_)
{
Info<< " (broadcast all)";
}
Info<< endl;
}
readerType_ = dict.get<word>("reader");
// Surface writer (keywords: writer, writeOptions)
@ -513,7 +517,7 @@ bool surfaceNoise::read(const dictionary& dict)
// Use outputDir/TIME/surface-name
writerPtr_->useTimeDir(true);
Info << endl;
Info<< endl;
return true;
}
@ -536,29 +540,36 @@ void surfaceNoise::calculate()
initialise(fName);
// Container for pressure time history data per face
List<scalarField> pData;
// Processor face addressing
globalIndex procFaceAddr;
// Processor procFaceOffsets
labelList procFaceOffset;
if (Pstream::parRun())
{
const label nProcs = Pstream::nProcs();
const label nFacePerProc = floor(nFace_/nProcs) + 1;
// Calculate face/proc offsets manually
labelList procFaceOffsets(Pstream::nProcs() + 1);
const label nFacePerProc = floor(nFaces_/Pstream::nProcs()) + 1;
procFaceOffset.setSize(nProcs + 1, 0);
for (label i = 1; i < procFaceOffset.size(); ++i)
procFaceOffsets[0] = 0;
for (label proci = 1; proci < procFaceOffsets.size(); ++proci)
{
procFaceOffset[i] = min(i*nFacePerProc, nFace_);
procFaceOffsets[proci] = min(proci*nFacePerProc, nFaces_);
}
procFaceAddr.offsets() = std::move(procFaceOffsets);
// Don't need to broadcast. Already identical on all ranks
}
else
{
procFaceOffset.setSize(1, nFace_);
// Local data only
procFaceAddr.reset(globalIndex::gatherNone{}, nFaces_);
}
// Pressure time history data per face
List<scalarField> pData;
// Read pressure data from file
readSurfaceData(procFaceOffset, pData);
readSurfaceData(procFaceAddr, pData);
// Process the pressure data, and store results as surface values per
// frequency so that it can be output using the surface writer
@ -567,9 +578,11 @@ void surfaceNoise::calculate()
const scalarField freq1(uniformFrequencies(deltaT_, true));
const scalar maxFreq1 = max(freq1);
// Reset desired frequency range if outside actual frequency range
fLower_ = min(fLower_, max(freq1));
fUpper_ = min(fUpper_, max(freq1));
fLower_ = min(fLower_, maxFreq1);
fUpper_ = min(fUpper_, maxFreq1);
// Storage for FFT data
const label nLocalFace = pData.size();
@ -682,6 +695,7 @@ void surfaceNoise::calculate()
}
Pstream::broadcast(surfArea);
Pstream::broadcast(surfSize);
List<Tuple2<string, token>> commonInfo =
{
{"Area average", token(word(Switch::name(areaAverage_)))},
@ -725,7 +739,7 @@ void surfaceNoise::calculate()
"Prmsf",
freq1[freqI],
surfPrmsf[i + f0],
procFaceOffset,
procFaceAddr,
writePrmsf_
);
@ -736,7 +750,7 @@ void surfaceNoise::calculate()
"PSDf",
freq1[freqI],
surfPSDf[i + f0],
procFaceOffset,
procFaceAddr,
writePSDf_
);
writeSurfaceData
@ -746,7 +760,7 @@ void surfaceNoise::calculate()
"PSD",
freq1[freqI],
PSD(surfPSDf[i + f0]),
procFaceOffset,
procFaceAddr,
writePSD_
);
writeSurfaceData
@ -756,7 +770,7 @@ void surfaceNoise::calculate()
"SPL",
freq1[freqI],
SPL(surfPSDf[i + f0]*deltaf, freq1[freqI]),
procFaceOffset,
procFaceAddr,
writeSPL_
);
}
@ -839,12 +853,12 @@ void surfaceNoise::calculate()
"SPL13",
octave13FreqCentre[i],
SPL(surfPrms13f[i], octave13FreqCentre[i]),
procFaceOffset,
procFaceAddr,
writeOctaves_
);
Prms13fAve[i] =
surfaceAverage(surfPrms13f[i], procFaceOffset);
surfaceAverage(surfPrms13f[i], procFaceAddr);
}
if (Pstream::master())

View File

@ -68,7 +68,7 @@ Description
reader ensight;
// Surface writer
writer ensight;
writer ensight; // none
// Collate times for ensight output - ensures geometry is only written once
writeOptions
@ -96,21 +96,27 @@ Description
\endverbatim
Communication Options
\table
Property | Description | Type | Req'd | Dflt
commsType | Communication type for send/receive field | word | no | scheduled
broadcast | (Experimental) broadcast all fields | bool | no | false
\endtable
SourceFiles
surfaceNoise.C
SeeAlso
noiseModel.H
\*---------------------------------------------------------------------------*/
#ifndef Foam_noiseModels_surfaceNoise_H
#define Foam_noiseModels_surfaceNoise_H
#include "noiseModel.H"
#include "labelList.H"
#include "scalarField.H"
#include "UPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -118,6 +124,7 @@ namespace Foam
{
// Forward Declarations
class globalIndex;
class surfaceReader;
class surfaceWriter;
@ -154,8 +161,8 @@ protected:
//- Start time index
label startTimeIndex_;
//- Number of surface faces
label nFace_;
//- Global number of surface faces
label nFaces_;
//- Frequency data output interval, default = 1
// nSamples/2 data points are returned from the FFT, which can
@ -166,6 +173,12 @@ protected:
//- compatibility
bool areaAverage_;
//- Use broadcast to send entire field to sub-ranks
bool useBroadcast_;
//- Communication type (for sending/receiving fields)
UPstream::commsTypes commType_;
//- Reader type
word readerType_;
@ -184,10 +197,17 @@ protected:
//- Read surface data
void readSurfaceData
(
const labelList& procFaceOffset,
const globalIndex& procFaceAddr,
List<scalarField>& pData
);
//- Calculate the area average value
scalar surfaceAverage
(
const scalarField& data,
const globalIndex& procFaceAddr
) const;
//- Write surface data to file
// Returns the area average value
scalar writeSurfaceData
@ -197,17 +217,10 @@ protected:
const word& title,
const scalar freq,
const scalarField& data,
const labelList& procFaceOffset,
const globalIndex& procFaceAddr,
const bool writeSurface
) const;
//- Calculate the area average value
scalar surfaceAverage
(
const scalarField& data,
const labelList& procFaceOffset
) const;
public: