ENH: communicators: initial version - extended Pstream API

This commit is contained in:
mattijs
2013-02-04 10:17:37 +00:00
parent e676f9472f
commit ea8d290191
56 changed files with 1250 additions and 395 deletions

View File

@ -150,45 +150,48 @@ int main(int argc, char *argv[])
top.append(i);
}
Pout<< "bottom:" << bottom << endl;
Pout<< "top:" << top << endl;
//Pout<< "bottom:" << bottom << endl;
Pout<< "top :" << top << endl;
scalar localValue = 111*UPstream::myProcNo(UPstream::worldComm);
Pout<< "localValue:" << localValue << endl;
Pout<< "localValue :" << localValue << endl;
if (Pstream::myProcNo(UPstream::worldComm) < n/2)
label comm = Pstream::allocateCommunicator
(
UPstream::worldComm,
top
);
Pout<< "allocated comm :" << comm << endl;
Pout<< "comm myproc :" << Pstream::myProcNo(comm)
<< endl;
if (Pstream::myProcNo(comm) != -1)
{
label comm = Pstream::allocateCommunicator
//scalar sum = sumReduce(comm, localValue);
//scalar sum = localValue;
//reduce
//(
// UPstream::treeCommunication(comm),
// sum,
// sumOp<scalar>(),
// Pstream::msgType(),
// comm
//);
scalar sum = returnReduce
(
UPstream::worldComm,
bottom
localValue,
sumOp<scalar>(),
Pstream::msgType(),
comm
);
Pout<< "allocated bottom comm:" << comm << endl;
Pout<< "comm myproc :" << Pstream::myProcNo(comm)
<< endl;
scalar sum = sumReduce(comm, localValue);
Pout<< "sum :" << sum << endl;
Pstream::freeCommunicator(comm);
Pout<< "sum :" << sum << endl;
}
else
{
label comm = Pstream::allocateCommunicator
(
UPstream::worldComm,
top
);
Pout<< "allocated top comm:" << comm << endl;
Pout<< "comm myproc :" << Pstream::myProcNo(comm)
<< endl;
scalar sum = sumReduce(comm, localValue);
Pout<< "sum :" << sum << endl;
Pstream::freeCommunicator(comm);
}
Pstream::freeCommunicator(comm);
Pout<< "End\n" << endl;

View File

@ -496,6 +496,7 @@ int main(int argc, char *argv[])
mesh.nFaces(), // start
patchI, // index
mesh.boundaryMesh(),// polyBoundaryMesh
Pstream::worldComm, // communicator
Pstream::myProcNo(),// myProcNo
nbrProcI // neighbProcNo
)

View File

@ -448,6 +448,7 @@ bool Foam::domainDecomposition::writeDecomposition()
curStart,
nPatches,
procMesh.boundaryMesh(),
Pstream::worldComm,
procI,
curNeighbourProcessors[procPatchI]
);
@ -475,6 +476,7 @@ bool Foam::domainDecomposition::writeDecomposition()
curStart,
nPatches,
procMesh.boundaryMesh(),
Pstream::worldComm,
procI,
curNeighbourProcessors[procPatchI],
referPatch,

View File

@ -32,7 +32,7 @@
#------------------------------------------------------------------------------
export WM_PROJECT=OpenFOAM
export WM_PROJECT_VERSION=dev
export WM_PROJECT_VERSION=dev.procAgglom
################################################################################
# USER EDITABLE PART: Changes made here may be lost with the next upgrade

View File

@ -31,7 +31,7 @@
#------------------------------------------------------------------------------
setenv WM_PROJECT OpenFOAM
setenv WM_PROJECT_VERSION dev
setenv WM_PROJECT_VERSION dev.procAgglom
################################################################################
# USER EDITABLE PART: Changes made here may be lost with the next upgrade

View File

@ -78,9 +78,10 @@ void Foam::IOdictionary::readFile(const bool masterOnly)
(
comms,
const_cast<word&>(headerClassName()),
Pstream::msgType()
Pstream::msgType(),
Pstream::worldComm
);
Pstream::scatter(comms, note(), Pstream::msgType());
Pstream::scatter(comms, note(), Pstream::msgType(), Pstream::worldComm);
// Get my communication order
const Pstream::commsStruct& myComm = comms[Pstream::myProcNo()];

View File

@ -33,6 +33,7 @@ Foam::IPstream::IPstream
const int fromProcNo,
const label bufSize,
const int tag,
const label comm,
streamFormat format,
versionNumber version
)
@ -45,6 +46,7 @@ Foam::IPstream::IPstream
buf_,
externalBufPosition_,
tag, // tag
comm,
false, // do not clear buf_ if at end
format,
version

View File

@ -69,6 +69,7 @@ public:
const int fromProcNo,
const label bufSize = 0,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
streamFormat format=BINARY,
versionNumber version=currentVersion
);

View File

@ -33,12 +33,13 @@ Foam::OPstream::OPstream
const int toProcNo,
const label bufSize,
const int tag,
const label comm,
streamFormat format,
versionNumber version
)
:
Pstream(commsType, bufSize),
UOPstream(commsType, toProcNo, buf_, tag, true, format, version)
UOPstream(commsType, toProcNo, buf_, tag, comm, true, format, version)
{}

View File

@ -66,6 +66,7 @@ public:
const int toProcNo,
const label bufSize = 0,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
streamFormat format=BINARY,
versionNumber version=currentVersion
);

View File

@ -98,7 +98,8 @@ public:
const List<commsStruct>& comms,
T& Value,
const BinaryOp& bop,
const int tag
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
@ -107,7 +108,8 @@ public:
(
T& Value,
const BinaryOp& bop,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
//- Scatter data. Distribute without modification. Reverse of gather
@ -116,12 +118,18 @@ public:
(
const List<commsStruct>& comms,
T& Value,
const int tag
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
template <class T>
static void scatter(T& Value, const int tag = Pstream::msgType());
static void scatter
(
T& Value,
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
// Combine variants. Inplace combine values from processors.
@ -133,7 +141,8 @@ public:
const List<commsStruct>& comms,
T& Value,
const CombineOp& cop,
const int tag
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
@ -142,7 +151,8 @@ public:
(
T& Value,
const CombineOp& cop,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
//- Scatter data. Reverse of combineGather
@ -151,7 +161,8 @@ public:
(
const List<commsStruct>& comms,
T& Value,
const int tag
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
@ -159,7 +170,8 @@ public:
static void combineScatter
(
T& Value,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
// Combine variants working on whole List at a time.
@ -170,7 +182,8 @@ public:
const List<commsStruct>& comms,
List<T>& Value,
const CombineOp& cop,
const int tag
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
@ -179,7 +192,8 @@ public:
(
List<T>& Value,
const CombineOp& cop,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
//- Scatter data. Reverse of combineGather
@ -188,7 +202,8 @@ public:
(
const List<commsStruct>& comms,
List<T>& Value,
const int tag
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
@ -196,7 +211,8 @@ public:
static void listCombineScatter
(
List<T>& Value,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
);
// Combine variants working on whole map at a time. Container needs to
@ -208,7 +224,8 @@ public:
const List<commsStruct>& comms,
Container& Values,
const CombineOp& cop,
const int tag
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
@ -217,7 +234,8 @@ public:
(
Container& Values,
const CombineOp& cop,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = UPstream::worldComm
);
//- Scatter data. Reverse of combineGather
@ -226,7 +244,8 @@ public:
(
const List<commsStruct>& comms,
Container& Values,
const int tag
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
@ -234,7 +253,8 @@ public:
static void mapCombineScatter
(
Container& Values,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = UPstream::worldComm
);
@ -249,7 +269,8 @@ public:
(
const List<commsStruct>& comms,
List<T>& Values,
const int tag
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
@ -257,7 +278,8 @@ public:
static void gatherList
(
List<T>& Values,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = UPstream::worldComm
);
//- Scatter data. Reverse of gatherList
@ -266,7 +288,8 @@ public:
(
const List<commsStruct>& comms,
List<T>& Values,
const int tag
const int tag,
const label comm
);
//- Like above but switches between linear/tree communication
@ -274,7 +297,8 @@ public:
static void scatterList
(
List<T>& Values,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = UPstream::worldComm
);
@ -291,6 +315,7 @@ public:
List<Container >&,
labelListList& sizes,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool block = true
);

View File

@ -40,17 +40,19 @@ Foam::PstreamBuffers::PstreamBuffers
(
const UPstream::commsTypes commsType,
const int tag,
const label comm,
IOstream::streamFormat format,
IOstream::versionNumber version
)
:
commsType_(commsType),
tag_(tag),
comm_(comm),
format_(format),
version_(version),
sendBuf_(UPstream::nProcs()),
recvBuf_(UPstream::nProcs()),
recvBufPos_(UPstream::nProcs(), 0),
sendBuf_(UPstream::nProcs(comm)),
recvBuf_(UPstream::nProcs(comm)),
recvBufPos_(UPstream::nProcs(comm), 0),
finishedSendsCalled_(false)
{}
@ -90,6 +92,7 @@ void Foam::PstreamBuffers::finishedSends(const bool block)
recvBuf_,
sizes,
tag_,
comm_,
block
);
}
@ -108,6 +111,7 @@ void Foam::PstreamBuffers::finishedSends(labelListList& sizes, const bool block)
recvBuf_,
sizes,
tag_,
comm_,
block
);
}
@ -123,9 +127,9 @@ void Foam::PstreamBuffers::finishedSends(labelListList& sizes, const bool block)
// Note: possible only if using different tag from write started
// by ~UOPstream. Needs some work.
//sizes.setSize(UPstream::nProcs());
//labelList& nsTransPs = sizes[UPstream::myProcNo()];
//nsTransPs.setSize(UPstream::nProcs());
//sizes.setSize(UPstream::nProcs(comm));
//labelList& nsTransPs = sizes[UPstream::myProcNo(comm)];
//nsTransPs.setSize(UPstream::nProcs(comm));
//
//forAll(sendBuf_, procI)
//{

View File

@ -95,6 +95,8 @@ class PstreamBuffers
const int tag_;
const label comm_;
const IOstream::streamFormat format_;
const IOstream::versionNumber version_;
@ -127,6 +129,7 @@ public:
(
const UPstream::commsTypes commsType,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
IOstream::streamFormat format=IOstream::BINARY,
IOstream::versionNumber version=IOstream::currentVersion
);

View File

@ -53,11 +53,12 @@ void combineReduce
const List<UPstream::commsStruct>& comms,
T& Value,
const CombineOp& cop,
const int tag
const int tag,
const label comm
)
{
Pstream::combineGather(comms, Value, cop, tag);
Pstream::combineScatter(comms, Value, tag);
Pstream::combineGather(comms, Value, cop, tag, comm);
Pstream::combineScatter(comms, Value, tag, comm);
}
@ -66,24 +67,45 @@ void combineReduce
(
T& Value,
const CombineOp& cop,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = Pstream::worldComm
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
Pstream::combineGather
(
UPstream::linearCommunication(),
UPstream::linearCommunication(comm),
Value,
cop,
tag
tag,
comm
);
Pstream::combineScatter
(
UPstream::linearCommunication(comm),
Value,
tag,
comm
);
Pstream::combineScatter(UPstream::linearCommunication(), Value, tag);
}
else
{
Pstream::combineGather(UPstream::treeCommunication(), Value, cop, tag);
Pstream::combineScatter(UPstream::treeCommunication(), Value, tag);
Pstream::combineGather
(
UPstream::treeCommunication(comm),
Value,
cop,
tag,
comm
);
Pstream::combineScatter
(
UPstream::treeCommunication(comm),
Value,
tag,
comm
);
}
}

View File

@ -44,11 +44,12 @@ void reduce
const List<UPstream::commsStruct>& comms,
T& Value,
const BinaryOp& bop,
const int tag
const int tag,
const label comm
)
{
Pstream::gather(comms, Value, bop, tag);
Pstream::scatter(comms, Value, tag);
Pstream::gather(comms, Value, bop, tag, comm);
Pstream::scatter(comms, Value, tag, comm);
}
@ -58,16 +59,17 @@ void reduce
(
T& Value,
const BinaryOp& bop,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = UPstream::worldComm
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
reduce(UPstream::linearCommunication(), Value, bop, tag);
reduce(UPstream::linearCommunication(comm), Value, bop, tag, comm);
}
else
{
reduce(UPstream::treeCommunication(), Value, bop, tag);
reduce(UPstream::treeCommunication(comm), Value, bop, tag, comm);
}
}
@ -78,18 +80,33 @@ T returnReduce
(
const T& Value,
const BinaryOp& bop,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = UPstream::worldComm
)
{
T WorkValue(Value);
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
reduce(UPstream::linearCommunication(), WorkValue, bop, tag);
reduce
(
UPstream::linearCommunication(comm),
WorkValue,
bop,
tag,
comm
);
}
else
{
reduce(UPstream::treeCommunication(), WorkValue, bop, tag);
reduce
(
UPstream::treeCommunication(comm),
WorkValue,
bop,
tag,
comm
);
}
return WorkValue;
@ -102,11 +119,12 @@ void sumReduce
(
T& Value,
label& Count,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = UPstream::worldComm
)
{
reduce(Value, sumOp<T>(), tag);
reduce(Count, sumOp<label>(), tag);
reduce(Value, sumOp<T>(), tag, comm);
reduce(Count, sumOp<label>(), tag, comm);
}
@ -117,10 +135,14 @@ void reduce
T& Value,
const BinaryOp& bop,
const int tag,
const label comm,
label& request
)
{
notImplemented("reduce(T&, const BinaryOp&, const int, label&");
notImplemented
(
"reduce(T&, const BinaryOp&, const int, const label, label&"
);
}
@ -129,28 +151,32 @@ void reduce
(
scalar& Value,
const sumOp<scalar>& bop,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = UPstream::worldComm
);
void reduce
(
scalar& Value,
const minOp<scalar>& bop,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = UPstream::worldComm
);
void reduce
(
vector2D& Value,
const sumOp<vector2D>& bop,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = UPstream::worldComm
);
void sumReduce
(
scalar& Value,
label& Count,
const int tag = Pstream::msgType()
const int tag = Pstream::msgType(),
const label comm = UPstream::worldComm
);
void reduce
@ -158,6 +184,7 @@ void reduce
scalar& Value,
const sumOp<scalar>& bop,
const int tag,
const label comm,
label& request
);

