unallocated Pstream

This commit is contained in:
mattijs
2009-10-26 09:59:07 +00:00
parent 86af21c833
commit 46f1df810c
43 changed files with 2701 additions and 1811 deletions

View File

@ -78,10 +78,7 @@ int main(int argc, char *argv[])
Serr<< "slave sending to master "
<< Pstream::masterNo() << endl;
OPstream toMaster
(
Pstream::blocking, Pstream::masterNo(), IOstream::ASCII
);
OPstream toMaster(Pstream::blocking, Pstream::masterNo());
FixedList<label, 2> list3;
list3[0] = 0;
@ -98,10 +95,7 @@ int main(int argc, char *argv[])
)
{
Serr << "master receiving from slave " << slave << endl;
IPstream fromSlave
(
Pstream::blocking, slave, IOstream::ASCII
);
IPstream fromSlave(Pstream::blocking, slave);
FixedList<label, 2> list3(fromSlave);
Serr<< list3 << endl;

View File

@ -22,13 +22,9 @@ License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Description
\*---------------------------------------------------------------------------*/
#include "GatherBase.H"
#include "IPstream.H"
#include "OPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View File

@ -32,7 +32,7 @@
#------------------------------------------------------------------------------
setenv WM_PROJECT OpenFOAM
setenv WM_PROJECT_VERSION dev
setenv WM_PROJECT_VERSION UPstream
################################################################################
# USER EDITABLE PART
@ -96,8 +96,8 @@ if ( ! $?WM_PRECISION_OPTION ) setenv WM_PRECISION_OPTION DP
if ( ! $?WM_COMPILE_OPTION ) setenv WM_COMPILE_OPTION Opt
# WM_MPLIB = | OPENMPI | MPICH | MPICH-GM | HPMPI | GAMMA | MPI | QSMPI
if ( ! $?WM_MPLIB ) setenv WM_MPLIB OPENMPI
#if ( ! $?WM_MPLIB ) setenv WM_MPLIB OPENMPI
setenv WM_MPLIB OPENMPI
# Run options (floating-point signal handling and memory initialisation)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -108,11 +108,14 @@ StringStreams = $(Streams)/StringStreams
$(StringStreams)/StringStreamsPrint.C
Pstreams = $(Streams)/Pstreams
$(Pstreams)/Pstream.C
$(Pstreams)/PstreamCommsStruct.C
$(Pstreams)/UIPstream.C
$(Pstreams)/IPstream.C
$(Pstreams)/UPstream.C
$(Pstreams)/UPstreamCommsStruct.C
$(Pstreams)/Pstream.C
$(Pstreams)/UOPstream.C
$(Pstreams)/OPstream.C
$(Pstreams)/PstreamsPrint.C
$(Pstreams)/PstreamBuffers.C
dictionary = db/dictionary
$(dictionary)/dictionary.C

View File

@ -114,7 +114,7 @@ void Foam::ParSortableList<Type>::checkAndSend
}
{
OPstream toSlave(destProcI);
OPstream toSlave(Pstream::blocking, destProcI);
toSlave << values << indices;
}
}
@ -311,7 +311,7 @@ void Foam::ParSortableList<Type>::sort()
Pout<< "Receiving from " << procI << endl;
}
IPstream fromSlave(procI);
IPstream fromSlave(Pstream::blocking, procI);
fromSlave >> recValues >> recIndices;

View File

@ -24,291 +24,22 @@ License
\*---------------------------------------------------------------------------*/
#include "error.H"
#include "IPstream.H"
#include "int.H"
#include "token.H"
#include <cctype>
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * Private member functions * * * * * * * * * * * //
inline void Foam::IPstream::checkEof()
{
if (bufPosition_ == messageSize_)
{
setEof();
}
}
template<class T>
inline void Foam::IPstream::readFromBuffer(T& t)
{
const size_t align = sizeof(T);
bufPosition_ = align + ((bufPosition_ - 1) & ~(align - 1));
t = reinterpret_cast<T&>(buf_[bufPosition_]);
bufPosition_ += sizeof(T);
checkEof();
}
inline void Foam::IPstream::readFromBuffer
Foam::IPstream::IPstream
(
void* data,
size_t count,
size_t align
const commsTypes commsType,
const int fromProcNo,
const label bufSize,
streamFormat format,
versionNumber version
)
{
if (align > 1)
{
bufPosition_ = align + ((bufPosition_ - 1) & ~(align - 1));
}
register const char* bufPtr = &buf_[bufPosition_];
register char* dataPtr = reinterpret_cast<char*>(data);
register size_t i = count;
while (i--) *dataPtr++ = *bufPtr++;
bufPosition_ += count;
checkEof();
}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::IPstream::~IPstream()
{
if (bufPosition_ < messageSize_)
{
FatalErrorIn("IPstream::~IPstream()")
<< "Message not fully consumed. messageSize:" << messageSize_
<< " bytes of which only " << bufPosition_
<< " consumed." << Foam::abort(FatalError);
}
}
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
Foam::Istream& Foam::IPstream::read(token& t)
{
// Return the put back token if it exists
if (Istream::getBack(t))
{
return *this;
}
char c;
// return on error
if (!read(c))
{
t.setBad();
return *this;
}
// Set the line number of this token to the current stream line number
t.lineNumber() = lineNumber();
// Analyse input starting with this character.
switch (c)
{
// Punctuation
case token::END_STATEMENT :
case token::BEGIN_LIST :
case token::END_LIST :
case token::BEGIN_SQR :
case token::END_SQR :
case token::BEGIN_BLOCK :
case token::END_BLOCK :
case token::COLON :
case token::COMMA :
case token::ASSIGN :
case token::ADD :
case token::SUBTRACT :
case token::MULTIPLY :
case token::DIVIDE :
{
t = token::punctuationToken(c);
return *this;
}
// Word
case token::WORD :
{
word* pval = new word;
if (read(*pval))
{
if (token::compound::isCompound(*pval))
{
t = token::compound::New(*pval, *this).ptr();
delete pval;
}
else
{
t = pval;
}
}
else
{
delete pval;
t.setBad();
}
return *this;
}
// String
case token::STRING :
{
string* pval = new string;
if (read(*pval))
{
t = pval;
}
else
{
delete pval;
t.setBad();
}
return *this;
}
// Label
case token::LABEL :
{
label val;
if (read(val))
{
t = val;
}
else
{
t.setBad();
}
return *this;
}
// floatScalar
case token::FLOAT_SCALAR :
{
floatScalar val;
if (read(val))
{
t = val;
}
else
{
t.setBad();
}
return *this;
}
// doubleScalar
case token::DOUBLE_SCALAR :
{
doubleScalar val;
if (read(val))
{
t = val;
}
else
{
t.setBad();
}
return *this;
}
// Character (returned as a single character word) or error
default:
{
if (isalpha(c))
{
t = word(c);
return *this;
}
setBad();
t.setBad();
return *this;
}
}
}
Foam::Istream& Foam::IPstream::read(char& c)
{
c = buf_[bufPosition_];
bufPosition_++;
checkEof();
return *this;
}
Foam::Istream& Foam::IPstream::read(word& str)
{
size_t len;
readFromBuffer(len);
str = &buf_[bufPosition_];
bufPosition_ += len + 1;
checkEof();
return *this;
}
Foam::Istream& Foam::IPstream::read(string& str)
{
size_t len;
readFromBuffer(len);
str = &buf_[bufPosition_];
bufPosition_ += len + 1;
checkEof();
return *this;
}
Foam::Istream& Foam::IPstream::read(label& val)
{
readFromBuffer(val);
return *this;
}
Foam::Istream& Foam::IPstream::read(floatScalar& val)
{
readFromBuffer(val);
return *this;
}
Foam::Istream& Foam::IPstream::read(doubleScalar& val)
{
readFromBuffer(val);
return *this;
}
Foam::Istream& Foam::IPstream::read(char* data, std::streamsize count)
{
if (format() != BINARY)
{
FatalErrorIn("IPstream::read(char*, std::streamsize)")
<< "stream format not binary"
<< Foam::abort(FatalError);
}
readFromBuffer(data, count, 8);
return *this;
}
Foam::Istream& Foam::IPstream::rewind()
{
bufPosition_ = 0;
return *this;
}
:
Pstream(commsType, bufSize),
UIPstream(commsType, fromProcNo, buf_)
{}
// ************************************************************************* //

View File

@ -38,7 +38,7 @@ SourceFiles
#ifndef IPstream_H
#define IPstream_H
#include "Istream.H"
#include "UIPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -52,26 +52,8 @@ namespace Foam
class IPstream
:
public Pstream,
public Istream
public UIPstream
{
// Private data
int fromProcNo_;
label messageSize_;
// Private member functions
//- Check the bufferPosition_ against messageSize_ for EOF
inline void checkEof();
//- Read a T from the transfer buffer
template<class T>
inline void readFromBuffer(T&);
//- Read data from the transfer buffer
inline void readFromBuffer(void* data, size_t count, size_t align);
public:
@ -88,76 +70,6 @@ public:
versionNumber version=currentVersion
);
// Destructor
~IPstream();
// Member functions
// Inquiry
//- Return flags of output stream
ios_base::fmtflags flags() const
{
return ios_base::fmtflags(0);
}
// Read functions
//- Read into given buffer from given processor and return the
// message size
static label read
(
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize
);
//- Return next token from stream
Istream& read(token&);
//- Read a character
Istream& read(char&);
//- Read a word
Istream& read(word&);
// Read a string (including enclosing double-quotes)
Istream& read(string&);
//- Read a label
Istream& read(label&);
//- Read a floatScalar
Istream& read(floatScalar&);
//- Read a doubleScalar
Istream& read(doubleScalar&);
//- Read binary block
Istream& read(char*, std::streamsize);
//- Rewind and return the stream so that it may be read again
Istream& rewind();
// Edit
//- Set flags of stream
ios_base::fmtflags flags(const ios_base::fmtflags)
{
return ios_base::fmtflags(0);
}
// Print
//- Print description of IOstream to Ostream
void print(Ostream&) const;
};

View File

@ -22,68 +22,9 @@ License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Description
Write primitive and binary block from OPstream
\*---------------------------------------------------------------------------*/
#include "error.H"
#include "OPstream.H"
#include "int.H"
#include "token.H"
#include <cctype>
// * * * * * * * * * * * * * Private member functions * * * * * * * * * * * //
template<class T>
inline void Foam::OPstream::writeToBuffer(const T& t)
{
writeToBuffer(&t, sizeof(T), sizeof(T));
}
inline void Foam::OPstream::writeToBuffer(const char& c)
{
if (size_t(buf_.size()) < bufPosition_ + 1U)
{
enlargeBuffer(1);
}
buf_[bufPosition_] = c;
bufPosition_ ++;
}
inline void Foam::OPstream::writeToBuffer
(
const void* data,
size_t count,
size_t align
)
{
label oldPos = bufPosition_;
if (align > 1)
{
// Align bufPosition. Pads bufPosition_ - oldPos characters.
bufPosition_ = align + ((bufPosition_ - 1) & ~(align - 1));
}
if (size_t(buf_.size()) < bufPosition_ + count)
{
enlargeBuffer(bufPosition_ - oldPos + count);
}
register char* bufPtr = &buf_[bufPosition_];
register const char* dataPtr = reinterpret_cast<const char*>(data);
register size_t i = count;
while (i--) *bufPtr++ = *dataPtr++;
bufPosition_ += count;
}
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
@ -92,137 +33,14 @@ Foam::OPstream::OPstream
const commsTypes commsType,
const int toProcNo,
const label bufSize,
const label tag,
streamFormat format,
versionNumber version
)
:
Pstream(commsType, bufSize),
Ostream(format, version),
toProcNo_(toProcNo)
{
setOpened();
setGood();
if (!bufSize)
{
buf_.setSize(1000);
}
}
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
Foam::Ostream& Foam::OPstream::write(const token&)
{
notImplemented("Ostream& OPstream::write(const token&)");
setBad();
return *this;
}
Foam::Ostream& Foam::OPstream::write(const char c)
{
if (!isspace(c))
{
writeToBuffer(c);
}
return *this;
}
Foam::Ostream& Foam::OPstream::write(const char* str)
{
word nonWhiteChars(string::validate<word>(str));
if (nonWhiteChars.size() == 1)
{
return write(nonWhiteChars.c_str()[1]);
}
else if (nonWhiteChars.size())
{
return write(nonWhiteChars);
}
else
{
return *this;
}
}
Foam::Ostream& Foam::OPstream::write(const word& str)
{
write(char(token::WORD));
size_t len = str.size();
writeToBuffer(len);
writeToBuffer(str.c_str(), len + 1, 1);
return *this;
}
Foam::Ostream& Foam::OPstream::write(const string& str)
{
write(char(token::STRING));
size_t len = str.size();
writeToBuffer(len);
writeToBuffer(str.c_str(), len + 1, 1);
return *this;
}
Foam::Ostream& Foam::OPstream::writeQuoted(const std::string& str, const bool)
{
write(char(token::STRING));
size_t len = str.size();
writeToBuffer(len);
writeToBuffer(str.c_str(), len + 1, 1);
return *this;
}
Foam::Ostream& Foam::OPstream::write(const label val)
{
write(char(token::LABEL));
writeToBuffer(val);
return *this;
}
Foam::Ostream& Foam::OPstream::write(const floatScalar val)
{
write(char(token::FLOAT_SCALAR));
writeToBuffer(val);
return *this;
}
Foam::Ostream& Foam::OPstream::write(const doubleScalar val)
{
write(char(token::DOUBLE_SCALAR));
writeToBuffer(val);
return *this;
}
Foam::Ostream& Foam::OPstream::write(const char* data, std::streamsize count)
{
if (format() != BINARY)
{
FatalErrorIn("Ostream::write(const char*, std::streamsize)")
<< "stream format not binary"
<< Foam::abort(FatalError);
}
writeToBuffer(data, count, 8);
return *this;
}
UOPstream(commsType, toProcNo, buf_, tag, true, format, version)
{}
// ************************************************************************* //

View File

