mirror of
https://develop.openfoam.com/Development/openfoam.git
synced 2025-11-28 03:28:01 +00:00
- num_blocks(), test_set() as per boost - broadcast(), reduceAnd(), reduceOr() to simplify parallel operations - matrix-like output for PackedList::writeList() BUG: Pstream::broadcastList() missing resize on sub-ranks - latent bug since it was unused in any OpenFOAM code
283 lines
7.3 KiB
C++
283 lines
7.3 KiB
C++
/*---------------------------------------------------------------------------*\
|
|
========= |
|
|
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
|
|
\\ / O peration |
|
|
\\ / A nd | www.openfoam.com
|
|
\\/ M anipulation |
|
|
-------------------------------------------------------------------------------
|
|
Copyright (C) 2022-2025 OpenCFD Ltd.
|
|
-------------------------------------------------------------------------------
|
|
License
|
|
This file is part of OpenFOAM.
|
|
|
|
OpenFOAM is free software: you can redistribute it and/or modify it
|
|
under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
|
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
Application
|
|
Test-parallel-chunks
|
|
|
|
Description
|
|
Test for sending contiguous data in chunk-wise.
|
|
Largely mirrors Pstream::exchange or vice versa
|
|
|
|
\*---------------------------------------------------------------------------*/
|
|
|
|
#define Foam_PstreamExchange_debug_chunks
|
|
|
|
#include "List.H"
|
|
#include "argList.H"
|
|
#include "Time.H"
|
|
#include "IPstream.H"
|
|
#include "OPstream.H"
|
|
#include "IOstreams.H"
|
|
|
|
using namespace Foam;
|
|
|
|
|
|
template<class Container, class Type>
|
|
void broadcast_chunks
|
|
(
|
|
Container& sendData,
|
|
const int tag = UPstream::msgType(),
|
|
const label comm = UPstream::worldComm,
|
|
const int64_t maxComms_bytes = UPstream::maxCommsSize
|
|
)
|
|
{
|
|
// OR static_assert(is_contiguous_v<Type>, "Contiguous data only!")
|
|
if constexpr (!is_contiguous_v<Type>)
|
|
{
|
|
FatalErrorInFunction
|
|
<< "Contiguous data only." << sizeof(Type)
|
|
<< Foam::abort(FatalError);
|
|
}
|
|
|
|
if (maxComms_bytes == 0)
|
|
{
|
|
// Do in one go
|
|
Info<< "send " << sendData.size() << " elements in one go" << endl;
|
|
Pstream::broadcast(sendData, comm);
|
|
return;
|
|
}
|
|
|
|
label sendSize(sendData.size());
|
|
Pstream::broadcast(sendSize, comm);
|
|
|
|
label recvSize(sendSize);
|
|
|
|
sendData.resize_nocopy(recvSize); // A no-op on master
|
|
|
|
|
|
// The chunk size (number of elements) corresponding to max byte transfer
|
|
// Is zero for non-chunked exchanges.
|
|
const std::size_t chunkSize
|
|
(
|
|
PstreamDetail::maxTransferCount<Type>
|
|
(
|
|
PstreamDetail::maxTransferBytes(maxComms_bytes)
|
|
)
|
|
);
|
|
|
|
|
|
if (chunkSize)
|
|
{
|
|
// Convert from send count (elements) to number of chunks.
|
|
// Can normally calculate with (count-1), but add some safety
|
|
label nChunks = 1 + (sendSize/label(chunkSize));
|
|
|
|
Info
|
|
<< "send " << sendSize << " elements ("
|
|
<< (sendSize*sizeof(Type)) << " bytes) in " << nChunks
|
|
<< " chunks of " << label(chunkSize) << " elements ("
|
|
<< label(chunkSize*sizeof(Type)) << " bytes) for maxCommsSize:"
|
|
<< label(maxComms_bytes)
|
|
<< endl;
|
|
}
|
|
|
|
|
|
// stress-test with shortened sendSize
|
|
// will produce useless loops, but no calls
|
|
// sendSize /= 2;
|
|
|
|
typedef stdFoam::span<Type> sendType;
|
|
|
|
do
|
|
{
|
|
sendType payload(sendData.data(), sendData.size());
|
|
|
|
if (!chunkSize)
|
|
{
|
|
UPstream::broadcast
|
|
(
|
|
payload.data_bytes(),
|
|
payload.size_bytes(),
|
|
comm
|
|
);
|
|
break;
|
|
}
|
|
|
|
// Dispatch chunk-wise until there is nothing left
|
|
for (int iter = 0; /*true*/; ++iter)
|
|
{
|
|
// The begin/end for the data window
|
|
const std::size_t beg = (std::size_t(iter)*chunkSize);
|
|
const std::size_t end = (std::size_t(iter+1)*chunkSize);
|
|
|
|
if (payload.size() <= beg)
|
|
{
|
|
// No more data windows
|
|
break;
|
|
}
|
|
|
|
sendType window
|
|
(
|
|
(end < payload.size())
|
|
? payload.subspan(beg, end - beg)
|
|
: payload.subspan(beg)
|
|
);
|
|
|
|
Info<< "iter " << iter
|
|
<< ": beg=" << label(beg) << " len=" << label(window.size())
|
|
<< " (" << label(window.size_bytes()) << " bytes)" << endl;
|
|
|
|
UPstream::broadcast
|
|
(
|
|
window.data_bytes(),
|
|
window.size_bytes(),
|
|
comm
|
|
);
|
|
}
|
|
}
|
|
while (false);
|
|
|
|
Info<< "final" << endl;
|
|
}
|
|
|
|
|
|
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
|
|
|
|
int main(int argc, char *argv[])
|
|
{
|
|
argList::noCheckProcessorDirectories();
|
|
argList::addOption("comms-size", "int", "override Pstream::maxCommsSize");
|
|
|
|
#include "setRootCase.H"
|
|
|
|
if (!Pstream::parRun())
|
|
{
|
|
Info<< "\nWarning: not parallel - skipping further tests\n" << endl;
|
|
return 0;
|
|
}
|
|
|
|
labelList input1;
|
|
if (UPstream::master())
|
|
{
|
|
input1 = identity(500);
|
|
}
|
|
broadcast_chunks<labelList, label>(input1);
|
|
|
|
UPstream::maxCommsSize = 33;
|
|
|
|
args.readIfPresent("comms-size", UPstream::maxCommsSize);
|
|
|
|
broadcast_chunks<labelList, label>(input1);
|
|
|
|
// Mostly the same with PstreamBuffers
|
|
if (false)
|
|
{
|
|
PstreamBuffers pBufs;
|
|
|
|
labelList sendData;
|
|
if (UPstream::master())
|
|
{
|
|
sendData = identity(500);
|
|
|
|
for (const int proci : UPstream::subProcs())
|
|
{
|
|
UOPstream os(proci, pBufs);
|
|
os << sendData;
|
|
}
|
|
}
|
|
|
|
Info<< "call finishedSends()" << endl;
|
|
pBufs.finishedScatters();
|
|
|
|
if (UPstream::is_subrank())
|
|
{
|
|
UIPstream is(UPstream::masterNo(), pBufs);
|
|
is >> sendData;
|
|
}
|
|
}
|
|
|
|
// Manually
|
|
Info<< "perform list exchange" << endl;
|
|
{
|
|
labelListList sendBufs(UPstream::nProcs());
|
|
labelListList recvBufs(UPstream::nProcs());
|
|
labelList recvSizes;
|
|
|
|
if (UPstream::master())
|
|
{
|
|
for (const int proci : UPstream::allProcs())
|
|
{
|
|
if (proci != UPstream::myProcNo())
|
|
{
|
|
sendBufs[proci] = identity(500);
|
|
}
|
|
}
|
|
}
|
|
|
|
Pstream::exchangeSizes(sendBufs, recvSizes);
|
|
|
|
Pstream::exchange<labelList, label>
|
|
(
|
|
sendBufs,
|
|
recvSizes,
|
|
recvBufs
|
|
);
|
|
}
|
|
|
|
|
|
Info<< "perform Map exchange" << endl;
|
|
{
|
|
Map<labelList> sendBufs;
|
|
Map<labelList> recvBufs;
|
|
Map<label> recvSizes;
|
|
|
|
if (UPstream::master())
|
|
{
|
|
for (const int proci : UPstream::allProcs())
|
|
{
|
|
if (proci != UPstream::myProcNo())
|
|
{
|
|
sendBufs(proci) = identity(500);
|
|
}
|
|
}
|
|
}
|
|
|
|
Pstream::exchangeSizes(sendBufs, recvSizes);
|
|
|
|
Pstream::exchange<labelList, label>
|
|
(
|
|
sendBufs,
|
|
recvSizes,
|
|
recvBufs
|
|
);
|
|
}
|
|
|
|
Info<< "End\n" << endl;
|
|
return 0;
|
|
}
|
|
|
|
|
|
// ************************************************************************* //
|