View File

@ -330,7 +330,9 @@ Foam::Istream& Foam::UIPstream::rewind()
void Foam::UIPstream::print(Ostream& os) const
{
os << "Reading from processor " << fromProcNo_
<< " to processor " << myProcNo() << Foam::endl;
<< " to processor " << myProcNo << " using communicator " << comm_
<< " and tag " << tag_
<< Foam::endl;
}

View File

@ -66,6 +66,8 @@ class UIPstream
const int tag_;
const label comm_;
const bool clearAtEnd_;
int messageSize_;
@ -97,6 +99,7 @@ public:
DynamicList<char>& externalBuf,
label& externalBufPosition,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool clearAtEnd = false, // destroy externalBuf if at end
streamFormat format=BINARY,
versionNumber version=currentVersion
@ -131,7 +134,8 @@ public:
const int fromProcNo,
char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType()
const int tag = UPstream::msgType(),
const label communicator = 0
);
//- Return next token from stream

View File

@ -91,6 +91,7 @@ Foam::UOPstream::UOPstream
const int toProcNo,
DynamicList<char>& sendBuf,
const int tag,
const label comm,
const bool sendAtDestruct,
streamFormat format,
versionNumber version
@ -101,6 +102,7 @@ Foam::UOPstream::UOPstream
toProcNo_(toProcNo),
sendBuf_(sendBuf),
tag_(tag),
comm_(comm),
sendAtDestruct_(sendAtDestruct)
{
setOpened();
@ -115,6 +117,7 @@ Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers)
toProcNo_(toProcNo),
sendBuf_(buffers.sendBuf_[toProcNo]),
tag_(buffers.tag_),
comm_(buffers.comm_),
sendAtDestruct_(buffers.commsType_ != UPstream::nonBlocking)
{
setOpened();
@ -136,7 +139,8 @@ Foam::UOPstream::~UOPstream()
toProcNo_,
sendBuf_.begin(),
sendBuf_.size(),
tag_
tag_,
comm_
)
)
{
@ -287,7 +291,8 @@ Foam::Ostream& Foam::UOPstream::write(const char* data, std::streamsize count)
void Foam::UOPstream::print(Ostream& os) const
{
os << "Writing from processor " << toProcNo_
<< " to processor " << myProcNo() << Foam::endl;
<< " to processor " << myProcNo() << " in communicator " << comm_
<< " and tag " << tag_ << Foam::endl;
}

View File

@ -65,6 +65,8 @@ class UOPstream
const int tag_;
const label comm_;
const bool sendAtDestruct_;
@ -93,6 +95,7 @@ public:
const int toProcNo,
DynamicList<char>& sendBuf,
const int tag = UPstream::msgType(),
const label comm = UPstream::worldComm,
const bool sendAtDestruct = true,
streamFormat format=BINARY,
versionNumber version=currentVersion
@ -126,7 +129,8 @@ public:
const int toProcNo,
const char* buf,
const std::streamsize bufSize,
const int tag = UPstream::msgType()
const int tag = UPstream::msgType(),
const label communicator = 0
);
//- Write next token to stream

View File

