/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2016-2019 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
| Copyright (C) 2011-2017 OpenFOAM Foundation
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see .
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "PstreamReduceOps.H"
#include "PstreamGlobals.H"
#include "profilingPstream.H"
#include "SubList.H"
#include "allReduce.H"
#include "int.H"
#include "collatedFileOperation.H"
#include
#include
#include
#include
#if defined(WM_SP) || defined(WM_SPDP)
#define MPI_SCALAR MPI_FLOAT
#elif defined(WM_DP)
#define MPI_SCALAR MPI_DOUBLE
#endif
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// The min value and default for MPI buffers length
constexpr int minBufLen = 20000000;
// Track if we have attached MPI buffers
static bool ourBuffers = false;
// Track if we initialized MPI
static bool ourMpi = false;
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
static void attachOurBuffers()
{
if (ourBuffers)
{
return; // Already attached
}
ourBuffers = true;
// Use UPstream::mpiBufferSize (optimisationSwitch),
// but allow override with MPI_BUFFER_SIZE env variable (int value)
#ifndef SGIMPI
int len = 0;
const std::string str(Foam::getEnv("MPI_BUFFER_SIZE"));
if (str.empty() || !Foam::read(str, len) || len <= 0)
{
len = Foam::UPstream::mpiBufferSize;
}
if (len < minBufLen)
{
len = minBufLen;
}
if (Foam::UPstream::debug)
{
Foam::Pout<< "UPstream::init : buffer-size " << len << '\n';
}
char* buf = new char[len];
if (MPI_SUCCESS != MPI_Buffer_attach(buf, len))
{
delete[] buf;
Foam::Pout<< "UPstream::init : could not attach buffer\n";
}
#endif
}
static void detachOurBuffers()
{
if (!ourBuffers)
{
return; // Nothing to detach
}
ourBuffers = false;
// Some MPI notes suggest that the return code is MPI_SUCCESS when
// no buffer is attached.
// Be extra careful and require a non-zero size as well.
#ifndef SGIMPI
int len = 0;
char* buf = nullptr;
if (MPI_SUCCESS == MPI_Buffer_detach(&buf, &len) && len)
{
delete[] buf;
}
#endif
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// NOTE:
// valid parallel options vary between implementations, but flag common ones.
// if they are not removed by MPI_Init(), the subsequent argument processing
// will notice that they are wrong
void Foam::UPstream::addValidParOptions(HashTable& validParOptions)
{
validParOptions.insert("np", "");
validParOptions.insert("p4pg", "PI file");
validParOptions.insert("p4wd", "directory");
validParOptions.insert("p4amslave", "");
validParOptions.insert("p4yourname", "hostname");
validParOptions.insert("machinefile", "machine file");
}
bool Foam::UPstream::initNull()
{
int flag = 0;
MPI_Finalized(&flag);
if (flag)
{
// Already finalized - this is an error
FatalErrorInFunction
<< "MPI was already finalized - cannot perform MPI_Init\n"
<< Foam::abort(FatalError);
return false;
}
MPI_Initialized(&flag);
if (flag)
{
if (debug)
{
Pout<< "UPstream::initNull : was already initialized\n";
}
}
else
{
// Not already initialized
MPI_Init_thread
(
nullptr, // argc
nullptr, // argv
MPI_THREAD_SINGLE,
&flag // provided_thread_support
);
ourMpi = true;
}
// Could also attach buffers etc.
return true;
}
bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
{
int numprocs = 0, myRank = 0;
int provided_thread_support = 0;
int flag = 0;
MPI_Finalized(&flag);
if (flag)
{
// Already finalized - this is an error
FatalErrorInFunction
<< "MPI was already finalized - cannot perform MPI_Init" << endl
<< Foam::abort(FatalError);
return false;
}
MPI_Initialized(&flag);
if (flag)
{
// Already initialized.
// Warn if we've called twice, but skip if initialized externally
if (ourMpi)
{
WarningInFunction
<< "MPI was already initialized - cannot perform MPI_Init" << nl
<< "This could indicate an application programming error!"
<< endl;
return true;
}
else if (debug)
{
Pout<< "UPstream::init : was already initialized\n";
}
}
else
{
MPI_Init_thread
(
&argc,
&argv,
(
needsThread
? MPI_THREAD_MULTIPLE
: MPI_THREAD_SINGLE
),
&provided_thread_support
);
ourMpi = true;
}
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
if (debug)
{
Pout<< "UPstream::init : procs=" << numprocs
<< " rank:" << myRank << endl;
}
if (numprocs <= 1)
{
FatalErrorInFunction
<< "attempt to run parallel on 1 processor"
<< Foam::abort(FatalError);
}
// Initialise parallel structure
setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
attachOurBuffers();
return true;
}
void Foam::UPstream::exit(int errnum)
{
if (debug)
{
Pout<< "UPstream::exit\n";
}
int flag = 0;
MPI_Initialized(&flag);
if (!flag)
{
// Not initialized - just exit
std::exit(errnum);
return;
}
MPI_Finalized(&flag);
if (flag)
{
// Already finalized elsewhere?
if (ourMpi)
{
WarningInFunction
<< "MPI was already finalized (by a connected program?)\n";
}
else if (debug)
{
Pout<< "UPstream::exit : was already finalized\n";
}
}
else
{
detachOurBuffers();
}
const label nOutstanding = PstreamGlobals::outstandingRequests_.size();
if (nOutstanding)
{
PstreamGlobals::outstandingRequests_.clear();
WarningInFunction
<< "There were still " << nOutstanding
<< " outstanding MPI_Requests." << nl
<< "Which means your code exited before doing a "
<< " UPstream::waitRequests()." << nl
<< "This should not happen for a normal code exit."
<< nl;
}
// Clean mpi communicators
forAll(myProcNo_, communicator)
{
if (myProcNo_[communicator] != -1)
{
freePstreamCommunicator(communicator);
}
}
if (!flag)
{
// MPI not already finalized
if (!ourMpi)
{
WarningInFunction
<< "Finalizing MPI, but was initialized elsewhere\n";
}
if (errnum == 0)
{
MPI_Finalize();
}
else
{
MPI_Abort(MPI_COMM_WORLD, errnum);
}
}
std::exit(errnum);
}
void Foam::UPstream::abort()
{
MPI_Abort(MPI_COMM_WORLD, 1);
}
void Foam::reduce
(
scalar& Value,
const sumOp& bop,
const int tag,
const label communicator
)
{
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
}
void Foam::reduce
(
scalar& Value,
const minOp& bop,
const int tag,
const label communicator
)
{
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
}
void Foam::reduce
(
vector2D& Value,
const sumOp& bop,
const int tag,
const label communicator
)
{
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
}
void Foam::sumReduce
(
scalar& Value,
label& Count,
const int tag,
const label communicator
)
{
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
vector2D twoScalars(Value, scalar(Count));
reduce(twoScalars, sumOp(), tag, communicator);
Value = twoScalars.x();
Count = twoScalars.y();
}
void Foam::reduce
(
scalar& Value,
const sumOp& bop,
const int tag,
const label communicator,
label& requestID
)
{
#ifdef MPIX_COMM_TYPE_SHARED
// Assume mpich2 with non-blocking collectives extensions. Once mpi3
// is available this will change.
MPI_Request request;
scalar v = Value;
MPIX_Ireduce
(
&v,
&Value,
1,
MPI_SCALAR,
MPI_SUM,
0, //root
PstreamGlobals::MPICommunicators_[communicator],
&request
);
requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
if (UPstream::debug)
{
Pout<< "UPstream::allocateRequest for non-blocking reduce"
<< " : request:" << requestID
<< endl;
}
#else
// Non-blocking not yet implemented in mpi
reduce(Value, bop, tag, communicator);
requestID = -1;
#endif
}
void Foam::UPstream::allToAll
(
const labelUList& sendData,
labelUList& recvData,
const label communicator
)
{
label np = nProcs(communicator);
if (sendData.size() != np || recvData.size() != np)
{
FatalErrorInFunction
<< "Size of sendData " << sendData.size()
<< " or size of recvData " << recvData.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
recvData.deepCopy(sendData);
}
else
{
profilingPstream::beginTiming();
if
(
MPI_Alltoall
(
// NOTE: const_cast is a temporary hack for
// backward-compatibility with versions of OpenMPI < 1.7.4
const_cast