ENH: Pstream: expose tag

This commit is contained in:
mattijs
2011-07-29 18:12:48 +01:00
parent ff37232f55
commit 537a916dff
29 changed files with 564 additions and 283 deletions

View File

@ -66,8 +66,13 @@ void Foam::IOdictionary::readFile(const bool masterOnly)
// Master reads headerclassname from file. Make sure this gets
// transfered as well as contents.
Pstream::scatter(comms, const_cast<word&>(headerClassName()));
Pstream::scatter(comms, note());
Pstream::scatter
(
comms,
const_cast<word&>(headerClassName()),
Pstream::msgType()
);
Pstream::scatter(comms, note(), Pstream::msgType());
// Get my communication order
const Pstream::commsStruct& myComm = comms[Pstream::myProcNo()];
@ -88,6 +93,7 @@ void Foam::IOdictionary::readFile(const bool masterOnly)
Pstream::scheduled,
myComm.above(),
0,
Pstream::msgType(),
IOstream::ASCII
);
IOdictionary::readData(fromAbove);

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -32,6 +32,7 @@ Foam::IPstream::IPstream
const commsTypes commsType,
const int fromProcNo,
const label bufSize,
const int tag,
streamFormat format,
versionNumber version
)
@ -43,7 +44,7 @@ Foam::IPstream::IPstream
fromProcNo,
buf_,
externalBufPosition_,
UPstream::msgType(), // tag
tag, // tag
false, // do not clear buf_ if at end
format,
version

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -68,6 +68,7 @@ public:
const commsTypes commsType,
const int fromProcNo,
const label bufSize = 0,
const int tag = UPstream::msgType(),
streamFormat format=BINARY,
versionNumber version=currentVersion
);

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -97,20 +97,31 @@ public:
(
const List<commsStruct>& comms,
T& Value,
const BinaryOp& bop
const BinaryOp& bop,
const int tag
);
//- Like above but switches between linear/tree communication
template <class T, class BinaryOp>
static void gather(T& Value, const BinaryOp& bop);
static void gather
(
T& Value,
const BinaryOp& bop,
const int tag = Pstream::msgType()
);
//- Scatter data. Distribute without modification. Reverse of gather
template <class T>
static void scatter(const List<commsStruct>& comms, T& Value);
static void scatter
(
const List<commsStruct>& comms,
T& Value,
const int tag
);
//- Like above but switches between linear/tree communication
template <class T>
static void scatter(T& Value);
static void scatter(T& Value, const int tag = Pstream::msgType());
// Combine variants. Inplace combine values from processors.
@ -121,24 +132,35 @@ public:
(
const List<commsStruct>& comms,
T& Value,
const CombineOp& cop
const CombineOp& cop,
const int tag
);
//- Like above but switches between linear/tree communication
template <class T, class CombineOp>
static void combineGather(T& Value, const CombineOp& cop);
static void combineGather
(
T& Value,
const CombineOp& cop,
const int tag = Pstream::msgType()
);
//- Scatter data. Reverse of combineGather
template <class T>
static void combineScatter
(
const List<commsStruct>& comms,
T& Value
T& Value,
const int tag
);
//- Like above but switches between linear/tree communication
template <class T>
static void combineScatter(T& Value);
static void combineScatter
(
T& Value,
const int tag = Pstream::msgType()
);
// Combine variants working on whole List at a time.
@ -147,24 +169,35 @@ public:
(
const List<commsStruct>& comms,
List<T>& Value,
const CombineOp& cop
const CombineOp& cop,
const int tag
);
//- Like above but switches between linear/tree communication
template <class T, class CombineOp>
static void listCombineGather(List<T>& Value, const CombineOp& cop);
static void listCombineGather
(
List<T>& Value,
const CombineOp& cop,
const int tag = Pstream::msgType()
);
//- Scatter data. Reverse of combineGather
template <class T>
static void listCombineScatter
(
const List<commsStruct>& comms,
List<T>& Value
List<T>& Value,
const int tag
);
//- Like above but switches between linear/tree communication
template <class T>
static void listCombineScatter(List<T>& Value);
static void listCombineScatter
(
List<T>& Value,
const int tag = Pstream::msgType()
);
// Combine variants working on whole map at a time. Container needs to
// have iterators and find() defined.
@ -174,7 +207,8 @@ public:
(
const List<commsStruct>& comms,
Container& Values,
const CombineOp& cop
const CombineOp& cop,
const int tag
);
//- Like above but switches between linear/tree communication
@ -182,7 +216,8 @@ public:
static void mapCombineGather
(
Container& Values,
const CombineOp& cop
const CombineOp& cop,
const int tag = Pstream::msgType()
);
//- Scatter data. Reverse of combineGather
@ -190,12 +225,17 @@ public:
static void mapCombineScatter
(
const List<commsStruct>& comms,
Container& Values
Container& Values,
const int tag
);
//- Like above but switches between linear/tree communication
template <class Container>
static void mapCombineScatter(Container& Values);
static void mapCombineScatter
(
Container& Values,
const int tag = Pstream::msgType()
);
@ -208,24 +248,34 @@ public:
static void gatherList
(
const List<commsStruct>& comms,
List<T>& Values
List<T>& Values,
const int tag
);
//- Like above but switches between linear/tree communication
template <class T>
static void gatherList(List<T>& Values);
static void gatherList
(
List<T>& Values,
const int tag = Pstream::msgType()
);
//- Scatter data. Reverse of gatherList
template <class T>
static void scatterList
(
const List<commsStruct>& comms,
List<T>& Values
List<T>& Values,
const int tag
);
//- Like above but switches between linear/tree communication
template <class T>
static void scatterList(List<T>& Values);
static void scatterList
(
List<T>& Values,
const int tag = Pstream::msgType()
);
// Exchange

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -137,6 +137,11 @@ public:
// Member functions
int tag() const
{
return tag_;
}
//- Mark all sends as having been done. This will start receives
// in non-blocking mode. If block will wait for all transfers to
// finish (only relevant for nonBlocking mode)

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -52,26 +52,38 @@ void combineReduce
(
const List<UPstream::commsStruct>& comms,
T& Value,
const CombineOp& cop
const CombineOp& cop,
const int tag
)
{
Pstream::combineGather(comms, Value, cop);
Pstream::combineScatter(comms, Value);
Pstream::combineGather(comms, Value, cop, tag);
Pstream::combineScatter(comms, Value, tag);
}
template <class T, class CombineOp>
void combineReduce(T& Value, const CombineOp& cop)
void combineReduce
(
T& Value,
const CombineOp& cop,
const int tag = Pstream::msgType()
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
Pstream::combineGather(UPstream::linearCommunication(), Value, cop);
Pstream::combineScatter(UPstream::linearCommunication(), Value);
Pstream::combineGather
(
UPstream::linearCommunication(),
Value,
cop,
tag
);
Pstream::combineScatter(UPstream::linearCommunication(), Value, tag);
}
else
{
Pstream::combineGather(UPstream::treeCommunication(), Value, cop);
Pstream::combineScatter(UPstream::treeCommunication(), Value);
Pstream::combineGather(UPstream::treeCommunication(), Value, cop, tag);
Pstream::combineScatter(UPstream::treeCommunication(), Value, tag);
}
}

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -42,11 +42,12 @@ void reduce
(
const List<UPstream::commsStruct>& comms,
T& Value,
const BinaryOp& bop
const BinaryOp& bop,
const int tag
)
{
Pstream::gather(comms, Value, bop);
Pstream::scatter(comms, Value);
Pstream::gather(comms, Value, bop, tag);
Pstream::scatter(comms, Value, tag);
}
@ -55,16 +56,17 @@ template <class T, class BinaryOp>
void reduce
(
T& Value,
const BinaryOp& bop
const BinaryOp& bop,
const int tag = Pstream::msgType()
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
reduce(UPstream::linearCommunication(), Value, bop);
reduce(UPstream::linearCommunication(), Value, bop, tag);
}
else
{
reduce(UPstream::treeCommunication(), Value, bop);
reduce(UPstream::treeCommunication(), Value, bop, tag);
}
}
@ -74,18 +76,19 @@ template <class T, class BinaryOp>
T returnReduce
(
const T& Value,
const BinaryOp& bop
const BinaryOp& bop,
const int tag = Pstream::msgType()
)
{
T WorkValue(Value);
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
reduce(UPstream::linearCommunication(), WorkValue, bop);
reduce(UPstream::linearCommunication(), WorkValue, bop, tag);
}
else
{
reduce(UPstream::treeCommunication(), WorkValue, bop);
reduce(UPstream::treeCommunication(), WorkValue, bop, tag);
}
return WorkValue;
@ -93,7 +96,12 @@ T returnReduce
// Insist there is a specialisation for the reduction of a scalar
void reduce(scalar& Value, const sumOp<scalar>& bop);
void reduce
(
scalar& Value,
const sumOp<scalar>& bop,
const int tag = Pstream::msgType()
);
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View File