@ -54,18 +54,33 @@ const Foam::NamedEnum<Foam::UPstream::commsTypes, 3>
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::UPstream::setParRun()
void Foam::UPstream::setParRun(const label nProcs)
{
parRun_ = true;
Pout.prefix() = '[' + name(myProcNo()) + "] ";
Perr.prefix() = '[' + name(myProcNo()) + "] ";
// Redo worldComm communicator (this has been created at static
// initialisation time)
freeCommunicator(UPstream::worldComm);
label comm = allocateCommunicator(-1, identity(nProcs), true);
if (comm != UPstream::worldComm)
{
FatalErrorIn("UPstream::setParRun(const label)")
<< "problem : comm:" << comm
<< " UPstream::worldComm:" << UPstream::worldComm
<< Foam::exit(FatalError);
}
Pout.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] ";
Perr.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] ";
}
void Foam::UPstream::calcLinearComm(const label nProcs)
Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::calcLinearComm
(
const label nProcs
)
{
linearCommunication_.setSize(nProcs);
List<commsStruct> linearCommunication(nProcs);
// Master
labelList belowIDs(nProcs - 1);
@ -74,7 +89,7 @@ void Foam::UPstream::calcLinearComm(const label nProcs)
belowIDs[i] = i + 1;
}
linearCommunication_[0] = commsStruct
linearCommunication[0] = commsStruct
(
nProcs,
0,
@ -86,7 +101,7 @@ void Foam::UPstream::calcLinearComm(const label nProcs)
// Slaves. Have no below processors, only communicate up to master
for (label procID = 1; procID < nProcs; procID++)
{
linearCommunication_[procID] = commsStruct
linearCommunication[procID] = commsStruct
(
nProcs,
procID,
@ -95,6 +110,7 @@ void Foam::UPstream::calcLinearComm(const label nProcs)
labelList(0)
);
}
return linearCommunication;
}
@ -142,7 +158,10 @@ void Foam::UPstream::collectReceives
// 5 - 4
// 6 7 4
// 7 - 6
void Foam::UPstream::calcTreeComm(label nProcs)
Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::calcTreeComm
(
label nProcs
)
{
label nLevels = 1;
while ((1 << nLevels) < nProcs)
@ -188,11 +207,11 @@ void Foam::UPstream::calcTreeComm(label nProcs)
}
treeCommunication_.setSize(nProcs);
List<commsStruct> treeCommunication(nProcs);
for (label procID = 0; procID < nProcs; procID++)
{
treeCommunication_[procID] = commsStruct
treeCommunication[procID] = commsStruct
(
nProcs,
procID,
@ -201,37 +220,159 @@ void Foam::UPstream::calcTreeComm(label nProcs)
allReceives[procID].shrink()
);
}
return treeCommunication;
}
// Callback from UPstream::init() : initialize linear and tree communication
// schedules now that nProcs is known.
void Foam::UPstream::initCommunicationSchedule()
//// Callback from UPstream::init() : initialize linear and tree communication
//// schedules now that nProcs is known.
//void Foam::UPstream::initCommunicationSchedule()
//{
// calcLinearComm(nProcs());
// calcTreeComm(nProcs());
//}
Foam::label Foam::UPstream::allocateCommunicator
(
const label parentIndex,
const labelList& subRanks,
const bool doPstream
)
{
calcLinearComm(nProcs());
calcTreeComm(nProcs());
label index;
if (!freeComms_.empty())
{
index = freeComms_.pop();
}
else
{
// Extend storage
index = parentCommunicator_.size();
myProcNo_.append(-1);
procIDs_.append(List<int>(0));
parentCommunicator_.append(-1);
linearCommunication_.append(List<commsStruct>(0));
treeCommunication_.append(List<commsStruct>(0));
}
Pout<< "Communicators : Allocating communicator " << index << endl
<< " parent : " << parentIndex << endl
<< " procs : " << subRanks << endl
<< endl;
// Initialise; overwritten by allocatePstreamCommunicator
myProcNo_[index] = 0;
// Convert from label to int
procIDs_[index].setSize(subRanks.size());
forAll(procIDs_[index], i)
{
procIDs_[index][i] = subRanks[i];
}
parentCommunicator_[index] = parentIndex;
linearCommunication_[index] = calcLinearComm(procIDs_[index].size());
treeCommunication_[index] = calcTreeComm(procIDs_[index].size());
if (doPstream)
{
allocatePstreamCommunicator(parentIndex, index);
}
return index;
}
void Foam::UPstream::freeCommunicator
(
const label communicator,
const bool doPstream
)
{
Pout<< "Communicators : Freeing communicator " << communicator << endl
<< " parent : " << parentCommunicator_[communicator] << endl
<< " myProcNo : " << myProcNo_[communicator] << endl
<< endl;
if (doPstream)
{
freePstreamCommunicator(communicator);
}
myProcNo_[communicator] = -1;
//procIDs_[communicator].clear();
parentCommunicator_[communicator] = -1;
linearCommunication_[communicator].clear();
treeCommunication_[communicator].clear();
freeComms_.push(communicator);
}
void Foam::UPstream::freeCommunicators(const bool doPstream)
{
Pout<< "Communicators : Freeing all communicators" << endl
<< endl;
forAll(myProcNo_, communicator)
{
if (myProcNo_[communicator] != -1)
{
freeCommunicator(communicator, doPstream);
}
}
}
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// Initialise my process number to 0 (the master)
int Foam::UPstream::myProcNo_(0);
// By default this is not a parallel run
bool Foam::UPstream::parRun_(false);
//// Initialise my process number to 0 (the master)
//int Foam::UPstream::myProcNo_(0);
//
//// List of process IDs
//Foam::List<int> Foam::UPstream::procIDs_(label(1), 0);
// Free communicators
Foam::LIFOStack<Foam::label> Foam::UPstream::freeComms_;
// My processor number
Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
// List of process IDs
Foam::List<int> Foam::UPstream::procIDs_(label(1), 0);
Foam::DynamicList<Foam::List<int> > Foam::UPstream::procIDs_(10);
// Parent communicator
Foam::DynamicList<Foam::label> Foam::UPstream::parentCommunicator_(10);
// Standard transfer message type
int Foam::UPstream::msgType_(1);
//// Linear communication schedule
//Foam::List<Foam::UPstream::commsStruct>
// Foam::UPstream::linearCommunication_(0);
//// Multi level communication schedule
//Foam::List<Foam::UPstream::commsStruct>
// Foam::UPstream::treeCommunication_(0);
// Linear communication schedule
Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::linearCommunication_(0);
Foam::DynamicList<Foam::List<Foam::UPstream::commsStruct> >
Foam::UPstream::linearCommunication_(10);
// Multi level communication schedule
Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::treeCommunication_(0);
Foam::DynamicList<Foam::List<Foam::UPstream::commsStruct> >
Foam::UPstream::treeCommunication_(10);
// Allocate a serial communicator. This gets overwritten in parallel mode
// (by UPstream::setParRun())
Foam::UPstream::communicator serialComm(-1, Foam::labelList(1, 0), false);
// Should compact transfer be used in which floats replace doubles
// reducing the bandwidth requirement at the expense of some loss
@ -292,6 +433,10 @@ public:
addcommsTypeToOpt addcommsTypeToOpt_("commsType");
// Default communicator
Foam::label Foam::UPstream::worldComm(0);
// Number of polling cycles in processor updates
int Foam::UPstream::nPollProcInterfaces
(

View File

@ -29,7 +29,6 @@ Description
SourceFiles
UPstream.C
UPstreamsPrint.C
UPstreamCommsStruct.C
gatherScatter.C
combineGatherScatter.C
@ -45,6 +44,8 @@ SourceFiles
#include "HashTable.H"
#include "string.H"
#include "NamedEnum.H"
#include "ListOps.H"
#include "LIFOStack.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -180,26 +181,33 @@ private:
// Private data
static int myProcNo_;
static bool parRun_;
static List<int> procIDs_;
static int msgType_;
static List<commsStruct> linearCommunication_;
static List<commsStruct> treeCommunication_;
// static int myProcNo_;
// static List<int> procIDs_;
// static List<commsStruct> linearCommunication_;
// static List<commsStruct> treeCommunication_;
// Communicator specific data
static LIFOStack<label> freeComms_;
static DynamicList<int> myProcNo_;
static DynamicList<List<int> > procIDs_;
static DynamicList<label> parentCommunicator_;
static DynamicList<List<commsStruct> > linearCommunication_;
static DynamicList<List<commsStruct> > treeCommunication_;
// Private Member Functions
//- Set data for parallel running
static void setParRun();
static void setParRun(const label nProcs);
//- Calculate linear communication schedule
static void calcLinearComm(const label nProcs);
static List<commsStruct> calcLinearComm(const label nProcs);
//- Calculate tree communication schedule
static void calcTreeComm(const label nProcs);
static List<commsStruct> calcTreeComm(const label nProcs);
//- Helper function for tree communication schedule determination
// Collects all processorIDs below a processor
@ -210,9 +218,22 @@ private:
DynamicList<label>& allReceives
);
//- Initialize all communication schedules. Callback from
// UPstream::init()
static void initCommunicationSchedule();
//- Allocate a communicator with index
static void allocatePstreamCommunicator
(
const label parentIndex,
const label index
);
//- Free a communicator
static void freePstreamCommunicator
(
const label index
);
// //- Initialize all communication schedules. Callback from
// // UPstream::init()
// static void initCommunicationSchedule();
protected:
@ -245,6 +266,9 @@ public:
//- Number of polling cycles in processor updates
static int nPollProcInterfaces;
//- Default communicator (all processors)
static label worldComm;
// Constructors
//- Construct given optional buffer size
@ -256,6 +280,69 @@ public:
// Member functions
//- Allocate a new communicator
static label allocateCommunicator
(
const label parent,
const labelList& subRanks,
const bool doPstream = true
);
//- Free a previously allocated communicator
static void freeCommunicator
(
const label communicator,
const bool doPstream = true
);
//- Free all communicators
static void freeCommunicators(const bool doPstream);
//- Helper class for allocating/freeing communicators
class communicator
{
label comm_;
//- Disallow copy and assignment
communicator(const communicator&);
void operator=(const communicator&);
public:
communicator
(
const label parent,
const labelList& subRanks,
const bool doPstream
)
:
comm_(allocateCommunicator(parent, subRanks, doPstream))
{}
communicator(const label parent)
:
comm_
(
allocateCommunicator
(
parent,
identity(UPstream::nProcs(parent))
)
)
{}
~communicator()
{
freeCommunicator(comm_);
}
operator label() const
{
return comm_;
}
};
//- Add the valid option this type of communications library
// adds/requires on the command line
static void addValidParOptions(HashTable<string>& validParOptions);
@ -281,6 +368,14 @@ public:
//- Non-blocking comms: has request i finished?
static bool finishedRequest(const label i);
static int allocateTag(const char*);
static int allocateTag(const word&);
static void freeTag(const char*, const int tag);
static void freeTag(const word&, const int tag);
//- Is this a parallel run?
static bool& parRun()
@ -289,15 +384,9 @@ public:
}
//- Number of processes in parallel run
static label nProcs()
static label nProcs(const label communicator = 0)
{
return procIDs_.size();
}
//- Am I the master process
static bool master()
{
return myProcNo_ == 0;
return procIDs_[communicator].size();
}
//- Process index of the master
@ -306,23 +395,29 @@ public:
return 0;
}
//- Am I the master process
static bool master(const label communicator = 0)
{
return myProcNo_[communicator] == masterNo();
}
//- Number of this process (starting from masterNo() = 0)
static int myProcNo()
static int myProcNo(const label communicator = 0)
{
return myProcNo_;
return myProcNo_[communicator];
}
//- Process IDs
static const List<int>& procIDs()
{
return procIDs_;
}
//- Process ID of given process index
static int procID(int procNo)
{
return procIDs_[procNo];
}
// //- Process IDs
// static const List<int>& procIDs()
// {
// return procIDs_;
// }
//
// //- Process ID of given process index
// static int procID(int procNo)
// {
// return procIDs_[procNo];
// }
//- Process index of first slave
static int firstSlave()
@ -331,21 +426,27 @@ public:
}
//- Process index of last slave
static int lastSlave()
static int lastSlave(const label communicator = 0)
{
return nProcs() - 1;
return nProcs(communicator) - 1;
}
//- Communication schedule for linear all-to-master (proc 0)
static const List<commsStruct>& linearCommunication()
static const List<commsStruct>& linearCommunication
(
const label communicator = 0
)
{
return linearCommunication_;
return linearCommunication_[communicator];
}
//- Communication schedule for tree all-to-master (proc 0)
static const List<commsStruct>& treeCommunication()
static const List<commsStruct>& treeCommunication
(
const label communicator = 0
)
{
return treeCommunication_;
return treeCommunication_[communicator];
}
//- Message tag of standard messages

View File