@ -38,7 +38,7 @@ SourceFiles
#ifndef OPstream_H
#define OPstream_H
#include "Ostream.H"
#include "UOPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -52,25 +52,8 @@ namespace Foam
class OPstream
:
public Pstream,
public Ostream
public UOPstream
{
// Private data
int toProcNo_;
// Private member functions
//- Write a T to the transfer buffer
template<class T>
inline void writeToBuffer(const T&);
//- Write a char to the transfer buffer
inline void writeToBuffer(const char&);
//- Write data to the transfer buffer
inline void writeToBuffer(const void* data, size_t count, size_t align);
public:
@ -83,126 +66,11 @@ public:
const commsTypes commsType,
const int toProcNo,
const label bufSize = 0,
const label tag = UPstream::msgType(),
streamFormat format=BINARY,
versionNumber version=currentVersion
);
// Destructor
~OPstream();
// Member functions
// Inquiry
//- Return flags of output stream
ios_base::fmtflags flags() const
{
return ios_base::fmtflags(0);
}
// Write functions
//- Write given buffer to given processor
static bool write
(
const commsTypes commsType,
const int toProcNo,
const char* buf,
const std::streamsize bufSize
);
//- Write next token to stream
Ostream& write(const token&);
//- Write character
Ostream& write(const char);
//- Write character string
Ostream& write(const char*);
//- Write word
Ostream& write(const word&);
//- Write string
Ostream& write(const string&);
//- Write std::string surrounded by quotes.
// Optional write without quotes.
Ostream& writeQuoted
(
const std::string&,
const bool quoted=true
);
//- Write label
Ostream& write(const label);
//- Write floatScalar
Ostream& write(const floatScalar);
//- Write doubleScalar
Ostream& write(const doubleScalar);
//- Write binary block
Ostream& write(const char*, std::streamsize);
//- Add indentation characters
void indent()
{}
// Stream state functions
//- Flush stream
void flush()
{}
//- Add newline and flush stream
void endl()
{}
//- Get width of output field
int width() const
{
return 0;
}
//- Set width of output field (and return old width)
int width(const int)
{
return 0;
}
//- Get precision of output field
int precision() const
{
return 0;
}
//- Set precision of output field (and return old precision)
int precision(const int)
{
return 0;
}
// Edit
//- Set flags of stream
ios_base::fmtflags flags(const ios_base::fmtflags)
{
return ios_base::fmtflags(0);
}
// Print
//- Print description of IOstream to Ostream
void print(Ostream&) const;
};

View File

@ -25,226 +25,16 @@ License
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "debug.H"
#include "dictionary.H"
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
defineTypeNameAndDebug(Foam::Pstream, 0);
template<>
const char* Foam::NamedEnum<Foam::Pstream::commsTypes, 3>::names[] =
{
"blocking",
"scheduled",
"nonBlocking"
};
const Foam::NamedEnum<Foam::Pstream::commsTypes, 3>
Foam::Pstream::commsTypeNames;
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::Pstream::setParRun()
{
parRun_ = true;
Pout.prefix() = '[' + name(myProcNo()) + "] ";
Perr.prefix() = '[' + name(myProcNo()) + "] ";
}
void Foam::Pstream::calcLinearComm(const label nProcs)
{
linearCommunication_.setSize(nProcs);
// Master
labelList belowIDs(nProcs - 1);
forAll(belowIDs, i)
{
belowIDs[i] = i + 1;
}
linearCommunication_[0] = commsStruct
(
nProcs,
0,
-1,
belowIDs,
labelList(0)
);
// Slaves. Have no below processors, only communicate up to master
for (label procID = 1; procID < nProcs; procID++)
{
linearCommunication_[procID] = commsStruct
(
nProcs,
procID,
0,
labelList(0),
labelList(0)
);
}
}
// Append my children (and my children children etc.) to allReceives.
void Foam::Pstream::collectReceives
(
const label procID,
const List<DynamicList<label> >& receives,
DynamicList<label>& allReceives
)
{
const DynamicList<label>& myChildren = receives[procID];
forAll(myChildren, childI)
{
allReceives.append(myChildren[childI]);
collectReceives(myChildren[childI], receives, allReceives);
}
}
// Tree like schedule. For 8 procs:
// (level 0)
// 0 receives from 1
// 2 receives from 3
// 4 receives from 5
// 6 receives from 7
// (level 1)
// 0 receives from 2
// 4 receives from 6
// (level 2)
// 0 receives from 4
//
// The sends/receives for all levels are collected per processor (one send per
// processor; multiple receives possible) creating a table:
//
// So per processor:
// proc receives from sends to
// ---- ------------- --------
// 0 1,2,4 -
// 1 - 0
// 2 3 0
// 3 - 2
// 4 5 0
// 5 - 4
// 6 7 4
// 7 - 6
void Foam::Pstream::calcTreeComm(label nProcs)
{
label nLevels = 1;
while ((1 << nLevels) < nProcs)
{
nLevels++;
}
List<DynamicList<label> > receives(nProcs);
labelList sends(nProcs, -1);
// Info<< "Using " << nLevels << " communication levels" << endl;
label offset = 2;
label childOffset = offset/2;
for (label level = 0; level < nLevels; level++)
{
label receiveID = 0;
while (receiveID < nProcs)
{
// Determine processor that sends and we receive from
label sendID = receiveID + childOffset;
if (sendID < nProcs)
{
receives[receiveID].append(sendID);
sends[sendID] = receiveID;
}
receiveID += offset;
}
offset <<= 1;
childOffset <<= 1;
}
// For all processors find the processors it receives data from
// (and the processors they receive data from etc.)
List<DynamicList<label> > allReceives(nProcs);
for (label procID = 0; procID < nProcs; procID++)
{
collectReceives(procID, receives, allReceives[procID]);
}
treeCommunication_.setSize(nProcs);
for (label procID = 0; procID < nProcs; procID++)
{
treeCommunication_[procID] = commsStruct
(
nProcs,
procID,
sends[procID],
receives[procID].shrink(),
allReceives[procID].shrink()
);
}
}
// Callback from Pstream::init() : initialize linear and tree communication
// schedules now that nProcs is known.
void Foam::Pstream::initCommunicationSchedule()
{
calcLinearComm(nProcs());
calcTreeComm(nProcs());
}
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// Initialise my process number to 0 (the master)
int Foam::Pstream::myProcNo_(0);
// By default this is not a parallel run
bool Foam::Pstream::parRun_(false);
// List of process IDs
Foam::List<int> Foam::Pstream::procIDs_(1, 0);
// Standard transfer message type
int Foam::Pstream::msgType_(1);
// Linear communication schedule
Foam::List<Foam::Pstream::commsStruct> Foam::Pstream::linearCommunication_(0);
// Multi level communication schedule
Foam::List<Foam::Pstream::commsStruct> Foam::Pstream::treeCommunication_(0);
// Should compact transfer be used in which floats replace doubles
// reducing the bandwidth requirement at the expense of some loss
// in accuracy
bool Foam::Pstream::floatTransfer
(
debug::optimisationSwitch("floatTransfer", 0)
);
// Number of processors at which the reduce algorithm changes from linear to
// tree
int Foam::Pstream::nProcsSimpleSum
(
debug::optimisationSwitch("nProcsSimpleSum", 16)
);
// Default commsType
Foam::Pstream::commsTypes Foam::Pstream::defaultCommsType
(
commsTypeNames.read(debug::optimisationSwitches().lookup("commsType"))
);
// ************************************************************************* //

View File

@ -30,22 +30,18 @@ Description
SourceFiles
Pstream.C
PstreamsPrint.C
PstreamCommsStruct.C
gatherScatter.C
combineGatherScatter.C
gatherScatterList.C
exchange.C
\*---------------------------------------------------------------------------*/
#ifndef Pstream_H
#define Pstream_H
#include "labelList.H"
#include "UPstream.H"
#include "DynamicList.H"
#include "HashTable.H"
#include "string.H"
#include "NamedEnum.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -57,162 +53,16 @@ namespace Foam
\*---------------------------------------------------------------------------*/
class Pstream
:
public UPstream
{
public:
//- Types of communications
enum commsTypes
{
blocking,
scheduled,
nonBlocking
};
static const NamedEnum<commsTypes, 3> commsTypeNames;
//- Structure for communicating between processors
class commsStruct
{
// Private data
//- procID of above processor
label above_;
//- procIDs of processors directly below me
labelList below_;
//- procIDs of all processors below (so not just directly below)
labelList allBelow_;
//- procIDs of all processors not below. (inverse set of allBelow_
// and minus myProcNo)
labelList allNotBelow_;
public:
// Constructors
//- Construct null
commsStruct();
//- Construct from components
commsStruct
(
const label,
const labelList&,
const labelList&,
const labelList&
);
//- Construct from components; construct allNotBelow_
commsStruct
(
const label nProcs,
const label myProcID,
const label,
const labelList&,
const labelList&
);
// Member Functions
// Access
label above() const
{
return above_;
}
const labelList& below() const
{
return below_;
}
const labelList& allBelow() const
{
return allBelow_;
}
const labelList& allNotBelow() const
{
return allNotBelow_;
}
// Member operators
bool operator==(const commsStruct&) const;
bool operator!=(const commsStruct&) const;
// Ostream Operator
friend Ostream& operator<<(Ostream&, const commsStruct&);
};
private:
// Private data
static int myProcNo_;
static bool parRun_;
static List<int> procIDs_;
static int msgType_;
static List<commsStruct> linearCommunication_;
static List<commsStruct> treeCommunication_;
// Private member functions
//- Set data for parallel running
static void setParRun();
//- Calculate linear communication schedule
static void calcLinearComm(const label nProcs);
//- Calculate tree communication schedule
static void calcTreeComm(const label nProcs);
//- Helper function for tree communication schedule determination
// Collects all processorIDs below a processor
static void collectReceives
(
const label procID,
const List<DynamicList<label> >& receives,
DynamicList<label>& allReceives
);
//- Initialize all communication schedules. Callback from
// Pstream::init()
static void initCommunicationSchedule();
protected:
// Protected data
//- Communications type of this stream
commsTypes commsType_;
//- Transfer buffer
List<char> buf_;
//- Current buffer read/write location
int bufPosition_;
// Protected member functions
//- Increase the size of the transfer buffer
inline void enlargeBuffer(size_t count);
DynamicList<char> buf_;
public:
@ -220,21 +70,6 @@ public:
ClassName("Pstream");
// Static data
//- Should compact transfer be used in which floats replace doubles
// reducing the bandwidth requirement at the expense of some loss
// in accuracy
static bool floatTransfer;
//- Number of processors at which the sum algorithm changes from linear
// to tree
static int nProcsSimpleSum;
//- Default commsType
static commsTypes defaultCommsType;
// Constructors
//- Construct given optional buffer size
@ -244,151 +79,16 @@ public:
const label bufSize = 0
)
:
commsType_(commsType),
bufPosition_(0)
UPstream(commsType),
buf_(0)
{
if (bufSize)
{
buf_.setSize(bufSize + 2*sizeof(scalar) + 1);
buf_.setCapacity(bufSize + 2*sizeof(scalar) + 1);
}
}
// Member functions
//- Add the valid option this type of communications library
// adds/requires on the command line
static void addValidParOptions(HashTable<string>& validParOptions);
//- Initialisation function called from main
// Spawns slave processes and initialises inter-communication
static bool init(int& argc, char**& argv);
//- Non-blocking comms: wait until all have finished.
static void waitRequests();
//- Non-blocking comms: has request i finished?
static bool finishedRequest(const label i);
//- Is this a parallel run?
static bool parRun()
{
return parRun_;
}
//- Number of processes in parallel run
static label nProcs()
{
return procIDs_.size();
}
//- Am I the master process
static bool master()
{
return myProcNo_ == 0;
}
//- Process index of the master
static int masterNo()
{
return 0;
}
//- Number of this process (starting from masterNo() = 0)
static int myProcNo()
{
return myProcNo_;
}
//- Process IDs
static const List<int>& procIDs()
{
return procIDs_;
}
//- Process ID of given process index
static int procID(int procNo)
{
return procIDs_[procNo];
}
//- Process index of first slave
static int firstSlave()
{
return 1;
}
//- Process index of last slave
static int lastSlave()
{
return nProcs() - 1;
}
//- Communication schedule for linear all-to-master (proc 0)
static const List<commsStruct>& linearCommunication()
{
return linearCommunication_;
}
//- Communication schedule for tree all-to-master (proc 0)
static const List<commsStruct>& treeCommunication()
{
return treeCommunication_;
}
//- Message tag of standard messages
static int& msgType()
{
return msgType_;
}
//- Get the communications type of the stream
commsTypes commsType() const
{
return commsType_;
}
//- Set the communications type of the stream
commsTypes commsType(const commsTypes ct)
{
commsTypes oldCommsType = commsType_;
commsType_ = ct;
return oldCommsType;
}
//- Transfer buffer
const List<char>& buf() const
{
return buf_;
}
//- Transfer buffer
List<char>& buf()
{
return buf_;
}
//- Current buffer read/write location
int bufPosition() const
{
return bufPosition_;
}
//- Current buffer read/write location
int& bufPosition()
{
return bufPosition_;
}
//- Exit program
static void exit(int errnum = 1);
//- Abort program
static void abort();
// Gather and scatter
//- Gather data. Apply bop to combine Value
@ -501,8 +201,8 @@ public:
// Gather/scatter keeping the individual processor data separate.
// Values is a List of size Pstream::nProcs() where
// Values[Pstream::myProcNo()] is the data for the current processor.
// Values is a List of size UPstream::nProcs() where
// Values[UPstream::myProcNo()] is the data for the current processor.
//- Gather data but keep individual values separate
template <class T>
@ -527,18 +227,26 @@ public:
//- Like above but switches between linear/tree communication
template <class T>
static void scatterList(List<T>& Values);
// Exchange
//- Exchange data. Sends sendData, receives into recvData, sets
// sizes (not bytes). sizes[p0][p1] is what processor p0 has
// sent to p1. Continuous data only.
//template <template<class> class ListType, class T>
template <class Container, class T>
static void exchange
(
const List<Container >&,
List<Container >&,
labelListList& sizes,
const label tag = UPstream::msgType()
);
};
inline void Pstream::enlargeBuffer(size_t count)
{
buf_.setSize(max(int(buf_.size() + count), 2*buf_.size()));
}
Ostream& operator<<(Ostream&, const Pstream::commsStruct&);
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
@ -549,9 +257,9 @@ Ostream& operator<<(Ostream&, const Pstream::commsStruct&);
# include "gatherScatter.C"
# include "combineGatherScatter.C"
# include "gatherScatterList.C"
# include "exchange.C"
#endif
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#endif

View File