@ -84,13 +84,17 @@ Foam::UIPstream::~UIPstream()
{
if (debug)
{
Pout<< "UIPstream::~UIPstream() : clearing externalBuf_ of size "
<< externalBuf_.size() << endl;
Pout<< "UIPstream::~UIPstream() : tag:" << tag_
<< " fromProcNo:" << fromProcNo_
<< " clearing externalBuf_ of size "
<< externalBuf_.size()
<< " messageSize_:" << messageSize_ << endl;
}
externalBuf_.clearStorage();
}
}
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
Foam::Istream& Foam::UIPstream::read(token& t)

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -262,14 +262,23 @@ public:
// Spawns slave processes and initialises inter-communication
static bool init(int& argc, char**& argv);
//- Non-blocking comms: wait until all have finished.
static void waitRequests();
// Non-blocking comms
//- Get number of outstanding requests
static label nRequests();
//- Truncate number of outstanding requests
static void resetRequests(const label sz);
//- Wait until all requests (from start onwards) have finished.
static void waitRequests(const label start = 0);
//- Non-blocking comms: has request i finished?
static bool finishedRequest(const label i);
//- Is this a parallel run?
static bool parRun()
static bool& parRun()
{
return parRun_;
}

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -50,7 +50,8 @@ void Pstream::combineGather
(
const List<UPstream::commsStruct>& comms,
T& Value,
const CombineOp& cop
const CombineOp& cop,
const int tag
)
{
if (UPstream::parRun())
@ -71,7 +72,8 @@ void Pstream::combineGather
UPstream::scheduled,
belowID,
reinterpret_cast<char*>(&value),
sizeof(T)
sizeof(T),
tag
);
if (debug & 2)
@ -84,7 +86,7 @@ void Pstream::combineGather
}
else
{
IPstream fromBelow(UPstream::scheduled, belowID);
IPstream fromBelow(UPstream::scheduled, belowID, 0, tag);
T value(fromBelow);
if (debug & 2)
@ -113,12 +115,13 @@ void Pstream::combineGather
UPstream::scheduled,
myComm.above(),
reinterpret_cast<const char*>(&Value),
sizeof(T)
sizeof(T),
tag
);
}
else
{
OPstream toAbove(UPstream::scheduled, myComm.above());
OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag);
toAbove << Value;
}
}
@ -127,15 +130,15 @@ void Pstream::combineGather
template <class T, class CombineOp>
void Pstream::combineGather(T& Value, const CombineOp& cop)
void Pstream::combineGather(T& Value, const CombineOp& cop, const int tag)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
combineGather(UPstream::linearCommunication(), Value, cop);
combineGather(UPstream::linearCommunication(), Value, cop, tag);
}
else
{
combineGather(UPstream::treeCommunication(), Value, cop);
combineGather(UPstream::treeCommunication(), Value, cop, tag);
}
}
@ -144,7 +147,8 @@ template <class T>
void Pstream::combineScatter
(
const List<UPstream::commsStruct>& comms,
T& Value
T& Value,
const int tag
)
{
if (UPstream::parRun())
@ -162,12 +166,13 @@ void Pstream::combineScatter
UPstream::scheduled,
myComm.above(),
reinterpret_cast<char*>(&Value),
sizeof(T)
sizeof(T),
tag
);
}
else
{
IPstream fromAbove(UPstream::scheduled, myComm.above());
IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag);
Value = T(fromAbove);
}
@ -195,12 +200,13 @@ void Pstream::combineScatter
UPstream::scheduled,
belowID,
reinterpret_cast<const char*>(&Value),
sizeof(T)
sizeof(T),
tag
);
}
else
{
OPstream toBelow(UPstream::scheduled, belowID);
OPstream toBelow(UPstream::scheduled, belowID, 0, tag);
toBelow << Value;
}
}
@ -209,15 +215,15 @@ void Pstream::combineScatter
template <class T>
void Pstream::combineScatter(T& Value)
void Pstream::combineScatter(T& Value, const int tag)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
combineScatter(UPstream::linearCommunication(), Value);
combineScatter(UPstream::linearCommunication(), Value, tag);
}
else
{
combineScatter(UPstream::treeCommunication(), Value);
combineScatter(UPstream::treeCommunication(), Value, tag);
}
}
@ -231,7 +237,8 @@ void Pstream::listCombineGather
(
const List<UPstream::commsStruct>& comms,
List<T>& Values,
const CombineOp& cop
const CombineOp& cop,
const int tag
)
{
if (UPstream::parRun())
@ -253,7 +260,8 @@ void Pstream::listCombineGather
UPstream::scheduled,
belowID,
reinterpret_cast<char*>(receivedValues.begin()),
receivedValues.byteSize()
receivedValues.byteSize(),
tag
);
if (debug & 2)
@ -269,7 +277,7 @@ void Pstream::listCombineGather
}
else
{
IPstream fromBelow(UPstream::scheduled, belowID);
IPstream fromBelow(UPstream::scheduled, belowID, 0, tag);
List<T> receivedValues(fromBelow);
if (debug & 2)
@ -301,12 +309,13 @@ void Pstream::listCombineGather
UPstream::scheduled,
myComm.above(),
reinterpret_cast<const char*>(Values.begin()),
Values.byteSize()
Values.byteSize(),
tag
);
}
else
{
OPstream toAbove(UPstream::scheduled, myComm.above());
OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag);
toAbove << Values;
}
}
@ -315,15 +324,20 @@ void Pstream::listCombineGather
template <class T, class CombineOp>
void Pstream::listCombineGather(List<T>& Values, const CombineOp& cop)
void Pstream::listCombineGather
(
List<T>& Values,
const CombineOp& cop,
const int tag
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
listCombineGather(UPstream::linearCommunication(), Values, cop);
listCombineGather(UPstream::linearCommunication(), Values, cop, tag);
}
else
{
listCombineGather(UPstream::treeCommunication(), Values, cop);
listCombineGather(UPstream::treeCommunication(), Values, cop, tag);
}
}
@ -332,7 +346,8 @@ template <class T>
void Pstream::listCombineScatter
(
const List<UPstream::commsStruct>& comms,
List<T>& Values
List<T>& Values,
const int tag
)
{
if (UPstream::parRun())
@ -350,12 +365,13 @@ void Pstream::listCombineScatter
UPstream::scheduled,
myComm.above(),
reinterpret_cast<char*>(Values.begin()),
Values.byteSize()
Values.byteSize(),
tag
);
}
else
{
IPstream fromAbove(UPstream::scheduled, myComm.above());
IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag);
fromAbove >> Values;
}
@ -383,12 +399,13 @@ void Pstream::listCombineScatter
UPstream::scheduled,
belowID,
reinterpret_cast<const char*>(Values.begin()),
Values.byteSize()
Values.byteSize(),
tag
);
}
else
{
OPstream toBelow(UPstream::scheduled, belowID);
OPstream toBelow(UPstream::scheduled, belowID, 0, tag);
toBelow << Values;
}
}
@ -397,15 +414,15 @@ void Pstream::listCombineScatter
template <class T>
void Pstream::listCombineScatter(List<T>& Values)
void Pstream::listCombineScatter(List<T>& Values, const int tag)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
listCombineScatter(UPstream::linearCommunication(), Values);
listCombineScatter(UPstream::linearCommunication(), Values, tag);
}
else
{
listCombineScatter(UPstream::treeCommunication(), Values);
listCombineScatter(UPstream::treeCommunication(), Values, tag);
}
}
@ -421,7 +438,8 @@ void Pstream::mapCombineGather
(
const List<UPstream::commsStruct>& comms,
Container& Values,
const CombineOp& cop
const CombineOp& cop,
const int tag
)
{
if (UPstream::parRun())
@ -434,7 +452,7 @@ void Pstream::mapCombineGather
{
label belowID = myComm.below()[belowI];
IPstream fromBelow(UPstream::scheduled, belowID);
IPstream fromBelow(UPstream::scheduled, belowID, 0, tag);
Container receivedValues(fromBelow);
if (debug & 2)
@ -474,7 +492,7 @@ void Pstream::mapCombineGather
<< " data:" << Values << endl;
}
OPstream toAbove(UPstream::scheduled, myComm.above());
OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag);
toAbove << Values;
}
}
@ -482,15 +500,20 @@ void Pstream::mapCombineGather
template <class Container, class CombineOp>
void Pstream::mapCombineGather(Container& Values, const CombineOp& cop)
void Pstream::mapCombineGather
(
Container& Values,
const CombineOp& cop,
const int tag
)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
mapCombineGather(UPstream::linearCommunication(), Values, cop);
mapCombineGather(UPstream::linearCommunication(), Values, cop, tag);
}
else
{
mapCombineGather(UPstream::treeCommunication(), Values, cop);
mapCombineGather(UPstream::treeCommunication(), Values, cop, tag);
}
}
@ -499,7 +522,8 @@ template <class Container>
void Pstream::mapCombineScatter
(
const List<UPstream::commsStruct>& comms,
Container& Values
Container& Values,
const int tag
)
{
if (UPstream::parRun())
@ -510,7 +534,7 @@ void Pstream::mapCombineScatter
// Reveive from up
if (myComm.above() != -1)
{
IPstream fromAbove(UPstream::scheduled, myComm.above());
IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag);
fromAbove >> Values;
if (debug & 2)
@ -530,7 +554,7 @@ void Pstream::mapCombineScatter
Pout<< " sending to " << belowID << " data:" << Values << endl;
}
OPstream toBelow(UPstream::scheduled, belowID);
OPstream toBelow(UPstream::scheduled, belowID, 0, tag);
toBelow << Values;
}
}
@ -538,15 +562,15 @@ void Pstream::mapCombineScatter
template <class Container>
void Pstream::mapCombineScatter(Container& Values)
void Pstream::mapCombineScatter(Container& Values, const int tag)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
mapCombineScatter(UPstream::linearCommunication(), Values);
mapCombineScatter(UPstream::linearCommunication(), Values, tag);
}
else
{
mapCombineScatter(UPstream::treeCommunication(), Values);
mapCombineScatter(UPstream::treeCommunication(), Values, tag);
}
}

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -77,14 +77,13 @@ void Pstream::exchange
nsTransPs[procI] = sendBufs[procI].size();
}
// Send sizes across.
int oldTag = UPstream::msgType();
UPstream::msgType() = tag;
combineReduce(sizes, UPstream::listEq());
UPstream::msgType() = oldTag;
// Send sizes across. Note: blocks.
combineReduce(sizes, UPstream::listEq(), tag);
if (Pstream::parRun())
{
label startOfRequests = Pstream::nRequests();
// Set up receives
// ~~~~~~~~~~~~~~~
@ -142,7 +141,7 @@ void Pstream::exchange
if (block)
{
Pstream::waitRequests();
Pstream::waitRequests(startOfRequests);
}
}

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -47,7 +47,8 @@ void Pstream::gather
(
const List<UPstream::commsStruct>& comms,
T& Value,
const BinaryOp& bop
const BinaryOp& bop,
const int tag
)
{
if (UPstream::parRun())
@ -67,12 +68,19 @@ void Pstream::gather
UPstream::scheduled,
myComm.below()[belowI],
reinterpret_cast<char*>(&value),
sizeof(T)
sizeof(T),
tag
);
}
else
{
IPstream fromBelow(UPstream::scheduled, myComm.below()[belowI]);
IPstream fromBelow
(
UPstream::scheduled,
myComm.below()[belowI],
0,
tag
);
fromBelow >> value;
}
@ -89,12 +97,13 @@ void Pstream::gather
UPstream::scheduled,
myComm.above(),
reinterpret_cast<const char*>(&Value),
sizeof(T)
sizeof(T),
tag
);
}
else
{
OPstream toAbove(UPstream::scheduled, myComm.above());
OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag);
toAbove << Value;
}
}
@ -103,21 +112,26 @@ void Pstream::gather
template <class T, class BinaryOp>
void Pstream::gather(T& Value, const BinaryOp& bop)
void Pstream::gather(T& Value, const BinaryOp& bop, const int tag)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
gather(UPstream::linearCommunication(), Value, bop);
gather(UPstream::linearCommunication(), Value, bop, tag);
}
else
{
gather(UPstream::treeCommunication(), Value, bop);
gather(UPstream::treeCommunication(), Value, bop, tag);
}
}
template <class T>
void Pstream::scatter(const List<UPstream::commsStruct>& comms, T& Value)
void Pstream::scatter
(
const List<UPstream::commsStruct>& comms,
T& Value,
const int tag
)
{
if (UPstream::parRun())
{
@ -134,12 +148,13 @@ void Pstream::scatter(const List<UPstream::commsStruct>& comms, T& Value)
UPstream::scheduled,
myComm.above(),
reinterpret_cast<char*>(&Value),
sizeof(T)
sizeof(T),
tag
);
}
else
{
IPstream fromAbove(UPstream::scheduled, myComm.above());
IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag);
fromAbove >> Value;
}
}
@ -154,12 +169,19 @@ void Pstream::scatter(const List<UPstream::commsStruct>& comms, T& Value)
UPstream::scheduled,
myComm.below()[belowI],
reinterpret_cast<const char*>(&Value),
sizeof(T)
sizeof(T),
tag
);
}
else
{
OPstream toBelow(UPstream::scheduled,myComm.below()[belowI]);
OPstream toBelow
(
UPstream::scheduled,
myComm.below()[belowI],
0,
tag
);
toBelow << Value;
}
}
@ -168,15 +190,15 @@ void Pstream::scatter(const List<UPstream::commsStruct>& comms, T& Value)
template <class T>
void Pstream::scatter(T& Value)
void Pstream::scatter(T& Value, const int tag)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
scatter(UPstream::linearCommunication(), Value);
scatter(UPstream::linearCommunication(), Value, tag);
}
else
{
scatter(UPstream::treeCommunication(), Value);
scatter(UPstream::treeCommunication(), Value, tag);
}
}

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -48,7 +48,8 @@ template <class T>
void Pstream::gatherList
(
const List<UPstream::commsStruct>& comms,
List<T>& Values
List<T>& Values,
const int tag
)
{
if (UPstream::parRun())
@ -83,7 +84,8 @@ void Pstream::gatherList
UPstream::scheduled,
belowID,
reinterpret_cast<char*>(receivedValues.begin()),
receivedValues.byteSize()
receivedValues.byteSize(),
tag
);
Values[belowID] = receivedValues[0];
@ -95,7 +97,7 @@ void Pstream::gatherList
}
else
{
IPstream fromBelow(UPstream::scheduled, belowID);
IPstream fromBelow(UPstream::scheduled, belowID, 0, tag);
fromBelow >> Values[belowID];
if (debug & 2)
@ -150,12 +152,13 @@ void Pstream::gatherList
UPstream::scheduled,
myComm.above(),
reinterpret_cast<const char*>(sendingValues.begin()),
sendingValues.byteSize()
sendingValues.byteSize(),
tag
);
}
else
{
OPstream toAbove(UPstream::scheduled, myComm.above());
OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag);
toAbove << Values[UPstream::myProcNo()];
forAll(belowLeaves, leafI)
@ -177,15 +180,15 @@ void Pstream::gatherList
template <class T>
void Pstream::gatherList(List<T>& Values)
void Pstream::gatherList(List<T>& Values, const int tag)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
gatherList(UPstream::linearCommunication(), Values);
gatherList(UPstream::linearCommunication(), Values, tag);
}
else
{
gatherList(UPstream::treeCommunication(), Values);
gatherList(UPstream::treeCommunication(), Values, tag);
}
}
@ -194,7 +197,8 @@ template <class T>
void Pstream::scatterList
(
const List<UPstream::commsStruct>& comms,
List<T>& Values
List<T>& Values,
const int tag
)
{
if (UPstream::parRun())
@ -228,7 +232,8 @@ void Pstream::scatterList
UPstream::scheduled,
myComm.above(),
reinterpret_cast<char*>(receivedValues.begin()),
receivedValues.byteSize()
receivedValues.byteSize(),
tag
);
forAll(notBelowLeaves, leafI)
@ -238,7 +243,7 @@ void Pstream::scatterList
}
else
{
IPstream fromAbove(UPstream::scheduled, myComm.above());
IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag);
forAll(notBelowLeaves, leafI)
{
@ -275,12 +280,13 @@ void Pstream::scatterList
UPstream::scheduled,
belowID,
reinterpret_cast<const char*>(sendingValues.begin()),
sendingValues.byteSize()
sendingValues.byteSize(),
tag
);
}
else
{
OPstream toBelow(UPstream::scheduled, belowID);
OPstream toBelow(UPstream::scheduled, belowID, 0, tag);
// Send data destined for all other processors below belowID
forAll(notBelowLeaves, leafI)
@ -302,15 +308,15 @@ void Pstream::scatterList
template <class T>
void Pstream::scatterList(List<T>& Values)
void Pstream::scatterList(List<T>& Values, const int tag)
{
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
scatterList(UPstream::linearCommunication(), Values);
scatterList(UPstream::linearCommunication(), Values, tag);
}
else
{
scatterList(UPstream::treeCommunication(), Values);
scatterList(UPstream::treeCommunication(), Values, tag);
}
}