@ -51,13 +51,14 @@ void Pstream::combineGather
const List<UPstream::commsStruct>& comms,
T& Value,
const CombineOp& cop,
const int tag
const int tag,
const label comm
)
{
if (UPstream::parRun())
{
// Get my communication order
const commsStruct& myComm = comms[UPstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours
forAll(myComm.below(), belowI)
@ -73,7 +74,8 @@ void Pstream::combineGather
belowID,
reinterpret_cast<char*>(&value),
sizeof(T),
tag
tag,
comm
);
if (debug & 2)
@ -86,7 +88,7 @@ void Pstream::combineGather
}
else
{
IPstream fromBelow(UPstream::scheduled, belowID, 0, tag);
IPstream fromBelow(UPstream::scheduled, belowID, 0, tag, comm);
T value(fromBelow);
if (debug & 2)
@ -116,12 +118,20 @@ void Pstream::combineGather
myComm.above(),
reinterpret_cast<const char*>(&Value),
sizeof(T),
tag
tag,
comm
);
}
else
{
OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag);
OPstream toAbove
(
UPstream::scheduled,
myComm.above(),
0,
tag,
comm
);
toAbove << Value;
}
}
@ -130,15 +140,35 @@ void Pstream::combineGather
template <class T, class CombineOp>
void Pstream::combineGather(T& Value, const CombineOp& cop, const int tag)
void Pstream::combineGather
(
T& Value,
const CombineOp& cop,
const int tag,
const label comm
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
combineGather(UPstream::linearCommunication(), Value, cop, tag);
combineGather
(
UPstream::linearCommunication(comm),
Value,
cop,
tag,
comm
);
}
else
{
combineGather(UPstream::treeCommunication(), Value, cop, tag);
combineGather
(
UPstream::treeCommunication(comm),
Value,
cop,
tag,
comm
);
}
}
@ -148,13 +178,14 @@ void Pstream::combineScatter
(
const List<UPstream::commsStruct>& comms,
T& Value,
const int tag
const int tag,
const label comm
)
{
if (UPstream::parRun())
{
// Get my communication order
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo()];
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Reveive from up
if (myComm.above() != -1)
@ -167,12 +198,20 @@ void Pstream::combineScatter
myComm.above(),
reinterpret_cast<char*>(&Value),
sizeof(T),
tag
tag,
comm
);
}
else
{
IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag);
IPstream fromAbove
(
UPstream::scheduled,
myComm.above(),
0,
tag,
comm
);
Value = T(fromAbove);
}
@ -201,12 +240,13 @@ void Pstream::combineScatter
belowID,
reinterpret_cast<const char*>(&Value),
sizeof(T),
tag
tag,
comm
);
}
else
{
OPstream toBelow(UPstream::scheduled, belowID, 0, tag);
OPstream toBelow(UPstream::scheduled, belowID, 0, tag, comm);
toBelow << Value;
}
}
@ -215,15 +255,20 @@ void Pstream::combineScatter
template <class T>
void Pstream::combineScatter(T& Value, const int tag)
void Pstream::combineScatter
(
T& Value,
const int tag,
const label comm
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
combineScatter(UPstream::linearCommunication(), Value, tag);
combineScatter(UPstream::linearCommunication(comm), Value, tag, comm);
}
else
{
combineScatter(UPstream::treeCommunication(), Value, tag);
combineScatter(UPstream::treeCommunication(comm), Value, tag, comm);
}
}
@ -238,13 +283,14 @@ void Pstream::listCombineGather
const List<UPstream::commsStruct>& comms,
List<T>& Values,
const CombineOp& cop,
const int tag
const int tag,
const label comm
)
{
if (UPstream::parRun())
{
// Get my communication order
const commsStruct& myComm = comms[UPstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours
forAll(myComm.below(), belowI)
@ -261,7 +307,8 @@ void Pstream::listCombineGather
belowID,
reinterpret_cast<char*>(receivedValues.begin()),
receivedValues.byteSize(),
tag
tag,
comm
);
if (debug & 2)
@ -277,7 +324,7 @@ void Pstream::listCombineGather
}
else
{
IPstream fromBelow(UPstream::scheduled, belowID, 0, tag);
IPstream fromBelow(UPstream::scheduled, belowID, 0, tag, comm);
List<T> receivedValues(fromBelow);
if (debug & 2)
@ -310,12 +357,20 @@ void Pstream::listCombineGather
myComm.above(),
reinterpret_cast<const char*>(Values.begin()),
Values.byteSize(),
tag
tag,
comm
);
}
else
{
OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag);
OPstream toAbove
(
UPstream::scheduled,
myComm.above(),
0,
tag,
comm
);
toAbove << Values;
}
}
@ -328,16 +383,31 @@ void Pstream::listCombineGather
(
List<T>& Values,
const CombineOp& cop,
const int tag
const int tag,
const label comm
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
listCombineGather(UPstream::linearCommunication(), Values, cop, tag);
listCombineGather
(
UPstream::linearCommunication(comm),
Values,
cop,
tag,
comm
);
}
else
{
listCombineGather(UPstream::treeCommunication(), Values, cop, tag);
listCombineGather
(
UPstream::treeCommunication(comm),
Values,
cop,
tag,
comm
);
}
}
@ -347,13 +417,14 @@ void Pstream::listCombineScatter
(
const List<UPstream::commsStruct>& comms,
List<T>& Values,
const int tag
const int tag,
const label comm
)
{
if (UPstream::parRun())
{
// Get my communication order
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo()];
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Reveive from up
if (myComm.above() != -1)
@ -366,12 +437,20 @@ void Pstream::listCombineScatter
myComm.above(),
reinterpret_cast<char*>(Values.begin()),
Values.byteSize(),
tag
tag,
comm
);
}
else
{
IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag);
IPstream fromAbove
(
UPstream::scheduled,
myComm.above(),
0,
tag,
comm
);
fromAbove >> Values;
}
@ -400,12 +479,13 @@ void Pstream::listCombineScatter
belowID,
reinterpret_cast<const char*>(Values.begin()),
Values.byteSize(),
tag
tag,
comm
);
}
else
{
OPstream toBelow(UPstream::scheduled, belowID, 0, tag);
OPstream toBelow(UPstream::scheduled, belowID, 0, tag, comm);
toBelow << Values;
}
}
@ -414,15 +494,32 @@ void Pstream::listCombineScatter
template <class T>
void Pstream::listCombineScatter(List<T>& Values, const int tag)
void Pstream::listCombineScatter
(
List<T>& Values,
const int tag,
const label comm
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
listCombineScatter(UPstream::linearCommunication(), Values, tag);
listCombineScatter
(
UPstream::linearCommunication(comm),
Values,
tag,
comm
);
}
else
{
listCombineScatter(UPstream::treeCommunication(), Values, tag);
listCombineScatter
(
UPstream::treeCommunication(comm),
Values,
tag,
comm
);
}
}
@ -439,20 +536,21 @@ void Pstream::mapCombineGather
const List<UPstream::commsStruct>& comms,
Container& Values,
const CombineOp& cop,
const int tag
const int tag,
const label comm
)
{
if (UPstream::parRun())
{
// Get my communication order
const commsStruct& myComm = comms[UPstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours
forAll(myComm.below(), belowI)
{
label belowID = myComm.below()[belowI];
IPstream fromBelow(UPstream::scheduled, belowID, 0, tag);
IPstream fromBelow(UPstream::scheduled, belowID, 0, tag, comm);
Container receivedValues(fromBelow);
if (debug & 2)
@ -492,7 +590,7 @@ void Pstream::mapCombineGather
<< " data:" << Values << endl;
}
OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag);
OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag, comm);
toAbove << Values;
}
}
@ -504,16 +602,31 @@ void Pstream::mapCombineGather
(
Container& Values,
const CombineOp& cop,
const int tag
const int tag,
const label comm
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
mapCombineGather(UPstream::linearCommunication(), Values, cop, tag);
mapCombineGather
(
UPstream::linearCommunication(comm),
Values,
cop,
tag,
comm
);
}
else
{
mapCombineGather(UPstream::treeCommunication(), Values, cop, tag);
mapCombineGather
(
UPstream::treeCommunication(comm),
Values,
cop,
tag,
comm
);
}
}
@ -523,18 +636,26 @@ void Pstream::mapCombineScatter
(
const List<UPstream::commsStruct>& comms,
Container& Values,
const int tag
const int tag,
const label comm
)
{
if (UPstream::parRun())
{
// Get my communication order
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo()];
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Reveive from up
if (myComm.above() != -1)
{
IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag);
IPstream fromAbove
(
UPstream::scheduled,
myComm.above(),
0,
tag,
comm
);
fromAbove >> Values;
if (debug & 2)
@ -554,7 +675,7 @@ void Pstream::mapCombineScatter
Pout<< " sending to " << belowID << " data:" << Values << endl;
}
OPstream toBelow(UPstream::scheduled, belowID, 0, tag);
OPstream toBelow(UPstream::scheduled, belowID, 0, tag, comm);
toBelow << Values;
}
}
@ -562,15 +683,32 @@ void Pstream::mapCombineScatter
template <class Container>
void Pstream::mapCombineScatter(Container& Values, const int tag)
void Pstream::mapCombineScatter
(
Container& Values,
const int tag,
const label comm
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
mapCombineScatter(UPstream::linearCommunication(), Values, tag);
mapCombineScatter
(
UPstream::linearCommunication(comm),
Values,
tag,
comm
);
}
else
{
mapCombineScatter(UPstream::treeCommunication(), Values, tag);
mapCombineScatter
(
UPstream::treeCommunication(comm),
Values,
tag,
comm
);
}
}

View File

@ -46,6 +46,7 @@ void Pstream::exchange
List<Container>& recvBufs,
labelListList& sizes,
const int tag,
const label comm,
const bool block
)
{
@ -57,20 +58,20 @@ void Pstream::exchange
) << "Continuous data only." << Foam::abort(FatalError);
}
if (sendBufs.size() != UPstream::nProcs())
if (sendBufs.size() != UPstream::nProcs(comm))
{
FatalErrorIn
(
"Pstream::exchange(..)"
) << "Size of list:" << sendBufs.size()
<< " does not equal the number of processors:"
<< UPstream::nProcs()
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
sizes.setSize(UPstream::nProcs());
labelList& nsTransPs = sizes[UPstream::myProcNo()];
nsTransPs.setSize(UPstream::nProcs());
sizes.setSize(UPstream::nProcs(comm));
labelList& nsTransPs = sizes[UPstream::myProcNo(comm)];
nsTransPs.setSize(UPstream::nProcs(comm));
forAll(sendBufs, procI)
{
@ -78,7 +79,7 @@ void Pstream::exchange
}
// Send sizes across. Note: blocks.
combineReduce(sizes, UPstream::listEq(), tag);
combineReduce(sizes, UPstream::listEq(), tag, comm);
if (Pstream::parRun())
{
@ -90,9 +91,9 @@ void Pstream::exchange
recvBufs.setSize(sendBufs.size());
forAll(sizes, procI)
{
label nRecv = sizes[procI][UPstream::myProcNo()];
label nRecv = sizes[procI][UPstream::myProcNo(comm)];
if (procI != Pstream::myProcNo() && nRecv > 0)
if (procI != Pstream::myProcNo(comm) && nRecv > 0)
{
recvBufs[procI].setSize(nRecv);
UIPstream::read
@ -101,7 +102,8 @@ void Pstream::exchange
procI,
reinterpret_cast<char*>(recvBufs[procI].begin()),
nRecv*sizeof(T),
tag
tag,
comm
);
}
}
@ -112,7 +114,7 @@ void Pstream::exchange
forAll(sendBufs, procI)
{
if (procI != Pstream::myProcNo() && sendBufs[procI].size() > 0)
if (procI != Pstream::myProcNo(comm) && sendBufs[procI].size() > 0)
{
if
(
@ -122,7 +124,8 @@ void Pstream::exchange
procI,
reinterpret_cast<const char*>(sendBufs[procI].begin()),
sendBufs[procI].size()*sizeof(T),
tag
tag,
comm
)
)
{
@ -146,7 +149,7 @@ void Pstream::exchange
}
// Do myself
recvBufs[Pstream::myProcNo()] = sendBufs[Pstream::myProcNo()];
recvBufs[Pstream::myProcNo(comm)] = sendBufs[Pstream::myProcNo(comm)];
}

View File

@ -48,13 +48,14 @@ void Pstream::gather
const List<UPstream::commsStruct>& comms,
T& Value,
const BinaryOp& bop,
const int tag
const int tag,
const label comm
)
{
if (UPstream::parRun())
{
// Get my communication order
const commsStruct& myComm = comms[UPstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours
forAll(myComm.below(), belowI)
@ -69,7 +70,8 @@ void Pstream::gather
myComm.below()[belowI],
reinterpret_cast<char*>(&value),
sizeof(T),
tag
tag,
comm
);
}
else
@ -79,7 +81,8 @@ void Pstream::gather
UPstream::scheduled,
myComm.below()[belowI],
0,
tag
tag,
comm
);
fromBelow >> value;
}
@ -98,12 +101,20 @@ void Pstream::gather
myComm.above(),
reinterpret_cast<const char*>(&Value),
sizeof(T),
tag
tag,
comm
);
}
else
{
OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag);
OPstream toAbove
(
UPstream::scheduled,
myComm.above(),
0,
tag,
comm
);
toAbove << Value;
}
}
@ -112,15 +123,21 @@ void Pstream::gather
template <class T, class BinaryOp>
void Pstream::gather(T& Value, const BinaryOp& bop, const int tag)
void Pstream::gather
(
T& Value,
const BinaryOp& bop,
const int tag,
const label comm
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
gather(UPstream::linearCommunication(), Value, bop, tag);
gather(UPstream::linearCommunication(comm), Value, bop, tag, comm);
}
else
{
gather(UPstream::treeCommunication(), Value, bop, tag);
gather(UPstream::treeCommunication(comm), Value, bop, tag, comm);
}
}
@ -130,13 +147,14 @@ void Pstream::scatter
(
const List<UPstream::commsStruct>& comms,
T& Value,
const int tag
const int tag,
const label comm
)
{
if (UPstream::parRun())
{
// Get my communication order
const commsStruct& myComm = comms[UPstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Reveive from up
if (myComm.above() != -1)
@ -149,12 +167,20 @@ void Pstream::scatter
myComm.above(),
reinterpret_cast<char*>(&Value),
sizeof(T),
tag
tag,
comm
);
}
else
{
IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag);
IPstream fromAbove
(
UPstream::scheduled,
myComm.above(),
0,
tag,
comm
);
fromAbove >> Value;
}
}
@ -170,7 +196,8 @@ void Pstream::scatter
myComm.below()[belowI],
reinterpret_cast<const char*>(&Value),
sizeof(T),
tag
tag,
comm
);
}
else
@ -180,7 +207,8 @@ void Pstream::scatter
UPstream::scheduled,
myComm.below()[belowI],
0,
tag
tag,
comm
);
toBelow << Value;
}
@ -190,15 +218,15 @@ void Pstream::scatter
template <class T>
void Pstream::scatter(T& Value, const int tag)
void Pstream::scatter(T& Value, const int tag, const label comm)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
scatter(UPstream::linearCommunication(), Value, tag);
scatter(UPstream::linearCommunication(comm), Value, tag, comm);
}
else
{
scatter(UPstream::treeCommunication(), Value, tag);
scatter(UPstream::treeCommunication(comm), Value, tag, comm);
}
}

