STYLE: use send/recv serializers for globalIndex and mapDistribute etc

This commit is contained in:
Mark Olesen
2024-02-19 11:50:20 +01:00
parent 4f43f0302d
commit 04d880e2ce
12 changed files with 87 additions and 308 deletions

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2016-2022 OpenCFD Ltd.
Copyright (C) 2016-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -568,12 +568,8 @@ void syncPoints
}
}
OPstream toNbr
(
Pstream::commsTypes::blocking,
procPatch.neighbProcNo()
);
toNbr << patchInfo;
// buffered send
OPstream::bsend(patchInfo, procPatch.neighbProcNo());
}
}
@ -587,17 +583,12 @@ void syncPoints
if (pp.nPoints() && !procPatch.owner())
{
pointField nbrPatchInfo(procPatch.nPoints());
{
pointField nbrPatchInfo;
// We do not know the number of points on the other side
// so cannot use UIPstream::read
IPstream fromNbr
(
Pstream::commsTypes::blocking,
procPatch.neighbProcNo()
);
fromNbr >> nbrPatchInfo;
}
IPstream::recv(nbrPatchInfo, procPatch.neighbProcNo());
// Null any value which is not on neighbouring processor
nbrPatchInfo.setSize(procPatch.nPoints(), nullValue);

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2015-2023 OpenCFD Ltd.
Copyright (C) 2015-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -379,11 +379,10 @@ void getInterfaceSizes
if (Pstream::master())
{
// Receive and add to my sizes
for (const int slave : Pstream::subProcs())
for (const int proci : UPstream::subProcs())
{
IPstream fromSlave(Pstream::commsTypes::blocking, slave);
EdgeMap<Map<label>> slaveSizes(fromSlave);
EdgeMap<Map<label>> slaveSizes;
IPstream::recv(slaveSizes, proci);
forAllConstIters(slaveSizes, slaveIter)
{
@ -421,15 +420,8 @@ void getInterfaceSizes
}
else
{
// Send to master
{
OPstream toMaster
(
Pstream::commsTypes::blocking,
Pstream::masterNo()
);
toMaster << regionsToSize;
}
// buffered send to master
OPstream::bsend(regionsToSize, UPstream::masterNo());
}
}

View File

@ -36,8 +36,8 @@ Description
\*---------------------------------------------------------------------------*/
#include "OPstream.H"
#include "IPstream.H"
#include "OPstream.H"
#include "IOstreams.H"
#include "contiguous.H"
@ -129,15 +129,7 @@ void Foam::Pstream::combineGather
}
else
{
OPstream toAbove
(
UPstream::commsTypes::scheduled,
myComm.above(),
0, // bufsize
tag,
comm
);
toAbove << value;
OPstream::send(value, myComm.above(), tag, comm);
}
}
}
@ -255,15 +247,7 @@ void Foam::Pstream::listCombineGather
}
else
{
OPstream toAbove
(
UPstream::commsTypes::scheduled,
myComm.above(),
0, // bufsize
tag,
comm
);
toAbove << values;
OPstream::send(values, myComm.above(), tag, comm);
}
}
}
@ -357,15 +341,7 @@ void Foam::Pstream::mapCombineGather
<< " data:" << values << endl;
}
OPstream toAbove
(
UPstream::commsTypes::scheduled,
myComm.above(),
0, // bufsize
tag,
comm
);
toAbove << values;
OPstream::send(values, myComm.above(), tag, comm);
}
}
}

View File

