Merge branch 'feature-chunkingComms' into 'develop'

Pstream: added maxCommsSize setting to do (unstructured) parallel transfers in blocks.

Tested:
- with maxCommsSize 0 produces exactly same result as plus.develop
- compiles with label64
- with maxCommsSize e.g. 3 produces exactly same result as plus.develop
- with maxCommsSize=0 exactly the same messages (with Pstream::debug = 1) as plus.develop

See merge request !85
This commit is contained in:
Andrew Heather
2016-12-14 15:18:42 +00:00
10 changed files with 318 additions and 54 deletions

View File

@ -48,7 +48,6 @@ Description
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "PstreamReduceOps.H"
#include "argList.H" #include "argList.H"
#include "Time.H" #include "Time.H"
#include "polyTopoChange.H" #include "polyTopoChange.H"
@ -62,6 +61,7 @@ Description
#include "motionSmoother.H" #include "motionSmoother.H"
#include "topoSet.H" #include "topoSet.H"
#include "processorMeshes.H" #include "processorMeshes.H"
#include "PstreamReduceOps.H"
using namespace Foam; using namespace Foam;

View File

@ -62,6 +62,15 @@ OptimisationSwitches
floatTransfer 0; floatTransfer 0;
nProcsSimpleSum 0; nProcsSimpleSum 0;
// Optional max size (bytes) for unstructured data exchanges. In some
// phases of OpenFOAM it can send over very large data chunks
// (e.g. in parallel load balancing) and some Pstream implementations have
// problems with this. Setting this variable > 0 indicates that the
// data exchange needs to be done in multiple passes, each of maxCommsSize.
// This is not switched on by default since it requires an additional
// global reduction, even if multi-pass is not needed)
maxCommsSize 0;
// Force dumping (at next timestep) upon signal (-1 to disable) // Force dumping (at next timestep) upon signal (-1 to disable)
writeNowSignal -1; // 10; writeNowSignal -1; // 10;

View File

@ -3,7 +3,7 @@
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration | \\ / O peration |
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
\\/ M anipulation | \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -55,6 +55,37 @@ class Pstream
: :
public UPstream public UPstream
{ {
// Private Static Functions
//- Exchange contiguous data. Sends sendData, receives into
// recvData. If block=true will wait for all transfers to finish.
// Data provided and received as container.
template<class Container, class T>
static void exchangeContainer
(
const UList<Container>& sendBufs,
const labelUList& recvSizes,
List<Container>& recvBufs,
const int tag,
const label comm,
const bool block
);
//- Exchange contiguous data. Sends sendData, receives into
// recvData. If block=true will wait for all transfers to finish.
// Data provided and received as pointers.
template<class T>
static void exchangeBuf
(
const labelUList& sendSizes, // number of T, not number of char
const UList<const char*>& sendBufs,
const labelUList& recvSizes, // number of T, not number of char
List<char*>& recvBufs,
const int tag,
const label comm,
const bool block
);
protected: protected:
@ -63,6 +94,7 @@ protected:
//- Transfer buffer //- Transfer buffer
DynamicList<char> buf_; DynamicList<char> buf_;
public: public:
// Declare name of the class and its debug switch // Declare name of the class and its debug switch

View File

@ -3,7 +3,7 @@
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration | \\ / O peration |
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
\\/ M anipulation | \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -32,7 +32,6 @@ Description
#ifndef PstreamReduceOps_H #ifndef PstreamReduceOps_H
#define PstreamReduceOps_H #define PstreamReduceOps_H
#include "Pstream.H"
#include "ops.H" #include "ops.H"
#include "vector2D.H" #include "vector2D.H"

View File

@ -464,4 +464,16 @@ registerOptSwitch
); );
int Foam::UPstream::maxCommsSize
(
Foam::debug::optimisationSwitch("maxCommsSize", 0)
);
registerOptSwitch
(
"maxCommsSize",
int,
Foam::UPstream::maxCommsSize
);
// ************************************************************************* // // ************************************************************************* //

View File

@ -269,6 +269,9 @@ public:
//- Number of polling cycles in processor updates //- Number of polling cycles in processor updates
static int nPollProcInterfaces; static int nPollProcInterfaces;
//- Optional maximum message size (bytes)
static int maxCommsSize;
//- Default communicator (all processors) //- Default communicator (all processors)
static label worldComm; static label worldComm;