View File

@ -26,7 +26,7 @@ Description
communication schedule (usually linear-to-master or tree-to-master).
The gathered data will be a list with element procID the data from processor
procID. Before calling every processor should insert its value into
Values[UPstream::myProcNo()].
Values[UPstream::myProcNo(comm)].
Note: after gather every processor only knows its own data and that of the
processors below it. Only the 'master' of the communication schedule holds
a fully filled List. Use scatter to distribute the data.
@ -49,12 +49,13 @@ void Pstream::gatherList
(
const List<UPstream::commsStruct>& comms,
List<T>& Values,
const int tag
const int tag,
const label comm
)
{
if (UPstream::parRun())
{
if (Values.size() != UPstream::nProcs())
if (Values.size() != UPstream::nProcs(comm))
{
FatalErrorIn
(
@ -62,12 +63,12 @@ void Pstream::gatherList
", List<T>)"
) << "Size of list:" << Values.size()
<< " does not equal the number of processors:"
<< UPstream::nProcs()
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
// Get my communication order
const commsStruct& myComm = comms[UPstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Receive from my downstairs neighbours
forAll(myComm.below(), belowI)
@ -85,7 +86,8 @@ void Pstream::gatherList
belowID,
reinterpret_cast<char*>(receivedValues.begin()),
receivedValues.byteSize(),
tag
tag,
comm
);
Values[belowID] = receivedValues[0];
@ -97,7 +99,7 @@ void Pstream::gatherList
}
else
{
IPstream fromBelow(UPstream::scheduled, belowID, 0, tag);
IPstream fromBelow(UPstream::scheduled, belowID, 0, tag, comm);
fromBelow >> Values[belowID];
if (debug & 2)
@ -133,14 +135,14 @@ void Pstream::gatherList
if (debug & 2)
{
Pout<< " sending to " << myComm.above()
<< " data from me:" << UPstream::myProcNo()
<< " data:" << Values[UPstream::myProcNo()] << endl;
<< " data from me:" << UPstream::myProcNo(comm)
<< " data:" << Values[UPstream::myProcNo(comm)] << endl;
}
if (contiguous<T>())
{
List<T> sendingValues(belowLeaves.size() + 1);
sendingValues[0] = Values[UPstream::myProcNo()];
sendingValues[0] = Values[UPstream::myProcNo(comm)];
forAll(belowLeaves, leafI)
{
@ -153,13 +155,21 @@ void Pstream::gatherList
myComm.above(),
reinterpret_cast<const char*>(sendingValues.begin()),
sendingValues.byteSize(),
tag
tag,
comm
);
}
else
{
OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag);
toAbove << Values[UPstream::myProcNo()];
OPstream toAbove
(
UPstream::scheduled,
myComm.above(),
0,
tag,
comm
);
toAbove << Values[UPstream::myProcNo(comm)];
forAll(belowLeaves, leafI)
{
@ -180,15 +190,15 @@ void Pstream::gatherList
template <class T>
void Pstream::gatherList(List<T>& Values, const int tag)
void Pstream::gatherList(List<T>& Values, const int tag, const label comm)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
gatherList(UPstream::linearCommunication(), Values, tag);
gatherList(UPstream::linearCommunication(comm), Values, tag, comm);
}
else
{
gatherList(UPstream::treeCommunication(), Values, tag);
gatherList(UPstream::treeCommunication(comm), Values, tag, comm);
}
}
@ -198,12 +208,13 @@ void Pstream::scatterList
(
const List<UPstream::commsStruct>& comms,
List<T>& Values,
const int tag
const int tag,
const label comm
)
{
if (UPstream::parRun())
{
if (Values.size() != UPstream::nProcs())
if (Values.size() != UPstream::nProcs(comm))
{
FatalErrorIn
(
@ -211,12 +222,12 @@ void Pstream::scatterList
", List<T>)"
) << "Size of list:" << Values.size()
<< " does not equal the number of processors:"
<< UPstream::nProcs()
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
// Get my communication order
const commsStruct& myComm = comms[UPstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo(comm)];
// Reveive from up
if (myComm.above() != -1)
@ -233,7 +244,8 @@ void Pstream::scatterList
myComm.above(),
reinterpret_cast<char*>(receivedValues.begin()),
receivedValues.byteSize(),
tag
tag,
comm
);
forAll(notBelowLeaves, leafI)
@ -243,7 +255,14 @@ void Pstream::scatterList
}
else
{
IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag);
IPstream fromAbove
(
UPstream::scheduled,
myComm.above(),
0,
tag,
comm
);
forAll(notBelowLeaves, leafI)
{
@ -281,12 +300,13 @@ void Pstream::scatterList
belowID,
reinterpret_cast<const char*>(sendingValues.begin()),
sendingValues.byteSize(),
tag
tag,
comm
);
}
else
{
OPstream toBelow(UPstream::scheduled, belowID, 0, tag);
OPstream toBelow(UPstream::scheduled, belowID, 0, tag, comm);
// Send data destined for all other processors below belowID
forAll(notBelowLeaves, leafI)
@ -308,15 +328,15 @@ void Pstream::scatterList
template <class T>
void Pstream::scatterList(List<T>& Values, const int tag)
void Pstream::scatterList(List<T>& Values, const int tag, const label comm)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum)
{
scatterList(UPstream::linearCommunication(), Values, tag);
scatterList(UPstream::linearCommunication(comm), Values, tag, comm);
}
else
{
scatterList(UPstream::treeCommunication(), Values, tag);
scatterList(UPstream::treeCommunication(comm), Values, tag, comm);
}
}

View File

@ -215,9 +215,10 @@ bool Foam::regIOobject::read()
(
comms,
const_cast<word&>(headerClassName()),
Pstream::msgType()
Pstream::msgType(),
Pstream::worldComm
);
Pstream::scatter(comms, note(), Pstream::msgType());
Pstream::scatter(comms, note(), Pstream::msgType(), Pstream::worldComm);
// Get my communication order

View File

@ -40,7 +40,8 @@ Foam::procLduInterface::procLduInterface
coeffs_(coeffs),
myProcNo_(-1),
neighbProcNo_(-1),
tag_(-1)
tag_(-1),
comm_(-1)
{
if (isA<processorLduInterface>(interface.interface()))
{
@ -50,6 +51,7 @@ Foam::procLduInterface::procLduInterface
myProcNo_ = pldui.myProcNo();
neighbProcNo_ = pldui.neighbProcNo();
tag_ = pldui.tag();
comm_ = pldui.comm();
}
else if (isA<cyclicLduInterface>(interface.interface()))
{
@ -73,7 +75,8 @@ Foam::procLduInterface::procLduInterface(Istream& is)
coeffs_(is),
myProcNo_(readLabel(is)),
neighbProcNo_(readLabel(is)),
tag_(readLabel(is))
tag_(readLabel(is)),
comm_(readLabel(is))
{}
@ -85,7 +88,8 @@ Foam::Ostream& Foam::operator<<(Ostream& os, const procLduInterface& cldui)
<< cldui.coeffs_
<< cldui.myProcNo_
<< cldui.neighbProcNo_
<< cldui.tag_;
<< cldui.tag_
<< cldui.comm_;
return os;
}

View File

@ -25,7 +25,7 @@ Class
Foam::procLduInterface
Description
Foam::procLduInterface
IO interface for processorLduInterface
SourceFiles
procLduInterface.C
@ -58,6 +58,7 @@ class procLduInterface
label myProcNo_;
label neighbProcNo_;
label tag_;
label comm_;
// Private Member Functions

View File

@ -84,10 +84,13 @@ public:
// Access
//- Return processor number
//- Return communicator used for sending
virtual int comm() const = 0;
//- Return processor number (rank in communicator)
virtual int myProcNo() const = 0;
//- Return neigbour processor number
//- Return neigbour processor number (rank in communicator)
virtual int neighbProcNo() const = 0;
//- Return face transformation tensor

View File

@ -46,7 +46,8 @@ void Foam::processorLduInterface::send
neighbProcNo(),
reinterpret_cast<const char*>(f.begin()),
nBytes,
tag()
tag(),
comm()
);
}
else if (commsType == Pstream::nonBlocking)
@ -59,7 +60,8 @@ void Foam::processorLduInterface::send
neighbProcNo(),
receiveBuf_.begin(),
nBytes,
tag()
tag(),
comm()
);
resizeBuf(sendBuf_, nBytes);
@ -71,7 +73,8 @@ void Foam::processorLduInterface::send
neighbProcNo(),
sendBuf_.begin(),
nBytes,
tag()
tag(),
comm()
);
}
else
@ -98,7 +101,8 @@ void Foam::processorLduInterface::receive
neighbProcNo(),
reinterpret_cast<char*>(f.begin()),
f.byteSize(),
tag()
tag(),
comm()
);
}
else if (commsType == Pstream::nonBlocking)
@ -162,7 +166,8 @@ void Foam::processorLduInterface::compressedSend
neighbProcNo(),
sendBuf_.begin(),
nBytes,
tag()
tag(),
comm()
);
}
else if (commsType == Pstream::nonBlocking)
@ -175,7 +180,8 @@ void Foam::processorLduInterface::compressedSend
neighbProcNo(),
receiveBuf_.begin(),
nBytes,
tag()
tag(),
comm()
);
OPstream::write
@ -184,7 +190,8 @@ void Foam::processorLduInterface::compressedSend
neighbProcNo(),
sendBuf_.begin(),
nBytes,
tag()
tag(),
comm()
);
}
else
@ -225,7 +232,8 @@ void Foam::processorLduInterface::compressedReceive
neighbProcNo(),
receiveBuf_.begin(),
nBytes,
tag()
tag(),
comm()
);
}
else if (commsType != Pstream::nonBlocking)

