ENH: replace PstreamBuffers with scheduled read/writes for VTK output

- use the new updates to globalIndex to manage the bookkeeping
This commit is contained in:
Mark Olesen
2021-10-29 11:36:22 +02:00
parent b6539cd02e
commit 33ff3201ea
2 changed files with 293 additions and 207 deletions

View File

@ -39,7 +39,7 @@ License
#include "globalIndex.H"
#include "instant.H"
#include "Fstream.H"
#include "PstreamBuffers.H"
#include "Pstream.H"
#include "OSspecific.H"
// * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * * //
@ -129,25 +129,17 @@ void Foam::vtk::writeListParallel
const globalIndex& procOffset
)
{
// List sizes
const globalIndex sizes(values.size());
// Gather sizes - master information, offsets are irrelevant
const globalIndex procAddr
(
UPstream::listGatherValues<label>(values.size()),
globalIndex::SIZES
);
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Send to master
if (!Pstream::master())
{
UOPstream os(Pstream::masterNo(), pBufs);
os.write(values.cdata_bytes(), values.size_bytes());
}
pBufs.finishedSends();
if (Pstream::master())
{
// Master data
// Write with offset
// Write master data - with value offset
const label offsetId = procOffset.offset(0);
for (const label val : values)
{
@ -155,21 +147,38 @@ void Foam::vtk::writeListParallel
}
// Receive and write
for (const int proci : Pstream::subProcs())
DynamicList<label> recvData(procAddr.maxNonLocalSize());
for (const label proci : procAddr.subProcs())
{
List<label> recv(sizes.localSize(proci));
recvData.resize_nocopy(procAddr.localSize(proci));
UIPstream::read
(
UPstream::commsTypes::scheduled,
proci,
recvData.data_bytes(),
recvData.size_bytes()
);
UIPstream is(proci, pBufs);
is.read(recv.data_bytes(), recv.size_bytes());
// Write with offset
// With value offset
const label offsetId = procOffset.offset(proci);
for (const label val : recv)
for (const label val : recvData)
{
vtk::write(fmt, val + offsetId);
}
}
}
else
{
// Send
UOPstream::write
(
UPstream::commsTypes::scheduled,
Pstream::masterNo(),
values.cdata_bytes(),
values.size_bytes()
);
}
}

View File