@ -32,8 +32,8 @@ Description
\*---------------------------------------------------------------------------*/
#include "OPstream.H"
#include "IPstream.H"
#include "OPstream.H"
#include "contiguous.H"
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
@ -73,15 +73,7 @@ void Foam::Pstream::gather
}
else
{
IPstream fromBelow
(
UPstream::commsTypes::scheduled,
belowID,
0, // bufsize
tag,
comm
);
fromBelow >> received;
IPstream::recv(received, belowID, tag, comm);
}
value = bop(value, received);
@ -104,15 +96,7 @@ void Foam::Pstream::gather
}
else
{
OPstream toAbove
(
UPstream::commsTypes::scheduled,
myComm.above(),
0, // bufsize
tag,
comm
);
toAbove << value;
OPstream::send(value, myComm.above(), tag, comm);
}
}
}
@ -166,28 +150,12 @@ Foam::List<T> Foam::Pstream::listGatherValues
for (int proci = 1; proci < numProc; ++proci)
{
IPstream fromProc
(
UPstream::commsTypes::scheduled,
proci,
0, // bufsize
tag,
comm
);
fromProc >> allValues[proci];
IPstream::recv(allValues[proci], proci, tag, comm);
}
}
else if (UPstream::is_rank(comm))
{
OPstream toProc
(
UPstream::commsTypes::scheduled,
UPstream::masterNo(),
0, // bufsize
tag,
comm
);
toProc << localValue;
OPstream::send(localValue, UPstream::masterNo(), tag, comm);
}
}
}
@ -269,15 +237,7 @@ T Foam::Pstream::listScatterValues
}
else if (UPstream::is_rank(comm))
{
IPstream fromProc
(
UPstream::commsTypes::scheduled,
UPstream::masterNo(),
0, // bufsize
tag,
comm
);
fromProc >> localValue;
IPstream::recv(localValue, UPstream::masterNo(), tag, comm);
}
}
}

View File

@ -99,7 +99,7 @@ void Foam::Pstream::gatherList
(
UPstream::commsTypes::scheduled,
belowID,
0,
0, // bufsize
tag,
comm
);
@ -167,7 +167,7 @@ void Foam::Pstream::gatherList
(
UPstream::commsTypes::scheduled,
myComm.above(),
0,
0, // bufsize
tag,
comm
);
@ -248,7 +248,7 @@ void Foam::Pstream::scatterList
(
UPstream::commsTypes::scheduled,
myComm.above(),
0,
0, // bufsize
tag,
comm
);
@ -298,7 +298,7 @@ void Foam::Pstream::scatterList
(
UPstream::commsTypes::scheduled,
belowID,
0,
0, // bufsize
tag,
comm
);

View File

@ -50,10 +50,9 @@ const Foam::Enum
>
Foam::UPstream::commsTypeNames
({
{ commsTypes::blocking, "blocking" },
{ commsTypes::blocking, "blocking" }, // "buffered"
{ commsTypes::scheduled, "scheduled" },
// { commsTypes::nonBlocking, "non-blocking" },
{ commsTypes::nonBlocking, "nonBlocking" },
{ commsTypes::nonBlocking, "nonBlocking" }, // "immediate"
});

View File

