ENH: Improvements to the fileHandler and collated IO

Improvements to existing functionality
--------------------------------------
  - MPI is initialised without thread support if it is not needed e.g. uncollated
  - Use native c++11 threading; avoids problem with static destruction order.
  - etc/cellModels now only read if needed.
  - etc/controlDict can now be read from the environment variable FOAM_CONTROLDICT
  - Uniform files (e.g. '0/uniform/time') are now read only once on the master only
    (with the masterUncollated or collated file handlers)
  - collated format writes to 'processorsNNN' instead of 'processors'.  The file
    format is unchanged.
  - Thread buffer and file buffer size are no longer limited to 2Gb.

The global controlDict file contains parameters for file handling.  Under some
circumstances, e.g. running in parallel on a system without NFS, the user may
need to set some parameters, e.g. fileHandler, before the global controlDict
file is read from file.  To support this, OpenFOAM now allows the global
controlDict to be read as a string set to the FOAM_CONTROLDICT environment
variable.

The FOAM_CONTROLDICT environment variable can be set to the content the global
controlDict file, e.g. from a sh/bash shell:

    export FOAM_CONTROLDICT=$(foamDictionary $FOAM_ETC/controlDict)

FOAM_CONTROLDICT can then be passed to mpirun using the -x option, e.g.:

    mpirun -np 2 -x FOAM_CONTROLDICT simpleFoam -parallel

Note that while this avoids the need for NFS to read the OpenFOAM configuration
the executable still needs to load shared libraries which must either be copied
locally or available via NFS or equivalent.

New: Multiple IO ranks
----------------------
The masterUncollated and collated fileHandlers can now use multiple ranks for
writing e.g.:

    mpirun -np 6 simpleFoam -parallel -ioRanks '(0 3)'

In this example ranks 0 ('processor0') and 3 ('processor3') now handle all the
I/O.  Rank 0 handles 0,1,2 and rank 3 handles 3,4,5.  The set of IO ranks should always
include 0 as first element and be sorted in increasing order.

The collated fileHandler uses the directory naming processorsNNN_XXX-YYY where
NNN is the total number of processors and XXX and YYY are first and last
processor in the rank, e.g. in above example the directories would be

    processors6_0-2
    processors6_3-5

and each of the collated files in these contains data of the local ranks
only. The same naming also applies when e.g. running decomposePar:

decomposePar -fileHandler collated -ioRanks '(0 3)'

New: Distributed data
---------------------

The individual root directories can be placed on different hosts with different
paths if necessary.  In the current framework it is necessary to specify the
root per slave process but this has been simplified with the option of specifying
the root per host with the -hostRoots command line option:

    mpirun -np 6 simpleFoam -parallel -ioRanks '(0 3)' \
        -hostRoots '("machineA" "/tmp/" "machineB" "/tmp")'

The hostRoots option is followed by a list of machine name + root directory, the
machine name can contain regular expressions.

New: hostCollated
-----------------

The new hostCollated fileHandler automatically sets the 'ioRanks' according to
the host name with the lowest rank e.g. to run simpleFoam on 6 processors with
ranks 0-2 on machineA and ranks 3-5 on machineB with the machines specified in
the hostfile:

    mpirun -np 6 --hostfile hostfile simpleFoam -parallel -fileHandler hostCollated

This is equivalent to

    mpirun -np 6 --hostfile hostfile simpleFoam -parallel -fileHandler collated -ioRanks '(0 3)'

This example will write directories:

    processors6_0-2/
    processors6_3-5/

A typical example would use distributed data e.g. no two nodes, machineA and
machineB, each with three processes:

    decomposePar -fileHandler collated -case cavity

    # Copy case (constant/*, system/*, processors6/) to master:
    rsync -a cavity machineA:/tmp/

    # Create root on slave:
    ssh machineB mkdir -p /tmp/cavity

    # Run
    mpirun --hostfile hostfile icoFoam \
        -case /tmp/cavity -parallel -fileHandler hostCollated \
        -hostRoots '("machineA" "/tmp" "machineB" "/tmp")'