@ -26,7 +26,7 @@ License
\*---------------------------------------------------------------------------*/
#include "globalIndex.H"
#include "PstreamBuffers.H"
#include "Pstream.H"
#include "ListOps.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -131,33 +131,31 @@ void Foam::vtk::writeValueParallel
const label count
)
{
if (!is_contiguous<Type>::value)
{
// Non-contiguous data does not make sense
FatalErrorInFunction
<< "Contiguous data only" << endl
<< Foam::exit(FatalError);
}
// Gather [count, value] tuples, including from master
const List<std::pair<label, Type>> countValues
(
UPstream::listGatherValues<std::pair<label, Type>>
(
std::pair<label, Type>(count, val)
)
);
if (Pstream::master())
{
vtk::write(fmt, val, count);
label subCount;
Type subValue;
// Receive each [size, value] tuple
for (const int proci : Pstream::subProcs())
for (const auto& countVal : countValues)
{
IPstream is(Pstream::commsTypes::blocking, proci);
is >> subCount >> subValue;
vtk::write(fmt, subValue, subCount);
// Write [count, value] tuple
vtk::write(fmt, countVal.first, countVal.second);
}
}
else
{
OPstream os
(
Pstream::commsTypes::blocking,
Pstream::masterNo()
);
// Send [size, value] tuple
os << count << val;
}
}
@ -168,26 +166,22 @@ void Foam::vtk::writeListParallel
const UList<Type>& values
)
{
// List sizes
const globalIndex sizes(values.size());
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Send to master
if (!Pstream::master())
if (!is_contiguous<Type>::value)
{
UOPstream os(Pstream::masterNo(), pBufs);
if (is_contiguous<Type>::value)
{
os.write(values.cdata_bytes(), values.size_bytes());
}
else
{
os << values;
}
// Non-contiguous data does not make sense
FatalErrorInFunction
<< "Contiguous data only" << endl
<< Foam::exit(FatalError);
}
pBufs.finishedSends();
// Gather sizes - master information, offsets are irrelevant
const globalIndex procAddr
(
UPstream::listGatherValues<label>(values.size()),
globalIndex::SIZES
);
if (Pstream::master())
{
@ -195,25 +189,32 @@ void Foam::vtk::writeListParallel
vtk::writeList(fmt, values);
// Receive and write
for (const int proci : Pstream::subProcs())
DynamicList<Type> recvData(procAddr.maxNonLocalSize());
for (const label proci : procAddr.subProcs())
{
UIPstream is(proci, pBufs);
{
List<Type> recv(sizes.localSize(proci));
if (is_contiguous<Type>::value)
{
is.read(recv.data_bytes(), recv.size_bytes());
}
else
{
is >> recv;
}
vtk::writeList(fmt, recv);
}
recvData.resize_nocopy(procAddr.localSize(proci));
UIPstream::read
(
UPstream::commsTypes::scheduled,
proci,
recvData.data_bytes(),
recvData.size_bytes()
);
vtk::writeList(fmt, recvData);
}
}
else
{
// Send
UOPstream::write
(
UPstream::commsTypes::scheduled,
Pstream::masterNo(),
values.cdata_bytes(),
values.size_bytes()
);
}
}
@ -225,21 +226,28 @@ void Foam::vtk::writeListParallel
const labelUList& addressing
)
{
UIndirectList<Type> send(values, addressing);
// List sizes
const globalIndex sizes(send.size());
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Send to master
if (!Pstream::master())
if (!is_contiguous<Type>::value)
{
UOPstream os(Pstream::masterNo(), pBufs);
os << send;
// Non-contiguous data does not make sense
FatalErrorInFunction
<< "Contiguous data only" << endl
<< Foam::exit(FatalError);
}
pBufs.finishedSends();
List<Type> sendData;
if (!Pstream::master())
{
sendData = UIndirectList<Type>(values, addressing);
}
// Gather sizes - master information, offsets are irrelevant
const globalIndex procAddr
(
UPstream::listGatherValues<label>(sendData.size()),
globalIndex::SIZES
);
if (Pstream::master())
{
@ -247,17 +255,31 @@ void Foam::vtk::writeListParallel
vtk::writeList(fmt, values, addressing);
// Receive and write
for (const int proci : Pstream::subProcs())
{
UIPstream is(proci, pBufs);
DynamicList<Type> recvData(procAddr.maxNonLocalSize());
{
List<Type> recv;
is >> recv;
vtk::writeList(fmt, recv);
}
for (const label proci : procAddr.subProcs())
{
recvData.resize_nocopy(procAddr.localSize(proci));
UIPstream::read
(
UPstream::commsTypes::scheduled,
proci,
recvData.data_bytes(),
recvData.size_bytes()
);
vtk::writeList(fmt, recvData);
}
}
else
{
UOPstream::write
(
UPstream::commsTypes::scheduled,
Pstream::masterNo(),
sendData.cdata_bytes(),
sendData.size_bytes()
);
}
}
@ -269,33 +291,28 @@ void Foam::vtk::writeListParallel
const bitSet& selected
)
{
List<Type> send;
if (!Pstream::master())
if (!is_contiguous<Type>::value)
{
send = subset(selected, values);
// Non-contiguous data does not make sense
FatalErrorInFunction
<< "Contiguous data only" << endl
<< Foam::exit(FatalError);
}
// List sizes.
// NOTE okay to skip proc0 since we only need sizes (not offsets)
const globalIndex sizes(send.size());
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Send to master
List<Type> sendData;
if (!Pstream::master())
{
UOPstream os(Pstream::masterNo(), pBufs);
if (is_contiguous<Type>::value)
{
os.write(send.cdata_bytes(), send.size_bytes());
}
else
{
os << send;
}
sendData = subset(selected, values);
}
pBufs.finishedSends();
// Gather sizes - master information, offsets are irrelevant
const globalIndex procAddr
(
UPstream::listGatherValues<label>(sendData.size()),
globalIndex::SIZES
);
if (Pstream::master())
{
@ -303,25 +320,31 @@ void Foam::vtk::writeListParallel
vtk::writeList(fmt, values, selected);
// Receive and write
for (const int proci : Pstream::subProcs())
DynamicList<Type> recvData(procAddr.maxNonLocalSize());
for (const label proci : procAddr.subProcs())
{
UIPstream is(proci, pBufs);
{
List<Type> recv(sizes.localSize(proci));
if (is_contiguous<Type>::value)
{
is.read(recv.data_bytes(), recv.size_bytes());
}
else
{
is >> recv;
}
vtk::writeList(fmt, recv);
}
recvData.resize_nocopy(procAddr.localSize(proci));
UIPstream::read
(
UPstream::commsTypes::scheduled,
proci,
recvData.data_bytes(),
recvData.size_bytes()
);
vtk::writeList(fmt, recvData);
}
}
else
{
UOPstream::write
(
UPstream::commsTypes::scheduled,
Pstream::masterNo(),
sendData.cdata_bytes(),
sendData.size_bytes()
);
}
}
@ -333,28 +356,27 @@ void Foam::vtk::writeListsParallel
const UList<Type>& values2
)
{
// List sizes
const globalIndex sizes1(values1.size());
const globalIndex sizes2(values2.size());
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Send to master
if (!Pstream::master())
if (!is_contiguous<Type>::value)
{
UOPstream os(Pstream::masterNo(), pBufs);
if (is_contiguous<Type>::value)
{
os.write(values1.cdata_bytes(), values1.size_bytes());
os.write(values2.cdata_bytes(), values2.size_bytes());
}
else
{
os << values1 << values2;
}
// Non-contiguous data does not make sense
FatalErrorInFunction
<< "Contiguous data only" << endl
<< Foam::exit(FatalError);
}
pBufs.finishedSends();
// Gather sizes - master information and offsets are irrelevant
const globalIndex procAddr1
(
UPstream::listGatherValues<label>(values1.size()),
globalIndex::SIZES
);
const globalIndex procAddr2
(
UPstream::listGatherValues<label>(values2.size()),
globalIndex::SIZES
);
if (Pstream::master())
{
@ -362,46 +384,55 @@ void Foam::vtk::writeListsParallel
vtk::writeList(fmt, values1);
vtk::writeList(fmt, values2);
// Reserve max receive size
DynamicList<Type> recv
// Receive and write
DynamicList<Type> recvData
(
max(sizes1.maxNonLocalSize(), sizes2.maxNonLocalSize())
max(procAddr1.maxNonLocalSize(), procAddr2.maxNonLocalSize())
);
// Receive and write
for (const int proci : Pstream::subProcs())
for (const label proci : procAddr1.subProcs())
{
UIPstream is(proci, pBufs);
// values1
{
List<Type> recv(sizes1.localSize(proci));
if (is_contiguous<Type>::value)
{
is.read(recv.data_bytes(), recv.size_bytes());
}
else
{
is >> recv;
}
vtk::writeList(fmt, recv);
}
recvData.resize_nocopy(procAddr1.localSize(proci));
UIPstream::read
(
UPstream::commsTypes::scheduled,
proci,
recvData.data_bytes(),
recvData.size_bytes()
);
vtk::writeList(fmt, recvData);
// values2
{
List<Type> recv(sizes2.localSize(proci));
if (is_contiguous<Type>::value)
{
is.read(recv.data_bytes(), recv.size_bytes());
}
else
{
is >> recv;
}
vtk::writeList(fmt, recv);
}
recvData.resize_nocopy(procAddr2.localSize(proci));
UIPstream::read
(
UPstream::commsTypes::scheduled,
proci,
recvData.data_bytes(),
recvData.size_bytes()
);
vtk::writeList(fmt, recvData);
}
}
else
{
UOPstream::write
(
UPstream::commsTypes::scheduled,
Pstream::masterNo(),
values1.cdata_bytes(),
values1.size_bytes()
);
UOPstream::write
(
UPstream::commsTypes::scheduled,
Pstream::masterNo(),
values2.cdata_bytes(),
values2.size_bytes()
);
}
}
@ -414,45 +445,91 @@ void Foam::vtk::writeListsParallel
const labelUList& addressing
)
{
UIndirectList<Type> send2(values2, addressing);
PstreamBuffers pBufs(Pstream::commsTypes::nonBlocking);
// Send to master
if (!Pstream::master())
if (!is_contiguous<Type>::value)
{
UOPstream os(Pstream::masterNo(), pBufs);
os << values1 << send2;
// Non-contiguous data does not make sense
FatalErrorInFunction
<< "Contiguous data only" << endl
<< Foam::exit(FatalError);
}
pBufs.finishedSends();
List<Type> sendData2;
if (!Pstream::master())
{
sendData2 = UIndirectList<Type>(values2, addressing);
}
// Gather sizes - master information, offsets are irrelevant
const globalIndex procAddr1
(
UPstream::listGatherValues<label>(values1.size()),
globalIndex::SIZES
);
const globalIndex procAddr2
(
UPstream::listGatherValues<label>(sendData2.size()),
globalIndex::SIZES
);
if (Pstream::master())
{
// Write master data
vtk::writeList(fmt, values1);
vtk::writeList(fmt, values2, addressing);
// Receive and write
for (const int proci : Pstream::subProcs())
DynamicList<Type> recvData
(
max(procAddr1.maxNonLocalSize(), procAddr2.maxNonLocalSize())
);
for (const label proci : procAddr1.subProcs())
{
UIPstream is(proci, pBufs);
// values1
{
List<Type> recv;
is >> recv;
vtk::writeList(fmt, recv);
}
recvData.resize_nocopy(procAddr1.localSize(proci));
UIPstream::read
(
UPstream::commsTypes::scheduled,
proci,
recvData.data_bytes(),
recvData.size_bytes()
);
vtk::writeList(fmt, recvData);
// values2 (send2)
{
List<Type> recv;
is >> recv;
vtk::writeList(fmt, recv);
}
// values2
recvData.resize_nocopy(procAddr2.localSize(proci));
UIPstream::read
(
UPstream::commsTypes::scheduled,
proci,
recvData.data_bytes(),
recvData.size_bytes()
);
vtk::writeList(fmt, recvData);
}
}
else
{
UOPstream::write
(
UPstream::commsTypes::scheduled,
Pstream::masterNo(),
values1.cdata_bytes(),
values1.size_bytes()
);
UOPstream::write
(
UPstream::commsTypes::scheduled,
Pstream::masterNo(),
sendData2.cdata_bytes(),
sendData2.size_bytes()
);
}
}