ENH: new broadcast version of Pstreams (#2371)

- The idea of broadcast streams is to replace multiple master to
  subProcs communications with a single MPI_Bcast.

    if (Pstream::master())
    {
        OPBstream toAll(Pstream::masterNo());
        toAll << data;
    }
    else
    {
        IPBstream fromMaster(Pstream::masterNo());
        fromMaster >> data;
    }

    // vs.
    if (Pstream::master())
    {
        for (const int proci : Pstream::subProcs())
        {
            OPstream os(Pstream::commsTypes::scheduled, proci);
            os << data;
        }
    }
    else
    {
        IPstream is(Pstream::commsTypes::scheduled, Pstream::masterNo());
        is >> data;
    }

  Can simply use UPstream::broadcast() directly for contiguous data
  with known lengths.

Based on ideas from T.Aoyagi(RIST), A.Azami(RIST)
This commit is contained in:
Mark Olesen
2022-02-20 19:26:13 +01:00
committed by Andrew Heather
parent a9cdaa1bae
commit 8478595a15
16 changed files with 956 additions and 8 deletions

View File

@ -0,0 +1,3 @@
Test-parallel-broadcast.C
EXE = $(FOAM_USER_APPBIN)/Test-parallel-broadcast

View File

@ -0,0 +1,2 @@
/* EXE_INC = */
/* EXE_LIBS = */

View File

@ -0,0 +1,147 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Application
Test-parallel-broadcast
Description
Test for various broadcast routines.
\*---------------------------------------------------------------------------*/
#include "List.H"
#include "argList.H"
#include "Time.H"
#include "vector.H"
#include "IPstream.H"
#include "OPstream.H"
#include "IOstreams.H"
using namespace Foam;
// This is what our new scatter will look like inside
template<class Type>
void testBroadcastStream
(
Type& value,
const label comm = UPstream::worldComm
)
{
Info<< nl << "is_contiguous:" << is_contiguous<Type>::value << endl;
Pout<< "pre-broadcast: " << value << endl;
if (is_contiguous<Type>::value)
{
UPstream::broadcast
(
reinterpret_cast<char*>(&value),
sizeof(Type),
comm,
UPstream::masterNo()
);
}
else
{
if (UPstream::master())
{
OPBstream toAll(UPstream::masterNo(), comm);
toAll << value;
}
else
{
IPBstream fromMaster(UPstream::masterNo(), comm);
fromMaster >> value;
}
}
Pout<< "post-broadcast: " << value << endl;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
int main(int argc, char *argv[])
{
argList::noCheckProcessorDirectories();
#include "setRootCase.H"
#include "createTime.H"
{
label value = -1;
if (Pstream::master())
{
value = UPstream::nProcs();
}
testBroadcastStream(value);
}
{
labelList values;
if (Pstream::master())
{
values = identity(UPstream::nProcs());
}
testBroadcastStream(values);
}
{
wordList values;
if (Pstream::master())
{
values.resize(UPstream::nProcs());
forAll(values, i)
{
values[i] = "value_" + Foam::name(i);
}
}
testBroadcastStream(values);
}
{
vector values(vector::uniform(-1));
if (Pstream::master())
{
values = vector(1,2,3);
}
testBroadcastStream(values);
}
{
FixedList<vector, 3> values(vector::uniform(-1));
if (Pstream::master())
{
values = vector(1,2,3);
}
testBroadcastStream(values);
}
Info<< "End\n" << endl;
return 0;
}
// ************************************************************************* //

View File

@ -274,6 +274,8 @@ $(Pstreams)/UIPstreamBase.C
$(Pstreams)/UOPstreamBase.C $(Pstreams)/UOPstreamBase.C
$(Pstreams)/IPstreams.C $(Pstreams)/IPstreams.C
$(Pstreams)/OPstreams.C $(Pstreams)/OPstreams.C
$(Pstreams)/IPBstreams.C
$(Pstreams)/OPBstreams.C
dictionary = db/dictionary dictionary = db/dictionary
$(dictionary)/dictionary.C $(dictionary)/dictionary.C

View File

@ -0,0 +1,107 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "UIPstream.H"
#include "IPstream.H"
#include "IOstreams.H"
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::UIPBstream::UIPBstream
(
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
const int tag,
const label comm,
const bool clearAtEnd,
IOstreamOption::streamFormat fmt
)
:
UIPstreamBase
(
commsType,
fromProcNo,
receiveBuf,
receiveBufPosition,
tag,
comm,
clearAtEnd,
fmt
)
{
bufferIPCrecv();
}
Foam::IPBstream::IPBstream
(
const commsTypes commsType,
const int fromProcNo,
const label bufSize,
const int tag,
const label comm,
IOstreamOption::streamFormat fmt
)
:
Pstream(commsType, bufSize),
UIPBstream
(
commsType,
fromProcNo,
Pstream::transferBuf_,
transferBufPosition_,
tag,
comm,
false, // Do not clear Pstream::transferBuf_ if at end
fmt
),
transferBufPosition_(0)
{}
Foam::IPBstream::IPBstream
(
const int fromProcNo,
const label comm,
IOstreamOption::streamFormat fmt
)
:
IPBstream
(
UPstream::commsTypes::scheduled, // irrelevant
fromProcNo,
label(0), // bufSize
UPstream::msgType(), // irrelevant
comm,
fmt
)
{}
// ************************************************************************* //

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2013 OpenFOAM Foundation Copyright (C) 2011-2013 OpenFOAM Foundation
Copyright (C) 2021 OpenCFD Ltd. Copyright (C) 2021-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -31,14 +31,14 @@ Description
Input inter-processor communications stream. Input inter-processor communications stream.
SourceFiles SourceFiles
IPstream.C IPstreams.C
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "Pstream.H" #include "Pstream.H"
#ifndef IPstream_H #ifndef Foam_IPstream_H
#define IPstream_H #define Foam_IPstream_H
#include "UIPstream.H" #include "UIPstream.H"
@ -79,6 +79,48 @@ public:
}; };
/*---------------------------------------------------------------------------*\
Class IPBstream Declaration
\*---------------------------------------------------------------------------*/
//- Input inter-processor communications stream
//- using MPI broadcast.
class IPBstream
:
public Pstream,
public UIPBstream
{
// Private Data
//- Receive index into Pstream::transferBuf_
label transferBufPosition_;
public:
// Constructors
//- Construct for broadcast root, optional buffer size, read format
IPBstream
(
const commsTypes commsType,
const int fromProcNo, //!< UPstream::masterNo() - root procNo
const label bufSize = 0,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
//- Construct for broadcast root with optional communicator,
//- write format
explicit IPBstream
(
const int fromProcNo, //!< UPstream::masterNo() - root procNo
const label comm = UPstream::worldComm,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
};
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam } // End namespace Foam

View File

@ -0,0 +1,109 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "UOPstream.H"
#include "OPstream.H"
#include "IOstreams.H"
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::UOPBstream::UOPBstream
(
const commsTypes commsType,
const int toProcNo,
DynamicList<char>& sendBuf,
const int tag,
const label comm,
const bool sendAtDestruct,
IOstreamOption::streamFormat fmt
)
:
UOPstreamBase(commsType, toProcNo, sendBuf, tag, comm, sendAtDestruct, fmt)
{}
Foam::OPBstream::OPBstream
(
const commsTypes commsType,
const int toProcNo,
const label bufSize,
const int tag,
const label comm,
IOstreamOption::streamFormat fmt
)
:
Pstream(commsType, bufSize),
UOPBstream
(
commsType,
toProcNo,
Pstream::transferBuf_,
tag,
comm,
true, // sendAtDestruct
fmt
)
{}
Foam::OPBstream::OPBstream
(
const int toProcNo,
const label comm,
IOstreamOption::streamFormat fmt
)
:
OPBstream
(
UPstream::commsTypes::scheduled, // irrelevant
toProcNo,
label(0), // bufSize
UPstream::msgType(), // irrelevant
comm,
fmt
)
{}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::UOPBstream::~UOPBstream()
{
if (sendAtDestruct_)
{
if (!bufferIPCsend())
{
FatalErrorInFunction
<< "Failed broadcast message of size "
<< sendBuf_.size() << " root: " << toProcNo_
<< Foam::abort(FatalError);
}
}
}
// ************************************************************************* //

View File

@ -6,7 +6,7 @@
\\/ M anipulation | \\/ M anipulation |
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
Copyright (C) 2011-2013 OpenFOAM Foundation Copyright (C) 2011-2013 OpenFOAM Foundation
Copyright (C) 2021 OpenCFD Ltd. Copyright (C) 2021-2022 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -31,14 +31,14 @@ Description
Output inter-processor communications stream. Output inter-processor communications stream.
SourceFiles SourceFiles
OPstream.C OPstreams.C
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "Pstream.H" #include "Pstream.H"
#ifndef OPstream_H #ifndef Foam_OPstream_H
#define OPstream_H #define Foam_OPstream_H
#include "UOPstream.H" #include "UOPstream.H"
@ -74,6 +74,43 @@ public:
}; };
/*---------------------------------------------------------------------------*\
Class OPBstream Declaration
\*---------------------------------------------------------------------------*/
//- Output inter-processor communications stream
//- using MPI broadcast.
class OPBstream
:
public Pstream,
public UOPBstream
{
public:
// Constructors
//- Construct for broadcast root, optional buffer size, read format
OPBstream
(
const commsTypes commsType,
const int toProcNo, //!< UPstream::masterNo() - root procNo
const label bufSize = 0,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
//- Construct for broadcast root with optional communicator,
//- write format
explicit OPBstream
(
const int toProcNo, //!< UPstream::masterNo() - root procNo
const label comm = UPstream::worldComm,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
};
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam } // End namespace Foam

View File

@ -262,6 +262,68 @@ public:
}; };
/*---------------------------------------------------------------------------*\
Class UIPBstream Declaration
\*---------------------------------------------------------------------------*/
//- Input inter-processor communications stream
//- using MPI broadcast - operating on external buffer.
class UIPBstream
:
public UIPstreamBase
{
// Private Member Functions
//- Initial buffer recv via broadcast, called by constructor
void bufferIPCrecv();
public:
// Constructors
//- Construct given process index to read from using the given
//- attached receive buffer, optional communication characteristics
//- and IO format
UIPBstream
(
const commsTypes commsType, //!< ignored
const int fromProcNo, //!< UPstream::masterNo()
DynamicList<char>& receiveBuf,
label& receiveBufPosition,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool clearAtEnd = false, // destroy receiveBuf if at end
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
//- Destructor
virtual ~UIPBstream() = default;
// Member Functions
//- Use all read methods from base
using UIPstreamBase::read;
// Static Functions
//- Wrapped version of UPstream::broadcast
// \return the message size
static label read
(
const commsTypes commsTypes, //!< ignored
const int rootProcNo, //!< UPstream::masterNo()
char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm
);
};
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam } // End namespace Foam