@ -0,0 +1,115 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
\*---------------------------------------------------------------------------*/
#include "PstreamBuffers.H"
/* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
namespace Foam
{
DynamicList<char> PstreamBuffers::nullBuf(0);
}
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::PstreamBuffers::PstreamBuffers
(
const UPstream::commsTypes commsType,
const label tag,
IOstream::streamFormat format,
IOstream::versionNumber version
)
:
commsType_(commsType),
tag_(tag),
format_(format),
version_(version),
sendBuf_(UPstream::nProcs()),
recvBuf_(UPstream::nProcs()),
finishedSendsCalled_(false)
{}
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
void Foam::PstreamBuffers::finishedSends()
{
finishedSendsCalled_ = true;
if (commsType_ == UPstream::nonBlocking)
{
labelListList sizes;
Pstream::exchange<DynamicList<char>, char>
(
sendBuf_,
recvBuf_,
sizes,
tag_
);
}
}
void Foam::PstreamBuffers::finishedSends(labelListList& sizes)
{
finishedSendsCalled_ = true;
if (commsType_ == UPstream::nonBlocking)
{
labelListList sizes;
labelListList send,recv;
Pstream::exchange<DynamicList<char>, char>
(
sendBuf_,
recvBuf_,
sizes,
tag_
);
}
else
{
sizes.setSize(UPstream::nProcs());
labelList& nsTransPs = sizes[UPstream::myProcNo()];
nsTransPs.setSize(UPstream::nProcs());
forAll(sendBuf_, procI)
{
nsTransPs[procI] = sendBuf_[procI].size();
}
// Send sizes across.
label oldTag = UPstream::msgType();
UPstream::msgType() = tag_;
combineReduce(sizes, UPstream::listEq());
UPstream::msgType() = oldTag;
}
}
// ************************************************************************* //

View File

@ -0,0 +1,155 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Class
Foam::PstreamBuffers
Description
Buffers for inter-processor communications streams (UOPstream, UIPstream).
Use UOPstream to stream data into buffers, call finishedSends() to
notify that data is in buffers and then use IUPstream to get data out
of received buffers. Works with both blocking and nonBlocking. Does
not make much sense with scheduled since there you would not need these
explicit buffers.
Example usage:
PstreamBuffers pBuffers(Pstream::nonBlocking);
for (label procI = 0; procI < Pstream::nProcs(); procI++)
{
if (procI != Pstream::myProcNo())
{
someObject vals;
UOPstream str(procI, pBuffers);
str << vals;
}
}
pBuffers.finishedSends(); // no-op for blocking
Pstream::waitRequests(); // no-op for blocking
for (label procI = 0; procI < Pstream::nProcs(); procI++)
{
if (procI != Pstream::myProcNo())
{
UIPstream str(procI, pBuffers);
someObject vals(str);
}
}
SourceFiles
PstreamBuffers.C
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#ifndef PstreamBuffers_H
#define PstreamBuffers_H
#include "DynamicList.H"
#include "UPstream.H"
#include "IOstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
/*---------------------------------------------------------------------------*\
Class PstreamBuffers Declaration
\*---------------------------------------------------------------------------*/
class PstreamBuffers
{
friend class UOPstream;
friend class UIPstream;
// Private data
//- Communications type of this stream
const UPstream::commsTypes commsType_;
const label tag_;
const IOstream::streamFormat format_;
const IOstream::versionNumber version_;
//- send buffer
List<DynamicList<char> > sendBuf_;
//- receive buffer
List<DynamicList<char> > recvBuf_;
bool finishedSendsCalled_;
// Private member functions
public:
// Static data
static DynamicList<char> nullBuf;
// Constructors
//- Construct given comms type,
// write format and IO version
PstreamBuffers
(
const UPstream::commsTypes commsType,
const label tag = UPstream::msgType(),
IOstream::streamFormat format=IOstream::BINARY,
IOstream::versionNumber version=IOstream::currentVersion
);
// Member functions
//- Mark all sends as having been done. This will start receives
// in non-blocking mode.
void finishedSends();
//- Mark all sends as having been done. Same as above but also returns
// sizes (bytes) transferred.
void finishedSends(labelListList& sizes);
};
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#endif
// ************************************************************************* //

View File

@ -23,7 +23,7 @@ License
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
InClass
Foam::Pstream
Foam
Description
Combination-Reduction operation for a parallel run. The
@ -37,6 +37,7 @@ Description
#ifndef PstreamCombineReduceOps_H
#define PstreamCombineReduceOps_H
#include "UPstream.H"
#include "Pstream.H"
#include "ops.H"
@ -50,7 +51,7 @@ namespace Foam
template <class T, class CombineOp>
void combineReduce
(
const List<Pstream::commsStruct>& comms,
const List<UPstream::commsStruct>& comms,
T& Value,
const CombineOp& cop
)
@ -63,15 +64,15 @@ void combineReduce
template <class T, class CombineOp>
void combineReduce(T& Value, const CombineOp& cop)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
Pstream::combineGather(Pstream::linearCommunication(), Value, cop);
Pstream::combineScatter(Pstream::linearCommunication(), Value);
Pstream::combineGather(UPstream::linearCommunication(), Value, cop);
Pstream::combineScatter(UPstream::linearCommunication(), Value);
}
else
{
Pstream::combineGather(Pstream::treeCommunication(), Value, cop);
Pstream::combineScatter(Pstream::treeCommunication(), Value);
Pstream::combineGather(UPstream::treeCommunication(), Value, cop);
Pstream::combineScatter(UPstream::treeCommunication(), Value);
}
}

View File

@ -41,7 +41,7 @@ namespace Foam
template <class T, class BinaryOp>
void reduce
(
const List<Pstream::commsStruct>& comms,
const List<UPstream::commsStruct>& comms,
T& Value,
const BinaryOp& bop
)
@ -59,13 +59,13 @@ void reduce
const BinaryOp& bop
)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
reduce(Pstream::linearCommunication(), Value, bop);
reduce(UPstream::linearCommunication(), Value, bop);
}
else
{
reduce(Pstream::treeCommunication(), Value, bop);
reduce(UPstream::treeCommunication(), Value, bop);
}
}
@ -80,13 +80,13 @@ T returnReduce
{
T WorkValue(Value);
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
reduce(Pstream::linearCommunication(), WorkValue, bop);
reduce(UPstream::linearCommunication(), WorkValue, bop);
}
else
{
reduce(Pstream::treeCommunication(), WorkValue, bop);
reduce(UPstream::treeCommunication(), WorkValue, bop);
}
return WorkValue;

View File

@ -1,49 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Description
Prints out a description of the streams
\*---------------------------------------------------------------------------*/
#include "IPstream.H"
#include "OPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
void Foam::IPstream::print(Ostream& os) const
{
os << "Reading from processor " << fromProcNo_
<< " to processor " << myProcNo() << Foam::endl;
}
void Foam::OPstream::print(Ostream& os) const
{
os << "Writing from processor " << toProcNo_
<< " to processor " << myProcNo() << Foam::endl;
}
// ************************************************************************* //

View File

@ -0,0 +1,179 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Description
Read token and binary block from UIPstream
\*---------------------------------------------------------------------------*/
#include "mpi.h"
#include "UIPstream.H"
#include "PstreamGlobals.H"
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::UIPstream::UIPstream
(
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& externalBuf,
const label tag,
streamFormat format,
versionNumber version
)
:
UPstream(commsType),
Istream(format, version),
fromProcNo_(fromProcNo),
externalBuf_(externalBuf),
externalBufPosition_(0),
tag_(tag),
messageSize_(0)
{
setOpened();
setGood();
if (commsType == UPstream::nonBlocking)
{
// Message is already received into externalBuf
}
else
{
MPI_Status status;
label wantedSize = externalBuf_.capacity();
label oldTag = UPstream::msgType();
UPstream::msgType() = tag_;
// If the buffer size is not specified, probe the incomming message
// and set it
if (!wantedSize)
{
MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
externalBuf_.setCapacity(messageSize_);
wantedSize = messageSize_;
}
messageSize_ = UIPstream::read
(
commsType,
fromProcNo_,
externalBuf_.begin(),
wantedSize
);
UPstream::msgType() = oldTag;
// Set addressed size. Leave actual allocated memory intact.
externalBuf_.setSize(messageSize_);
if (!messageSize_)
{
FatalErrorIn
(
"UIPstream::UIPstream(const commsTypes, const int, "
"DynamicList<char>&, streamFormat, versionNumber)"
) << "read failed"
<< Foam::abort(FatalError);
}
}
}
Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
:
UPstream(buffers.commsType_),
Istream(buffers.format_, buffers.version_),
fromProcNo_(fromProcNo),
externalBuf_(buffers.recvBuf_[fromProcNo]),
externalBufPosition_(0),
tag_(buffers.tag_),
messageSize_(0)
{
if (commsType() != UPstream::scheduled && !buffers.finishedSendsCalled_)
{
FatalErrorIn("UIPstream::UIPstream(const int, PstreamBuffers&)")
<< "PstreamBuffers::finishedSends() never called." << endl
<< "Please call PstreamBuffers::finishedSends() after doing"
<< " all your sends (using UOPstream) and before doing any"
<< " receives (using UIPstream)" << Foam::exit(FatalError);
}
setOpened();
setGood();
if (commsType() == UPstream::nonBlocking)
{
// Message is already received into externalBuf
}
else
{
MPI_Status status;
label wantedSize = externalBuf_.capacity();
label oldTag = UPstream::msgType();
UPstream::msgType() = tag_;
// If the buffer size is not specified, probe the incomming message
// and set it
if (!wantedSize)
{
MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
externalBuf_.setCapacity(messageSize_);
wantedSize = messageSize_;
}
messageSize_ = UIPstream::read
(
commsType(),
fromProcNo_,
externalBuf_.begin(),
wantedSize
);
UPstream::msgType() = oldTag;
// Set addressed size. Leave actual allocated memory intact.
externalBuf_.setSize(messageSize_);
if (!messageSize_)
{
FatalErrorIn
(
"UIPstream::UIPstream(const int, PstreamBuffers&)"
) << "read failed"
<< Foam::abort(FatalError);
}
}
}
// ************************************************************************* //

View File

@ -0,0 +1,322 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
\*---------------------------------------------------------------------------*/
#include "error.H"
#include "UIPstream.H"
#include "int.H"
#include "token.H"
#include <cctype>
// * * * * * * * * * * * * * Private member functions * * * * * * * * * * * //
inline void Foam::UIPstream::checkEof()
{
if (externalBufPosition_ == messageSize_)
{
setEof();
}
}
template<class T>
inline void Foam::UIPstream::readFromBuffer(T& t)
{
const size_t align = sizeof(T);
externalBufPosition_ = align + ((externalBufPosition_ - 1) & ~(align - 1));
t = reinterpret_cast<T&>(externalBuf_[externalBufPosition_]);
externalBufPosition_ += sizeof(T);
checkEof();
}
inline void Foam::UIPstream::readFromBuffer
(
void* data,
size_t count,
size_t align
)
{
if (align > 1)
{
externalBufPosition_ =
align
+ ((externalBufPosition_ - 1) & ~(align - 1));
}
register const char* bufPtr = &externalBuf_[externalBufPosition_];
register char* dataPtr = reinterpret_cast<char*>(data);
register size_t i = count;
while (i--) *dataPtr++ = *bufPtr++;
externalBufPosition_ += count;
checkEof();
}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::UIPstream::~UIPstream()
{
if (externalBufPosition_ < messageSize_)
{
FatalErrorIn("UIPstream::~UIPstream()")
<< "Message not fully consumed. messageSize:" << messageSize_
<< " bytes of which only " << externalBufPosition_
<< " consumed." << Foam::abort(FatalError);
}
}
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
Foam::Istream& Foam::UIPstream::read(token& t)
{
// Return the put back token if it exists
if (Istream::getBack(t))
{
return *this;
}
char c;
// return on error
if (!read(c))
{
t.setBad();
return *this;
}
// Set the line number of this token to the current stream line number
t.lineNumber() = lineNumber();
// Analyse input starting with this character.
switch (c)
{
// Punctuation
case token::END_STATEMENT :
case token::BEGIN_LIST :
case token::END_LIST :
case token::BEGIN_SQR :
case token::END_SQR :
case token::BEGIN_BLOCK :
case token::END_BLOCK :
case token::COLON :
case token::COMMA :
case token::ASSIGN :
case token::ADD :
case token::SUBTRACT :
case token::MULTIPLY :
case token::DIVIDE :
{
t = token::punctuationToken(c);
return *this;
}
// Word
case token::WORD :
{
word* pval = new word;
if (read(*pval))
{
if (token::compound::isCompound(*pval))
{
t = token::compound::New(*pval, *this).ptr();
delete pval;
}
else
{
t = pval;
}
}
else
{
delete pval;
t.setBad();
}
return *this;
}
// String
case token::STRING :
{
string* pval = new string;
if (read(*pval))
{
t = pval;
}
else
{
delete pval;
t.setBad();
}
return *this;
}
// Label
case token::LABEL :
{
label val;
if (read(val))
{
t = val;
}
else
{
t.setBad();
}
return *this;
}
// floatScalar
case token::FLOAT_SCALAR :
{
floatScalar val;
if (read(val))
{
t = val;
}
else
{
t.setBad();
}
return *this;
}
// doubleScalar
case token::DOUBLE_SCALAR :
{
doubleScalar val;
if (read(val))
{
t = val;
}
else
{
t.setBad();
}
return *this;
}
// Character (returned as a single character word) or error
default:
{
if (isalpha(c))
{
t = word(c);
return *this;
}
setBad();
t.setBad();
return *this;
}
}
}
Foam::Istream& Foam::UIPstream::read(char& c)
{
c = externalBuf_[externalBufPosition_];
externalBufPosition_++;
checkEof();
return *this;
}
Foam::Istream& Foam::UIPstream::read(word& str)
{
size_t len;
readFromBuffer(len);
str = &externalBuf_[externalBufPosition_];
externalBufPosition_ += len + 1;
checkEof();
return *this;
}
Foam::Istream& Foam::UIPstream::read(string& str)
{
size_t len;
readFromBuffer(len);
str = &externalBuf_[externalBufPosition_];
externalBufPosition_ += len + 1;
checkEof();
return *this;
}
Foam::Istream& Foam::UIPstream::read(label& val)
{
readFromBuffer(val);
return *this;
}
Foam::Istream& Foam::UIPstream::read(floatScalar& val)
{
readFromBuffer(val);
return *this;
}
Foam::Istream& Foam::UIPstream::read(doubleScalar& val)
{
readFromBuffer(val);
return *this;
}
Foam::Istream& Foam::UIPstream::read(char* data, std::streamsize count)
{
if (format() != BINARY)
{
FatalErrorIn("UIPstream::read(char*, std::streamsize)")
<< "stream format not binary"
<< Foam::abort(FatalError);
}
readFromBuffer(data, count, 8);
return *this;
}
Foam::Istream& Foam::UIPstream::rewind()
{
externalBufPosition_ = 0;
return *this;
}
void Foam::UIPstream::print(Ostream& os) const
{
os << "Reading from processor " << fromProcNo_
<< " to processor " << myProcNo() << Foam::endl;
}
// ************************************************************************* //

