MPI configuration: Added support for SYSTEMMPI

Provided by Bruno Santos
Also some general cleaning and update of comment.
Resolves bug-report http://www.openfoam.org/mantisbt/view.php?id=1167
This commit is contained in:
Henry
2015-03-08 21:04:38 +00:00
parent b6e5ecc45f
commit ad7053d560
12 changed files with 170 additions and 1190 deletions

View File

@ -1,135 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2014 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Description
Read token and binary block from IPstream
\*---------------------------------------------------------------------------*/
#include "IPstream.H"
#include "PstreamGlobals.H"
extern "C"
{
#include <linux/gamma/libgamma.h>
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
IPstream::IPstream
(
const commsTypes commsType,
const int fromProcNo,
const label bufSize,
streamFormat format,
versionNumber version
)
:
Pstream(commsType, bufSize),
Istream(format, version),
fromProcNo_(fromProcNo),
messageSize_(0)
{
// Blocking read.
setOpened();
setGood();
if (Pstream::debug)
{
Pout<< "IPstream::IPstream : Starting receive from " << fromProcNo_
<< " recvIndex:" << PstreamGlobals::recvIndex[fromProcNo_]
<< Foam::endl;
}
PstreamGlobals::gammaWait(fromProcNo_);
label ready = PstreamGlobals::consumeIndex[fromProcNo_];
messageSize_ = PstreamGlobals::recvBufLen[ready][fromProcNo_];
if (!bufSize)
{
if (Pstream::debug)
{
Pout<< "IPstream::IPstream : sizing buffer to " << messageSize_
<< endl;
}
buf_.setSize(messageSize_);
}
PstreamGlobals::copyReceive(fromProcNo_, buf_.begin(), buf_.size());
if (Pstream::debug)
{
Pout<< "IPstream::IPstream : Received " << messageSize_
<< " from " << fromProcNo_
<< Foam::endl;
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
label IPstream::read
(
const commsTypes commsType,
const int fromProcNo,
char* buf,
const std::streamsize bufSize
)
{
// Blocking read.
label messageSize;
if (Pstream::debug)
{
Pout<< "IPstream::read : Starting receive from " << fromProcNo
<< " recvIndex:" << PstreamGlobals::recvIndex[fromProcNo]
<< Foam::endl;
}
PstreamGlobals::gammaWait(fromProcNo);
messageSize = PstreamGlobals::copyReceive(fromProcNo, buf, bufSize);
if (Pstream::debug)
{
Pout<< "IPstream::read : Received " << messageSize
<< " from " << fromProcNo
<< Foam::endl;
}
return messageSize;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View File

@ -1,6 +0,0 @@
PstreamGlobals.C
Pstream.C
OPwrite.C
IPread.C
LIB = $(FOAM_LIBBIN)/$(FOAM_MPI)/libPstream

View File

@ -1,5 +0,0 @@
sinclude $(GENERAL_RULES)/mplib$(WM_MPLIB)
sinclude $(RULES)/mplib$(WM_MPLIB)
EXE_INC = $(PFLAGS) $(PINC)
LIB_LIBS = $(PLIBS)

View File

@ -1,183 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2014 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Description
Write primitive and binary block from OPstream gamma-mpi
\*---------------------------------------------------------------------------*/
#include "OPstream.H"
#include "PstreamGlobals.H"
extern "C"
{
#include <linux/gamma/libgamma.h>
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// Largest message sent so far. This tracks the size of the receive
// buffer on the receiving end. Done so we only send out resize messages
// if necessary
//! \cond fileScope
labelList maxSendSize;
//! \endcond
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
OPstream::~OPstream()
{
if (Pstream::debug)
{
Pout<< "OPstream::~OPstream() to processor " << toProcNo_
<< Foam::endl;
}
if
(
!write
(
commsType_,
toProcNo_,
buf_.begin(),
bufPosition_
)
)
{
FatalErrorIn("OPstream::~OPstream()")
<< "GAMMA cannot send outgoing message"
<< Foam::abort(FatalError);
}
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
bool OPstream::write
(
const commsTypes commsType,
const int toProcNo,
const char* buf,
const std::streamsize bufSize
)
{
if (PstreamGlobals::getSizeFromHeader(buf, bufSize) != -1)
{
FatalErrorIn("OPstream::write")
<< "Problem: Trying to send message of size " << bufSize
<< " that corresponds to the special resizeMessage."
<< Foam::abort(FatalError);
}
if (maxSendSize.empty())
{
// Intialize maxSendSize to the initial size of the receive buffers.
maxSendSize.setSize(Pstream::nProcs());
maxSendSize = PstreamGlobals::initialBufferLen;
maxSendSize[Pstream::myProcNo()] = 0;
if (Pstream::debug)
{
forAll(maxSendSize, procNo)
{
Pout<< "OPstream::write() : for toProcNo:" << procNo
<< " set maxSendSize to " << maxSendSize[procNo]
<< Foam::endl;
}
}
}
if (Pstream::debug)
{
Pout<< "OPstream::write() : proc:" << toProcNo
<< " maxSendSize:" << maxSendSize[toProcNo]
<< Foam::endl;
}
if (bufSize > maxSendSize[toProcNo])
{
// Send resize message.
if (Pstream::debug)
{
Pout<< "OPstream::write() : Sending resize message to proc "
<< toProcNo
<< " for size:" << bufSize
<< Foam::endl;
}
PstreamGlobals::setResizeMessage(bufSize);
gamma_send_flowctl
(
toProcNo,
reinterpret_cast<char*>(PstreamGlobals::resizeMessage),
PstreamGlobals::resizeMessageLen*sizeof(uint64_t)
);
maxSendSize[toProcNo] = bufSize;
}
// Do normal send
// ~~~~~~~~~~~~~~
// Note: could be put into allocation of buf.
//gamma_mlock(const_cast<char*>(buf), bufSize);
if (Pstream::debug)
{
Pout<< "OPstream::write() : Sending to proc " << toProcNo
<< " bytes:" << bufSize << Foam::endl;
}
gamma_send_flowctl
(
toProcNo,
const_cast<char*>(buf),
bufSize
);
//gamma_munlock(const_cast<char*>(buf), bufSize);
if (Pstream::debug)
{
Pout<< "OPstream::write() : Sent " << bufSize
<< " to proc " << toProcNo
<< Foam::endl;
}
return true;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View File

@ -1,474 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2012 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Description
Pstream for GAMMA
GAMMA has a (polling) receive handler which gets called every time a
received message is complete. Ours stores the length of the currently
received message and sets up the next buffer to store the next message
in.
Note that the pattern between two processors can be
- send
- receive
- receive
- send
since the first swap might belong to a local exchange and the second to
a reduce. Since gamma has to have the receive buffers already set up we
have to allocate them big enough. To prevent excessive amounts needed we
dynamically resize them (never shrink) by sending special 'resize' messages
before sending a largish message.
Because of this we actually need four receive buffers:
- send
- receive resize message
- receive normal message
- receive resize message
- receive normal message
- send
The special resize message is a message with a special header which
(hopefully) should never appear in normal exchanges (it actually checks
for this in the OPstream::send)
\*---------------------------------------------------------------------------*/
#include "Pstream.H"
#include "PstreamReduceOps.H"
#include "OSspecific.H"
#include "PstreamGlobals.H"
#include <cstring>
#include <cstdlib>
#include <csignal>
extern "C"
{
# include <linux/gamma/libgamma.h>
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// Receive handler to copy out received message length and switch buffers.
static void handler(void)
{
label current = PstreamGlobals::recvIndex[gamma_active_port];
List<char>& buf = PstreamGlobals::recvBuf[current][gamma_active_port];
label bufLen = PstreamGlobals::recvBufLen[current][gamma_active_port];
if (bufLen != -1)
{
FatalErrorIn("Pstream::handler(void)")
<< "Buffer length not reset : "
<< bufLen
<< " when receiving message of size " << gamma_msglen
<< " from processor " << gamma_active_port << endl
<< "This means that the existing data has not been consumed yet"
<< " (by IPstream::read) and means your communication pattern"
<< " is probably not balanced (a receive for every send)"
<< endl
<< "This can happen if you have e.g. gather without scatter."
<< endl
<< "A workaround is to increase the depth of the circular"
<< " receive buffers in PstreamGlobals.H"
<< abort(FatalError);
}
// Some checks
if
(
gamma_msglen < 0
|| gamma_msglen > buf.size()
)
{
FatalErrorIn("Pstream::handler(void)")
<< "Received message of size " << gamma_msglen
<< " from processor " << gamma_active_port
<< Foam::endl
<< "but global receive buffer is only of size "
<< buf.size()
<< abort(FatalError);
}
// Check for resize message
label resizeLen = PstreamGlobals::getSizeFromHeader
(
buf.begin(),
gamma_msglen
);
if (resizeLen != -1)
{
if (Pstream::debug)
{
Pout<< "Pstream::handler : Resize message:" << resizeLen
<< " from proc " << gamma_active_port
<< " current size:"
<< PstreamGlobals::getMaxBufSize(gamma_active_port)
<< Foam::endl;
}
// Saved current buffer.
List<char> savedBuf;
if (resizeLen > PstreamGlobals::getMaxBufSize(gamma_active_port))
{
if (Pstream::debug)
{
Pout<< "Pstream::handler :"
<< " resizing receive buffer for processor "
<< gamma_active_port
<< " from "
<< PstreamGlobals::getMaxBufSize(gamma_active_port)
<< " to " << resizeLen << Foam::endl;
}
// Save the pointer (that gamma knows about) so we can safely
// gamma_switch_to_buffer with a valid pointer.
// Not sure if necessary but do anyway.
savedBuf.transfer(buf);
// Resize all the buffers
forAll(PstreamGlobals::recvBuf, i)
{
List<char>& chars =
PstreamGlobals::recvBuf[i][gamma_active_port];
// gamma_munlock(chars.begin(), chars.size());
chars.setSize(resizeLen);
// gamma_mlock(chars.begin(), chars.size());
}
}
// Update length with special value to denote resize was done.
PstreamGlobals::recvBufLen[current][gamma_active_port] = -2;
}
else
{
// Update length with actual message length
PstreamGlobals::recvBufLen[current][gamma_active_port] = gamma_msglen;
}
// Go to next buffer.
label next = PstreamGlobals::recvBuf.fcIndex(current);
PstreamGlobals::recvIndex[gamma_active_port] = next;
// gamma_switch_to_buffer
gamma_post_recv
(
gamma_active_port,
PstreamGlobals::recvBuf[next][gamma_active_port].begin(),
PstreamGlobals::recvBuf[next][gamma_active_port].size()
);
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
void Pstream::addValidParOptions(HashTable<string>& validParOptions)
{
validParOptions.insert("np", "");
validParOptions.insert("p4pg", "PI file");
validParOptions.insert("p4wd", "directory");
validParOptions.insert("p4amslave", "");
validParOptions.insert("p4yourname", "hostname");
validParOptions.insert("machinefile", "machine file");
validParOptions.insert("GAMMANP", "numProcs");
validParOptions.insert("GAMMAHOME", "gamma cwd");
validParOptions.insert("GAMMA", "1(enable) or 0(disable)");
}
bool Pstream::init(int& argc, char**& argv)
{
int numprocs = 0;
string npString("-GAMMANP");
for (label i = 0; i < argc; i++)
{
if (argv[i] == npString)
{
if (i+1 < argc)
{
numprocs = atoi(argv[i+1]);
break;
}
}
}
// Initialize GAMMA
unsigned char smallNumprocs = numprocs;
gamma_init(smallNumprocs, argc, argv);
myProcNo_ = gamma_my_node();
// Make sure printing with prefix.
setParRun();
procIDs_.setSize(numprocs);
forAll(procIDs_, procNo)
{
procIDs_[procNo] = procNo;
}
// Allocate receive buffers.
// ~~~~~~~~~~~~~~~~~~~~~~~~~
// Make sure each receive buffer is at least large enough to receive
// the resize message.
// Current active buffer
PstreamGlobals::recvIndex.setSize(numprocs);
PstreamGlobals::recvIndex = 0;
PstreamGlobals::consumeIndex.setSize(numprocs);
PstreamGlobals::consumeIndex = 0;
forAll(PstreamGlobals::recvBuf, i)
{
PstreamGlobals::recvBufLen[i].setSize(numprocs);
PstreamGlobals::recvBufLen[i] = -1;
List<List<char> >& buffers = PstreamGlobals::recvBuf[i];
buffers.setSize(numprocs);
forAll(buffers, procNo)
{
if (procNo != myProcNo_)
{
buffers[procNo].setSize(PstreamGlobals::initialBufferLen);
// Acc. to gamma sources all buffers need to be in memory.
// Either locked or "write touched".
// gamma_mlock
// (
// buffers[procNo].begin(),
// buffers[procNo].size()
// );
}
}
}
// Lock the special resize message
// gamma_mlock
// (
// reinterpret_cast<char*>(PstreamGlobals::resizeMessage),
// PstreamGlobals::resizeMessageLen*sizeof(uint64_t)
// );
// Attach current receive buffers
forAll(procIDs_, procNo)
{
if (procNo != myProcNo_)
{
// Buffer index (always 0 at this point)
label current = PstreamGlobals::recvIndex[procNo];
// Current buffer for this processor.
List<char>& buf = PstreamGlobals::recvBuf[current][procNo];
gamma_set_active_port
(
procNo, //unsigned short port,
procNo, //unsigned short dest_node,
gamma_my_par_pid(), //unsigned char dest_par_pid,
myProcNo_, //unsigned short dest_port,
handler, //callback
procNo, //unsigned short semaphore,
GO_BACK, //unsigned char buffer_kind,
buf.begin(),
buf.size()
);
}
}
// Make sure all have allocated the ports (so set the receive buffers)
gamma_sync();
Info<< "GAMMA Pstream initialized with:" << nl
<< " floatTransfer : " << floatTransfer << nl
<< " nProcsSimpleSum : " << nProcsSimpleSum << nl
<< " scheduledTransfer : " << Pstream::scheduledTransfer << nl
<< Foam::endl;
// Now that nprocs is known construct communication tables.
initCommunicationSchedule();
return true;
}
void Pstream::exit(int errnum)
{
// gamma_munlockall();
gamma_exit();
//gamma_abort();
}
void Pstream::abort()
{
Pout<< "**Pstream::abort()**" << endl;
// gamma_munlockall();
gamma_abort();
}
void reduce(scalar& Value, const sumOp<scalar>& bop)
{
if (!Pstream::parRun())
{
return;
}
if (Pstream::debug)
{
Pout<< "**entering Pstream::reduce for " << Value << Foam::endl;
}
if (Pstream::master())
{
for
(
int slave=Pstream::firstSlave();
slave<=Pstream::lastSlave();
slave++
)
{
scalar value;
if
(
!IPstream::read
(
slave,
reinterpret_cast<char*>(&value), // buf
sizeof(Value) // bufSize
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "IPstream::read failed"
<< Foam::abort(FatalError);
}
Value = bop(Value, value);
}
}
else
{
if
(
!OPstream::write
(
Pstream::masterNo(),
reinterpret_cast<const char*>(&Value), // buf
sizeof(Value), // bufSize
false // non-buffered
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "OPstream::write failed"
<< Foam::abort(FatalError);
}
}
if (Pstream::master())
{
for
(
int slave=Pstream::firstSlave();
slave<=Pstream::lastSlave();
slave++
)
{
if
(
!OPstream::write
(
slave,
reinterpret_cast<const char*>(&Value), // buf
sizeof(Value), // bufSize,
false // non-buffered
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "OPstream::write failed"
<< Foam::abort(FatalError);
}
}
}
else
{
if
(
!IPstream::read
(
Pstream::masterNo(),
reinterpret_cast<char*>(&Value), // buf
sizeof(Value) // bufSize
)
)
{
FatalErrorIn
(
"reduce(scalar& Value, const sumOp<scalar>& sumOp)"
) << "IPstream::read failed"
<< Foam::abort(FatalError);
}
}
if (Pstream::debug)
{
Pout<< "**exiting Pstream::reduce with " << Value << Foam::endl;
}
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View File

@ -1,206 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
#include "PstreamGlobals.H"
#include "IOstreams.H"
#include "Pstream.H"
extern "C" {
#include <linux/gamma/libgamma.h>
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
// Receive buffers
FixedList<List<List<char> >, 4> PstreamGlobals::recvBuf;
// Length of receive buffers
FixedList<labelList, 4> PstreamGlobals::recvBufLen;
labelList PstreamGlobals::recvIndex;
labelList PstreamGlobals::consumeIndex;
// These are all signalling nans and probably different from the ones that
// the fpu might ever generate.
uint64_t PstreamGlobals::resizeMessage[PstreamGlobals::resizeMessageLen] =
{
0x7ff7ffffffffffABllu,
0x7ff7ffffffffffCDllu,
0x7ff7ffffffffff12llu,
0x7ff7ffffffffff30llu,
0x7ff7ffffffffff19llu,
0x0000000000000000llu // this word gets overwritten with the length.
};
// Wrapper around gamma_wait
void PstreamGlobals::gammaWait(const label procNo)
{
// Last request. Block.
gamma_wait(procNo, 1);
// Currently unconsumed received message
label ready = PstreamGlobals::consumeIndex[procNo];
// Check received length
if (PstreamGlobals::recvBufLen[ready][procNo] == -2)
{
// Was resize message. Consume and rewait (is always followed by
// real message)
if (Pstream::debug)
{
Pout<< "PstreamGlobals::gammaWait : "
<< "Resize event. consumeIndex:" << ready
<< " Restarting receive from " << procNo << endl;
}
// Consume resize message
PstreamGlobals::recvBufLen[ready][procNo] = -1;
PstreamGlobals::consumeIndex[procNo] =
PstreamGlobals::recvBuf.fcIndex(ready);
// And rewait
gamma_wait(procNo, 1);
}
}
// Copies data from global receive buffer into buf.
label PstreamGlobals::copyReceive
(
const label procNo,
char* buf,
const label bufSize
)
{
// Get the ready buffer
label ready = consumeIndex[procNo];
// Actually received
label receivedLen = recvBufLen[ready][procNo];
if (Pstream::debug)
{
Pout<< "copyReceive : for proc " << procNo
<< " copying " << receivedLen << " bytes out of buffer " << ready
<< endl;
}
if (receivedLen < 0)
{
FatalErrorIn
(
"Pstream::copyReceive(const label, char*, const label)"
) << "Illegal message length "
<< receivedLen
<< " received from proc " << procNo << " into buffer " << ready
<< endl
<< "This is probably caused by receiving more than is actually"
<< " sent (e.g. gather without scatter)." << endl
<< abort(FatalError);
}
if (receivedLen > bufSize)
{
FatalErrorIn
(
"Pstream::copyReceive(const label, char*, const label)"
) << "buffer ("
<< bufSize
<< ") not large enough for incomming message ("
<< receivedLen << ')'
<< " received from proc " << procNo << " into buffer " << ready
<< abort(FatalError);
}
// Copy out of receive buffer
memcpy
(
buf,
recvBuf[ready][procNo].begin(),
receivedLen
);
// Release receive buffer
recvBufLen[ready][procNo] = -1;
// Go to next buffer to consume
consumeIndex[procNo] = recvBuf.fcIndex(ready);
return receivedLen;
}
// Checks whether an incoming message is a resize message. If not returns -1,
// otherwise returns size read from header.
label PstreamGlobals::getSizeFromHeader(const char* buf, const label len)
{
if (len != resizeMessageLen*sizeof(uint64_t))
{
return -1;
}
const uint64_t* dPtr = reinterpret_cast<const uint64_t*>(buf);
// Check all but the last word
for (label i = 0; i < resizeMessageLen-1; i++)
{
if (*dPtr++ != resizeMessage[i])
{
return -1;
}
}
return *reinterpret_cast<const label*>(dPtr);
}
void PstreamGlobals::setResizeMessage(const label len)
{
reinterpret_cast<label&>(resizeMessage[resizeMessageLen-1]) = len;
}
label PstreamGlobals::getMaxBufSize(const int procNo)
{
label maxSz = 0;
forAll(recvBuf, i)
{
maxSz = max(maxSz, recvBuf[i][procNo].size());
}
return maxSz;
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// ************************************************************************* //

View File

@ -1,105 +0,0 @@
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
Namespace
Foam::PstreamGlobals
Description
Global functions and variables for working with parallel streams,
but principally for gamma/mpi
SourceFiles
PstreamGlobals.C
\*---------------------------------------------------------------------------*/
#ifndef PstreamGlobals_H
#define PstreamGlobals_H
#include "FixedList.H"
#include "labelList.H"
#include "DynamicList.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
namespace Foam
{
/*---------------------------------------------------------------------------*\
Class PstreamGlobals Declaration
\*---------------------------------------------------------------------------*/
namespace PstreamGlobals
{
//- Block wait for message on port procNo
void gammaWait(const label procNo);
//- Helper routine to copy out newly received data
label copyReceive
(
const label procNo,
char* buf,
const label bufSize
);
//- Receive buffers
extern FixedList<List<List<char> >, 4> recvBuf;
//- Length of receive buffers
extern FixedList<labelList, 4> recvBufLen;
//- Currently active buffer in receiving
extern labelList recvIndex;
//- Receive buffer that has to be consumed
extern labelList consumeIndex;
//- Special message to signal resizing
const int resizeMessageLen = 6;
extern uint64_t resizeMessage[];
//- Initial buffer length. Should be able to contain the message comfortably.
const int initialBufferLen = 2*resizeMessageLen*sizeof(uint64_t);
//- Helper routine to check if a message is a resize message.
// Returns -1 if not or the new size.
label getSizeFromHeader(const char* buf, const label len);
//- Change the resize message to contain the new length
void setResizeMessage(const label len);
//- Get max size of all receive buffers to procNo
label getMaxBufSize(const int procNo);
};
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
} // End namespace Foam
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
#endif
// ************************************************************************* //