@ -70,9 +70,9 @@ public:
//- Communications types
enum class commsTypes : char
{
blocking, //!< "blocking" : (MPI_Bsend, MPI_Recv)
scheduled, //!< "scheduled" : (MPI_Send, MPI_Recv)
nonBlocking //!< "nonBlocking" : (MPI_Isend, MPI_Irecv)
blocking, //!< "blocking" (buffered) : (MPI_Bsend, MPI_Recv)
scheduled, //!< "scheduled" (MPI standard) : (MPI_Send, MPI_Recv)
nonBlocking //!< "nonBlocking" (immediate) : (MPI_Isend, MPI_Irecv)
};
//- Enumerated names for the communication types

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2015-2017 OpenFOAM Foundation
Copyright (C) 2015-2023 OpenCFD Ltd.
Copyright (C) 2015-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -227,15 +227,8 @@ Foam::List<Foam::labelPair> Foam::mapDistributeBase::schedule
// Receive and merge
for (const int proci : UPstream::subProcs(comm))
{
IPstream fromProc
(
UPstream::commsTypes::scheduled,
proci,
0,
tag,
comm
);
List<labelPair> nbrData(fromProc);
List<labelPair> nbrData;
IPstream::recv(nbrData, proci, tag, comm);
for (const labelPair& connection : nbrData)
{
@ -247,15 +240,7 @@ Foam::List<Foam::labelPair> Foam::mapDistributeBase::schedule
{
if (UPstream::parRun())
{
OPstream toMaster
(
UPstream::commsTypes::scheduled,
UPstream::masterNo(),
0,
tag,
comm
);
toMaster << allComms;
OPstream::send(allComms, UPstream::masterNo(), tag, comm);
}
}

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2015-2017 OpenFOAM Foundation
Copyright (C) 2015-2023 OpenCFD Ltd.
Copyright (C) 2015-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -503,20 +503,13 @@ void Foam::mapDistributeBase::distribute
if (proci != myRank && map.size())
{
OPstream os
(
UPstream::commsTypes::blocking,
proci,
0,
tag,
comm
);
List<T> subField
(
accessAndFlip(field, map, subHasFlip, negOp)
);
os << subField;
// buffered send
OPstream::bsend(subField, proci, tag, comm);
}
}
@ -552,15 +545,8 @@ void Foam::mapDistributeBase::distribute
if (proci != myRank && map.size())
{
IPstream is
(
UPstream::commsTypes::blocking,
proci,
0,
tag,
comm
);
List<T> subField(is);
List<T> subField;
IPstream::recv(subField, proci, tag, comm);
checkReceivedSize(proci, map.size(), subField.size());
@ -620,15 +606,6 @@ void Foam::mapDistributeBase::distribute
const label nbrProc = twoProcs.second();
{
OPstream os
(
UPstream::commsTypes::scheduled,
nbrProc,
0,
tag,
comm
);
const labelList& map = subMap[nbrProc];
List<T> subField
@ -636,18 +613,12 @@ void Foam::mapDistributeBase::distribute
accessAndFlip(field, map, subHasFlip, negOp)
);
os << subField;
OPstream::send(subField, nbrProc, tag, comm);
}
{
IPstream is
(
UPstream::commsTypes::scheduled,
nbrProc,
0,
tag,
comm
);
List<T> subField(is);
List<T> subField;
IPstream::recv(subField, nbrProc, tag, comm);
const labelList& map = constructMap[nbrProc];
checkReceivedSize(nbrProc, map.size(), subField.size());
@ -669,15 +640,9 @@ void Foam::mapDistributeBase::distribute
const label nbrProc = twoProcs.first();
{
IPstream is
(
UPstream::commsTypes::scheduled,
nbrProc,
0,
tag,
comm
);
List<T> subField(is);
List<T> subField;
IPstream::recv(subField, nbrProc, tag, comm);
const labelList& map = constructMap[nbrProc];
checkReceivedSize(nbrProc, map.size(), subField.size());
@ -693,15 +658,6 @@ void Foam::mapDistributeBase::distribute
);
}
{
OPstream os
(
UPstream::commsTypes::scheduled,
nbrProc,
0,
tag,
comm
);
const labelList& map = subMap[nbrProc];
List<T> subField
@ -709,7 +665,7 @@ void Foam::mapDistributeBase::distribute
accessAndFlip(field, map, subHasFlip, negOp)
);
os << subField;
OPstream::send(subField, nbrProc, tag, comm);
}
}
}
@ -730,13 +686,12 @@ void Foam::mapDistributeBase::distribute
if (proci != myRank && map.size())
{
UOPstream os(proci, pBufs);
List<T> subField
(
accessAndFlip(field, map, subHasFlip, negOp)
);
UOPstream os(proci, pBufs);
os << subField;
}
}
@ -992,21 +947,13 @@ void Foam::mapDistributeBase::distribute
if (proci != myRank && map.size())
{
OPstream os
(
UPstream::commsTypes::blocking,
proci,
0,
tag,
comm
);
List<T> subField
(
accessAndFlip(field, map, subHasFlip, negOp)
);
os << subField;
// buffered send
OPstream::bsend(subField, proci, tag, comm);
}
}
@ -1041,15 +988,8 @@ void Foam::mapDistributeBase::distribute
if (proci != myRank && map.size())
{
IPstream is
(
UPstream::commsTypes::blocking,
proci,
0,
tag,
comm
);
List<T> subField(is);
List<T> subField;
IPstream::recv(subField, proci, tag, comm);
checkReceivedSize(proci, map.size(), subField.size());
@ -1107,33 +1047,17 @@ void Foam::mapDistributeBase::distribute
const label nbrProc = twoProcs.second();
{
OPstream os
(
UPstream::commsTypes::scheduled,
nbrProc,
0,
tag,
comm
);
const labelList& map = subMap[nbrProc];
List<T> subField
(
accessAndFlip(field, map, subHasFlip, negOp)
);
os << subField;
OPstream::send(subField, nbrProc, tag, comm);
}
{
IPstream is
(
UPstream::commsTypes::scheduled,
nbrProc,
0,
tag,
comm
);
List<T> subField(is);
List<T> subField;
IPstream::recv(subField, nbrProc, tag, comm);
const labelList& map = constructMap[nbrProc];
@ -1156,15 +1080,8 @@ void Foam::mapDistributeBase::distribute
const label nbrProc = twoProcs.first();
{
IPstream is
(
UPstream::commsTypes::scheduled,
nbrProc,
0,
tag,
comm
);
List<T> subField(is);
List<T> subField;
IPstream::recv(subField, nbrProc, tag, comm);
const labelList& map = constructMap[nbrProc];
@ -1181,22 +1098,13 @@ void Foam::mapDistributeBase::distribute
);
}
{
OPstream os
(
UPstream::commsTypes::scheduled,
nbrProc,
0,
tag,
comm
);
const labelList& map = subMap[nbrProc];
List<T> subField
(
accessAndFlip(field, map, subHasFlip, negOp)
);
os << subField;
OPstream::send(subField, nbrProc, tag, comm);
}
}
}
@ -1217,13 +1125,12 @@ void Foam::mapDistributeBase::distribute
if (proci != myRank && map.size())
{
UOPstream os(proci, pBufs);
List<T> subField
(
accessAndFlip(field, map, subHasFlip, negOp)
);
UOPstream os(proci, pBufs);
os << subField;
}
}
@ -1433,13 +1340,12 @@ void Foam::mapDistributeBase::send
if (map.size())
{
UOPstream os(proci, pBufs);
List<T> subField
(
accessAndFlip(field, map, subHasFlip_, flipOp())
);
UOPstream os(proci, pBufs);
os << subField;
}
}