View File

@ -0,0 +1,186 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Class
Foam::UIPstream
Description
Input inter-processor communications stream operating on external
buffer.
SourceFiles
UIPstream.C
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#ifndef UIPstream_H
#define UIPstream_H
#include "UPstream.H"
#include "Istream.H"
#include "PstreamBuffers.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
/*---------------------------------------------------------------------------*\
Class UIPstream Declaration
\*---------------------------------------------------------------------------*/
class UIPstream
:
public UPstream,
public Istream
{
// Private data
int fromProcNo_;
DynamicList<char>& externalBuf_;
label externalBufPosition_;
const label tag_;
label messageSize_;
// Private member functions
//- Check the bufferPosition against messageSize_ for EOF
inline void checkEof();
//- Read a T from the transfer buffer
template<class T>
inline void readFromBuffer(T&);
//- Read data from the transfer buffer
inline void readFromBuffer(void* data, size_t count, size_t align);
public:
// Constructors
//- Construct given process index to read from and optional buffer size,
// read format and IO version
UIPstream
(
const commsTypes commsType,
const int fromProcNo,
DynamicList<char>& externalBuf,
const label tag = UPstream::msgType(),
streamFormat format=BINARY,
versionNumber version=currentVersion
);
//- Construct given buffers
UIPstream(const int fromProcNo, PstreamBuffers&);
// Destructor
~UIPstream();
// Member functions
// Inquiry
//- Return flags of output stream
ios_base::fmtflags flags() const
{
return ios_base::fmtflags(0);
}
// Read functions
//- Read into given buffer from given processor and return the
// message size
static label read
(
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize
);
//- Return next token from stream
Istream& read(token&);
//- Read a character
Istream& read(char&);
//- Read a word
Istream& read(word&);
// Read a string (including enclosing double-quotes)
Istream& read(string&);
//- Read a label
Istream& read(label&);
//- Read a floatScalar
Istream& read(floatScalar&);
//- Read a doubleScalar
Istream& read(doubleScalar&);
//- Read binary block
Istream& read(char*, std::streamsize);
//- Rewind and return the stream so that it may be read again
Istream& rewind();
// Edit
//- Set flags of stream
ios_base::fmtflags flags(const ios_base::fmtflags)
{
return ios_base::fmtflags(0);
}
// Print
//- Print description of IOstream to Ostream
void print(Ostream&) const;
};
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#endif
// ************************************************************************* //

View File

@ -0,0 +1,279 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Description
Write primitive and binary block from UOPstream
\*---------------------------------------------------------------------------*/
#include "error.H"
#include "UOPstream.H"
#include "int.H"
#include "token.H"
#include <cctype>
// * * * * * * * * * * * * * Private member functions * * * * * * * * * * * //
template<class T>
inline void Foam::UOPstream::writeToBuffer(const T& t)
{
writeToBuffer(&t, sizeof(T), sizeof(T));
}
inline void Foam::UOPstream::writeToBuffer(const char& c)
{
if (!sendBuf_.capacity())
{
sendBuf_.setCapacity(1000);
}
sendBuf_.append(c);
}
inline void Foam::UOPstream::writeToBuffer
(
const void* data,
size_t count,
size_t align
)
{
if (!sendBuf_.capacity())
{
sendBuf_.setCapacity(1000);
}
label alignedPos = sendBuf_.size();
if (align > 1)
{
// Align bufPosition. Pads sendBuf_.size() - oldPos characters.
alignedPos = align + ((sendBuf_.size() - 1) & ~(align - 1));
}
// Extend if necessary
sendBuf_.setSize(alignedPos + count);
register const char* dataPtr = reinterpret_cast<const char*>(data);
register size_t i = count;
while (i--) sendBuf_[alignedPos++] = *dataPtr++;
}
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::UOPstream::UOPstream
(
const commsTypes commsType,
const int toProcNo,
DynamicList<char>& sendBuf,
const label tag,
const bool sendAtDestruct,
streamFormat format,
versionNumber version
)
:
UPstream(commsType),
Ostream(format, version),
toProcNo_(toProcNo),
sendBuf_(sendBuf),
tag_(tag),
sendAtDestruct_(sendAtDestruct)
{
setOpened();
setGood();
}
Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers)
:
UPstream(buffers.commsType_),
Ostream(buffers.format_, buffers.version_),
toProcNo_(toProcNo),
sendBuf_(buffers.sendBuf_[toProcNo]),
tag_(buffers.tag_),
sendAtDestruct_(buffers.commsType_ != UPstream::nonBlocking)
{
setOpened();
setGood();
}
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::UOPstream::~UOPstream()
{
if (sendAtDestruct_)
{
label oldTag = Pstream::msgType();
Pstream::msgType() = tag_;
if
(
!UOPstream::write
(
commsType_,
toProcNo_,
sendBuf_.begin(),
sendBuf_.size()
)
)
{
FatalErrorIn("UOPstream::~UOPstream()")
<< "Failed sending outgoing message of size " << sendBuf_.size()
<< " to processor " << toProcNo_
<< Foam::abort(FatalError);
}
UPstream::msgType() = oldTag;
}
}
// * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * * //
Foam::Ostream& Foam::UOPstream::write(const token&)
{
notImplemented("Ostream& UOPstream::write(const token&)");
setBad();
return *this;
}
Foam::Ostream& Foam::UOPstream::write(const char c)
{
if (!isspace(c))
{
writeToBuffer(c);
}
return *this;
}
Foam::Ostream& Foam::UOPstream::write(const char* str)
{
word nonWhiteChars(string::validate<word>(str));
if (nonWhiteChars.size() == 1)
{
return write(nonWhiteChars.c_str()[1]);
}
else if (nonWhiteChars.size())
{
return write(nonWhiteChars);
}
else
{
return *this;
}
}
Foam::Ostream& Foam::UOPstream::write(const word& str)
{
write(char(token::WORD));
size_t len = str.size();
writeToBuffer(len);
writeToBuffer(str.c_str(), len + 1, 1);
return *this;
}
Foam::Ostream& Foam::UOPstream::write(const string& str)
{
write(char(token::STRING));
size_t len = str.size();
writeToBuffer(len);
writeToBuffer(str.c_str(), len + 1, 1);
return *this;
}
Foam::Ostream& Foam::UOPstream::writeQuoted(const std::string& str, const bool)
{
write(char(token::STRING));
size_t len = str.size();
writeToBuffer(len);
writeToBuffer(str.c_str(), len + 1, 1);
return *this;
}
Foam::Ostream& Foam::UOPstream::write(const label val)
{
write(char(token::LABEL));
writeToBuffer(val);
return *this;
}
Foam::Ostream& Foam::UOPstream::write(const floatScalar val)
{
write(char(token::FLOAT_SCALAR));
writeToBuffer(val);
return *this;
}
Foam::Ostream& Foam::UOPstream::write(const doubleScalar val)
{
write(char(token::DOUBLE_SCALAR));
writeToBuffer(val);
return *this;
}
Foam::Ostream& Foam::UOPstream::write(const char* data, std::streamsize count)
{
if (format() != BINARY)
{
FatalErrorIn("Ostream::write(const char*, std::streamsize)")
<< "stream format not binary"
<< Foam::abort(FatalError);
}
writeToBuffer(data, count, 8);
return *this;
}
void Foam::UOPstream::print(Ostream& os) const
{
os << "Writing from processor " << toProcNo_
<< " to processor " << myProcNo() << Foam::endl;
}
// ************************************************************************* //

View File

@ -0,0 +1,232 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Class
Foam::UOPstream
Description
Output inter-processor communications stream operating on external
buffer.
SourceFiles
UOPstream.C
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#ifndef UOPstream_H
#define UOPstream_H
#include "UPstream.H"
#include "Ostream.H"
#include "DynamicList.H"
#include "PstreamBuffers.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
/*---------------------------------------------------------------------------*\
Class UOPstream Declaration
\*---------------------------------------------------------------------------*/
class UOPstream
:
public UPstream,
public Ostream
{
// Private data
int toProcNo_;
DynamicList<char>& sendBuf_;
const label tag_;
const bool sendAtDestruct_;
// Private member functions
//- Write a T to the transfer buffer
template<class T>
inline void writeToBuffer(const T&);
//- Write a char to the transfer buffer
inline void writeToBuffer(const char&);
//- Write data to the transfer buffer
inline void writeToBuffer(const void* data, size_t count, size_t align);
public:
// Constructors
//- Construct given process index to send to and optional buffer size,
// write format and IO version
UOPstream
(
const commsTypes commsType,
const int toProcNo,
DynamicList<char>& sendBuf,
const label tag = UPstream::msgType(),
const bool sendAtDestruct = true,
streamFormat format=BINARY,
versionNumber version=currentVersion
);
//- Construct given buffers
UOPstream(const int toProcNo, PstreamBuffers&);
// Destructor
~UOPstream();
// Member functions
// Inquiry
//- Return flags of output stream
ios_base::fmtflags flags() const
{
return ios_base::fmtflags(0);
}
// Write functions
//- Write given buffer to given processor
static bool write
(
const commsTypes commsType,
const int toProcNo,
const char* buf,
const std::streamsize bufSize
);
//- Write next token to stream
Ostream& write(const token&);
//- Write character
Ostream& write(const char);
//- Write character string
Ostream& write(const char*);
//- Write word
Ostream& write(const word&);
//- Write string
Ostream& write(const string&);
//- Write std::string surrounded by quotes.
// Optional write without quotes.
Ostream& writeQuoted
(
const std::string&,
const bool quoted=true
);
//- Write label
Ostream& write(const label);
//- Write floatScalar
Ostream& write(const floatScalar);
//- Write doubleScalar
Ostream& write(const doubleScalar);
//- Write binary block
Ostream& write(const char*, std::streamsize);
//- Add indentation characters
void indent()
{}
// Stream state functions
//- Flush stream
void flush()
{}
//- Add newline and flush stream
void endl()
{}
//- Get width of output field
int width() const
{
return 0;
}
//- Set width of output field (and return old width)
int width(const int)
{
return 0;
}
//- Get precision of output field
int precision() const
{
return 0;
}
//- Set precision of output field (and return old precision)
int precision(const int)
{
return 0;
}
// Edit
//- Set flags of stream
ios_base::fmtflags flags(const ios_base::fmtflags)
{
return ios_base::fmtflags(0);
}
// Print
//- Print description of IOstream to Ostream
void print(Ostream&) const;
};
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#endif
// ************************************************************************* //

View File

@ -0,0 +1,251 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
\*---------------------------------------------------------------------------*/
#include "UPstream.H"
#include "debug.H"
#include "dictionary.H"
#include "IOstreams.H"
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
defineTypeNameAndDebug(Foam::UPstream, 0);
template<>
const char* Foam::NamedEnum<Foam::UPstream::commsTypes, 3>::names[] =
{
"blocking",
"scheduled",
"nonBlocking"
};
const Foam::NamedEnum<Foam::UPstream::commsTypes, 3>
Foam::UPstream::commsTypeNames;
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
void Foam::UPstream::setParRun()
{
parRun_ = true;
Pout.prefix() = '[' + name(myProcNo()) + "] ";
Perr.prefix() = '[' + name(myProcNo()) + "] ";
}
void Foam::UPstream::calcLinearComm(const label nProcs)
{
linearCommunication_.setSize(nProcs);
// Master
labelList belowIDs(nProcs - 1);
forAll(belowIDs, i)
{
belowIDs[i] = i + 1;
}
linearCommunication_[0] = commsStruct
(
nProcs,
0,
-1,
belowIDs,
labelList(0)
);
// Slaves. Have no below processors, only communicate up to master
for (label procID = 1; procID < nProcs; procID++)
{
linearCommunication_[procID] = commsStruct
(
nProcs,
procID,
0,
labelList(0),
labelList(0)
);
}
}
// Append my children (and my children children etc.) to allReceives.
void Foam::UPstream::collectReceives
(
const label procID,
const List<DynamicList<label> >& receives,
DynamicList<label>& allReceives
)
{
const DynamicList<label>& myChildren = receives[procID];
forAll(myChildren, childI)
{
allReceives.append(myChildren[childI]);
collectReceives(myChildren[childI], receives, allReceives);
}
}
// Tree like schedule. For 8 procs:
// (level 0)
// 0 receives from 1
// 2 receives from 3
// 4 receives from 5
// 6 receives from 7
// (level 1)
// 0 receives from 2
// 4 receives from 6
// (level 2)
// 0 receives from 4
//
// The sends/receives for all levels are collected per processor (one send per
// processor; multiple receives possible) creating a table:
//
// So per processor:
// proc receives from sends to
// ---- ------------- --------
// 0 1,2,4 -
// 1 - 0
// 2 3 0
// 3 - 2
// 4 5 0
// 5 - 4
// 6 7 4
// 7 - 6
void Foam::UPstream::calcTreeComm(label nProcs)
{
label nLevels = 1;
while ((1 << nLevels) < nProcs)
{
nLevels++;
}
List<DynamicList<label> > receives(nProcs);
labelList sends(nProcs, -1);
// Info<< "Using " << nLevels << " communication levels" << endl;
label offset = 2;
label childOffset = offset/2;
for (label level = 0; level < nLevels; level++)
{
label receiveID = 0;
while (receiveID < nProcs)
{
// Determine processor that sends and we receive from
label sendID = receiveID + childOffset;
if (sendID < nProcs)
{
receives[receiveID].append(sendID);
sends[sendID] = receiveID;
}
receiveID += offset;
}
offset <<= 1;
childOffset <<= 1;
}
// For all processors find the processors it receives data from
// (and the processors they receive data from etc.)
List<DynamicList<label> > allReceives(nProcs);
for (label procID = 0; procID < nProcs; procID++)
{
collectReceives(procID, receives, allReceives[procID]);
}
treeCommunication_.setSize(nProcs);
for (label procID = 0; procID < nProcs; procID++)
{
treeCommunication_[procID] = commsStruct
(
nProcs,
procID,
sends[procID],
receives[procID].shrink(),
allReceives[procID].shrink()
);
}
}
// Callback from UPstream::init() : initialize linear and tree communication
// schedules now that nProcs is known.
void Foam::UPstream::initCommunicationSchedule()
{
calcLinearComm(nProcs());
calcTreeComm(nProcs());
}
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// Initialise my process number to 0 (the master)
int Foam::UPstream::myProcNo_(0);
// By default this is not a parallel run
bool Foam::UPstream::parRun_(false);
// List of process IDs
Foam::List<int> Foam::UPstream::procIDs_(1, 0);
// Standard transfer message type
int Foam::UPstream::msgType_(1);
// Linear communication schedule
Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::linearCommunication_(0);
// Multi level communication schedule
Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::treeCommunication_(0);
// Should compact transfer be used in which floats replace doubles
// reducing the bandwidth requirement at the expense of some loss
// in accuracy
bool Foam::UPstream::floatTransfer
(
debug::optimisationSwitch("floatTransfer", 0)
);
// Number of processors at which the reduce algorithm changes from linear to
// tree
int Foam::UPstream::nProcsSimpleSum
(
debug::optimisationSwitch("nProcsSimpleSum", 16)
);
// Default commsType
Foam::UPstream::commsTypes Foam::UPstream::defaultCommsType
(
commsTypeNames.read(debug::optimisationSwitches().lookup("commsType"))
);
// ************************************************************************* //

