Files
openfoam/src/Pstream/mpi/UPstream.C

771 lines
19 KiB
C

/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2012 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "mpi.h"
#include "UPstream.H"
#include "PstreamReduceOps.H"
#include "OSspecific.H"
#include "PstreamGlobals.H"
#include "SubList.H"
#include <cstring>
#include <cstdlib>
#include <csignal>
#if defined(WM_SP)
# define MPI_SCALAR MPI_FLOAT
#elif defined(WM_DP)
# define MPI_SCALAR MPI_DOUBLE
#endif
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
namespace Foam
{
typedef struct
{
scalar value;
label count;
} CountAndValue;
void reduceSum
(
void *in,
void *inOut,
int *len,
MPI_Datatype *dptr
)
{
CountAndValue* inPtr =
reinterpret_cast<CountAndValue*>(in);
CountAndValue* inOutPtr =
reinterpret_cast<CountAndValue*>(inOut);
for (int i=0; i< *len; ++i)
{
inOutPtr->value += inPtr->value;
inOutPtr->count += inPtr->count;
inPtr++;
inOutPtr++;
}
}
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// NOTE:
// valid parallel options vary between implementations, but flag common ones.
// if they are not removed by MPI_Init(), the subsequent argument processing
// will notice that they are wrong
void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
{
validParOptions.insert("np", "");
validParOptions.insert("p4pg", "PI file");
validParOptions.insert("p4wd", "directory");
validParOptions.insert("p4amslave", "");
validParOptions.insert("p4yourname", "hostname");
validParOptions.insert("GAMMANP", "number of instances");
validParOptions.insert("machinefile", "machine file");
}
bool Foam::UPstream::init(int& argc, char**& argv)
{
MPI_Init(&argc, &argv);
int numprocs;
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_);
if (debug)
{
Pout<< "UPstream::init : initialised with numProcs:" << numprocs
<< " myProcNo:" << myProcNo_ << endl;
}
if (numprocs <= 1)
{
FatalErrorIn("UPstream::init(int& argc, char**& argv)")
<< "bool IPstream::init(int& argc, char**& argv) : "
"attempt to run parallel on 1 processor"
<< Foam::abort(FatalError);
}
procIDs_.setSize(numprocs);
forAll(procIDs_, procNo)
{
procIDs_[procNo] = procNo;
}
setParRun();
# ifndef SGIMPI
string bufferSizeName = getEnv("MPI_BUFFER_SIZE");
if (bufferSizeName.size())
{
int bufferSize = atoi(bufferSizeName.c_str());
if (bufferSize)
{
MPI_Buffer_attach(new char[bufferSize], bufferSize);
}
}
else
{
FatalErrorIn("UPstream::init(int& argc, char**& argv)")
<< "UPstream::init(int& argc, char**& argv) : "
<< "environment variable MPI_BUFFER_SIZE not defined"
<< Foam::abort(FatalError);
}
# endif
int processorNameLen;
char processorName[MPI_MAX_PROCESSOR_NAME];
MPI_Get_processor_name(processorName, &processorNameLen);
//signal(SIGABRT, stop);
// Now that nprocs is known construct communication tables.
initCommunicationSchedule();
return true;
}
void Foam::UPstream::exit(int errnum)
{
if (debug)
{
Pout<< "UPstream::exit." << endl;
}
# ifndef SGIMPI
int size;
char* buff;
MPI_Buffer_detach(&buff, &size);
delete[] buff;
# endif
if (PstreamGlobals::outstandingRequests_.size())
{
label n = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.clear();
WarningIn("UPstream::exit(int)")
<< "There are still " << n << " outstanding MPI_Requests." << endl
<< "This means that your code exited before doing a"
<< " UPstream::waitRequests()." << endl
<< "This should not happen for a normal code exit."
<< endl;
}
if (errnum == 0)
{
MPI_Finalize();
::exit(errnum);
}
else
{
MPI_Abort(MPI_COMM_WORLD, errnum);
}
}
void Foam::UPstream::abort()
{
MPI_Abort(MPI_COMM_WORLD, 1);
}
void Foam::reduce(scalar& Value, const sumOp<scalar>& bop, const int tag)
{
if (Pstream::debug)
{
Pout<< "Foam::reduce : value:" << Value << endl;
}
if (!UPstream::parRun())
{
return;
}
if (UPstream::nProcs() <= UPstream::nProcsSimpleSum)
{
if (UPstream::master())
{
for
(
int slave=UPstream::firstSlave();
slave<=UPstream::lastSlave();
slave++
)
{
scalar value;
if
(
MPI_Recv
(
&value,
1,
MPI_SCALAR,
UPstream::procID(slave),
tag,
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
Value = bop(Value, value);
}
}
else
{
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
UPstream::procID(UPstream::masterNo()),
tag,
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
if (UPstream::master())
{
for
(
int slave=UPstream::firstSlave();
slave<=UPstream::lastSlave();
slave++
)
{
if
(
MPI_Send
(
&Value,
1,
MPI_SCALAR,
UPstream::procID(slave),
tag,
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
}
else
{
if
(
MPI_Recv
(
&Value,
1,
MPI_SCALAR,
UPstream::procID(UPstream::masterNo()),
tag,
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
}
}
else
{
scalar sum;
MPI_Allreduce(&Value, &sum, 1, MPI_SCALAR, MPI_SUM, MPI_COMM_WORLD);
Value = sum;
}
if (Pstream::debug)
{
Pout<< "Foam::reduce : reduced value:" << Value << endl;
}
}
void Foam::reduce(vector2D& Value, const sumOp<vector2D>& bop, const int tag)
{
if (Pstream::debug)
{
Pout<< "Foam::reduce : value:" << Value << endl;
}
if (!UPstream::parRun())
{
return;
}
if (UPstream::nProcs() <= UPstream::nProcsSimpleSum)
{
if (UPstream::master())
{
for
(
int slave=UPstream::firstSlave();
slave<=UPstream::lastSlave();
slave++
)
{
vector2D value;
if
(
MPI_Recv
(
&value,
2,
MPI_SCALAR,
UPstream::procID(slave),
tag,
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"reduce(vector2D& Value, const sumOp<vector2D>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
Value = bop(Value, value);
}
}
else
{
if
(
MPI_Send
(
&Value,
2,
MPI_SCALAR,
UPstream::procID(UPstream::masterNo()),
tag,
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(vector2D& Value, const sumOp<vector2D>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
if (UPstream::master())
{
for
(
int slave=UPstream::firstSlave();
slave<=UPstream::lastSlave();
slave++
)
{
if
(
MPI_Send
(
&Value,
2,
MPI_SCALAR,
UPstream::procID(slave),
tag,
MPI_COMM_WORLD
)
)
{
FatalErrorIn
(
"reduce(vector2D& Value, const sumOp<vector2D>& sumOp)"
) << "MPI_Send failed"
<< Foam::abort(FatalError);
}
}
}
else
{
if
(
MPI_Recv
(
&Value,
2,
MPI_SCALAR,
UPstream::procID(UPstream::masterNo()),
tag,
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"reduce(vector2D& Value, const sumOp<vector2D>& sumOp)"
) << "MPI_Recv failed"
<< Foam::abort(FatalError);
}
}
}
else
{
vector2D sum;
MPI_Allreduce(&Value, &sum, 2, MPI_SCALAR, MPI_SUM, MPI_COMM_WORLD);
Value = sum;
}
if (Pstream::debug)
{
Pout<< "Foam::reduce : reduced value:" << Value << endl;
}
}
void Foam::sumReduce
(
scalar& Value,
label& Count,
const int tag
)
{
static bool hasDataType_ = false;
static MPI_Datatype mesg_mpi_strct_;
static MPI_Op myOp_;
if (Pstream::debug)
{
Pout<< "Foam::sumReduce : value:" << Value
<< " count:" << Count << endl;
}
if (!UPstream::parRun())
{
return;
}
if (UPstream::nProcs() <= UPstream::nProcsSimpleSum)
{
reduce(Value, sumOp<scalar>(), tag);
reduce(Count, sumOp<label>(), tag);
}
else
{
CountAndValue in,out;
if (!hasDataType_)
{
int lengths[2];
lengths[0] = 1;
lengths[1] = 1;
MPI_Datatype types[2];
types[0] = MPI_DOUBLE;
types[1] = MPI_INT;
MPI_Aint addresses[2];
MPI_Address(&in.value, &addresses[0]);
MPI_Address(&in.count, &addresses[1]);
MPI_Aint offsets[2];
offsets[0] = 0;
offsets[1] = addresses[1]-addresses[0];
if
(
MPI_Type_create_struct
(
2,
lengths,
offsets,
types,
&mesg_mpi_strct_
)
)
{
FatalErrorIn("Foam::sumReduce()")
<< "MPI_Type_create_struct" << abort(FatalError);
}
if (MPI_Type_commit(&mesg_mpi_strct_))
{
FatalErrorIn("Foam::sumReduce()")
<< "MPI_Type_commit" << abort(FatalError);
}
if (MPI_Op_create(reduceSum, true, &myOp_))
{
FatalErrorIn("Foam::sumReduce()")
<< "MPI_Op_create" << abort(FatalError);
}
hasDataType_ = true;
}
in.value = Value;
in.count = Count;
if
(
MPI_Allreduce
(
&in,
&out,
1,
mesg_mpi_strct_,
myOp_,
MPI_COMM_WORLD
)
)
{
FatalErrorIn("Foam::sumReduce(..)")
<< "Problem." << abort(FatalError);
}
Value = out.value;
Count = out.count;
}
if (Pstream::debug)
{
Pout<< "Foam::reduce : reduced value:" << Value
<< " reduced count:" << Count << endl;
}
}
void Foam::reduce
(
scalar& Value,
const sumOp<scalar>& bop,
const int tag,
label& requestID
)
{
#ifdef MPIX_COMM_TYPE_SHARED
// Assume mpich2 with non-blocking collectives extensions. Once mpi3
// is available this will change.
MPI_Request request;
scalar v = Value;
MPIX_Ireduce
(
&v,
&Value,
1,
MPI_SCALAR,
MPI_SUM,
0, //root
MPI_COMM_WORLD,
&request
);
requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
#else
// Non-blocking not yet implemented in mpi
reduce(Value, bop, tag);
requestID = -1;
#endif
}
Foam::label Foam::UPstream::nRequests()
{
return PstreamGlobals::outstandingRequests_.size();
}
void Foam::UPstream::resetRequests(const label i)
{
if (i < PstreamGlobals::outstandingRequests_.size())
{
PstreamGlobals::outstandingRequests_.setSize(i);
}
}
void Foam::UPstream::waitRequests(const label start)
{
if (debug)
{
Pout<< "UPstream::waitRequests : starting wait for "
<< PstreamGlobals::outstandingRequests_.size()-start
<< " outstanding requests starting at " << start << endl;
}
if (PstreamGlobals::outstandingRequests_.size())
{
SubList<MPI_Request> waitRequests
(
PstreamGlobals::outstandingRequests_,
PstreamGlobals::outstandingRequests_.size() - start,
start
);
if
(
MPI_Waitall
(
waitRequests.size(),
waitRequests.begin(),
MPI_STATUSES_IGNORE
)
)
{
FatalErrorIn
(
"UPstream::waitRequests()"
) << "MPI_Waitall returned with error" << Foam::endl;
}
resetRequests(start);
}
if (debug)
{
Pout<< "UPstream::waitRequests : finished wait." << endl;
}
}
void Foam::UPstream::waitRequest(const label i)
{
if (debug)
{
Pout<< "UPstream::waitRequest : starting wait for request:" << i
<< endl;
}
if (i >= PstreamGlobals::outstandingRequests_.size())
{
FatalErrorIn
(
"UPstream::waitRequest(const label)"
) << "There are " << PstreamGlobals::outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
int flag;
if
(
MPI_Wait
(
&PstreamGlobals::outstandingRequests_[i],
MPI_STATUS_IGNORE
)
)
{
FatalErrorIn
(
"UPstream::waitRequest()"
) << "MPI_Wait returned with error" << Foam::endl;
}
if (debug)
{
Pout<< "UPstream::waitRequest : finished wait for request:" << i
<< endl;
}
}
bool Foam::UPstream::finishedRequest(const label i)
{
if (debug)
{
Pout<< "UPstream::waitRequests : checking finishedRequest:" << i
<< endl;
}
if (i >= PstreamGlobals::outstandingRequests_.size())
{
FatalErrorIn
(
"UPstream::finishedRequest(const label)"
) << "There are " << PstreamGlobals::outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
int flag;
MPI_Test
(
&PstreamGlobals::outstandingRequests_[i],
&flag,
MPI_STATUS_IGNORE
);
if (debug)
{
Pout<< "UPstream::waitRequests : finished finishedRequest:" << i
<< endl;
}
return flag != 0;
}
// ************************************************************************* //