View File

@ -324,8 +324,8 @@ void Foam::syncTools::syncPointMap
// Receive the edges using shared points from other procs
for (const int proci : UPstream::subProcs())
{
IPstream fromProc(UPstream::commsTypes::scheduled, proci);
Map<T> nbrValues(fromProc);
Map<T> nbrValues;
IPstream::recv(nbrValues, proci);
// Merge neighbouring values with my values
forAllConstIters(nbrValues, iter)
@ -343,12 +343,7 @@ void Foam::syncTools::syncPointMap
else
{
// Send to master
OPstream toMaster
(
UPstream::commsTypes::scheduled,
UPstream::masterNo()
);
toMaster << sharedPointValues;
OPstream::send(sharedPointValues, UPstream::masterNo());
}
// Broadcast: send merged values to all
@ -687,8 +682,8 @@ void Foam::syncTools::syncEdgeMap
// Receive the edges using shared points from other procs
for (const int proci : UPstream::subProcs())
{
IPstream fromProc(UPstream::commsTypes::scheduled, proci);
EdgeMap<T> nbrValues(fromProc);
EdgeMap<T> nbrValues;
IPstream::recv(nbrValues, proci);
// Merge neighbouring values with my values
forAllConstIters(nbrValues, iter)
@ -706,14 +701,7 @@ void Foam::syncTools::syncEdgeMap
else
{
// Send to master
{
OPstream toMaster
(
UPstream::commsTypes::scheduled,
UPstream::masterNo()
);
toMaster << sharedEdgeValues;
}
OPstream::send(sharedEdgeValues, UPstream::masterNo());
}
// Broadcast: send merged values to all

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2013-2017 OpenFOAM Foundation
Copyright (C) 2019-2023 OpenCFD Ltd.
Copyright (C) 2019-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -170,8 +170,7 @@ void Foam::globalIndex::gatherValues
}
else
{
IPstream fromProc(commsType, procIDs[i], 0, tag, comm);
fromProc >> allValues[i];
IPstream::recv(allValues[i], procIDs[i], tag, comm);
}
}
}
@ -193,8 +192,7 @@ void Foam::globalIndex::gatherValues
}
else
{
OPstream toMaster(commsType, masterProci, 0, tag, comm);
toMaster << localValue;
OPstream::send(localValue, commsType, masterProci, tag, comm);
}
}
@ -269,8 +267,7 @@ void Foam::globalIndex::gather
}
else
{
IPstream fromProc(commsType, procIDs[i], 0, tag, comm);
fromProc >> procSlot;
IPstream::recv(procSlot, procIDs[i], tag, comm);
}
}
}
@ -294,8 +291,7 @@ void Foam::globalIndex::gather
}
else
{
OPstream toMaster(commsType, masterProci, 0, tag, comm);
toMaster << fld;
OPstream::send(fld, commsType, masterProci, tag, comm);
}
}
@ -378,8 +374,7 @@ void Foam::globalIndex::gather
}
else
{
IPstream fromProc(commsType, procIDs[i], 0, tag, comm);
fromProc >> procSlot;
IPstream::recv(procSlot, procIDs[i], tag, comm);
}
}
}
@ -391,8 +386,7 @@ void Foam::globalIndex::gather
}
else
{
OPstream toMaster(commsType, masterProci, 0, tag, comm);
toMaster << fld;
OPstream::send(fld, commsType, masterProci, tag, comm);
}
}
@ -962,8 +956,7 @@ void Foam::globalIndex::scatter
}
else
{
OPstream toProc(commsType, procIDs[i], 0, tag, comm);
toProc << procSlot;
OPstream::send(procSlot, commsType, procIDs[i], tag, comm);
}
}
@ -999,8 +992,7 @@ void Foam::globalIndex::scatter
}
else
{
IPstream fromMaster(commsType, masterProci, 0, tag, comm);
fromMaster >> fld;
IPstream::recv(fld, masterProci, tag, comm);
}
}

View File

@ -6,7 +6,7 @@
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2011-2017 OpenFOAM Foundation
Copyright (C) 2015-2022 OpenCFD Ltd.
Copyright (C) 2015-2024 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
@ -164,12 +164,8 @@ void Foam::isoSurfacePoint::syncUnseparatedPoints
patchInfo[nbrPointi] = pointValues[meshPts[pointi]];
}
OPstream toNbr
(
Pstream::commsTypes::blocking,
procPatch.neighbProcNo()
);
toNbr << patchInfo;
// buffered send
OPstream::bsend(patchInfo, procPatch.neighbProcNo());
}
}
@ -181,17 +177,11 @@ void Foam::isoSurfacePoint::syncUnseparatedPoints
if (pp.nPoints() && collocatedPatch(pp))
{
pointField nbrPatchInfo(procPatch.nPoints());
{
pointField nbrPatchInfo;
// We do not know the number of points on the other side
// so cannot use UIPstream::read
IPstream fromNbr
(
Pstream::commsTypes::blocking,
procPatch.neighbProcNo()
);
fromNbr >> nbrPatchInfo;
}
IPstream::recv(nbrPatchInfo, procPatch.neighbProcNo());
const labelList& meshPts = procPatch.meshPoints();