View File

@ -0,0 +1,381 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Class
Foam::UPstream
Description
Inter-processor communications stream
SourceFiles
UPstream.C
UPstreamsPrint.C
UPstreamCommsStruct.C
gatherScatter.C
combineGatherScatter.C
gatherScatterList.C
\*---------------------------------------------------------------------------*/
#ifndef UPstream_H
#define UPstream_H
#include "labelList.H"
#include "DynamicList.H"
#include "HashTable.H"
#include "string.H"
#include "NamedEnum.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
/*---------------------------------------------------------------------------*\
Class UPstream Declaration
\*---------------------------------------------------------------------------*/
class UPstream
{
public:
//- Types of communications
enum commsTypes
{
blocking,
scheduled,
nonBlocking
};
static const NamedEnum<commsTypes, 3> commsTypeNames;
// Public classes
//- Structure for communicating between processors
class commsStruct
{
// Private data
//- procID of above processor
label above_;
//- procIDs of processors directly below me
labelList below_;
//- procIDs of all processors below (so not just directly below)
labelList allBelow_;
//- procIDs of all processors not below. (inverse set of
// allBelow_ and minus myProcNo)
labelList allNotBelow_;
public:
// Constructors
//- Construct null
commsStruct();
//- Construct from components
commsStruct
(
const label,
const labelList&,
const labelList&,
const labelList&
);
//- Construct from components; construct allNotBelow_
commsStruct
(
const label nProcs,
const label myProcID,
const label,
const labelList&,
const labelList&
);
// Member Functions
// Access
label above() const
{
return above_;
}
const labelList& below() const
{
return below_;
}
const labelList& allBelow() const
{
return allBelow_;
}
const labelList& allNotBelow() const
{
return allNotBelow_;
}
// Member operators
bool operator==(const commsStruct&) const;
bool operator!=(const commsStruct&) const;
// Ostream Operator
friend Ostream& operator<<(Ostream&, const commsStruct&);
};
//- combineReduce operator for lists. Used for counting.
class listEq
{
public:
template<class T>
void operator()(T& x, const T& y) const
{
forAll(y, i)
{
if (y[i].size())
{
x[i] = y[i];
}
}
}
};
private:
// Private data
static int myProcNo_;
static bool parRun_;
static List<int> procIDs_;
static int msgType_;
static List<commsStruct> linearCommunication_;
static List<commsStruct> treeCommunication_;
// Private member functions
//- Set data for parallel running
static void setParRun();
//- Calculate linear communication schedule
static void calcLinearComm(const label nProcs);
//- Calculate tree communication schedule
static void calcTreeComm(const label nProcs);
//- Helper function for tree communication schedule determination
// Collects all processorIDs below a processor
static void collectReceives
(
const label procID,
const List<DynamicList<label> >& receives,
DynamicList<label>& allReceives
);
//- Initialize all communication schedules. Callback from
// UPstream::init()
static void initCommunicationSchedule();
protected:
// Protected data
//- Communications type of this stream
commsTypes commsType_;
public:
// Declare name of the class and its debug switch
ClassName("UPstream");
// Static data
//- Should compact transfer be used in which floats replace doubles
// reducing the bandwidth requirement at the expense of some loss
// in accuracy
static bool floatTransfer;
//- Number of processors at which the sum algorithm changes from linear
// to tree
static int nProcsSimpleSum;
//- Default commsType
static commsTypes defaultCommsType;
// Constructors
//- Construct given optional buffer size
UPstream(const commsTypes commsType)
:
commsType_(commsType)
{}
// Member functions
//- Add the valid option this type of communications library
// adds/requires on the command line
static void addValidParOptions(HashTable<string>& validParOptions);
//- Initialisation function called from main
// Spawns slave processes and initialises inter-communication
static bool init(int& argc, char**& argv);
//- Non-blocking comms: wait until all have finished.
static void waitRequests();
//- Non-blocking comms: has request i finished?
static bool finishedRequest(const label i);
//- Is this a parallel run?
static bool parRun()
{
return parRun_;
}
//- Number of processes in parallel run
static label nProcs()
{
return procIDs_.size();
}
//- Am I the master process
static bool master()
{
return myProcNo_ == 0;
}
//- Process index of the master
static int masterNo()
{
return 0;
}
//- Number of this process (starting from masterNo() = 0)
static int myProcNo()
{
return myProcNo_;
}
//- Process IDs
static const List<int>& procIDs()
{
return procIDs_;
}
//- Process ID of given process index
static int procID(int procNo)
{
return procIDs_[procNo];
}
//- Process index of first slave
static int firstSlave()
{
return 1;
}
//- Process index of last slave
static int lastSlave()
{
return nProcs() - 1;
}
//- Communication schedule for linear all-to-master (proc 0)
static const List<commsStruct>& linearCommunication()
{
return linearCommunication_;
}
//- Communication schedule for tree all-to-master (proc 0)
static const List<commsStruct>& treeCommunication()
{
return treeCommunication_;
}
//- Message tag of standard messages
static int& msgType()
{
return msgType_;
}
//- Get the communications type of the stream
commsTypes commsType() const
{
return commsType_;
}
//- Set the communications type of the stream
commsTypes commsType(const commsTypes ct)
{
commsTypes oldCommsType = commsType_;
commsType_ = ct;
return oldCommsType;
}
//- Exit program
static void exit(int errnum = 1);
//- Abort program
static void abort();
};
Ostream& operator<<(Ostream&, const UPstream::commsStruct&);
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#endif
// ************************************************************************* //

View File