View File

@ -331,6 +331,69 @@ public:
}; };
/*---------------------------------------------------------------------------*\
Class UOPBstream Declaration
\*---------------------------------------------------------------------------*/
//- Output inter-processor communications stream
//- using MPI broadcast - operating on external buffer.
//
// \note does not use commsType, tag etc.
class UOPBstream
:
public UOPstreamBase
{
// Private Member Functions
//- Final buffer send, called by destructor
bool bufferIPCsend();
public:
// Constructors
//- Construct given process index to write to using the given
//- attached send buffer, optional communication characteristics
//- and IO format
UOPBstream
(
const commsTypes commsType, //!< ignored
const int toProcNo, //!< UPstream::masterNo()
DynamicList<char>& sendBuf,
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm,
const bool sendAtDestruct = true,
IOstreamOption::streamFormat fmt = IOstreamOption::BINARY
);
//- Destructor, usually sends buffer on destruct.
virtual ~UOPBstream();
// Member Functions
//- Use all write methods from base
using UOPstreamBase::write;
// Static Functions
//- Wrapped version of UPstream::broadcast with const-cast
// \return True on success
static bool write
(
const commsTypes commsType, //!< ignored
const int rootProcNo, //!< UPstream::masterNo()
const char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType(), //!< ignored
const label comm = UPstream::worldComm
);
};
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam } // End namespace Foam