View File

@ -203,8 +203,13 @@ bool Foam::regIOobject::read()
// Master reads headerclassname from file. Make sure this gets
// transfered as well as contents.
Pstream::scatter(comms, const_cast<word&>(headerClassName()));
Pstream::scatter(comms, note());
Pstream::scatter
(
comms,
const_cast<word&>(headerClassName()),
Pstream::msgType()
);
Pstream::scatter(comms, note(), Pstream::msgType());
// Get my communication order
@ -228,6 +233,7 @@ bool Foam::regIOobject::read()
Pstream::scheduled,
myComm.above(),
0,
Pstream::msgType(),
IOstream::ASCII
);
ok = readData(fromAbove);

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -340,6 +340,8 @@ evaluate()
|| Pstream::defaultCommsType == Pstream::nonBlocking
)
{
label nReq = Pstream::nRequests();
forAll(*this, patchi)
{
this->operator[](patchi).initEvaluate(Pstream::defaultCommsType);
@ -352,7 +354,7 @@ evaluate()
&& Pstream::defaultCommsType == Pstream::nonBlocking
)
{
Pstream::waitRequests();
Pstream::waitRequests(nReq);
}
forAll(*this, patchi)

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -27,14 +27,14 @@ License
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::globalIndex::globalIndex(const label localSize)
Foam::globalIndex::globalIndex(const label localSize, const int tag)
:
offsets_(Pstream::nProcs()+1)
{
labelList localSizes(Pstream::nProcs());
localSizes[Pstream::myProcNo()] = localSize;
Pstream::gatherList(localSizes);
Pstream::scatterList(localSizes);
Pstream::gatherList(localSizes, tag);
Pstream::scatterList(localSizes, tag);
label offset = 0;
offsets_[0] = 0;

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -72,7 +72,7 @@ public:
// Constructors
//- Construct from local max size
globalIndex(const label localSize);
globalIndex(const label localSize, const int tag = Pstream::msgType());
//- Construct from Istream
globalIndex(Istream& is);

View File

@ -133,7 +133,8 @@ void Foam::mapDistribute::transform::operator()
Foam::List<Foam::labelPair> Foam::mapDistribute::schedule
(
const labelListList& subMap,
const labelListList& constructMap
const labelListList& constructMap,
const int tag
)
{
// Communications: send and receive processor
@ -174,7 +175,7 @@ Foam::List<Foam::labelPair> Foam::mapDistribute::schedule
slave++
)
{
IPstream fromSlave(Pstream::scheduled, slave);
IPstream fromSlave(Pstream::scheduled, slave, 0, tag);
List<labelPair> nbrData(fromSlave);
forAll(nbrData, i)
@ -195,18 +196,24 @@ Foam::List<Foam::labelPair> Foam::mapDistribute::schedule
slave++
)
{
OPstream toSlave(Pstream::scheduled, slave);
OPstream toSlave(Pstream::scheduled, slave, 0, tag);
toSlave << allComms;
}
}
else
{
{
OPstream toMaster(Pstream::scheduled, Pstream::masterNo());
OPstream toMaster(Pstream::scheduled, Pstream::masterNo(), 0, tag);
toMaster << allComms;
}
{
IPstream fromMaster(Pstream::scheduled, Pstream::masterNo());
IPstream fromMaster
(
Pstream::scheduled,
Pstream::masterNo(),
0,
tag
);
fromMaster >> allComms;
}
}
@ -257,7 +264,7 @@ const Foam::List<Foam::labelPair>& Foam::mapDistribute::schedule() const
(
new List<labelPair>
(
schedule(subMap_, constructMap_)
schedule(subMap_, constructMap_, Pstream::msgType())
)
);
}
@ -471,6 +478,7 @@ void Foam::mapDistribute::calcCompactAddressing
void Foam::mapDistribute::exchangeAddressing
(
const int tag,
const globalIndex& globalNumbering,
labelList& elements,
List<Map<label> >& compactMap,
@ -535,7 +543,8 @@ void Foam::mapDistribute::exchangeAddressing
(
wantedRemoteElements,
subMap_,
sendSizes
sendSizes,
tag
);
// Renumber elements
@ -548,6 +557,7 @@ void Foam::mapDistribute::exchangeAddressing
void Foam::mapDistribute::exchangeAddressing
(
const int tag,
const globalIndex& globalNumbering,
labelListList& cellCells,
List<Map<label> >& compactMap,
@ -612,7 +622,8 @@ void Foam::mapDistribute::exchangeAddressing
(
wantedRemoteElements,
subMap_,
sendSizes
sendSizes,
tag
);
// Renumber elements
@ -750,7 +761,8 @@ Foam::mapDistribute::mapDistribute
(
const globalIndex& globalNumbering,
labelList& elements,
List<Map<label> >& compactMap
List<Map<label> >& compactMap,
const int tag
)
:
constructSize_(0),
@ -789,6 +801,7 @@ Foam::mapDistribute::mapDistribute
labelList compactStart;
exchangeAddressing
(
tag,
globalNumbering,
elements,
compactMap,
@ -806,7 +819,8 @@ Foam::mapDistribute::mapDistribute
(
const globalIndex& globalNumbering,
labelListList& cellCells,
List<Map<label> >& compactMap
List<Map<label> >& compactMap,
const int tag
)
:
constructSize_(0),
@ -845,6 +859,7 @@ Foam::mapDistribute::mapDistribute
labelList compactStart;
exchangeAddressing
(
tag,
globalNumbering,
cellCells,
compactMap,
@ -865,7 +880,8 @@ Foam::mapDistribute::mapDistribute
const globalIndexAndTransform& globalTransforms,
const labelPairList& transformedElements,
labelList& transformedIndices,
List<Map<label> >& compactMap
List<Map<label> >& compactMap,
const int tag
)
:
constructSize_(0),
@ -900,6 +916,7 @@ Foam::mapDistribute::mapDistribute
labelList compactStart;
exchangeAddressing
(
tag,
globalNumbering,
elements,
compactMap,
@ -969,7 +986,8 @@ Foam::mapDistribute::mapDistribute
const globalIndexAndTransform& globalTransforms,
const List<labelPairList>& transformedElements,
labelListList& transformedIndices,
List<Map<label> >& compactMap
List<Map<label> >& compactMap,
const int tag
)
:
constructSize_(0),
@ -1008,6 +1026,7 @@ Foam::mapDistribute::mapDistribute
labelList compactStart;
exchangeAddressing
(
tag,
globalNumbering,
cellCells,
compactMap,
@ -1150,7 +1169,7 @@ Foam::label Foam::mapDistribute::renumber
}
void Foam::mapDistribute::compact(const boolList& elemIsUsed)
void Foam::mapDistribute::compact(const boolList& elemIsUsed, const int tag)
{
// 1. send back to sender. Have sender delete the corresponding element
// from the submap and do the same to the constructMap locally
@ -1160,6 +1179,31 @@ void Foam::mapDistribute::compact(const boolList& elemIsUsed)
// mapDistribute but in reverse order.
if (Pstream::parRun())
{
label startOfRequests = Pstream::nRequests();
// Set up receives from neighbours
List<boolList> recvFields(Pstream::nProcs());
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
const labelList& map = subMap_[domain];
if (domain != Pstream::myProcNo() && map.size())
{
recvFields[domain].setSize(map.size());
IPstream::read
(
Pstream::nonBlocking,
domain,
reinterpret_cast<char*>(recvFields[domain].begin()),
recvFields[domain].size()*sizeof(bool),
tag
);
}
}
List<boolList> sendFields(Pstream::nProcs());
for (label domain = 0; domain < Pstream::nProcs(); domain++)
@ -1180,31 +1224,12 @@ void Foam::mapDistribute::compact(const boolList& elemIsUsed)
Pstream::nonBlocking,
domain,
reinterpret_cast<const char*>(subField.begin()),
subField.size()*sizeof(bool)
subField.size()*sizeof(bool),
tag
);
}
}
// Set up receives from neighbours
List<boolList> recvFields(Pstream::nProcs());
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
const labelList& map = subMap_[domain];
if (domain != Pstream::myProcNo() && map.size())
{
recvFields[domain].setSize(map.size());
IPstream::read
(
Pstream::nonBlocking,
domain,
reinterpret_cast<char*>(recvFields[domain].begin()),
recvFields[domain].size()*sizeof(bool)
);
}
}
// Set up 'send' to myself - write directly into recvFields
@ -1222,7 +1247,7 @@ void Foam::mapDistribute::compact(const boolList& elemIsUsed)
// Wait for all to finish
Pstream::waitRequests();
Pstream::waitRequests(startOfRequests);
// Compact out all submap entries that are referring to unused elements

