mirror of
https://develop.openfoam.com/Development/openfoam.git
synced 2025-11-28 03:28:01 +00:00
- now largely encapsulated using PstreamBuffers methods, which makes it simpler to centralize and maintain - avoid building intermediate structures when sending data, remove unused methods/data TUT: parallel version of depthCharge2D STYLE: minor update in ProcessorTopology
379 lines
8.6 KiB
C
379 lines
8.6 KiB
C
/*---------------------------------------------------------------------------*\
|
|
========= |
|
|
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
|
|
\\ / O peration |
|
|
\\ / A nd | www.openfoam.com
|
|
\\/ M anipulation |
|
|
-------------------------------------------------------------------------------
|
|
Copyright (C) 2011-2017 OpenFOAM Foundation
|
|
Copyright (C) 2021-2022 OpenCFD Ltd.
|
|
-------------------------------------------------------------------------------
|
|
License
|
|
This file is part of OpenFOAM.
|
|
|
|
OpenFOAM is free software: you can redistribute it and/or modify it
|
|
under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
|
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
\*---------------------------------------------------------------------------*/
|
|
|
|
#include "PstreamBuffers.H"
|
|
#include "bitSet.H"
|
|
|
|
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
|
|
|
|
void Foam::PstreamBuffers::finalExchange
|
|
(
|
|
labelList& recvSizes,
|
|
const bool wait
|
|
)
|
|
{
|
|
// Could also check that it is not called twice
|
|
// but that is used for overlapping send/recv (eg, overset)
|
|
finishedSendsCalled_ = true;
|
|
|
|
if (commsType_ == UPstream::commsTypes::nonBlocking)
|
|
{
|
|
// all-to-all
|
|
Pstream::exchangeSizes(sendBuf_, recvSizes, comm_);
|
|
|
|
Pstream::exchange<DynamicList<char>, char>
|
|
(
|
|
sendBuf_,
|
|
recvSizes,
|
|
recvBuf_,
|
|
tag_,
|
|
comm_,
|
|
wait
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
void Foam::PstreamBuffers::finalExchange
|
|
(
|
|
const labelUList& sendProcs,
|
|
const labelUList& recvProcs,
|
|
labelList& recvSizes,
|
|
const bool wait
|
|
)
|
|
{
|
|
// Could also check that it is not called twice
|
|
// but that is used for overlapping send/recv (eg, overset)
|
|
finishedSendsCalled_ = true;
|
|
|
|
if (commsType_ == UPstream::commsTypes::nonBlocking)
|
|
{
|
|
Pstream::exchangeSizes
|
|
(
|
|
sendProcs,
|
|
recvProcs,
|
|
sendBuf_,
|
|
recvSizes,
|
|
tag_,
|
|
comm_
|
|
);
|
|
|
|
Pstream::exchange<DynamicList<char>, char>
|
|
(
|
|
sendBuf_,
|
|
recvSizes,
|
|
recvBuf_,
|
|
tag_,
|
|
comm_,
|
|
wait
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
|
|
|
|
Foam::PstreamBuffers::PstreamBuffers
|
|
(
|
|
const UPstream::commsTypes commsType,
|
|
const int tag,
|
|
const label comm,
|
|
IOstreamOption::streamFormat fmt
|
|
)
|
|
:
|
|
finishedSendsCalled_(false),
|
|
allowClearRecv_(true),
|
|
format_(fmt),
|
|
commsType_(commsType),
|
|
tag_(tag),
|
|
comm_(comm),
|
|
sendBuf_(UPstream::nProcs(comm)),
|
|
recvBuf_(UPstream::nProcs(comm)),
|
|
recvBufPos_(UPstream::nProcs(comm), Zero)
|
|
{}
|
|
|
|
|
|
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
|
|
|
|
Foam::PstreamBuffers::~PstreamBuffers()
|
|
{
|
|
// Check that all data has been consumed.
|
|
forAll(recvBufPos_, proci)
|
|
{
|
|
if (recvBufPos_[proci] < recvBuf_[proci].size())
|
|
{
|
|
FatalErrorInFunction
|
|
<< "Message from processor " << proci
|
|
<< " Only consumed " << recvBufPos_[proci] << " of "
|
|
<< recvBuf_[proci].size() << " bytes" << nl
|
|
<< Foam::abort(FatalError);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
|
|
|
|
void Foam::PstreamBuffers::clear()
|
|
{
|
|
for (DynamicList<char>& buf : sendBuf_)
|
|
{
|
|
buf.clear();
|
|
}
|
|
for (DynamicList<char>& buf : recvBuf_)
|
|
{
|
|
buf.clear();
|
|
}
|
|
recvBufPos_ = 0;
|
|
|
|
finishedSendsCalled_ = false;
|
|
}
|
|
|
|
|
|
void Foam::PstreamBuffers::clearStorage()
|
|
{
|
|
// Could also clear out entire sendBuf_, recvBuf_ and reallocate.
|
|
// Not sure if it makes much difference
|
|
for (DynamicList<char>& buf : sendBuf_)
|
|
{
|
|
buf.clearStorage();
|
|
}
|
|
for (DynamicList<char>& buf : recvBuf_)
|
|
{
|
|
buf.clearStorage();
|
|
}
|
|
recvBufPos_ = 0;
|
|
|
|
finishedSendsCalled_ = false;
|
|
}
|
|
|
|
|
|
bool Foam::PstreamBuffers::hasSendData() const
|
|
{
|
|
for (const DynamicList<char>& buf : sendBuf_)
|
|
{
|
|
if (!buf.empty())
|
|
{
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
|
|
bool Foam::PstreamBuffers::hasSendData(const label proci) const
|
|
{
|
|
return !sendBuf_[proci].empty();
|
|
}
|
|
|
|
|
|
bool Foam::PstreamBuffers::hasRecvData() const
|
|
{
|
|
if (finishedSendsCalled_)
|
|
{
|
|
for (const DynamicList<char>& buf : recvBuf_)
|
|
{
|
|
if (!buf.empty())
|
|
{
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
#ifdef FULLDEBUG
|
|
else
|
|
{
|
|
FatalErrorInFunction
|
|
<< "Call finishedSends first" << exit(FatalError);
|
|
}
|
|
#endif
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
bool Foam::PstreamBuffers::hasRecvData(const label proci) const
|
|
{
|
|
if (finishedSendsCalled_)
|
|
{
|
|
return !recvBuf_[proci].empty();
|
|
}
|
|
#ifdef FULLDEBUG
|
|
else
|
|
{
|
|
FatalErrorInFunction
|
|
<< "Call finishedSends first" << exit(FatalError);
|
|
}
|
|
#endif
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
bool Foam::PstreamBuffers::allowClearRecv(bool on) noexcept
|
|
{
|
|
bool old(allowClearRecv_);
|
|
allowClearRecv_ = on;
|
|
return old;
|
|
}
|
|
|
|
|
|
void Foam::PstreamBuffers::finishedSends(const bool wait)
|
|
{
|
|
labelList recvSizes;
|
|
finalExchange(recvSizes, wait);
|
|
}
|
|
|
|
|
|
void Foam::PstreamBuffers::finishedSends
|
|
(
|
|
labelList& recvSizes,
|
|
const bool wait
|
|
)
|
|
{
|
|
finalExchange(recvSizes, wait);
|
|
|
|
if (commsType_ != UPstream::commsTypes::nonBlocking)
|
|
{
|
|
FatalErrorInFunction
|
|
<< "Obtaining sizes not supported in "
|
|
<< UPstream::commsTypeNames[commsType_] << endl
|
|
<< " since transfers already in progress. Use non-blocking instead."
|
|
<< exit(FatalError);
|
|
|
|
// Note: maybe possible only if using different tag from write started
|
|
// by ~UOPstream. Needs some work.
|
|
}
|
|
}
|
|
|
|
|
|
void Foam::PstreamBuffers::finishedSends
|
|
(
|
|
const labelUList& sendProcs,
|
|
const labelUList& recvProcs,
|
|
const bool wait
|
|
)
|
|
{
|
|
labelList recvSizes;
|
|
finalExchange(sendProcs, recvProcs, recvSizes, wait);
|
|
}
|
|
|
|
|
|
void Foam::PstreamBuffers::finishedSends
|
|
(
|
|
const labelUList& sendProcs,
|
|
const labelUList& recvProcs,
|
|
labelList& recvSizes,
|
|
const bool wait
|
|
)
|
|
{
|
|
finalExchange(sendProcs, recvProcs, recvSizes, wait);
|
|
|
|
if (commsType_ != UPstream::commsTypes::nonBlocking)
|
|
{
|
|
FatalErrorInFunction
|
|
<< "Obtaining sizes not supported in "
|
|
<< UPstream::commsTypeNames[commsType_] << endl
|
|
<< " since transfers already in progress. Use non-blocking instead."
|
|
<< exit(FatalError);
|
|
|
|
// Note: maybe possible only if using different tag from write started
|
|
// by ~UOPstream. Needs some work.
|
|
}
|
|
}
|
|
|
|
|
|
bool Foam::PstreamBuffers::finishedSends
|
|
(
|
|
bitSet& sendConnections,
|
|
DynamicList<label>& sendProcs,
|
|
DynamicList<label>& recvProcs,
|
|
const bool wait
|
|
)
|
|
{
|
|
bool changed = (sendConnections.size() != nProcs());
|
|
|
|
if (changed)
|
|
{
|
|
sendConnections.resize(nProcs());
|
|
}
|
|
|
|
// Update send connections
|
|
// - reasonable to assume there are no self-sends on UPstream::myProcNo
|
|
forAll(sendBuf_, proci)
|
|
{
|
|
// ie, hasSendData(proci)
|
|
if (sendConnections.set(proci, !sendBuf_[proci].empty()))
|
|
{
|
|
// The state changed
|
|
changed = true;
|
|
}
|
|
}
|
|
|
|
reduce(changed, orOp<bool>());
|
|
|
|
if (changed)
|
|
{
|
|
// Create send/recv topology
|
|
|
|
// The send ranks
|
|
sendProcs.clear();
|
|
forAll(sendBuf_, proci)
|
|
{
|
|
// ie, hasSendData(proci)
|
|
if (!sendBuf_[proci].empty())
|
|
{
|
|
sendProcs.append(proci);
|
|
}
|
|
}
|
|
|
|
finishedSends(wait); // All-to-all
|
|
|
|
// The recv ranks
|
|
recvProcs.clear();
|
|
forAll(recvBuf_, proci)
|
|
{
|
|
// ie, hasRecvData(proci)
|
|
if (!recvBuf_[proci].empty())
|
|
{
|
|
recvProcs.append(proci);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Use existing send/recv ranks
|
|
|
|
finishedSends(sendProcs, recvProcs, wait);
|
|
}
|
|
|
|
return changed;
|
|
}
|
|
|
|
|
|
// ************************************************************************* //
|