@ -24,12 +24,12 @@ License
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "UPstream.H"
#include "boolList.H"
// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
Foam::Pstream::commsStruct::commsStruct()
Foam::UPstream::commsStruct::commsStruct()
:
above_(-1),
below_(0),
@ -38,7 +38,7 @@ Foam::Pstream::commsStruct::commsStruct()
{}
Foam::Pstream::commsStruct::commsStruct
Foam::UPstream::commsStruct::commsStruct
(
const label above,
const labelList& below,
@ -53,7 +53,7 @@ Foam::Pstream::commsStruct::commsStruct
{}
Foam::Pstream::commsStruct::commsStruct
Foam::UPstream::commsStruct::commsStruct
(
const label nProcs,
const label myProcID,
@ -91,7 +91,7 @@ Foam::Pstream::commsStruct::commsStruct
// * * * * * * * * * * * * * * * Member Operators * * * * * * * * * * * * * //
bool Foam::Pstream::commsStruct::operator==(const commsStruct& comm) const
bool Foam::UPstream::commsStruct::operator==(const commsStruct& comm) const
{
return
(
@ -103,7 +103,7 @@ bool Foam::Pstream::commsStruct::operator==(const commsStruct& comm) const
}
bool Foam::Pstream::commsStruct::operator!=(const commsStruct& comm) const
bool Foam::UPstream::commsStruct::operator!=(const commsStruct& comm) const
{
return !operator==(comm);
}
@ -111,7 +111,7 @@ bool Foam::Pstream::commsStruct::operator!=(const commsStruct& comm) const
// * * * * * * * * * * * * * * * Ostream Operator * * * * * * * * * * * * * //
Foam::Ostream& Foam::operator<<(Ostream& os, const Pstream::commsStruct& comm)
Foam::Ostream& Foam::operator<<(Ostream& os, const UPstream::commsStruct& comm)
{
os << comm.above_ << token::SPACE
<< comm.below_ << token::SPACE

View File

@ -49,15 +49,15 @@ namespace Foam
template <class T, class CombineOp>
void Pstream::combineGather
(
const List<Pstream::commsStruct>& comms,
const List<UPstream::commsStruct>& comms,
T& Value,
const CombineOp& cop
)
{
if (Pstream::parRun())
if (UPstream::parRun())
{
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo()];
// Receive from my downstairs neighbours
forAll(myComm.below(), belowI)
@ -67,9 +67,9 @@ void Pstream::combineGather
if (contiguous<T>())
{
T value;
IPstream::read
UIPstream::read
(
Pstream::scheduled,
UPstream::scheduled,
belowID,
reinterpret_cast<char*>(&value),
sizeof(T)
@ -85,7 +85,7 @@ void Pstream::combineGather
}
else
{
IPstream fromBelow(Pstream::scheduled, belowID);
IPstream fromBelow(UPstream::scheduled, belowID);
T value(fromBelow);
if (debug & 2)
@ -109,9 +109,9 @@ void Pstream::combineGather
if (contiguous<T>())
{
OPstream::write
UOPstream::write
(
Pstream::scheduled,
UPstream::scheduled,
myComm.above(),
reinterpret_cast<const char*>(&Value),
sizeof(T)
@ -119,7 +119,7 @@ void Pstream::combineGather
}
else
{
OPstream toAbove(Pstream::scheduled, myComm.above());
OPstream toAbove(UPstream::scheduled, myComm.above());
toAbove << Value;
}
}
@ -130,33 +130,37 @@ void Pstream::combineGather
template <class T, class CombineOp>
void Pstream::combineGather(T& Value, const CombineOp& cop)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
combineGather(Pstream::linearCommunication(), Value, cop);
combineGather(UPstream::linearCommunication(), Value, cop);
}
else
{
combineGather(Pstream::treeCommunication(), Value, cop);
combineGather(UPstream::treeCommunication(), Value, cop);
}
}
template <class T>
void Pstream::combineScatter(const List<Pstream::commsStruct>& comms, T& Value)
void Pstream::combineScatter
(
const List<UPstream::commsStruct>& comms,
T& Value
)
{
if (Pstream::parRun())
if (UPstream::parRun())
{
// Get my communication order
const Pstream::commsStruct& myComm = comms[Pstream::myProcNo()];
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo()];
// Reveive from up
if (myComm.above() != -1)
{
if (contiguous<T>())
{
IPstream::read
UIPstream::read
(
Pstream::scheduled,
UPstream::scheduled,
myComm.above(),
reinterpret_cast<char*>(&Value),
sizeof(T)
@ -164,7 +168,7 @@ void Pstream::combineScatter(const List<Pstream::commsStruct>& comms, T& Value)
}
else
{
IPstream fromAbove(Pstream::scheduled, myComm.above());
IPstream fromAbove(UPstream::scheduled, myComm.above());
Value = T(fromAbove);
}
@ -187,9 +191,9 @@ void Pstream::combineScatter(const List<Pstream::commsStruct>& comms, T& Value)
if (contiguous<T>())
{
OPstream::write
UOPstream::write
(
Pstream::scheduled,
UPstream::scheduled,
belowID,
reinterpret_cast<const char*>(&Value),
sizeof(T)
@ -197,7 +201,7 @@ void Pstream::combineScatter(const List<Pstream::commsStruct>& comms, T& Value)
}
else
{
OPstream toBelow(Pstream::scheduled, belowID);
OPstream toBelow(UPstream::scheduled, belowID);
toBelow << Value;
}
}
@ -208,13 +212,13 @@ void Pstream::combineScatter(const List<Pstream::commsStruct>& comms, T& Value)
template <class T>
void Pstream::combineScatter(T& Value)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
combineScatter(Pstream::linearCommunication(), Value);
combineScatter(UPstream::linearCommunication(), Value);
}
else
{
combineScatter(Pstream::treeCommunication(), Value);
combineScatter(UPstream::treeCommunication(), Value);
}
}
@ -226,15 +230,15 @@ void Pstream::combineScatter(T& Value)
template <class T, class CombineOp>
void Pstream::listCombineGather
(
const List<Pstream::commsStruct>& comms,
const List<UPstream::commsStruct>& comms,
List<T>& Values,
const CombineOp& cop
)
{
if (Pstream::parRun())
if (UPstream::parRun())
{
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo()];
// Receive from my downstairs neighbours
forAll(myComm.below(), belowI)
@ -245,9 +249,9 @@ void Pstream::listCombineGather
{
List<T> receivedValues(Values.size());
IPstream::read
UIPstream::read
(
Pstream::scheduled,
UPstream::scheduled,
belowID,
reinterpret_cast<char*>(receivedValues.begin()),
receivedValues.byteSize()
@ -266,7 +270,7 @@ void Pstream::listCombineGather
}
else
{
IPstream fromBelow(Pstream::scheduled, belowID);
IPstream fromBelow(UPstream::scheduled, belowID);
List<T> receivedValues(fromBelow);
if (debug & 2)
@ -293,9 +297,9 @@ void Pstream::listCombineGather
if (contiguous<T>())
{
OPstream::write
UOPstream::write
(
Pstream::scheduled,
UPstream::scheduled,
myComm.above(),
reinterpret_cast<const char*>(Values.begin()),
Values.byteSize()
@ -303,7 +307,7 @@ void Pstream::listCombineGather
}
else
{
OPstream toAbove(Pstream::scheduled, myComm.above());
OPstream toAbove(UPstream::scheduled, myComm.above());
toAbove << Values;
}
}
@ -314,13 +318,13 @@ void Pstream::listCombineGather
template <class T, class CombineOp>
void Pstream::listCombineGather(List<T>& Values, const CombineOp& cop)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
listCombineGather(Pstream::linearCommunication(), Values, cop);
listCombineGather(UPstream::linearCommunication(), Values, cop);
}
else
{
listCombineGather(Pstream::treeCommunication(), Values, cop);
listCombineGather(UPstream::treeCommunication(), Values, cop);
}
}
@ -328,23 +332,23 @@ void Pstream::listCombineGather(List<T>& Values, const CombineOp& cop)
template <class T>
void Pstream::listCombineScatter
(
const List<Pstream::commsStruct>& comms,
const List<UPstream::commsStruct>& comms,
List<T>& Values
)
{
if (Pstream::parRun())
if (UPstream::parRun())
{
// Get my communication order
const Pstream::commsStruct& myComm = comms[Pstream::myProcNo()];
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo()];
// Reveive from up
if (myComm.above() != -1)
{
if (contiguous<T>())
{
IPstream::read
UIPstream::read
(
Pstream::scheduled,
UPstream::scheduled,
myComm.above(),
reinterpret_cast<char*>(Values.begin()),
Values.byteSize()
@ -352,7 +356,7 @@ void Pstream::listCombineScatter
}
else
{
IPstream fromAbove(Pstream::scheduled, myComm.above());
IPstream fromAbove(UPstream::scheduled, myComm.above());
fromAbove >> Values;
}
@ -375,9 +379,9 @@ void Pstream::listCombineScatter
if (contiguous<T>())
{
OPstream::write
UOPstream::write
(
Pstream::scheduled,
UPstream::scheduled,
belowID,
reinterpret_cast<const char*>(Values.begin()),
Values.byteSize()
@ -385,7 +389,7 @@ void Pstream::listCombineScatter
}
else
{
OPstream toBelow(Pstream::scheduled, belowID);
OPstream toBelow(UPstream::scheduled, belowID);
toBelow << Values;
}
}
@ -396,13 +400,13 @@ void Pstream::listCombineScatter
template <class T>
void Pstream::listCombineScatter(List<T>& Values)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
listCombineScatter(Pstream::linearCommunication(), Values);
listCombineScatter(UPstream::linearCommunication(), Values);
}
else
{
listCombineScatter(Pstream::treeCommunication(), Values);
listCombineScatter(UPstream::treeCommunication(), Values);
}
}
@ -416,22 +420,22 @@ void Pstream::listCombineScatter(List<T>& Values)
template <class Container, class CombineOp>
void Pstream::mapCombineGather
(
const List<Pstream::commsStruct>& comms,
const List<UPstream::commsStruct>& comms,
Container& Values,
const CombineOp& cop
)
{
if (Pstream::parRun())
if (UPstream::parRun())
{
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo()];
// Receive from my downstairs neighbours
forAll(myComm.below(), belowI)
{
label belowID = myComm.below()[belowI];
IPstream fromBelow(Pstream::scheduled, belowID);
IPstream fromBelow(UPstream::scheduled, belowID);
Container receivedValues(fromBelow);
if (debug & 2)
@ -471,7 +475,7 @@ void Pstream::mapCombineGather
<< " data:" << Values << endl;
}
OPstream toAbove(Pstream::scheduled, myComm.above());
OPstream toAbove(UPstream::scheduled, myComm.above());
toAbove << Values;
}
}
@ -481,13 +485,13 @@ void Pstream::mapCombineGather
template <class Container, class CombineOp>
void Pstream::mapCombineGather(Container& Values, const CombineOp& cop)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
mapCombineGather(Pstream::linearCommunication(), Values, cop);
mapCombineGather(UPstream::linearCommunication(), Values, cop);
}
else
{
mapCombineGather(Pstream::treeCommunication(), Values, cop);
mapCombineGather(UPstream::treeCommunication(), Values, cop);
}
}
@ -495,19 +499,19 @@ void Pstream::mapCombineGather(Container& Values, const CombineOp& cop)
template <class Container>
void Pstream::mapCombineScatter
(
const List<Pstream::commsStruct>& comms,
const List<UPstream::commsStruct>& comms,
Container& Values
)
{
if (Pstream::parRun())
if (UPstream::parRun())
{
// Get my communication order
const Pstream::commsStruct& myComm = comms[Pstream::myProcNo()];
const UPstream::commsStruct& myComm = comms[UPstream::myProcNo()];
// Reveive from up
if (myComm.above() != -1)
{
IPstream fromAbove(Pstream::scheduled, myComm.above());
IPstream fromAbove(UPstream::scheduled, myComm.above());
fromAbove >> Values;
if (debug & 2)
@ -527,7 +531,7 @@ void Pstream::mapCombineScatter
Pout<< " sending to " << belowID << " data:" << Values << endl;
}
OPstream toBelow(Pstream::scheduled, belowID);
OPstream toBelow(UPstream::scheduled, belowID);
toBelow << Values;
}
}
@ -537,19 +541,17 @@ void Pstream::mapCombineScatter
template <class Container>
void Pstream::mapCombineScatter(Container& Values)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
mapCombineScatter(Pstream::linearCommunication(), Values);
mapCombineScatter(UPstream::linearCommunication(), Values);
}
else
{
mapCombineScatter(Pstream::treeCommunication(), Values);
mapCombineScatter(UPstream::treeCommunication(), Values);
}
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam

View File

@ -0,0 +1,158 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM; if not, write to the Free Software Foundation,
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Description
Exchange data.
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "contiguous.H"
#include "PstreamCombineReduceOps.H"
#include "UPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
//template <template<class> class ListType, class T>
template <class Container, class T>
void Pstream::exchange
(
const List<Container >& sendBufs,
List<Container >& recvBufs,
labelListList& sizes,
const label tag
)
{
if (UPstream::parRun())
{
if (!contiguous<T>())
{
FatalErrorIn
(
"Pstream::exchange(..)"
) << "Continuous data only." << Foam::abort(FatalError);
}
if (sendBufs.size() != UPstream::nProcs())
{
FatalErrorIn
(
"Pstream::exchange(..)"
) << "Size of list:" << sendBufs.size()
<< " does not equal the number of processors:"
<< UPstream::nProcs()
<< Foam::abort(FatalError);
}
sizes.setSize(UPstream::nProcs());
labelList& nsTransPs = sizes[UPstream::myProcNo()];
nsTransPs.setSize(UPstream::nProcs());
forAll(sendBufs, procI)
{
nsTransPs[procI] = sendBufs[procI].size();
}
// Send sizes across.
label oldTag = UPstream::msgType();
UPstream::msgType() = tag;
combineReduce(sizes, UPstream::listEq());
UPstream::msgType() = oldTag;
// Set up receives
// ~~~~~~~~~~~~~~~
recvBufs.setSize(sendBufs.size());
forAll(sizes, procI)
{
label nRecv = sizes[procI][UPstream::myProcNo()];
if (nRecv > 0)
{
recvBufs[procI].setSize(nRecv);
label oldTag = UPstream::msgType();
UPstream::msgType() = tag;
UIPstream::read
(
UPstream::nonBlocking,
procI,
reinterpret_cast<char*>(recvBufs[procI].begin()),
nRecv*sizeof(T)
);
UPstream::msgType() = oldTag;
}
}
// Set up sends
// ~~~~~~~~~~~~
forAll(sendBufs, procI)
{
if (sendBufs[procI].size() > 0)
{
label oldTag = UPstream::msgType();
UPstream::msgType() = tag;
if
(
!UOPstream::write
(
UPstream::nonBlocking,
procI,
reinterpret_cast<const char*>(sendBufs[procI].begin()),
sendBufs[procI].size()*sizeof(T)
)
)
{
FatalErrorIn("Pstream::exchange(..)")
<< "Cannot send outgoing message. "
<< "to:" << procI << " nBytes:"
<< label(sendBufs[procI].size()*sizeof(T))
<< Foam::abort(FatalError);
}
UPstream::msgType() = oldTag;
}
}
// Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~
Pstream::waitRequests();
}
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View File

@ -30,7 +30,9 @@ Description
\*---------------------------------------------------------------------------*/
#include "UOPstream.H"
#include "OPstream.H"
#include "UIPstream.H"
#include "IPstream.H"
#include "contiguous.H"
@ -44,15 +46,15 @@ namespace Foam
template <class T, class BinaryOp>
void Pstream::gather
(
const List<Pstream::commsStruct>& comms,
const List<UPstream::commsStruct>& comms,
T& Value,
const BinaryOp& bop
)
{
if (Pstream::parRun())
if (UPstream::parRun())
{
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo()];
// Receive from my downstairs neighbours
forAll(myComm.below(), belowI)
@ -61,9 +63,9 @@ void Pstream::gather
if (contiguous<T>())
{
IPstream::read
UIPstream::read
(
Pstream::scheduled,
UPstream::scheduled,
myComm.below()[belowI],
reinterpret_cast<char*>(&value),
sizeof(T)
@ -71,7 +73,7 @@ void Pstream::gather
}
else
{
IPstream fromBelow(Pstream::scheduled, myComm.below()[belowI]);
IPstream fromBelow(UPstream::scheduled, myComm.below()[belowI]);
fromBelow >> value;
}
@ -83,9 +85,9 @@ void Pstream::gather
{
if (contiguous<T>())
{
OPstream::write
UOPstream::write
(
Pstream::scheduled,
UPstream::scheduled,
myComm.above(),
reinterpret_cast<const char*>(&Value),
sizeof(T)
@ -93,7 +95,7 @@ void Pstream::gather
}
else
{
OPstream toAbove(Pstream::scheduled, myComm.above());
OPstream toAbove(UPstream::scheduled, myComm.above());
toAbove << Value;
}
}
@ -104,33 +106,33 @@ void Pstream::gather
template <class T, class BinaryOp>
void Pstream::gather(T& Value, const BinaryOp& bop)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
gather(Pstream::linearCommunication(), Value, bop);
gather(UPstream::linearCommunication(), Value, bop);
}
else
{
gather(Pstream::treeCommunication(), Value, bop);
gather(UPstream::treeCommunication(), Value, bop);
}
}
template <class T>
void Pstream::scatter(const List<Pstream::commsStruct>& comms, T& Value)
void Pstream::scatter(const List<UPstream::commsStruct>& comms, T& Value)
{
if (Pstream::parRun())
if (UPstream::parRun())
{
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo()];
// Reveive from up
if (myComm.above() != -1)
{
if (contiguous<T>())
{
IPstream::read
UIPstream::read
(
Pstream::scheduled,
UPstream::scheduled,
myComm.above(),
reinterpret_cast<char*>(&Value),
sizeof(T)
@ -138,7 +140,7 @@ void Pstream::scatter(const List<Pstream::commsStruct>& comms, T& Value)
}
else
{
IPstream fromAbove(Pstream::scheduled, myComm.above());
IPstream fromAbove(UPstream::scheduled, myComm.above());
fromAbove >> Value;
}
}
@ -148,9 +150,9 @@ void Pstream::scatter(const List<Pstream::commsStruct>& comms, T& Value)
{
if (contiguous<T>())
{
OPstream::write
UOPstream::write
(
Pstream::scheduled,
UPstream::scheduled,
myComm.below()[belowI],
reinterpret_cast<const char*>(&Value),
sizeof(T)
@ -158,7 +160,7 @@ void Pstream::scatter(const List<Pstream::commsStruct>& comms, T& Value)
}
else
{
OPstream toBelow(Pstream::scheduled,myComm.below()[belowI]);
OPstream toBelow(UPstream::scheduled,myComm.below()[belowI]);
toBelow << Value;
}
}
@ -169,13 +171,13 @@ void Pstream::scatter(const List<Pstream::commsStruct>& comms, T& Value)
template <class T>
void Pstream::scatter(T& Value)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
scatter(Pstream::linearCommunication(), Value);
scatter(UPstream::linearCommunication(), Value);
}
else
{
scatter(Pstream::treeCommunication(), Value);
scatter(UPstream::treeCommunication(), Value);
}
}

View File

@ -27,7 +27,7 @@ Description
communication schedule (usually linear-to-master or tree-to-master).
The gathered data will be a list with element procID the data from processor
procID. Before calling every processor should insert its value into
Values[Pstream::myProcNo()].
Values[UPstream::myProcNo()].
Note: after gather every processor only knows its own data and that of the
processors below it. Only the 'master' of the communication schedule holds
a fully filled List. Use scatter to distribute the data.
@ -48,26 +48,26 @@ namespace Foam
template <class T>
void Pstream::gatherList
(
const List<Pstream::commsStruct>& comms,
const List<UPstream::commsStruct>& comms,
List<T>& Values
)
{
if (Pstream::parRun())
if (UPstream::parRun())
{
if (Values.size() != Pstream::nProcs())
if (Values.size() != UPstream::nProcs())
{
FatalErrorIn
(
"Pstream::gatherList(const List<Pstream::commsStruct>&"
"UPstream::gatherList(const List<UPstream::commsStruct>&"
", List<T>)"
) << "Size of list:" << Values.size()
<< " does not equal the number of processors:"
<< Pstream::nProcs()
<< UPstream::nProcs()
<< Foam::abort(FatalError);
}
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo()];
// Receive from my downstairs neighbours
forAll(myComm.below(), belowI)
@ -79,9 +79,9 @@ void Pstream::gatherList
{
List<T> receivedValues(belowLeaves.size() + 1);
IPstream::read
UIPstream::read
(
Pstream::scheduled,
UPstream::scheduled,
belowID,
reinterpret_cast<char*>(receivedValues.begin()),
receivedValues.byteSize()
@ -96,7 +96,7 @@ void Pstream::gatherList
}
else
{
IPstream fromBelow(Pstream::scheduled, belowID);
IPstream fromBelow(UPstream::scheduled, belowID);
fromBelow >> Values[belowID];
if (debug & 2)
@ -132,14 +132,14 @@ void Pstream::gatherList
if (debug & 2)
{
Pout<< " sending to " << myComm.above()
<< " data from me:" << Pstream::myProcNo()
<< " data:" << Values[Pstream::myProcNo()] << endl;
<< " data from me:" << UPstream::myProcNo()
<< " data:" << Values[UPstream::myProcNo()] << endl;
}
if (contiguous<T>())
{
List<T> sendingValues(belowLeaves.size() + 1);
sendingValues[0] = Values[Pstream::myProcNo()];
sendingValues[0] = Values[UPstream::myProcNo()];
forAll(belowLeaves, leafI)
{
@ -148,7 +148,7 @@ void Pstream::gatherList
OPstream::write
(
Pstream::scheduled,
UPstream::scheduled,
myComm.above(),
reinterpret_cast<const char*>(sendingValues.begin()),
sendingValues.byteSize()
@ -156,8 +156,8 @@ void Pstream::gatherList
}
else
{
OPstream toAbove(Pstream::scheduled, myComm.above());
toAbove << Values[Pstream::myProcNo()];
OPstream toAbove(UPstream::scheduled, myComm.above());
toAbove << Values[UPstream::myProcNo()];
forAll(belowLeaves, leafI)
{
@ -180,13 +180,13 @@ void Pstream::gatherList
template <class T>
void Pstream::gatherList(List<T>& Values)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
gatherList(Pstream::linearCommunication(), Values);
gatherList(UPstream::linearCommunication(), Values);
}
else
{
gatherList(Pstream::treeCommunication(), Values);
gatherList(UPstream::treeCommunication(), Values);
}
}
@ -194,26 +194,26 @@ void Pstream::gatherList(List<T>& Values)
template <class T>
void Pstream::scatterList
(
const List<Pstream::commsStruct>& comms,
const List<UPstream::commsStruct>& comms,
List<T>& Values
)
{
if (Pstream::parRun())
if (UPstream::parRun())
{
if (Values.size() != Pstream::nProcs())
if (Values.size() != UPstream::nProcs())
{
FatalErrorIn
(
"Pstream::scatterList(const List<Pstream::commsStruct>&"
"UPstream::scatterList(const List<UPstream::commsStruct>&"
", List<T>)"
) << "Size of list:" << Values.size()
<< " does not equal the number of processors:"
<< Pstream::nProcs()
<< UPstream::nProcs()
<< Foam::abort(FatalError);
}
// Get my communication order
const commsStruct& myComm = comms[Pstream::myProcNo()];
const commsStruct& myComm = comms[UPstream::myProcNo()];
// Reveive from up
if (myComm.above() != -1)
@ -224,9 +224,9 @@ void Pstream::scatterList
{
List<T> receivedValues(notBelowLeaves.size());
IPstream::read
UIPstream::read
(
Pstream::scheduled,
UPstream::scheduled,
myComm.above(),
reinterpret_cast<char*>(receivedValues.begin()),
receivedValues.byteSize()
@ -239,7 +239,7 @@ void Pstream::scatterList
}
else
{
IPstream fromAbove(Pstream::scheduled, myComm.above());
IPstream fromAbove(UPstream::scheduled, myComm.above());
forAll(notBelowLeaves, leafI)
{
@ -273,7 +273,7 @@ void Pstream::scatterList
OPstream::write
(
Pstream::scheduled,
UPstream::scheduled,
belowID,
reinterpret_cast<const char*>(sendingValues.begin()),
sendingValues.byteSize()
@ -281,7 +281,7 @@ void Pstream::scatterList
}
else
{
OPstream toBelow(Pstream::scheduled, belowID);
OPstream toBelow(UPstream::scheduled, belowID);
// Send data destined for all other processors below belowID
forAll(notBelowLeaves, leafI)
@ -305,13 +305,13 @@ void Pstream::scatterList
template <class T>
void Pstream::scatterList(List<T>& Values)
{
if (Pstream::nProcs() < Pstream::nProcsSimpleSum)
if (UPstream::nProcs() < UPstream::nProcsSimpleSum)
{
scatterList(Pstream::linearCommunication(), Values);
scatterList(UPstream::linearCommunication(), Values);
}
else
{
scatterList(Pstream::treeCommunication(), Values);
scatterList(UPstream::treeCommunication(), Values);
}
}

View File

@ -229,11 +229,7 @@ void processorPointPatch::initPatchPatchPoints()
// Send the patchPatchPoints to the neighbouring processor
OPstream toNeighbProc
(
Pstream::blocking,
neighbProcNo()
);
OPstream toNeighbProc(Pstream::blocking, neighbProcNo());
toNeighbProc
<< ppmp.size() // number of points for checking
@ -252,11 +248,7 @@ void processorPointPatch::initPatchPatchPoints()
void Foam::processorPointPatch::calcPatchPatchPoints()
{
// Get the patchPatchPoints from the neighbouring processor
IPstream fromNeighbProc
(
Pstream::blocking,
neighbProcNo()
);
IPstream fromNeighbProc(Pstream::blocking, neighbProcNo());
label nbrNPoints(readLabel(fromNeighbProc));
labelListList patchPatchPoints(fromNeighbProc);

View File

@ -83,28 +83,6 @@ class mapDistribute
public:
// Public classes
//- combineReduce operator for lists. Used for counting.
class listEq
{
public:
template<class T>
void operator()(T& x, const T& y) const
{
forAll(y, i)
{
if (y[i].size())
{
x[i] = y[i];
}
}
}
};
// Constructors
//- Construct from components

View File

@ -25,6 +25,7 @@ License
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "PstreamBuffers.H"
#include "PstreamCombineReduceOps.H"
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
@ -185,17 +186,9 @@ void Foam::mapDistribute::distribute
{
if (!contiguous<T>())
{
// 1. convert to contiguous buffer
// 2. send buffer
// 3. receive buffer
// 4. read from buffer into List<T>
PstreamBuffers pBuffs(Pstream::nonBlocking);
List<List<char> > sendFields(Pstream::nProcs());
labelListList allNTrans(Pstream::nProcs());
labelList& nsTransPs = allNTrans[Pstream::myProcNo()];
nsTransPs.setSize(Pstream::nProcs(), 0);
// Stream data into sendField buffers
// Stream data into buffer
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
const labelList& map = subMap[domain];
@ -203,66 +196,13 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
// Put data into send buffer
OPstream toDomain(Pstream::nonBlocking, domain);
UOPstream toDomain(domain, pBuffs);
toDomain << UIndirectList<T>(field, map);
// Store the size
nsTransPs[domain] = toDomain.bufPosition();
// Transfer buffer out
sendFields[domain].transfer(toDomain.buf());
toDomain.bufPosition() = 0;
}
}
// Send sizes across
combineReduce(allNTrans, listEq());
// Start sending buffers
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
const labelList& map = subMap[domain];
if (domain != Pstream::myProcNo() && map.size())
{
OPstream::write
(
Pstream::nonBlocking,
domain,
reinterpret_cast<const char*>
(
sendFields[domain].begin()
),
nsTransPs[domain]
);
}
}
// Set up receives from neighbours
PtrList<IPstream> fromSlave(Pstream::nProcs());
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
const labelList& map = constructMap[domain];
if (domain != Pstream::myProcNo() && map.size())
{
// Start receiving
fromSlave.set
(
domain,
new IPstream
(
Pstream::nonBlocking,
domain,
allNTrans[domain][Pstream::myProcNo()]
)
);
}
}
// Start receiving
pBuffs.finishedSends();
{
// Set up 'send' to myself
@ -285,10 +225,6 @@ void Foam::mapDistribute::distribute
}
}
// Wait till all finished
Pstream::waitRequests();
// Consume
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
@ -296,7 +232,8 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
List<T> recvField(fromSlave[domain]);
UIPstream str(domain, pBuffs);
List<T> recvField(str);
if (recvField.size() != map.size())
{
@ -322,9 +259,6 @@ void Foam::mapDistribute::distribute
{
field[map[i]] = recvField[i];
}
// Delete receive buffer
fromSlave.set(domain, NULL);
}
}
}
@ -618,17 +552,10 @@ void Foam::mapDistribute::distribute
{
if (!contiguous<T>())
{
// 1. convert to contiguous buffer
// 2. send buffer
// 3. receive buffer
// 4. read from buffer into List<T>
//XXXXXX
PstreamBuffers pBuffs(Pstream::nonBlocking);
List<List<char> > sendFields(Pstream::nProcs());
labelListList allNTrans(Pstream::nProcs());
labelList& nsTransPs = allNTrans[Pstream::myProcNo()];
nsTransPs.setSize(Pstream::nProcs());
// Stream data into sendField buffers
// Stream data into buffer
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
const labelList& map = subMap[domain];
@ -636,65 +563,13 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
// Put data into send buffer
OPstream toDomain(Pstream::nonBlocking, domain);
UOPstream toDomain(domain, pBuffs);
toDomain << UIndirectList<T>(field, map);
// Store the size
nsTransPs[domain] = toDomain.bufPosition();
// Transfer buffer out
sendFields[domain].transfer(toDomain.buf());
toDomain.bufPosition() = 0;
}
}
// Send sizes across
combineReduce(allNTrans, listEq());
// Start sending buffers
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
const labelList& map = subMap[domain];
if (domain != Pstream::myProcNo() && map.size())
{
OPstream::write
(
Pstream::nonBlocking,
domain,
reinterpret_cast<const char*>
(
sendFields[domain].begin()
),
nsTransPs[domain]
);
}
}
// Set up receives from neighbours
PtrList<IPstream> fromSlave(Pstream::nProcs());
for (label domain = 0; domain < Pstream::nProcs(); domain++)
{
const labelList& map = constructMap[domain];
if (domain != Pstream::myProcNo() && map.size())
{
// Start receiving
fromSlave.set
(
domain,
new IPstream
(
Pstream::nonBlocking,
domain,
allNTrans[domain][Pstream::myProcNo()]
)
);
}
}
// Start receiving
pBuffs.finishedSends();
{
// Set up 'send' to myself
@ -715,7 +590,7 @@ void Foam::mapDistribute::distribute
// Wait till all finished
Pstream::waitRequests();
UPstream::waitRequests();
// Consume
for (label domain = 0; domain < Pstream::nProcs(); domain++)
@ -724,7 +599,8 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
List<T> recvField(fromSlave[domain]);
UIPstream str(domain, pBuffs);
List<T> recvField(str);
if (recvField.size() != map.size())
{
@ -750,9 +626,6 @@ void Foam::mapDistribute::distribute
{
cop(field[map[i]], recvField[i]);
}
// Delete receive buffer
fromSlave.set(domain, NULL);
}
}
}
@ -796,7 +669,7 @@ void Foam::mapDistribute::distribute
if (domain != Pstream::myProcNo() && map.size())
{
recvFields[domain].setSize(map.size());
IPstream::read
UIPstream::read
(
Pstream::nonBlocking,
domain,

View File

@ -402,11 +402,7 @@ void Foam::syncTools::syncPointMap
{
// Send to master
{
OPstream toMaster
(
Pstream::blocking,
Pstream::masterNo()
);
OPstream toMaster(Pstream::blocking, Pstream::masterNo());
toMaster << sharedPointValues;
}
// Receive merged values

View File

@ -1,5 +1,5 @@
Pstream.C
IPread.C
OPwrite.C
UPstream.C
UIPread.C
UOPwrite.C
LIB = $(FOAM_LIBBIN)/dummy/libPstream

View File

@ -27,34 +27,38 @@ Description
\*---------------------------------------------------------------------------*/
#include "error.H"
#include "IPstream.H"
#include "UIPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::IPstream::IPstream
Foam::UIPstream::UIPstream
(
const commsTypes commsType,
const int fromProcNo,
const label bufSize,
DynamicList<char>& externalBuf,
const label tag,
streamFormat format,
versionNumber version
)
:
Pstream(commsType, bufSize),
UPstream(commsType),
Istream(format, version),
fromProcNo_(fromProcNo),
externalBuf_(externalBuf),
externalBufPosition_(0),
tag_(tag),
messageSize_(0)
{
notImplemented
(
"IPsream::IPstream"
"UIPstream::UIPstream"
"("
"const commsTypes,"
"const int fromProcNo,"
"const label bufSize,"
"const int fromProcNo,"
"DynamicList<char>&,"
"const label tag,"
"streamFormat, versionNumber"
")"
);
@ -63,7 +67,7 @@ Foam::IPstream::IPstream
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
int Foam::IPstream::read
int Foam::UIPstream::read
(
const commsTypes commsType,
const int fromProcNo,
@ -73,7 +77,7 @@ int Foam::IPstream::read
{
notImplemented
(
"IPstream::read"
"UIPstream::read"
"("
"const commsTypes,"
"const int fromProcNo,"

View File

@ -27,22 +27,13 @@ Description
\*---------------------------------------------------------------------------*/
#include "error.H"
#include "OPstream.H"
#include "UOPstream.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::OPstream::~OPstream()
{
notImplemented("OPstream::~OPstream()");
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::OPstream::write
bool Foam::UOPstream::write
(
const commsTypes commsType,
const int toProcNo,
@ -52,7 +43,7 @@ bool Foam::OPstream::write
{
notImplemented
(
"IPstream::write"
"UOPstream::write"
"("
"const commsTypes commsType,"
"const int fromProcNo,"

View File

@ -24,20 +24,18 @@ License
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "UPstream.H"
#include "PstreamReduceOps.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
void Foam::Pstream::addValidParOptions(HashTable<string>& validParOptions)
void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
{}
bool Foam::Pstream::init(int& argc, char**& argv)
bool Foam::UPstream::init(int& argc, char**& argv)
{
FatalErrorIn("Pstream::init(int& argc, char**& argv)")
FatalErrorIn("UPstream::init(int& argc, char**& argv)")
<< "Trying to use the dummy Pstream library." << nl
<< "This dummy library cannot be used in parallel mode"
<< Foam::exit(FatalError);
@ -46,15 +44,15 @@ bool Foam::Pstream::init(int& argc, char**& argv)
}
void Foam::Pstream::exit(int errnum)
void Foam::UPstream::exit(int errnum)
{
notImplemented("Pstream::exit(int errnum)");
notImplemented("UPstream::exit(int errnum)");
}
void Foam::Pstream::abort()
void Foam::UPstream::abort()
{
notImplemented("Pstream::abort()");
notImplemented("UPstream::abort()");
}
@ -63,13 +61,13 @@ void Foam::reduce(scalar&, const sumOp<scalar>&)
void Foam::Pstream::waitRequests()
void Foam::UPstream::waitRequests()
{}
bool Foam::Pstream::finishedRequest(const label i)
bool Foam::UPstream::finishedRequest(const label i)
{
notImplemented("Pstream::finishedRequest()");
notImplemented("UPstream::finishedRequest()");
return false;
}

View File

@ -1,6 +1,6 @@
OPwrite.C
IPread.C
Pstream.C
UOPwrite.C
UIPread.C
UPstream.C
PstreamGlobals.C
LIB = $(FOAM_MPI_LIBBIN)/libPstream

View File

@ -23,86 +23,177 @@ License
Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
Description
Read token and binary block from IPstream
Read from UIPstream
\*---------------------------------------------------------------------------*/
#include "mpi.h"
#include "IPstream.H"
#include "UIPstream.H"
#include "PstreamGlobals.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// Outstanding non-blocking operations.
//! @cond fileScope
//Foam::DynamicList<MPI_Request> IPstream_outstandingRequests_;
//! @endcond fileScope
#include "IOstreams.H"
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
Foam::IPstream::IPstream
Foam::UIPstream::UIPstream
(
const commsTypes commsType,
const int fromProcNo,
const label bufSize,
DynamicList<char>& externalBuf,
const label tag,
streamFormat format,
versionNumber version
)
:
Pstream(commsType, bufSize),
UPstream(commsType),
Istream(format, version),
fromProcNo_(fromProcNo),
externalBuf_(externalBuf),
externalBufPosition_(0),
tag_(tag),
messageSize_(0)
{
setOpened();
setGood();
MPI_Status status;
// Cannot use buf_.size() since appends a few bytes extra
label realBufSize = bufSize;
// If the buffer size is not specified, probe the incomming message
// and set it
if (!bufSize)
if (commsType == UPstream::nonBlocking)
{
if (commsType == nonBlocking)
// Message is already received into externalBuf
}
else
{
MPI_Status status;
label wantedSize = externalBuf_.capacity();
label oldTag = UPstream::msgType();
UPstream::msgType() = tag_;
Pout<< "UIPstream::UIPstream() starting receive from " << fromProcNo_
<< " with tag:" << msgType()
<< Foam::endl;
// If the buffer size is not specified, probe the incomming message
// and set it
if (!wantedSize)
{
MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
externalBuf_.setCapacity(messageSize_);
wantedSize = messageSize_;
}
messageSize_ = UIPstream::read
(
commsType,
fromProcNo_,
externalBuf_.begin(),
wantedSize
);
UPstream::msgType() = oldTag;
// Set addressed size. Leave actual allocated memory intact.
externalBuf_.setSize(messageSize_);
if (!messageSize_)
{
FatalErrorIn
(
"IPstream::IPstream(const commsTypes, const int, "
"const label, streamFormat, versionNumber)"
) << "Can use nonBlocking mode only with pre-allocated buffers"
"UIPstream::UIPstream(const commsTypes, const int, "
"DynamicList<char>&, streamFormat, versionNumber)"
) << "read failed"
<< Foam::abort(FatalError);
}
}
}
MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
buf_.setSize(messageSize_);
realBufSize = buf_.size();
Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
:
UPstream(buffers.commsType_),
Istream(buffers.format_, buffers.version_),
fromProcNo_(fromProcNo),
externalBuf_(buffers.recvBuf_[fromProcNo]),
externalBufPosition_(0),
tag_(buffers.tag_),
messageSize_(0)
{
if (commsType() != UPstream::scheduled && !buffers.finishedSendsCalled_)
{
FatalErrorIn("UIPstream::UIPstream(const int, PstreamBuffers&)")
<< "PstreamBuffers::finishedSends() never called." << endl
<< "Please call PstreamBuffers::finishedSends() after doing"
<< " all your sends (using UOPstream) and before doing any"
<< " receives (using UIPstream)" << Foam::exit(FatalError);
}
messageSize_ = read(commsType, fromProcNo_, buf_.begin(), realBufSize);
setOpened();
setGood();
if (!messageSize_)
if (commsType() == UPstream::nonBlocking)
{
FatalErrorIn
// Message is already received into externalBuf
}
else
{
MPI_Status status;
label wantedSize = externalBuf_.capacity();
label oldTag = UPstream::msgType();
UPstream::msgType() = tag_;
Pout<< "UIPstream::UIPstream() starting receive from " << fromProcNo_
<< " with tag:" << msgType()
<< Foam::endl;
// If the buffer size is not specified, probe the incomming message
// and set it
if (!wantedSize)
{
MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_BYTE, &messageSize_);
externalBuf_.setCapacity(messageSize_);
wantedSize = messageSize_;
}
Pout<< "UIPstream::UIPstream() starting read from " << fromProcNo_
<< " of size:" << wantedSize << endl;
messageSize_ = UIPstream::read
(
"IPstream::IPstream(const commsTypes, const int, "
"const label, streamFormat, versionNumber)"
) << "read failed"
<< Foam::abort(FatalError);
commsType(),
fromProcNo_,
externalBuf_.begin(),
wantedSize
);
UPstream::msgType() = oldTag;
// Set addressed size. Leave actual allocated memory intact.
externalBuf_.setSize(messageSize_);
Pout<< "UIPstream::UIPstream()received from " << fromProcNo_
<< " size:" << externalBuf_.size() << endl;
if (!messageSize_)
{
FatalErrorIn
(
"UIPstream::UIPstream(const int, PstreamBuffers&)"
) << "read failed"
<< Foam::abort(FatalError);
}
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
Foam::label Foam::IPstream::read
Foam::label Foam::UIPstream::read
(
const commsTypes commsType,
const int fromProcNo,
@ -130,7 +221,7 @@ Foam::label Foam::IPstream::read
{
FatalErrorIn
(
"IPstream::read"
"UIPstream::read"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "MPI_Recv cannot receive incomming message"
<< Foam::abort(FatalError);
@ -148,7 +239,7 @@ Foam::label Foam::IPstream::read
{
FatalErrorIn
(
"IPstream::read"
"UIPstream::read"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "buffer (" << label(bufSize)
<< ") not large enough for incomming message ("
@ -178,7 +269,7 @@ Foam::label Foam::IPstream::read
{
FatalErrorIn
(
"IPstream::read"
"UIPstream::read"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "MPI_Recv cannot start non-blocking receive"
<< Foam::abort(FatalError);
@ -195,7 +286,7 @@ Foam::label Foam::IPstream::read
{
FatalErrorIn
(
"IPstream::read"
"UIPstream::read"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "Unsupported communications type " << commsType
<< Foam::abort(FatalError);
@ -205,6 +296,4 @@ Foam::label Foam::IPstream::read
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// ************************************************************************* //

View File

@ -29,49 +29,12 @@ Description
#include "mpi.h"
#include "OPstream.H"
#include "UOPstream.H"
#include "PstreamGlobals.H"
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::OPstream::~OPstream()
{
if (commsType_ == nonBlocking)
{
// alloc nonBlocking only if empty buffer. This denotes the buffer
// having been transfered out.
if (bufPosition_ > 0)
{
FatalErrorIn("OPstream::~OPstream()")
<< "OPstream contains buffer so cannot be used with nonBlocking"
<< " since destructor would destroy buffer whilst possibly"
<< " still sending." << Foam::abort(FatalError);
}
}
else
{
if
(
!write
(
commsType_,
toProcNo_,
buf_.begin(),
bufPosition_
)
)
{
FatalErrorIn("OPstream::~OPstream()")
<< "MPI cannot send outgoing message"
<< Foam::abort(FatalError);
}
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool Foam::OPstream::write
bool Foam::UOPstream::write
(
const commsTypes commsType,
const int toProcNo,
@ -126,7 +89,7 @@ bool Foam::OPstream::write
{
FatalErrorIn
(
"OPstream::write"
"UOPstream::write"
"(const int fromProcNo, char* buf, std::streamsize bufSize)"
) << "Unsupported communications type " << commsType
<< Foam::abort(FatalError);

View File

@ -26,7 +26,7 @@ License
#include "mpi.h"
#include "Pstream.H"
#include "UPstream.H"
#include "PstreamReduceOps.H"
#include "OSspecific.H"
#include "PstreamGlobals.H"
@ -50,7 +50,7 @@ License
// valid parallel options vary between implementations, but flag common ones.
// if they are not removed by MPI_Init(), the subsequent argument processing
// will notice that they are wrong
void Foam::Pstream::addValidParOptions(HashTable<string>& validParOptions)
void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
{
validParOptions.insert("np", "");
validParOptions.insert("p4pg", "PI file");
@ -62,7 +62,7 @@ void Foam::Pstream::addValidParOptions(HashTable<string>& validParOptions)
}
bool Foam::Pstream::init(int& argc, char**& argv)
bool Foam::UPstream::init(int& argc, char**& argv)
{
MPI_Init(&argc, &argv);
@ -72,8 +72,8 @@ bool Foam::Pstream::init(int& argc, char**& argv)
if (numprocs <= 1)
{
FatalErrorIn("Pstream::init(int& argc, char**& argv)")
<< "bool Pstream::init(int& argc, char**& argv) : "
FatalErrorIn("UPstream::init(int& argc, char**& argv)")
<< "bool IPstream::init(int& argc, char**& argv) : "
"attempt to run parallel on 1 processor"
<< Foam::abort(FatalError);
}
@ -101,8 +101,8 @@ bool Foam::Pstream::init(int& argc, char**& argv)
}
else
{
FatalErrorIn("Pstream::init(int& argc, char**& argv)")
<< "Pstream::init(int& argc, char**& argv) : "
FatalErrorIn("UPstream::init(int& argc, char**& argv)")
<< "UPstream::init(int& argc, char**& argv) : "
<< "environment variable MPI_BUFFER_SIZE not defined"
<< Foam::abort(FatalError);
}
@ -122,7 +122,7 @@ bool Foam::Pstream::init(int& argc, char**& argv)
}
void Foam::Pstream::exit(int errnum)
void Foam::UPstream::exit(int errnum)
{
# ifndef SGIMPI
int size;
@ -136,10 +136,10 @@ void Foam::Pstream::exit(int errnum)
label n = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.clear();
WarningIn("Pstream::exit(int)")
WarningIn("UPstream::exit(int)")
<< "There are still " << n << " outstanding MPI_Requests." << endl
<< "This means that your code exited before doing a"
<< " Pstream::waitRequests()." << endl
<< " UPstream::waitRequests()." << endl
<< "This should not happen for a normal code exit."
<< endl;
}
@ -156,7 +156,7 @@ void Foam::Pstream::exit(int errnum)
}
void Foam::Pstream::abort()
void Foam::UPstream::abort()
{
MPI_Abort(MPI_COMM_WORLD, 1);
}
@ -164,19 +164,19 @@ void Foam::Pstream::abort()
void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
{
if (!Pstream::parRun())
if (!UPstream::parRun())
{
return;
}
if (Pstream::nProcs() <= Pstream::nProcsSimpleSum)
if (UPstream::nProcs() <= UPstream::nProcsSimpleSum)
{
if (Pstream::master())
if (UPstream::master())
{
for
(
int slave=Pstream::firstSlave();
slave<=Pstream::lastSlave();
int slave=UPstream::firstSlave();
slave<=UPstream::lastSlave();
slave++
)
{
@ -189,8 +189,8 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
&value,
1,
MPI_SCALAR,
Pstream::procID(slave),
Pstream::msgType(),
UPstream::procID(slave),
UPstream::msgType(),
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
@ -215,8 +215,8 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
&Value,
1,
MPI_SCALAR,
Pstream::procID(Pstream::masterNo()),
Pstream::msgType(),
UPstream::procID(UPstream::masterNo()),
UPstream::msgType(),
MPI_COMM_WORLD
)
)
@ -230,12 +230,12 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
}
if (Pstream::master())
if (UPstream::master())
{
for
(
int slave=Pstream::firstSlave();
slave<=Pstream::lastSlave();
int slave=UPstream::firstSlave();
slave<=UPstream::lastSlave();
slave++
)
{
@ -246,8 +246,8 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
&Value,
1,
MPI_SCALAR,
Pstream::procID(slave),
Pstream::msgType(),
UPstream::procID(slave),
UPstream::msgType(),
MPI_COMM_WORLD
)
)
@ -269,8 +269,8 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
&Value,
1,
MPI_SCALAR,
Pstream::procID(Pstream::masterNo()),
Pstream::msgType(),
UPstream::procID(UPstream::masterNo()),
UPstream::msgType(),
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
@ -291,8 +291,8 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
Value = sum;
/*
int myProcNo = Pstream::myProcNo();
int nProcs = Pstream::nProcs();
int myProcNo = UPstream::myProcNo();
int nProcs = UPstream::nProcs();
//
// receive from children
@ -321,8 +321,8 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
&value,
1,
MPI_SCALAR,
Pstream::procID(childProcId),
Pstream::msgType(),
UPstream::procID(childProcId),
UPstream::msgType(),
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
@ -346,7 +346,7 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
//
// send and receive from parent
//
if (!Pstream::master())
if (!UPstream::master())
{
int parentId = myProcNo - (myProcNo % thisLevelOffset);
@ -357,8 +357,8 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
&Value,
1,
MPI_SCALAR,
Pstream::procID(parentId),
Pstream::msgType(),
UPstream::procID(parentId),
UPstream::msgType(),
MPI_COMM_WORLD
)
)
@ -377,8 +377,8 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
&Value,
1,
MPI_SCALAR,
Pstream::procID(parentId),
Pstream::msgType(),
UPstream::procID(parentId),
UPstream::msgType(),
MPI_COMM_WORLD,
MPI_STATUS_IGNORE
)
@ -413,8 +413,8 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
&Value,
1,
MPI_SCALAR,
Pstream::procID(childProcId),
Pstream::msgType(),
UPstream::procID(childProcId),
UPstream::msgType(),
MPI_COMM_WORLD
)
)
@ -436,7 +436,7 @@ void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
}
void Foam::Pstream::waitRequests()
void Foam::UPstream::waitRequests()
{
if (PstreamGlobals::outstandingRequests_.size())
{
@ -452,7 +452,7 @@ void Foam::Pstream::waitRequests()
{
FatalErrorIn
(
"Pstream::waitRequests()"
"UPstream::waitRequests()"
) << "MPI_Waitall returned with error" << Foam::endl;
}
@ -461,13 +461,13 @@ void Foam::Pstream::waitRequests()
}
bool Foam::Pstream::finishedRequest(const label i)
bool Foam::UPstream::finishedRequest(const label i)
{
if (i >= PstreamGlobals::outstandingRequests_.size())
{
FatalErrorIn
(
"Pstream::finishedRequest(const label)"
"UPstream::finishedRequest(const label)"
) << "There are " << PstreamGlobals::outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl

View File

@ -26,8 +26,6 @@ License
#include "processorFvPatchField.H"
#include "processorFvPatch.H"
#include "IPstream.H"
#include "OPstream.H"
#include "demandDrivenData.H"
#include "transformField.H"

View File

@ -27,9 +27,6 @@ Description
list of selected cells, it creates the mesh consisting only of the
desired cells, with the mapping list for points, faces, and cells.
MJ 23/03/05 on coupled faces change the patch of the face to the
oldInternalFaces patch.
\*---------------------------------------------------------------------------*/
#include "fvMeshSubset.H"

View File

@ -98,29 +98,6 @@ void Foam::Cloud<ParticleType>::deleteParticle(ParticleType& p)
}
namespace Foam
{
class combineNsTransPs
{
public:
void operator()(labelListList& x, const labelListList& y) const
{
forAll(y, i)
{
if (y[i].size())
{
x[i] = y[i];
}
}
}
};
} // End namespace Foam
template<class ParticleType>
template<class TrackingData>
void Foam::Cloud<ParticleType>::move(TrackingData& td)
@ -196,7 +173,7 @@ void Foam::Cloud<ParticleType>::move(TrackingData& td)
// processor patches for all the processors
labelListList allNTrans(Pstream::nProcs());
allNTrans[Pstream::myProcNo()] = nsTransPs;
combineReduce(allNTrans, combineNsTransPs());
combineReduce(allNTrans, UPstream::listEq());
transfered = false;