View File

@ -201,6 +201,7 @@ class mapDistribute
void exchangeAddressing
(
const int tag,
const globalIndex& globalNumbering,
labelList& elements,
List<Map<label> >& compactMap,
@ -208,6 +209,7 @@ class mapDistribute
);
void exchangeAddressing
(
const int tag,
const globalIndex& globalNumbering,
labelListList& elements,
List<Map<label> >& compactMap,
@ -380,7 +382,8 @@ public:
(
const globalIndex&,
labelList& elements,
List<Map<label> >& compactMap
List<Map<label> >& compactMap,
const int tag = Pstream::msgType()
);
//- Special variant that works with the info sorted into bins
@ -390,7 +393,8 @@ public:
(
const globalIndex&,
labelListList& cellCells,
List<Map<label> >& compactMap
List<Map<label> >& compactMap,
const int tag = Pstream::msgType()
);
//- Construct from list of (possibly remote) untransformed elements
@ -406,7 +410,8 @@ public:
const globalIndexAndTransform&,
const labelPairList& transformedElements,
labelList& transformedIndices,
List<Map<label> >& compactMap
List<Map<label> >& compactMap,
const int tag = Pstream::msgType()
);
//- As above but with ListLists.
@ -417,7 +422,8 @@ public:
const globalIndexAndTransform&,
const List<labelPairList>& transformedElements,
labelListList& transformedIndices,
List<Map<label> >& compactMap
List<Map<label> >& compactMap,
const int tag = Pstream::msgType()
);
//- Construct by transferring parameter content
@ -487,7 +493,8 @@ public:
static List<labelPair> schedule
(
const labelListList& subMap,
const labelListList& constructMap
const labelListList& constructMap,
const int tag
);
//- Return a schedule. Demand driven. See above.
@ -514,7 +521,7 @@ public:
//- Compact maps. Gets per field a bool whether it is used (locally)
// and works out itself what this side and sender side can remove
// from maps.
void compact(const boolList& elemIsUsed);
void compact(const boolList& elemIsUsed, const int tag);
//- Distribute data. Note:schedule only used for Pstream::scheduled
@ -527,7 +534,8 @@ public:
const label constructSize,
const labelListList& subMap,
const labelListList& constructMap,
List<T>&
List<T>&,
const int tag = UPstream::msgType()
);
//- Distribute data. If multiple processors writing to same
@ -542,13 +550,18 @@ public:
const labelListList& constructMap,
List<T>&,
const CombineOp& cop,
const T& nullValue
const T& nullValue,
const int tag = UPstream::msgType()
);
//- Distribute data using default commsType.
template<class T>
void distribute(List<T>& fld, const bool dummyTransform = true)
const;
void distribute
(
List<T>& fld,
const bool dummyTransform = true,
const int tag = UPstream::msgType()
) const;
//- Same but with transforms
template<class T, class TransformOp>
@ -556,7 +569,8 @@ public:
(
const globalIndexAndTransform&,
List<T>& fld,
const TransformOp& top
const TransformOp& top,
const int tag = UPstream::msgType()
) const;
//- Reverse distribute data using default commsType.
@ -565,7 +579,8 @@ public:
(
const label constructSize,
List<T>&,
const bool dummyTransform = true
const bool dummyTransform = true,
const int tag = UPstream::msgType()
) const;
//- Same but with transforms
@ -575,7 +590,8 @@ public:
const globalIndexAndTransform&,
const label constructSize,
List<T>& fld,
const TransformOp& top
const TransformOp& top,
const int tag = UPstream::msgType()
) const;
//- Reverse distribute data using default commsType.
@ -587,7 +603,8 @@ public:
const label constructSize,
const T& nullValue,
List<T>& fld,
const bool dummyTransform = true
const bool dummyTransform = true,
const int tag = UPstream::msgType()
) const;
//- Same but with transforms
@ -598,7 +615,8 @@ public:
const label constructSize,
const T& nullValue,
List<T>& fld,
const TransformOp& top
const TransformOp& top,
const int tag = UPstream::msgType()
) const;
//- Do all sends using PstreamBuffers