View File

@ -4,5 +4,7 @@ UPstreamReduce.C
UIPstreamRead.C UIPstreamRead.C
UOPstreamWrite.C UOPstreamWrite.C
UIPBstreamRead.C
UOPBstreamWrite.C
LIB = $(FOAM_LIBBIN)/dummy/libPstream LIB = $(FOAM_LIBBIN)/dummy/libPstream

View File

@ -0,0 +1,55 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "UIPstream.H"
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::UIPBstream::bufferIPCrecv()
{
NotImplemented;
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::UIPBstream::read
(
const commsTypes commsType,
const int rootProcNo,
char* buf,
const std::streamsize bufSize,
const int tag,
const label communicator
)
{
NotImplemented;
return 0;
}
// ************************************************************************* //

View File

@ -0,0 +1,56 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "UOPstream.H"
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
bool Foam::UOPBstream::bufferIPCsend()
{
NotImplemented;
return false;
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UOPBstream::write
(
const commsTypes commsType,
const int rootProcNo,
const char* buf,
const std::streamsize bufSize,
const int tag,
const label communicator
)
{
NotImplemented;
return false;
}
// ************************************************************************* //

View File

@ -5,5 +5,7 @@ UPstreamReduce.C
UIPstreamRead.C UIPstreamRead.C
UOPstreamWrite.C UOPstreamWrite.C
UIPBstreamRead.C
UOPBstreamWrite.C
LIB = $(FOAM_MPI_LIBBIN)/libPstream LIB = $(FOAM_MPI_LIBBIN)/libPstream

View File

@ -0,0 +1,140 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "UIPstream.H"
#include "PstreamGlobals.H"
#include "profilingPstream.H"
#include "IOstreams.H"
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::UIPBstream::bufferIPCrecv()
{
// Uses double broadcast. Symmetric with UOPBstream::bufferIPCsend()
// 1. for the data size
// 2. for the data itself
// Expected message size, similar to MPI_Probe
// Same type must be expected in UOPBstream::bufferIPCsend()
label bufSize(0);
// Broadcast #1 - data size
if
(
!UPstream::broadcast
(
reinterpret_cast<char*>(&bufSize),
sizeof(label),
comm_,
fromProcNo_ //< is actually rootProcNo
)
)
{
FatalErrorInFunction
<< "MPI_Bcast failure receiving buffer size" << nl
<< Foam::abort(FatalError);
}
if (debug)
{
Pout<< "UOPBstream IPC read buffer :"
<< " root:" << fromProcNo_
<< " comm:" << comm_
<< " probed size:" << bufSize
<< " wanted size:" << recvBuf_.capacity()
<< Foam::endl;
}
// No buffer size allocated/specified
if (!recvBuf_.capacity())
{
recvBuf_.resize(bufSize);
}
// This is the only real information we can trust
messageSize_ = bufSize;
// Broadcast #2 - data content
// - skip if there is no data to receive
if (messageSize_)
{
if
(
!UPstream::broadcast
(
recvBuf_.data(),
messageSize_, // same as bufSize
comm_,
fromProcNo_ //< is actually rootProcNo
)
)
{
FatalErrorInFunction
<< "MPI_Bcast failure receiving buffer data:" << bufSize << nl
<< Foam::abort(FatalError);
}
}
// Set addressed size. Leave actual allocated memory intact.
recvBuf_.resize(messageSize_);
if (!messageSize_)
{
setEof();
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::UIPBstream::read
(
const commsTypes commsType,
const int rootProcNo,
char* buf,
const std::streamsize bufSize,
const int tag,
const label comm
)
{
if
(
!UPstream::broadcast(buf, bufSize, comm, rootProcNo)
)
{
FatalErrorInFunction
<< "MPI_Bcast failure receiving data:" << label(bufSize) << nl
<< Foam::abort(FatalError);
return 0;
}
return bufSize;
}
// ************************************************************************* //

View File

@ -0,0 +1,119 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2022 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "UOPstream.H"
#include "PstreamGlobals.H"
#include "profilingPstream.H"
#include <mpi.h>
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
bool Foam::UOPBstream::bufferIPCsend()
{
// Uses double broadcast
// 1. for the data size
// 2. for the data itself
// With this information, can determine and resize receive buffer
PstreamGlobals::checkCommunicator(comm_, toProcNo_);
// Same type must be expected in UIPBstream::bufferIPCrecv()
label bufSize(sendBuf_.size());
// Broadcast #1 - data size
if
(
!UPstream::broadcast
(
reinterpret_cast<char*>(&bufSize),
sizeof(label),
comm_,
toProcNo_ //< is actually rootProcNo
)
)
{
FatalErrorInFunction
<< "MPI_Bcast failure sending buffer size:" << bufSize << nl
<< Foam::abort(FatalError);
return false;
}
// Broadcast #2 - data content
// - skip if there is no data to send
if (bufSize)
{
if
(
!Pstream::broadcast
(
sendBuf_.data(),
sendBuf_.size(), // same as bufSize
comm_,
toProcNo_ //< is actually rootProcNo
)
)
{
FatalErrorInFunction
<< "MPI_Bcast failure sending buffer data:" << bufSize << nl
<< Foam::abort(FatalError);
return false;
}
}
return true;
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::UOPBstream::write
(
const commsTypes commsType, /* unused */
const int rootProcNo,
const char* buf,
const std::streamsize bufSize,
const int tag, /* unused */
const label comm
)
{
if
(
!UPstream::broadcast(const_cast<char*>(buf), bufSize, comm, rootProcNo)
)
{
FatalErrorInFunction
<< "MPI_Bcast failure sending buffer data:" << label(bufSize) << nl
<< Foam::abort(FatalError);
return false;
}
return true;
}
// ************************************************************************* //