View File

@ -71,6 +71,9 @@ public:
// Access
//- Return communicator used for comms
virtual int comm() const = 0;
//- Return processor number
virtual int myProcNo() const = 0;

View File

@ -146,6 +146,12 @@ public:
//- Processor interface functions
//- Return communicator used for comms
virtual int comm() const
{
return procInterface_.comm();
}
//- Return processor number
virtual int myProcNo() const
{

View File

@ -55,7 +55,7 @@ class processorGAMGInterface
{
// Private data
//- Reference tor the processorLduInterface from which this is
//- Reference for the processorLduInterface from which this is
// agglomerated
const processorLduInterface& fineProcInterface_;
@ -137,6 +137,12 @@ public:
{
return fineProcInterface_.tag();
}
//- Return communicator used for sending
virtual int comm() const
{
return fineProcInterface_.comm();
}
};

View File

@ -34,6 +34,7 @@ License
template<class Patch, class ProcPatch>
Foam::labelList Foam::ProcessorTopology<Patch, ProcPatch>::procNeighbours
(
const label nProcs,
const PtrList<Patch>& patches
)
{
@ -43,7 +44,7 @@ Foam::labelList Foam::ProcessorTopology<Patch, ProcPatch>::procNeighbours
label maxNb = 0;
boolList isNeighbourProc(Pstream::nProcs(), false);
boolList isNeighbourProc(nProcs, false);
forAll(patches, patchi)
{
@ -106,20 +107,21 @@ Foam::labelList Foam::ProcessorTopology<Patch, ProcPatch>::procNeighbours
template<class Patch, class ProcPatch>
Foam::ProcessorTopology<Patch, ProcPatch>::ProcessorTopology
(
const PtrList<Patch>& patches
const PtrList<Patch>& patches,
const label comm
)
:
labelListList(Pstream::nProcs()),
labelListList(Pstream::nProcs(comm)),
patchSchedule_(2*patches.size())
{
if (Pstream::parRun())
{
// Fill my 'slot' with my neighbours
operator[](Pstream::myProcNo()) = procNeighbours(patches);
operator[](Pstream::myProcNo()) = procNeighbours(this->size(), patches);
// Distribute to all processors
Pstream::gatherList(*this);
Pstream::scatterList(*this);
Pstream::gatherList(*this, Pstream::msgType(), comm);
Pstream::scatterList(*this, Pstream::msgType(), comm);
}
if (Pstream::parRun() && Pstream::defaultCommsType == Pstream::scheduled)
@ -172,7 +174,7 @@ Foam::ProcessorTopology<Patch, ProcPatch>::ProcessorTopology
(
commSchedule
(
Pstream::nProcs(),
Pstream::nProcs(comm),
comms
).procSchedule()[Pstream::myProcNo()]
);

View File

@ -76,14 +76,14 @@ private:
//- Return all neighbouring processors of this processor. Set
// procPatchMap_.
labelList procNeighbours(const PtrList<Patch>&);
labelList procNeighbours(const label nProcs, const PtrList<Patch>&);
public:
// Constructors
//- Construct from boundaryMesh
ProcessorTopology(const PtrList<Patch>& patches);
ProcessorTopology(const PtrList<Patch>& patches, const label comm);
// Member Functions

View File

@ -1740,7 +1740,7 @@ void Foam::globalMeshData::calcGlobalCoPointSlaves() const
// Construct from polyMesh
Foam::globalMeshData::globalMeshData(const polyMesh& mesh)
:
processorTopology(mesh.boundaryMesh()),
processorTopology(mesh.boundaryMesh(), UPstream::worldComm),
mesh_(mesh),
nTotalPoints_(-1),
nTotalFaces_(-1),

View File

@ -548,7 +548,8 @@ void Foam::mapDistribute::exchangeAddressing
wantedRemoteElements,
subMap_,
sendSizes,
tag
tag,
Pstream::worldComm //TBD
);
// Renumber elements
@ -627,7 +628,8 @@ void Foam::mapDistribute::exchangeAddressing
wantedRemoteElements,
subMap_,
sendSizes,
tag
tag,
Pstream::worldComm //TBD
);
// Renumber elements

View File

@ -54,12 +54,14 @@ Foam::processorPolyPatch::processorPolyPatch
const label start,
const label index,
const polyBoundaryMesh& bm,
const label comm,
const int myProcNo,
const int neighbProcNo,
const transformType transform
)
:
coupledPolyPatch(name, size, start, index, bm, typeName, transform),
comm_(comm),
myProcNo_(myProcNo),
neighbProcNo_(neighbProcNo),
neighbFaceCentres_(),
@ -78,6 +80,10 @@ Foam::processorPolyPatch::processorPolyPatch
)
:
coupledPolyPatch(name, dict, index, bm, patchType),
comm_
(
dict.lookupOrDefault("communicator", UPstream::worldComm)
),
myProcNo_(readLabel(dict.lookup("myProcNo"))),
neighbProcNo_(readLabel(dict.lookup("neighbProcNo"))),
neighbFaceCentres_(),
@ -93,6 +99,7 @@ Foam::processorPolyPatch::processorPolyPatch
)
:
coupledPolyPatch(pp, bm),
comm_(pp.comm_),
myProcNo_(pp.myProcNo_),
neighbProcNo_(pp.neighbProcNo_),
neighbFaceCentres_(),
@ -111,6 +118,7 @@ Foam::processorPolyPatch::processorPolyPatch
)
:
coupledPolyPatch(pp, bm, index, newSize, newStart),
comm_(pp.comm_),
myProcNo_(pp.myProcNo_),
neighbProcNo_(pp.neighbProcNo_),
neighbFaceCentres_(),
@ -129,6 +137,7 @@ Foam::processorPolyPatch::processorPolyPatch
)
:
coupledPolyPatch(pp, bm, index, mapAddressing, newStart),
comm_(pp.comm_),
myProcNo_(pp.myProcNo_),
neighbProcNo_(pp.neighbProcNo_),
neighbFaceCentres_(),
@ -1082,6 +1091,11 @@ bool Foam::processorPolyPatch::order
void Foam::processorPolyPatch::write(Ostream& os) const
{
coupledPolyPatch::write(os);
if (comm_ != UPstream::worldComm)
{
os.writeKeyword("communicator") << comm_
<< token::END_STATEMENT << nl;
}
os.writeKeyword("myProcNo") << myProcNo_
<< token::END_STATEMENT << nl;
os.writeKeyword("neighbProcNo") << neighbProcNo_

View File

@ -58,6 +58,9 @@ class processorPolyPatch
{
// Private data
//- Communicator to use
label comm_;
int myProcNo_;
int neighbProcNo_;
@ -133,6 +136,7 @@ public:
const label start,
const label index,
const polyBoundaryMesh& bm,
const label comm,
const int myProcNo,
const int neighbProcNo,
const transformType transform = UNKNOWN // transformation type
@ -244,6 +248,13 @@ public:
}
}
//- Return communicator used for communication
label comm() const
{
return comm_;
}
//- Return processor number
int myProcNo() const
{

View File

@ -49,6 +49,7 @@ Foam::processorCyclicPolyPatch::processorCyclicPolyPatch
const label start,
const label index,
const polyBoundaryMesh& bm,
const label comm,
const int myProcNo,
const int neighbProcNo,
const word& referPatchName,
@ -62,6 +63,7 @@ Foam::processorCyclicPolyPatch::processorCyclicPolyPatch
start,
index,
bm,
comm,
myProcNo,
neighbProcNo,
transform

View File

@ -121,6 +121,7 @@ public:
const label start,
const label index,
const polyBoundaryMesh& bm,
const label comm,
const int myProcNo,
const int neighbProcNo,
const word& referPatchName,

View File

@ -37,6 +37,7 @@ Foam::UIPstream::UIPstream
DynamicList<char>& externalBuf,
label& externalBufPosition,
const int tag,
const label comm,
const bool clearAtEnd,
streamFormat format,
versionNumber version
@ -48,6 +49,7 @@ Foam::UIPstream::UIPstream
externalBuf_(externalBuf),
externalBufPosition_(externalBufPosition),
tag_(tag),
comm_(comm),
clearAtEnd_(clearAtEnd),
messageSize_(0)
{
@ -60,6 +62,7 @@ Foam::UIPstream::UIPstream
"DynamicList<char>&,\n"
"label&,\n"
"const int,\n"
"const label,\n"
"const bool,\n"
"streamFormat,\n"
"versionNumber\n"
@ -68,11 +71,7 @@ Foam::UIPstream::UIPstream
}
Foam::UIPstream::UIPstream
(
const int fromProcNo,
PstreamBuffers& buffers
)
Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
:
UPstream(buffers.commsType_),
Istream(buffers.format_, buffers.version_),
@ -80,6 +79,7 @@ Foam::UIPstream::UIPstream
externalBuf_(buffers.recvBuf_[fromProcNo]),
externalBufPosition_(buffers.recvBufPos_[fromProcNo]),
tag_(buffers.tag_),
comm_(buffers.comm_),
clearAtEnd_(true),
messageSize_(0)
{
@ -102,7 +102,8 @@ Foam::label Foam::UIPstream::read
const int fromProcNo,
char* buf,
const std::streamsize bufSize,
const int tag
const int tag,
const label communicator
)
{
notImplemented
@ -113,7 +114,8 @@ Foam::label Foam::UIPstream::read
"const int fromProcNo,"
"char* buf,"
"const label bufSize,"
"const int tag"
"const int tag,"
"const label communicator"
")"
);

View File

@ -36,7 +36,8 @@ bool Foam::UOPstream::write
const int toProcNo,
const char* buf,
const std::streamsize bufSize,
const int tag
const int tag,
const label communicator
)
{
notImplemented
@ -47,7 +48,8 @@ bool Foam::UOPstream::write
"const int fromProcNo,"
"char* buf,"
"const label bufSize,"
"const int tag"
"const int tag,"
"const label communicator"
")"
);