View File

@ -40,7 +40,8 @@ void Foam::mapDistribute::distribute
const label constructSize,
const labelListList& subMap,
const labelListList& constructMap,
List<T>& field
List<T>& field,
const int tag
)
{
if (!Pstream::parRun())
@ -79,7 +80,7 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
OPstream toNbr(Pstream::blocking, domain);
OPstream toNbr(Pstream::blocking, domain, 0, tag);
toNbr << UIndirectList<T>(field, map);
}
}
@ -110,7 +111,7 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
IPstream fromNbr(Pstream::blocking, domain);
IPstream fromNbr(Pstream::blocking, domain, 0, tag);
List<T> subField(fromNbr);
checkReceivedSize(domain, map.size(), subField.size());
@ -154,11 +155,11 @@ void Foam::mapDistribute::distribute
{
// I am send first, receive next
{
OPstream toNbr(Pstream::scheduled, recvProc);
OPstream toNbr(Pstream::scheduled, recvProc, 0, tag);
toNbr << UIndirectList<T>(field, subMap[recvProc]);
}
{
IPstream fromNbr(Pstream::scheduled, recvProc);
IPstream fromNbr(Pstream::scheduled, recvProc, 0, tag);
List<T> subField(fromNbr);
const labelList& map = constructMap[recvProc];
@ -175,7 +176,7 @@ void Foam::mapDistribute::distribute
{
// I am receive first, send next
{
IPstream fromNbr(Pstream::scheduled, sendProc);
IPstream fromNbr(Pstream::scheduled, sendProc, 0, tag);
List<T> subField(fromNbr);
const labelList& map = constructMap[sendProc];
@ -188,7 +189,7 @@ void Foam::mapDistribute::distribute
}
}
{
OPstream toNbr(Pstream::scheduled, sendProc);
OPstream toNbr(Pstream::scheduled, sendProc, 0, tag);
toNbr << UIndirectList<T>(field, subMap[sendProc]);
}
}
@ -197,9 +198,11 @@ void Foam::mapDistribute::distribute
}
else if (commsType == Pstream::nonBlocking)
{
label nOutstanding = Pstream::nRequests();
if (!contiguous<T>())
{
PstreamBuffers pBufs(Pstream::nonBlocking);
PstreamBuffers pBufs(Pstream::nonBlocking, tag);
// Stream data into buffer
for (label domain = 0; domain < Pstream::nProcs(); domain++)
@ -214,8 +217,8 @@ void Foam::mapDistribute::distribute
}
}
// Start receiving
pBufs.finishedSends();
// Start receiving. Do not block.
pBufs.finishedSends(false);
{
// Set up 'send' to myself
@ -238,6 +241,9 @@ void Foam::mapDistribute::distribute
}
}
// Block ourselves, waiting only for the current comms
Pstream::waitRequests(nOutstanding);
// Consume
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
@ -281,7 +287,8 @@ void Foam::mapDistribute::distribute
Pstream::nonBlocking,
domain,
reinterpret_cast<const char*>(subField.begin()),
subField.byteSize()
subField.byteSize(),
tag
);
}
}
@ -302,7 +309,8 @@ void Foam::mapDistribute::distribute
Pstream::nonBlocking,
domain,
reinterpret_cast<char*>(recvFields[domain].begin()),
recvFields[domain].byteSize()
recvFields[domain].byteSize(),
tag
);
}
}
@ -341,7 +349,8 @@ void Foam::mapDistribute::distribute
// Wait for all to finish
Pstream::waitRequests();
Pstream::waitRequests(nOutstanding);
// Collect neighbour fields
@ -383,7 +392,8 @@ void Foam::mapDistribute::distribute
const labelListList& constructMap,
List<T>& field,
const CombineOp& cop,
const T& nullValue
const T& nullValue,
const int tag
)
{
if (!Pstream::parRun())
@ -423,7 +433,7 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
OPstream toNbr(Pstream::blocking, domain);
OPstream toNbr(Pstream::blocking, domain, 0, tag);
toNbr << UIndirectList<T>(field, map);
}
}
@ -455,7 +465,7 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
IPstream fromNbr(Pstream::blocking, domain);
IPstream fromNbr(Pstream::blocking, domain, 0, tag);
List<T> subField(fromNbr);
checkReceivedSize(domain, map.size(), subField.size());
@ -499,11 +509,11 @@ void Foam::mapDistribute::distribute
{
// I am send first, receive next
{
OPstream toNbr(Pstream::scheduled, recvProc);
OPstream toNbr(Pstream::scheduled, recvProc, 0, tag);
toNbr << UIndirectList<T>(field, subMap[recvProc]);
}
{
IPstream fromNbr(Pstream::scheduled, recvProc);
IPstream fromNbr(Pstream::scheduled, recvProc, 0, tag);
List<T> subField(fromNbr);
const labelList& map = constructMap[recvProc];
@ -519,7 +529,7 @@ void Foam::mapDistribute::distribute
{
// I am receive first, send next
{
IPstream fromNbr(Pstream::scheduled, sendProc);
IPstream fromNbr(Pstream::scheduled, sendProc, 0, tag);
List<T> subField(fromNbr);
const labelList& map = constructMap[sendProc];
@ -531,7 +541,7 @@ void Foam::mapDistribute::distribute
}
}
{
OPstream toNbr(Pstream::scheduled, sendProc);
OPstream toNbr(Pstream::scheduled, sendProc, 0, tag);
toNbr << UIndirectList<T>(field, subMap[sendProc]);
}
}
@ -540,9 +550,11 @@ void Foam::mapDistribute::distribute
}
else if (commsType == Pstream::nonBlocking)
{
label nOutstanding = Pstream::nRequests();
if (!contiguous<T>())
{
PstreamBuffers pBufs(Pstream::nonBlocking);
PstreamBuffers pBufs(Pstream::nonBlocking, tag);
// Stream data into buffer
for (label domain = 0; domain < Pstream::nProcs(); domain++)
@ -557,8 +569,8 @@ void Foam::mapDistribute::distribute
}
}
// Start receiving
pBufs.finishedSends();
// Start receiving. Do not block.
pBufs.finishedSends(false);
{
// Set up 'send' to myself
@ -577,9 +589,8 @@ void Foam::mapDistribute::distribute
}
}
// Wait till all finished
UPstream::waitRequests();
// Block ourselves, waiting only for the current comms
Pstream::waitRequests(nOutstanding);
// Consume
for (label domain = 0; domain < Pstream::nProcs(); domain++)
@ -624,7 +635,8 @@ void Foam::mapDistribute::distribute
Pstream::nonBlocking,
domain,
reinterpret_cast<const char*>(subField.begin()),
subField.size()*sizeof(T)
subField.size()*sizeof(T),
tag
);
}
}
@ -645,7 +657,8 @@ void Foam::mapDistribute::distribute
Pstream::nonBlocking,
domain,
reinterpret_cast<char*>(recvFields[domain].begin()),
recvFields[domain].size()*sizeof(T)
recvFields[domain].size()*sizeof(T),
tag
);
}
}
@ -683,7 +696,8 @@ void Foam::mapDistribute::distribute
// Wait for all to finish
Pstream::waitRequests();
Pstream::waitRequests(nOutstanding);
// Collect neighbour fields
@ -878,7 +892,8 @@ template<class T>
void Foam::mapDistribute::distribute
(
List<T>& fld,
const bool dummyTransform
const bool dummyTransform,
const int tag
) const
{
if (Pstream::defaultCommsType == Pstream::nonBlocking)
@ -890,7 +905,8 @@ void Foam::mapDistribute::distribute
constructSize_,
subMap_,
constructMap_,
fld
fld,
tag
);
}
else if (Pstream::defaultCommsType == Pstream::scheduled)
@ -902,7 +918,8 @@ void Foam::mapDistribute::distribute
constructSize_,
subMap_,
constructMap_,
fld
fld,
tag
);
}
else
@ -914,7 +931,8 @@ void Foam::mapDistribute::distribute
constructSize_,
subMap_,
constructMap_,
fld
fld,
tag
);
}
@ -932,7 +950,8 @@ void Foam::mapDistribute::reverseDistribute
(
const label constructSize,
List<T>& fld,
const bool dummyTransform
const bool dummyTransform,
const int tag
) const
{
if (dummyTransform)
@ -949,7 +968,8 @@ void Foam::mapDistribute::reverseDistribute
constructSize,
constructMap_,
subMap_,
fld
fld,
tag
);
}
else if (Pstream::defaultCommsType == Pstream::scheduled)
@ -961,7 +981,8 @@ void Foam::mapDistribute::reverseDistribute
constructSize,
constructMap_,
subMap_,
fld
fld,
tag
);
}
else
@ -973,7 +994,8 @@ void Foam::mapDistribute::reverseDistribute
constructSize,
constructMap_,
subMap_,
fld
fld,
tag
);
}
}
@ -988,7 +1010,8 @@ void Foam::mapDistribute::reverseDistribute
const label constructSize,
const T& nullValue,
List<T>& fld,
const bool dummyTransform
const bool dummyTransform,
const int tag
) const
{
if (dummyTransform)
@ -1007,7 +1030,8 @@ void Foam::mapDistribute::reverseDistribute
subMap_,
fld,
eqOp<T>(),
nullValue
nullValue,
tag
);
}
else if (Pstream::defaultCommsType == Pstream::scheduled)
@ -1021,7 +1045,8 @@ void Foam::mapDistribute::reverseDistribute
subMap_,
fld,
eqOp<T>(),
nullValue
nullValue,
tag
);
}
else
@ -1035,7 +1060,8 @@ void Foam::mapDistribute::reverseDistribute
subMap_,
fld,
eqOp<T>(),
nullValue
nullValue,
tag
);
}
}
@ -1047,11 +1073,12 @@ void Foam::mapDistribute::distribute
(
const globalIndexAndTransform& git,
List<T>& fld,
const TransformOp& top
const TransformOp& top,
const int tag
) const
{
// Distribute. Leave out dummy transforms since we're doing them ourselves
distribute(fld, false);
distribute(fld, false, tag);
// Do transforms
applyTransforms(git, fld, top);
}
@ -1063,7 +1090,8 @@ void Foam::mapDistribute::reverseDistribute
const globalIndexAndTransform& git,
const label constructSize,
List<T>& fld,
const TransformOp& top
const TransformOp& top,
const int tag
) const
{
// Fill slots with reverse-transformed data. Note that it also copies
@ -1072,7 +1100,7 @@ void Foam::mapDistribute::reverseDistribute
applyInverseTransforms(git, fld, top);
// And send back (the remote slots). Disable dummy transformations.
reverseDistribute(constructSize, fld, false);
reverseDistribute(constructSize, fld, false, tag);
}
@ -1083,7 +1111,8 @@ void Foam::mapDistribute::reverseDistribute
const label constructSize,
const T& nullValue,
List<T>& fld,
const TransformOp& top
const TransformOp& top,
const int tag
) const
{
// Fill slots with reverse-transformed data Note that it also copies
@ -1092,7 +1121,7 @@ void Foam::mapDistribute::reverseDistribute
applyInverseTransforms(git, fld, top); //, eqOp<T>());
// And send back (the remote slots) Disable dummy transformations.
reverseDistribute(constructSize, nullValue, fld, false);
reverseDistribute(constructSize, nullValue, fld, false, tag);
}

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -55,12 +55,22 @@ void Foam::UPstream::abort()
}
void Foam::reduce(scalar&, const sumOp<scalar>&)
void Foam::reduce(scalar&, const sumOp<scalar>&, const int)
{}
void Foam::UPstream::waitRequests()
Foam::label Foam::UPstream::nRequests()
{
return 0;
}
void Foam::UPstream::resetRequests(const label i)
{}
void Foam::UPstream::waitRequests(const label start)
{}

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -145,6 +145,14 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
{
// Message is already received into externalBuf
messageSize_ = buffers.recvBuf_[fromProcNo].size();
if (debug)
{
Pout<< "UIPstream::UIPstream PstreamBuffers :"
<< " fromProcNo:" << fromProcNo
<< " tag:" << tag_ << " receive buffer size:" << messageSize_
<< Foam::endl;
}
}
else
{

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2004-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2004-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -29,6 +29,7 @@ License
#include "PstreamReduceOps.H"
#include "OSspecific.H"
#include "PstreamGlobals.H"
#include "SubList.H"
#include <cstring>
#include <cstdlib>
@ -172,7 +173,7 @@ void Foam::UPstream::abort()
}
void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
void Foam::reduce(scalar& Value, const sumOp<scalar>& bop, const int tag)
{
if (Pstream::debug)
{
@ -205,7 +206,7 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
1,
MPI_SCALAR,
UPstream::procID(slave),
UPstream::msgType(),
tag,
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
@ -231,7 +232,7 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
1,
MPI_SCALAR,
UPstream::procID(UPstream::masterNo()),
UPstream::msgType(),
tag,
MPI_COMM_WORLD
)
)
@ -262,7 +263,7 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
1,
MPI_SCALAR,
UPstream::procID(slave),
UPstream::msgType(),
tag,
MPI_COMM_WORLD
)
)
@ -285,7 +286,7 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
1,
MPI_SCALAR,
UPstream::procID(UPstream::masterNo()),
UPstream::msgType(),
tag,
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
@ -337,7 +338,7 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
1,
MPI_SCALAR,
UPstream::procID(childProcId),
UPstream::msgType(),
tag,
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
@ -373,7 +374,7 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
1,
MPI_SCALAR,
UPstream::procID(parentId),
UPstream::msgType(),
tag,
MPI_COMM_WORLD
)
)
@ -393,7 +394,7 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
1,
MPI_SCALAR,
UPstream::procID(parentId),
UPstream::msgType(),
tag,
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
@ -429,7 +430,7 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
1,
MPI_SCALAR,
UPstream::procID(childProcId),
UPstream::msgType(),
tag,
MPI_COMM_WORLD
)
)
@ -456,23 +457,45 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
}
void Foam::UPstream::waitRequests()
Foam::label Foam::UPstream::nRequests()
{
return PstreamGlobals::outstandingRequests_.size();
}
void Foam::UPstream::resetRequests(const label i)
{
if (i < PstreamGlobals::outstandingRequests_.size())
{
PstreamGlobals::outstandingRequests_.setSize(i);
}
}
void Foam::UPstream::waitRequests(const label start)
{
if (debug)
{
Pout<< "UPstream::waitRequests : starting wait for "
<< PstreamGlobals::outstandingRequests_.size()
<< " outstanding requests." << endl;
<< PstreamGlobals::outstandingRequests_.size()-start
<< " outstanding requests starting at " << start << endl;
}
if (PstreamGlobals::outstandingRequests_.size())
{
SubList<MPI_Request> waitRequests
(
PstreamGlobals::outstandingRequests_,
PstreamGlobals::outstandingRequests_.size() - start,
start
);
if
(
MPI_Waitall
(
PstreamGlobals::outstandingRequests_.size(),
PstreamGlobals::outstandingRequests_.begin(),
waitRequests.size(),
waitRequests.begin(),
MPI_STATUSES_IGNORE
)
)
@ -483,7 +506,7 @@ void Foam::UPstream::waitRequests()
) << "MPI_Waitall returned with error" << Foam::endl;
}
PstreamGlobals::outstandingRequests_.clear();
resetRequests(start);
}
if (debug)

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2008-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2008-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -68,7 +68,7 @@ void Foam::extendedCentredCellToFaceStencil::compact()
}
}
mapPtr_().compact(isInStencil);
mapPtr_().compact(isInStencil, Pstream::msgType());
}

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2008-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2008-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -68,7 +68,7 @@ void Foam::extendedCentredFaceToCellStencil::compact()
}
}
mapPtr_().compact(isInStencil);
mapPtr_().compact(isInStencil, Pstream::msgType());
}