View File

@ -3,7 +3,7 @@
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration | \\ / O peration |
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
\\/ M anipulation | \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -28,13 +28,12 @@ Description
#include "Pstream.H" #include "Pstream.H"
#include "contiguous.H" #include "contiguous.H"
#include "PstreamCombineReduceOps.H" #include "PstreamReduceOps.H"
#include "UPstream.H"
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
template<class Container, class T> template<class Container, class T>
void Foam::Pstream::exchange void Foam::Pstream::exchangeContainer
( (
const UList<Container>& sendBufs, const UList<Container>& sendBufs,
const labelUList& recvSizes, const labelUList& recvSizes,
@ -44,25 +43,6 @@ void Foam::Pstream::exchange
const bool block const bool block
) )
{ {
if (!contiguous<T>())
{
FatalErrorInFunction
<< "Continuous data only." << sizeof(T) << Foam::abort(FatalError);
}
if (sendBufs.size() != UPstream::nProcs(comm))
{
FatalErrorInFunction
<< "Size of list " << sendBufs.size()
<< " does not equal the number of processors "
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
recvBufs.setSize(sendBufs.size());
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
label startOfRequests = Pstream::nRequests(); label startOfRequests = Pstream::nRequests();
// Set up receives // Set up receives
@ -70,17 +50,14 @@ void Foam::Pstream::exchange
forAll(recvSizes, proci) forAll(recvSizes, proci)
{ {
label nRecv = recvSizes[proci]; if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0)
if (proci != Pstream::myProcNo(comm) && nRecv > 0)
{ {
recvBufs[proci].setSize(nRecv);
UIPstream::read UIPstream::read
( (
UPstream::nonBlocking, UPstream::nonBlocking,
proci, proci,
reinterpret_cast<char*>(recvBufs[proci].begin()), reinterpret_cast<char*>(recvBufs[proci].begin()),
nRecv*sizeof(T), recvSizes[proci]*sizeof(T),
tag, tag,
comm comm
); );
@ -125,6 +102,237 @@ void Foam::Pstream::exchange
{ {
Pstream::waitRequests(startOfRequests); Pstream::waitRequests(startOfRequests);
} }
}
template<class T>
void Foam::Pstream::exchangeBuf
(
const labelUList& sendSizes,
const UList<const char*>& sendBufs,
const labelUList& recvSizes,
List<char*>& recvBufs,
const int tag,
const label comm,
const bool block
)
{
label startOfRequests = Pstream::nRequests();
// Set up receives
// ~~~~~~~~~~~~~~~
forAll(recvSizes, proci)
{
if (proci != Pstream::myProcNo(comm) && recvSizes[proci] > 0)
{
UIPstream::read
(
UPstream::nonBlocking,
proci,
recvBufs[proci],
recvSizes[proci]*sizeof(T),
tag,
comm
);
}
}
// Set up sends
// ~~~~~~~~~~~~
forAll(sendBufs, proci)
{
if (proci != Pstream::myProcNo(comm) && sendSizes[proci] > 0)
{
if
(
!UOPstream::write
(
UPstream::nonBlocking,
proci,
sendBufs[proci],
sendSizes[proci]*sizeof(T),
tag,
comm
)
)
{
FatalErrorInFunction
<< "Cannot send outgoing message. "
<< "to:" << proci << " nBytes:"
<< label(sendSizes[proci]*sizeof(T))
<< Foam::abort(FatalError);
}
}
}
// Wait for all to finish
// ~~~~~~~~~~~~~~~~~~~~~~
if (block)
{
Pstream::waitRequests(startOfRequests);
}
}
template<class Container, class T>
void Foam::Pstream::exchange
(
const UList<Container>& sendBufs,
const labelUList& recvSizes,
List<Container>& recvBufs,
const int tag,
const label comm,
const bool block
)
{
if (!contiguous<T>())
{
FatalErrorInFunction
<< "Continuous data only." << sizeof(T) << Foam::abort(FatalError);
}
if (sendBufs.size() != UPstream::nProcs(comm))
{
FatalErrorInFunction
<< "Size of list " << sendBufs.size()
<< " does not equal the number of processors "
<< UPstream::nProcs(comm)
<< Foam::abort(FatalError);
}
recvBufs.setSize(sendBufs.size());
if (UPstream::parRun() && UPstream::nProcs(comm) > 1)
{
// Presize all receive buffers
forAll(recvSizes, proci)
{
label nRecv = recvSizes[proci];
if (proci != Pstream::myProcNo(comm) && nRecv > 0)
{
recvBufs[proci].setSize(nRecv);
}
}
if (Pstream::maxCommsSize <= 0)
{
// Do the exchanging in one go
exchangeContainer<Container, T>
(
sendBufs,
recvSizes,
recvBufs,
tag,
comm,
block
);
}
else
{
// Determine the number of chunks to send. Note that we
// only have to look at the sending data since we are
// guaranteed that some processor's sending size is some other
// processor's receive size. Also we can ignore any local comms.
label maxNSend = 0;
forAll(sendBufs, proci)
{
if (proci != Pstream::myProcNo(comm))
{
maxNSend = max(maxNSend, sendBufs[proci].size());
}
}
const label maxNBytes = sizeof(T)*maxNSend;
// We need to send maxNBytes bytes so the number of iterations:
// maxNBytes iterations
// --------- ----------
// 0 0
// 1..maxCommsSize 1
// maxCommsSize+1..2*maxCommsSize 2
// etc.
label nIter;
if (maxNBytes == 0)
{
nIter = 0;
}
else
{
nIter = (maxNBytes-1)/Pstream::maxCommsSize+1;
}
reduce(nIter, maxOp<label>(), tag, comm);
List<const char*> charSendBufs(sendBufs.size());
List<char*> charRecvBufs(sendBufs.size());
labelList nRecv(sendBufs.size());
labelList startRecv(sendBufs.size(), 0);
labelList nSend(sendBufs.size());
labelList startSend(sendBufs.size(), 0);
for (label iter = 0; iter < nIter; iter++)
{
forAll(sendBufs, proci)
{
nSend[proci] = min
(
Pstream::maxCommsSize,
sendBufs[proci].size()-startSend[proci]
);
charSendBufs[proci] =
(
nSend[proci] > 0
? reinterpret_cast<const char*>
(
&(sendBufs[proci][startSend[proci]])
)
: nullptr
);
nRecv[proci] = min
(
Pstream::maxCommsSize,
recvBufs[proci].size()-startRecv[proci]
);
charRecvBufs[proci] =
(
nRecv[proci] > 0
? reinterpret_cast<char*>
(
&(recvBufs[proci][startRecv[proci]])
)
: nullptr
);
}
exchangeBuf<T>
(
nSend,
charSendBufs,
nRecv,
charRecvBufs,
tag,
comm,
block
);
forAll(nSend, proci)
{
startSend[proci] += nSend[proci];
startRecv[proci] += nRecv[proci];
}
}
}
} }
// Do myself // Do myself

View File

@ -28,6 +28,7 @@ License
#include "dynamicCode.H" #include "dynamicCode.H"
#include "dynamicCodeContext.H" #include "dynamicCodeContext.H"
#include "dlLibraryTable.H" #include "dlLibraryTable.H"
#include "Pstream.H"
#include "PstreamReduceOps.H" #include "PstreamReduceOps.H"
#include "OSspecific.H" #include "OSspecific.H"
#include "Ostream.H" #include "Ostream.H"

View File

@ -3,7 +3,7 @@
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration | \\ / O peration |
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
\\/ M anipulation | \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -23,7 +23,7 @@ License
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "UPstream.H" #include "Pstream.H"
#include "PstreamReduceOps.H" #include "PstreamReduceOps.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

View File

@ -3,7 +3,7 @@
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration | \\ / O peration |
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
\\/ M anipulation | \\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
License License
This file is part of OpenFOAM. This file is part of OpenFOAM.
@ -23,7 +23,7 @@ License
\*---------------------------------------------------------------------------*/ \*---------------------------------------------------------------------------*/
#include "UPstream.H" #include "Pstream.H"
#include "PstreamReduceOps.H" #include "PstreamReduceOps.H"
#include "OSspecific.H" #include "OSspecific.H"
#include "PstreamGlobals.H" #include "PstreamGlobals.H"