View File

@ -55,28 +55,41 @@ void Foam::UPstream::abort()
}
void Foam::reduce(scalar&, const sumOp<scalar>&, const int)
void Foam::reduce(scalar&, const sumOp<scalar>&, const int, const label)
{}
void Foam::reduce(scalar&, const minOp<scalar>&, const int)
void Foam::reduce(scalar&, const minOp<scalar>&, const int, const label)
{}
void Foam::reduce(vector2D&, const sumOp<vector2D>&, const int)
void Foam::reduce(vector2D&, const sumOp<vector2D>&, const int, const label)
{}
void Foam::sumReduce
(
scalar& Value,
label& Count,
const int tag
scalar&,
label&,
const int,
const label
)
{}
void Foam::reduce(scalar&, const sumOp<scalar>&, const int, label&)
void Foam::reduce(scalar&, const sumOp<scalar>&, const int, const label, label&)
{}
void Foam::UPstream::allocatePstreamCommunicator
(
const label,
const label
)
{}
void Foam::UPstream::freePstreamCommunicator(const label)
{}

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011 OpenFOAM Foundation
\\ / A nd | Copyright (C) 2013 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -37,6 +37,35 @@ namespace Foam
DynamicList<MPI_Request> PstreamGlobals::outstandingRequests_;
//! \endcond
// Allocated communicators.
//! \cond fileScope
DynamicList<MPI_Comm> PstreamGlobals::MPICommunicators_;
DynamicList<MPI_Group> PstreamGlobals::MPIGroups_;
//! \endcond
void PstreamGlobals::checkCommunicator
(
const label comm,
const label otherProcNo
)
{
if
(
comm < 0
|| comm >= PstreamGlobals::MPICommunicators_.size()
)
{
FatalErrorIn
(
"PstreamGlobals::checkCommunicator(const label, const label)"
) << "otherProcNo:" << otherProcNo << " : illegal communicator "
<< comm << endl
<< "Communicator should be within range 0.."
<< PstreamGlobals::MPICommunicators_.size()-1 << abort(FatalError);
}
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011 OpenFOAM Foundation
\\ / A nd | Copyright (C) 2013 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -26,7 +26,7 @@ Namespace
Description
Global functions and variables for working with parallel streams,
but principally for gamma/mpi
but principally for mpi
SourceFiles
PstreamGlobals.C
@ -54,6 +54,13 @@ namespace PstreamGlobals
extern DynamicList<MPI_Request> outstandingRequests_;
// Current communicators. First element will be MPI_COMM_WORLD
extern DynamicList<MPI_Comm> MPICommunicators_;
extern DynamicList<MPI_Group> MPIGroups_;
void checkCommunicator(const label, const label procNo);
};

View File

@ -41,6 +41,7 @@ Foam::UIPstream::UIPstream
DynamicList<char>& externalBuf,
label& externalBufPosition,
const int tag,
const label comm,
const bool clearAtEnd,
streamFormat format,
versionNumber version
@ -52,6 +53,7 @@ Foam::UIPstream::UIPstream
externalBuf_(externalBuf),
externalBufPosition_(externalBufPosition),
tag_(tag),
comm_(comm),
clearAtEnd_(clearAtEnd),
messageSize_(0)
{
@ -80,7 +82,7 @@ Foam::UIPstream::UIPstream
// and set it
if (!wantedSize)
{
MPI_Probe(procID(fromProcNo_), tag_, MPI_COMM_WORLD, &status);
MPI_Probe(fromProcNo_, tag_, MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
externalBuf_.setCapacity(messageSize_);
@ -99,7 +101,8 @@ Foam::UIPstream::UIPstream
fromProcNo_,
externalBuf_.begin(),
wantedSize,
tag_
tag_,
comm_
);
// Set addressed size. Leave actual allocated memory intact.
@ -121,6 +124,7 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
externalBuf_(buffers.recvBuf_[fromProcNo]),
externalBufPosition_(buffers.recvBufPos_[fromProcNo]),
tag_(buffers.tag_),
comm_(buffers.comm_),
clearAtEnd_(true),
messageSize_(0)
{
@ -167,7 +171,7 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
// and set it
if (!wantedSize)
{
MPI_Probe(procID(fromProcNo_), tag_, MPI_COMM_WORLD, &status);
MPI_Probe(fromProcNo_, tag_, MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
externalBuf_.setCapacity(messageSize_);
@ -186,7 +190,8 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
fromProcNo_,
externalBuf_.begin(),
wantedSize,
tag_
tag_,
comm_
);
// Set addressed size. Leave actual allocated memory intact.
@ -208,13 +213,15 @@ Foam::label Foam::UIPstream::read
const int fromProcNo,
char* buf,
const std::streamsize bufSize,
const int tag
const int tag,
const label communicator
)
{
if (debug)
{
Pout<< "UIPstream::read : starting read from:" << fromProcNo
<< " tag:" << tag << " wanted size:" << label(bufSize)
<< " tag:" << tag << " comm:" << communicator
<< " wanted size:" << label(bufSize)
<< " commsType:" << UPstream::commsTypeNames[commsType]
<< Foam::endl;
}
@ -230,9 +237,9 @@ Foam::label Foam::UIPstream::read
buf,
bufSize,
MPI_PACKED,
procID(fromProcNo),
fromProcNo,
tag,
MPI_COMM_WORLD,
PstreamGlobals::MPICommunicators_[communicator],
&status
)
)
@ -286,9 +293,9 @@ Foam::label Foam::UIPstream::read
buf,
bufSize,
MPI_PACKED,
procID(fromProcNo),
fromProcNo,
tag,
MPI_COMM_WORLD,
PstreamGlobals::MPICommunicators_[communicator],
&request
)
)

View File

@ -39,17 +39,22 @@ bool Foam::UOPstream::write
const int toProcNo,
const char* buf,
const std::streamsize bufSize,
const int tag
const int tag,
const label communicator
)
{
if (debug)
{
Pout<< "UOPstream::write : starting write to:" << toProcNo
<< " tag:" << tag << " size:" << label(bufSize)
<< " tag:" << tag
<< " comm:" << communicator << " size:" << label(bufSize)
<< " commsType:" << UPstream::commsTypeNames[commsType]
<< Foam::endl;
}
PstreamGlobals::checkCommunicator(communicator, toProcNo);
bool transferFailed = true;
if (commsType == blocking)
@ -59,9 +64,9 @@ bool Foam::UOPstream::write
const_cast<char*>(buf),
bufSize,
MPI_PACKED,
procID(toProcNo),
toProcNo, //procID(toProcNo),
tag,
MPI_COMM_WORLD
PstreamGlobals::MPICommunicators_[communicator] //MPI_COMM_WORLD
);
if (debug)
@ -79,9 +84,9 @@ bool Foam::UOPstream::write
const_cast<char*>(buf),
bufSize,
MPI_PACKED,
procID(toProcNo),
toProcNo, //procID(toProcNo),
tag,
MPI_COMM_WORLD
PstreamGlobals::MPICommunicators_[communicator] //MPI_COMM_WORLD
);
if (debug)
@ -101,9 +106,9 @@ bool Foam::UOPstream::write
const_cast<char*>(buf),
bufSize,
MPI_PACKED,
procID(toProcNo),
toProcNo, //procID(toProcNo),
tag,
MPI_COMM_WORLD,
PstreamGlobals::MPICommunicators_[communicator],//MPI_COMM_WORLD,
&request
);

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2012 OpenFOAM Foundation
\\ / A nd | Copyright (C) 2011-2013 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -55,7 +55,6 @@ void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
validParOptions.insert("p4wd", "directory");
validParOptions.insert("p4amslave", "");
validParOptions.insert("p4yourname", "hostname");
validParOptions.insert("GAMMANP", "number of instances");
validParOptions.insert("machinefile", "machine file");
}
@ -66,12 +65,13 @@ bool Foam::UPstream::init(int& argc, char**& argv)
int numprocs;
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_);
int myRank;
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
if (debug)
{
Pout<< "UPstream::init : initialised with numProcs:" << numprocs
<< " myProcNo:" << myProcNo_ << endl;
<< " myRank:" << myRank << endl;
}
if (numprocs <= 1)
@ -82,14 +82,9 @@ bool Foam::UPstream::init(int& argc, char**& argv)
<< Foam::abort(FatalError);
}
procIDs_.setSize(numprocs);
forAll(procIDs_, procNo)
{
procIDs_[procNo] = procNo;
}
setParRun();
// Initialise parallel structure
setParRun(numprocs);
# ifndef SGIMPI
string bufferSizeName = getEnv("MPI_BUFFER_SIZE");
@ -116,11 +111,9 @@ bool Foam::UPstream::init(int& argc, char**& argv)
char processorName[MPI_MAX_PROCESSOR_NAME];
MPI_Get_processor_name(processorName, &processorNameLen);
processorName[processorNameLen] = '\0';
//signal(SIGABRT, stop);
// Now that nprocs is known construct communication tables.
initCommunicationSchedule();
Pout<< "Processor name:" << processorName << endl;
return true;
}
@ -153,6 +146,15 @@ void Foam::UPstream::exit(int errnum)
<< endl;
}
// Clean mpi communicators
forAll(myProcNo_, communicator)
{
if (myProcNo_[communicator] != -1)
{
freePstreamCommunicator(communicator);
}
}
if (errnum == 0)
{
MPI_Finalize();
@ -171,21 +173,39 @@ void Foam::UPstream::abort()
}
void Foam::reduce(scalar& Value, const sumOp<scalar>& bop, const int tag)
void Foam::reduce
(
scalar& Value,
const sumOp<scalar>& bop,
const int tag,
const label communicator
)
{
allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag);
allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
}
void Foam::reduce(scalar& Value, const minOp<scalar>& bop, const int tag)
void Foam::reduce
(
scalar& Value,
const minOp<scalar>& bop,
const int tag,
const label communicator
)
{
allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag);
allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
}
void Foam::reduce(vector2D& Value, const sumOp<vector2D>& bop, const int tag)
void Foam::reduce
(
vector2D& Value,
const sumOp<vector2D>& bop,
const int tag,
const label communicator
)
{
allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag);
allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
}
@ -193,11 +213,12 @@ void Foam::sumReduce
(
scalar& Value,
label& Count,
const int tag
const int tag,
const label communicator
)
{
vector2D twoScalars(Value, scalar(Count));
reduce(twoScalars, sumOp<vector2D>());
reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
Value = twoScalars.x();
Count = twoScalars.y();
@ -209,6 +230,7 @@ void Foam::reduce
scalar& Value,
const sumOp<scalar>& bop,
const int tag,
const label communicator,
label& requestID
)
{
@ -225,7 +247,7 @@ void Foam::reduce
MPI_SCALAR,
MPI_SUM,
0, //root
MPI_COMM_WORLD,
PstreamGlobals::MPICommunicators_[communicator],
&request
);
@ -233,12 +255,141 @@ void Foam::reduce
PstreamGlobals::outstandingRequests_.append(request);
#else
// Non-blocking not yet implemented in mpi
reduce(Value, bop, tag);
reduce(Value, bop, tag, communicator);
requestID = -1;
#endif
}
void Foam::UPstream::allocatePstreamCommunicator
(
const label parentIndex,
const label index
)
{
if (index == PstreamGlobals::MPIGroups_.size())
{
// Extend storage with dummy values
MPI_Group newGroup;
PstreamGlobals::MPIGroups_.append(newGroup);
MPI_Comm newComm;
PstreamGlobals::MPICommunicators_.append(newComm);
}
else if (index > PstreamGlobals::MPIGroups_.size())
{
FatalErrorIn
(
"UPstream::allocatePstreamCommunicator\n"
"(\n"
" const label parentIndex,\n"
" const labelList& subRanks\n"
")\n"
) << "PstreamGlobals out of sync with UPstream data. Problem."
<< Foam::exit(FatalError);
}
if (parentIndex == -1)
{
// Allocate world communicator
//std::cout
// << "MPI : Allocating world communicator at index " << index
// << std::endl;
if (index != UPstream::worldComm)
{
FatalErrorIn
(
"UPstream::allocateCommunicator\n"
"(\n"
" const label parentIndex,\n"
" const labelList& subRanks\n"
")\n"
) << "world communicator should always be index "
<< UPstream::worldComm << Foam::exit(FatalError);
}
PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
MPI_Comm_rank
(
PstreamGlobals::MPICommunicators_[index],
&myProcNo_[index]
);
// Set the number of processes to the actual number
int numProcs;
MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
procIDs_[index] = identity(numProcs);
}
else
{
//std::cout
// << "MPI : Allocating new communicator at index " << index
// << " from parent " << parentIndex
// << std::endl;
// Create new group
MPI_Group_incl
(
PstreamGlobals::MPIGroups_[parentIndex],
procIDs_[index].size(),
procIDs_[index].begin(),
&PstreamGlobals::MPIGroups_[index]
);
//std::cout
// << "MPI : New group " << long(PstreamGlobals::MPIGroups_[index])
// << std::endl;
// Create new communicator
MPI_Comm_create
(
PstreamGlobals::MPICommunicators_[parentIndex],
PstreamGlobals::MPIGroups_[index],
&PstreamGlobals::MPICommunicators_[index]
);
if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
{
//std::cout
// << "MPI : NULL : not in group"
// << std::endl;
myProcNo_[index] = -1;
}
else
{
//std::cout
// << "MPI : New comm "
// << long(PstreamGlobals::MPICommunicators_[index])
// << std::endl;
MPI_Comm_rank
(
PstreamGlobals::MPICommunicators_[index],
&myProcNo_[index]
);
}
}
//std::cout<< "MPI : I am rank " << myProcNo_[index] << std::endl;
}
void Foam::UPstream::freePstreamCommunicator(const label communicator)
{
if (communicator != UPstream::worldComm)
{
if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
{
MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
}
MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
}
}
Foam::label Foam::UPstream::nRequests()
{
return PstreamGlobals::outstandingRequests_.size();

View File

@ -52,7 +52,8 @@ void allReduce
MPI_Datatype MPIType,
MPI_Op op,
const BinaryOp& bop,
const int tag
const int tag,
const label communicator
);
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View File