View File

@ -1208,10 +1208,11 @@ void Foam::InteractionLists<ParticleType>::sendReferredData
template<class ParticleType>
void Foam::InteractionLists<ParticleType>::receiveReferredData
(
PstreamBuffers& pBufs
PstreamBuffers& pBufs,
const label startOfRequests
)
{
Pstream::waitRequests();
Pstream::waitRequests(startOfRequests);
referredParticles_.setSize(cellMap().constructSize());

View File

@ -38,9 +38,10 @@ Description
\verbatim
PstreamBuffers pBufs(Pstream::nonBlocking);
label startOfRequests = Pstream::nRequests();
il_.sendReferredData(cellOccupancy_, pBufs);
// Do other things
il_.receiveReferredData(pBufs);
il_.receiveReferredData(pBufs, startOfRequests);
\endverbatim
Requiring data:
@ -237,7 +238,11 @@ public:
);
//- Receive referred data
void receiveReferredData(PstreamBuffers& pBufs);
void receiveReferredData
(
PstreamBuffers& pBufs,
const label startReq = 0
);
// Access

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2009-2010 OpenCFD Ltd.
\\ / A nd | Copyright (C) 2009-2011 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -59,11 +59,13 @@ void Foam::PairCollision<CloudType>::parcelInteraction()
{
PstreamBuffers pBufs(Pstream::nonBlocking);
label startOfRequests = Pstream::nRequests();
il_.sendReferredData(this->owner().cellOccupancy(), pBufs);
realRealInteraction();
il_.receiveReferredData(pBufs);
il_.receiveReferredData(pBufs, startOfRequests);
realReferredInteraction();
}