Contributed by Mattijs Janssens
This commit is contained in:
Henry Weller
2018-03-21 12:42:22 +00:00
committed by mattijs
parent d469bbae4b
commit 8959b8e00a
37 changed files with 4093 additions and 1479 deletions

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2017 OpenFOAM Foundation
\\ / A nd | Copyright (C) 2017-2018 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -681,14 +681,15 @@ void Foam::decomposedBlockData::gather
List<int> recvOffsets;
List<int> recvSizes;
if (UPstream::master())
if (UPstream::master(comm))
{
recvOffsets.setSize(nProcs);
forAll(recvOffsets, proci)
{
// Note: truncating long int to int since UPstream::gather limited
// to ints
recvOffsets[proci] =
reinterpret_cast<char*>(&datas[proci])
- data0Ptr;
int(reinterpret_cast<char*>(&datas[proci]) - data0Ptr);
}
recvSizes.setSize(nProcs, sizeof(label));
}
@ -748,7 +749,8 @@ void Foam::decomposedBlockData::gatherSlaveData
&& (UPstream::myProcNo(comm) < startProc+nProcs)
)
{
nSend = data.byteSize();
// Note: UPstream::gather limited to int
nSend = int(data.byteSize());
}
UPstream::gather
@ -764,6 +766,46 @@ void Foam::decomposedBlockData::gatherSlaveData
}
Foam::label Foam::decomposedBlockData::calcNumProcs
(
const label comm,
const off_t maxBufferSize,
const labelUList& recvSizes,
const label startProci
)
{
const label nProcs = UPstream::nProcs(comm);
label nSendProcs = -1;
if (UPstream::master(comm))
{
off_t totalSize = recvSizes[startProci];
label proci = startProci+1;
while (proci < nProcs && (totalSize+recvSizes[proci] < maxBufferSize))
{
totalSize += recvSizes[proci];
proci++;
}
nSendProcs = proci-startProci;
}
// Scatter nSendProcs
label n;
UPstream::scatter
(
reinterpret_cast<const char*>(&nSendProcs),
List<int>(nProcs, sizeof(nSendProcs)),
List<int>(nProcs, 0),
reinterpret_cast<char*>(&n),
sizeof(n),
comm
);
return n;
}
bool Foam::decomposedBlockData::writeBlocks
(
const label comm,
@ -772,8 +814,7 @@ bool Foam::decomposedBlockData::writeBlocks
const UList<char>& data,
const labelUList& recvSizes,
const bool haveSlaveData,
const List<char>& slaveData,
const PtrList<SubList<char>>& slaveData,
const UPstream::commsTypes commsType,
const bool syncReturnState
@ -784,17 +825,15 @@ bool Foam::decomposedBlockData::writeBlocks
Pout<< "decomposedBlockData::writeBlocks:"
<< " stream:" << (osPtr.valid() ? osPtr().name() : "invalid")
<< " data:" << data.size()
<< " haveSlaveData:" << haveSlaveData
<< " (master only) slaveData:" << slaveData.size()
<< " commsType:" << Pstream::commsTypeNames[commsType] << endl;
}
const label nProcs = UPstream::nProcs(comm);
bool ok = true;
if (haveSlaveData)
if (slaveData.size())
{
// Already have gathered the slave data. communicator only used to
// check who is the master
@ -821,8 +860,7 @@ bool Foam::decomposedBlockData::writeBlocks
os << nl << nl << "// Processor" << proci << nl;
start[proci] = os.stdStream().tellp();
os << SubList<char>(slaveData, recvSizes[proci], slaveOffset);
os << slaveData[proci];
slaveOffset += recvSizes[proci];
}
@ -897,44 +935,24 @@ bool Foam::decomposedBlockData::writeBlocks
// maxMasterFileBufferSize
// Starting slave processor and number of processors
labelPair startAndSize(1, nProcs-1);
label startProc = 1;
label nSendProcs = nProcs-1;
while (startAndSize[1] > 0)
while (nSendProcs > 0)
{
labelPair masterData(startAndSize);
if (UPstream::master(comm))
{
label totalSize = recvSizes[masterData[0]];
label proci = masterData[0]+1;
while
(
proci < nProcs
&& (
totalSize+recvSizes[proci]
< fileOperations::masterUncollatedFileOperation::
maxMasterFileBufferSize
)
)
{
totalSize += recvSizes[proci];
++proci;
}
masterData[1] = proci-masterData[0];
}
// Scatter masterData
UPstream::scatter
nSendProcs = calcNumProcs
(
reinterpret_cast<const char*>(masterData.cdata()),
List<int>(nProcs, sizeof(masterData)),
List<int>(nProcs, 0),
reinterpret_cast<char*>(startAndSize.data()),
sizeof(startAndSize),
comm
comm,
off_t
(
fileOperations::masterUncollatedFileOperation::
maxMasterFileBufferSize
),
recvSizes,
startProc
);
if (startAndSize[0] == nProcs || startAndSize[1] == 0)
if (startProc == nProcs || nSendProcs == 0)
{
break;
}
@ -949,8 +967,8 @@ bool Foam::decomposedBlockData::writeBlocks
data,
recvSizes,
startAndSize[0], // startProc,
startAndSize[1], // nProcs,
startProc, // startProc,
nSendProcs, // nProcs,
sliceOffsets,
recvData
@ -963,9 +981,9 @@ bool Foam::decomposedBlockData::writeBlocks
// Write slaves
for
(
label proci = startAndSize[0];
proci < startAndSize[0]+startAndSize[1];
++proci
label proci = startProc;
proci < startProc+nSendProcs;
proci++
)
{
os << nl << nl << "// Processor" << proci << nl;
@ -981,7 +999,7 @@ bool Foam::decomposedBlockData::writeBlocks
}
}
startAndSize[0] += startAndSize[1];
startProc += nSendProcs;
}
if (UPstream::master(comm))
@ -1027,7 +1045,7 @@ bool Foam::decomposedBlockData::writeData(Ostream& os) const
);
IOobject io(*this);
if (Pstream::master())
if (Pstream::master(comm_))
{
IStringStream is
(
@ -1043,7 +1061,7 @@ bool Foam::decomposedBlockData::writeData(Ostream& os) const
// version
string versionString(os.version().str());
Pstream::scatter(versionString);
Pstream::scatter(versionString, Pstream::msgType(), comm_);
// stream
string formatString;
@ -1051,21 +1069,21 @@ bool Foam::decomposedBlockData::writeData(Ostream& os) const
OStringStream os;
os << os.format();
formatString = os.str();
Pstream::scatter(formatString);
Pstream::scatter(formatString, Pstream::msgType(), comm_);
}
//word masterName(name());
//Pstream::scatter(masterName);
//Pstream::scatter(masterName, Pstream::msgType(), comm_);
Pstream::scatter(io.headerClassName());
Pstream::scatter(io.note());
Pstream::scatter(io.headerClassName(), Pstream::msgType(), comm_);
Pstream::scatter(io.note(), Pstream::msgType(), comm_);
//Pstream::scatter(io.instance(), Pstream::msgType(), comm);
//Pstream::scatter(io.local(), Pstream::msgType(), comm);
fileName masterLocation(instance()/db().dbDir()/local());
Pstream::scatter(masterLocation);
Pstream::scatter(masterLocation, Pstream::msgType(), comm_);
if (!Pstream::master())
if (!Pstream::master(comm_))
{
writeHeader
(
@ -1081,7 +1099,7 @@ bool Foam::decomposedBlockData::writeData(Ostream& os) const
os.writeQuoted(str, false);
if (!Pstream::master())
if (!Pstream::master(comm_))
{
IOobject::writeEndDivider(os);
}
@ -1108,10 +1126,10 @@ bool Foam::decomposedBlockData::writeObject
}
labelList recvSizes;
gather(comm_, this->byteSize(), recvSizes);
gather(comm_, label(this->byteSize()), recvSizes);
List<std::streamoff> start;
List<char> slaveData; // dummy already received slave data
PtrList<SubList<char>> slaveData; // dummy slave data
return writeBlocks
(
comm_,
@ -1119,7 +1137,6 @@ bool Foam::decomposedBlockData::writeObject
start,
*this,
recvSizes,
false, // don't have slave data
slaveData,
commsType_
);