@ -35,7 +35,8 @@ void Foam::allReduce
MPI_Datatype MPIType,
MPI_Op MPIOp,
const BinaryOp& bop,
const int tag
const int tag,
const label communicator
)
{
if (!UPstream::parRun())
@ -43,14 +44,14 @@ void Foam::allReduce
return;
}
if (UPstream::nProcs() <= UPstream::nProcsSimpleSum)
if (UPstream::nProcs(communicator) <= UPstream::nProcsSimpleSum)
{
if (UPstream::master())
if (UPstream::master(communicator))
{
for
(
int slave=UPstream::firstSlave();
slave<=UPstream::lastSlave();
slave<=UPstream::lastSlave(communicator);
slave++
)
{
@ -63,9 +64,9 @@ void Foam::allReduce
&value,
MPICount,
MPIType,
UPstream::procID(slave),
slave, //UPstream::procID(slave),
tag,
MPI_COMM_WORLD,
PstreamGlobals::MPICommunicators_[communicator],
MPI_STATUS_IGNORE
)
)
@ -97,9 +98,9 @@ void Foam::allReduce
&Value,
MPICount,
MPIType,
UPstream::procID(UPstream::masterNo()),
UPstream::masterNo(),//UPstream::procID(masterNo()),
tag,
MPI_COMM_WORLD
PstreamGlobals::MPICommunicators_[communicator]
)
)
{
@ -120,12 +121,12 @@ void Foam::allReduce
}
if (UPstream::master())
if (UPstream::master(communicator))
{
for
(
int slave=UPstream::firstSlave();
slave<=UPstream::lastSlave();
slave<=UPstream::lastSlave(communicator);
slave++
)
{
@ -136,9 +137,9 @@ void Foam::allReduce
&Value,
MPICount,
MPIType,
UPstream::procID(slave),
slave, //UPstream::procID(slave),
tag,
MPI_COMM_WORLD
PstreamGlobals::MPICommunicators_[communicator]
)
)
{
@ -167,9 +168,9 @@ void Foam::allReduce
&Value,
MPICount,
MPIType,
UPstream::procID(UPstream::masterNo()),
UPstream::masterNo(),//UPstream::procID(masterNo()),
tag,
MPI_COMM_WORLD,
PstreamGlobals::MPICommunicators_[communicator],
MPI_STATUS_IGNORE
)
)
@ -193,7 +194,15 @@ void Foam::allReduce
else
{
Type sum;
MPI_Allreduce(&Value, &sum, MPICount, MPIType, MPIOp, MPI_COMM_WORLD);
MPI_Allreduce
(
&Value,
&sum,
MPICount,
MPIType,
MPIOp,
PstreamGlobals::MPICommunicators_[communicator]
);
Value = sum;
}
}

View File

@ -945,6 +945,7 @@ void Foam::fvMeshDistribute::addProcPatches
mesh_.nFaces(),
mesh_.boundaryMesh().size(),
mesh_.boundaryMesh(),
Pstream::worldComm,
Pstream::myProcNo(),
nbrProc[bFaceI]
);
@ -988,6 +989,7 @@ void Foam::fvMeshDistribute::addProcPatches
mesh_.nFaces(),
mesh_.boundaryMesh().size(),
mesh_.boundaryMesh(),
Pstream::worldComm,
Pstream::myProcNo(),
nbrProc[bFaceI],
cycName,

View File

@ -244,23 +244,25 @@ void processorFvPatchField<Type>::initEvaluate
// Fast path. Receive into *this
this->setSize(sendBuf_.size());
outstandingRecvRequest_ = UPstream::nRequests();
IPstream::read
UIPstream::read
(
Pstream::nonBlocking,
procPatch_.neighbProcNo(),
reinterpret_cast<char*>(this->begin()),
this->byteSize(),
procPatch_.tag()
procPatch_.tag(),
procPatch_.comm()
);
outstandingSendRequest_ = UPstream::nRequests();
OPstream::write
UOPstream::write
(
Pstream::nonBlocking,
procPatch_.neighbProcNo(),
reinterpret_cast<const char*>(sendBuf_.begin()),
this->byteSize(),
procPatch_.tag()
procPatch_.tag(),
procPatch_.comm()
);
}
else
@ -342,23 +344,25 @@ void processorFvPatchField<Type>::initInterfaceMatrixUpdate
scalarReceiveBuf_.setSize(scalarSendBuf_.size());
outstandingRecvRequest_ = UPstream::nRequests();
IPstream::read
UIPstream::read
(
Pstream::nonBlocking,
procPatch_.neighbProcNo(),
reinterpret_cast<char*>(scalarReceiveBuf_.begin()),
scalarReceiveBuf_.byteSize(),
procPatch_.tag()
procPatch_.tag(),
procPatch_.comm()
);
outstandingSendRequest_ = UPstream::nRequests();
OPstream::write
UOPstream::write
(
Pstream::nonBlocking,
procPatch_.neighbProcNo(),
reinterpret_cast<const char*>(scalarSendBuf_.begin()),
scalarSendBuf_.byteSize(),
procPatch_.tag()
procPatch_.tag(),
procPatch_.comm()
);
}
else
@ -467,7 +471,8 @@ void processorFvPatchField<Type>::initInterfaceMatrixUpdate
procPatch_.neighbProcNo(),
reinterpret_cast<char*>(receiveBuf_.begin()),
receiveBuf_.byteSize(),
procPatch_.tag()
procPatch_.tag(),
procPatch_.comm()
);
outstandingSendRequest_ = UPstream::nRequests();
@ -477,7 +482,8 @@ void processorFvPatchField<Type>::initInterfaceMatrixUpdate
procPatch_.neighbProcNo(),
reinterpret_cast<const char*>(sendBuf_.begin()),
sendBuf_.byteSize(),
procPatch_.tag()
procPatch_.tag(),
procPatch_.comm()
);
}
else

View File

@ -244,6 +244,12 @@ public:
//- Processor coupled interface functions
//- Return communicator used for comms
virtual int comm() const
{
return procPatch_.comm();
}
//- Return processor number
virtual int myProcNo() const
{

View File

@ -60,23 +60,25 @@ void processorFvPatchField<scalar>::initInterfaceMatrixUpdate
scalarReceiveBuf_.setSize(scalarSendBuf_.size());
outstandingRecvRequest_ = UPstream::nRequests();
IPstream::read
UIPstream::read
(
Pstream::nonBlocking,
procPatch_.neighbProcNo(),
reinterpret_cast<char*>(scalarReceiveBuf_.begin()),
scalarReceiveBuf_.byteSize(),
procPatch_.tag()
procPatch_.tag(),
procPatch_.comm()
);
outstandingSendRequest_ = UPstream::nRequests();
OPstream::write
UOPstream::write
(
Pstream::nonBlocking,
procPatch_.neighbProcNo(),
reinterpret_cast<const char*>(scalarSendBuf_.begin()),
scalarSendBuf_.byteSize(),
procPatch_.tag()
procPatch_.tag(),
procPatch_.comm()
);
}
else

View File

@ -84,14 +84,20 @@ public:
// Member functions
//- Return communicator used for comms
virtual int comm() const
{
return procPolyPatch_.comm();
}
//- Return processor number
int myProcNo() const
virtual int myProcNo() const
{
return procPolyPatch_.myProcNo();
}
//- Return neigbour processor number
int neighbProcNo() const
virtual int neighbProcNo() const
{
return procPolyPatch_.neighbProcNo();
}