View File

@ -130,6 +130,7 @@ void Foam::moleculeCloud::calculatePairForce()
PstreamBuffers pBufs(Pstream::nonBlocking);
// Start sending referred data
label startOfRequests = Pstream::nRequests();
il_.sendReferredData(cellOccupancy(), pBufs);
molecule* molI = NULL;
@ -173,7 +174,7 @@ void Foam::moleculeCloud::calculatePairForce()
}
// Receive referred data
il_.receiveReferredData(pBufs);
il_.receiveReferredData(pBufs, startOfRequests);
{
// Real-Referred interactions
@ -361,10 +362,12 @@ void Foam::moleculeCloud::removeHighEnergyOverlaps()
PstreamBuffers pBufs(Pstream::nonBlocking);
// Start sending referred data
label startOfRequests = Pstream::nRequests();
il_.sendReferredData(cellOccupancy(), pBufs);
// Receive referred data
il_.receiveReferredData(pBufs);
il_.receiveReferredData(pBufs, startOfRequests);
// Real-Referred interaction
@ -451,10 +454,12 @@ void Foam::moleculeCloud::removeHighEnergyOverlaps()
buildCellOccupancy();
// Start sending referred data
startOfRequests = Pstream::nRequests();
il_.sendReferredData(cellOccupancy(), pBufs);
// Receive referred data
il_.receiveReferredData(pBufs);
il_.receiveReferredData(pBufs, startOfRequests);
label molsRemoved = initialSize - this->size();