/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | www.openfoam.com
\\/ M anipulation |
-------------------------------------------------------------------------------
Copyright (C) 2017-2018 OpenFOAM Foundation
Copyright (C) 2019-2023 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see .
\*---------------------------------------------------------------------------*/
#include "masterUncollatedFileOperation.H"
#include "fileOperationInitialise.H"
#include "addToRunTimeSelectionTable.H"
#include "Pstream.H"
#include "Time.H"
#include "instant.H"
#include "IFstream.H"
#include "SpanStream.H"
#include "masterOFstream.H"
#include "decomposedBlockData.H"
#include "registerSwitch.H"
#include "dummyISstream.H"
#include "SubList.H"
/* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
namespace Foam
{
namespace fileOperations
{
defineTypeNameAndDebug(masterUncollatedFileOperation, 0);
addToRunTimeSelectionTable
(
fileOperation,
masterUncollatedFileOperation,
word
);
addToRunTimeSelectionTable
(
fileOperation,
masterUncollatedFileOperation,
comm
);
float masterUncollatedFileOperation::maxMasterFileBufferSize
(
Foam::debug::floatOptimisationSwitch("maxMasterFileBufferSize", 1e9)
);
registerOptSwitch
(
"maxMasterFileBufferSize",
float,
masterUncollatedFileOperation::maxMasterFileBufferSize
);
// Threaded MPI: not required
addNamedToRunTimeSelectionTable
(
fileOperationInitialise,
fileOperationInitialise_unthreaded,
word,
masterUncollated
);
}
}
// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * //
namespace Foam
{
// Get file contents (compressed or uncompressed)
static DynamicList slurpFile(IFstream& ifs)
{
DynamicList buffer;
auto& iss = ifs.stdStream();
const auto inputSize = ifs.fileSize();
if (IOstreamOption::COMPRESSED == ifs.compression())
{
// For compressed files, no idea how large the result will be.
// So read chunk-wise.
// Using the compressed size for the chunk size:
// 50% compression = 2 iterations
// 66% compression = 3 iterations
// ...
const uint64_t chunkSize =
(
(inputSize <= 1024)
? uint64_t(4096)
: uint64_t(2*inputSize)
);
uint64_t beg = 0;
for (int iter = 1; iter < 100000; ++iter)
{
// Manual resizing to use incremental vs doubling
buffer.setCapacity(label(iter * chunkSize));
buffer.resize(buffer.capacity());
ifs.readRaw(buffer.data() + beg, chunkSize);
const std::streamsize nread = iss.gcount();
if
(
nread < 0
|| nread == std::numeric_limits::max()
)
{
// Failed, but treat as normal 'done'
buffer.resize(label(beg));
break;
}
else
{
beg += uint64_t(nread);
if (nread >= 0 && uint64_t(nread) < chunkSize)
{
// normalExit = true;
buffer.resize(label(beg));
break;
}
}
}
}
else
{
if (inputSize >= 0)
{
buffer.setCapacity(label(inputSize));
buffer.resize(buffer.capacity());
ifs.readRaw(buffer.data(), buffer.size_bytes());
const std::streamsize nread = iss.gcount();
if
(
nread < 0
|| nread == std::numeric_limits::max()
)
{
// Failed, but treat as normal 'done'
buffer.clear();
}
else
{
buffer.resize(label(nread)); // Safety
}
}
}
return buffer;
}
} // End namespace Foam
// * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
Foam::word
Foam::fileOperations::masterUncollatedFileOperation::findInstancePath
(
const instantList& timeDirs,
const instant& t
)
{
// Note:
// - times will include constant (with value 0) as first element.
// For backwards compatibility make sure to find 0 in preference
// to constant.
// - list is sorted so could use binary search
forAllReverse(timeDirs, i)
{
if (t.equal(timeDirs[i].value()))
{
return timeDirs[i].name();
}
}
return word();
}
Foam::fileName
Foam::fileOperations::masterUncollatedFileOperation::filePathInfo
(
const bool checkGlobal,
const bool isFile,
const IOobject& io,
const dirIndexList& pDirs,
const bool search,
pathType& searchType,
word& procsDir,
word& newInstancePath
) const
{
procsDir.clear();
newInstancePath.clear();
if (io.instance().isAbsolute())
{
fileName objPath = io.instance()/io.name();
if (isFileOrDir(isFile, objPath))
{
searchType = fileOperation::ABSOLUTE;
return objPath;
}
else
{
searchType = fileOperation::NOTFOUND;
return fileName();
}
}
else
{
// 1. Check the writing fileName
fileName writePath(objectPath(io, io.headerClassName()));
if (isFileOrDir(isFile, writePath))
{
searchType = fileOperation::WRITEOBJECT;
return writePath;
}
// 2. Check processors/
if (io.time().processorCase())
{
for (const dirIndex& dirIdx : pDirs)
{
const fileName& pDir = dirIdx.first();
fileName objPath =
processorsPath(io, io.instance(), pDir)
/io.name();
if (objPath != writePath && isFileOrDir(isFile, objPath))
{
searchType = dirIdx.second().first();
procsDir = pDir;
return objPath;
}
}
}
{
// 3. Check local
fileName localPath = io.objectPath();
if
(
localPath != writePath
&& isFileOrDir(isFile, localPath)
)
{
searchType = fileOperation::OBJECT;
return localPath;
}
}
// Any global checks
if
(
checkGlobal
&& io.time().processorCase()
&& (
io.instance() == io.time().system()
|| io.instance() == io.time().constant()
)
)
{
fileName parentPath =
io.rootPath()/io.time().globalCaseName()
/io.instance()/io.db().dbDir()/io.local()/io.name();
if (isFileOrDir(isFile, parentPath))
{
searchType = fileOperation::PARENTOBJECT;
return parentPath;
}
}
// Check for approximately same time. E.g. if time = 1e-2 and
// directory is 0.01 (due to different time formats)
const auto pathFnd = times_.cfind(io.time().path());
if (search && pathFnd.good())
{
newInstancePath = findInstancePath
(
*pathFnd(),
instant(io.instance())
);
if (newInstancePath.size() && newInstancePath != io.instance())
{
// 1. Try processors equivalent
for (const dirIndex& dirIdx : pDirs)
{
const fileName& pDir = dirIdx.first();
fileName fName
(
processorsPath(io, newInstancePath, pDir)
/io.name()
);
if (isFileOrDir(isFile, fName))
{
switch (dirIdx.second().first())
{
case fileOperation::PROCUNCOLLATED:
{
searchType =
fileOperation::PROCUNCOLLATEDINSTANCE;
}
break;
case fileOperation::PROCBASEOBJECT:
{
searchType = fileOperation::PROCBASEINSTANCE;
}
break;
case fileOperation::PROCOBJECT:
{
searchType = fileOperation::PROCINSTANCE;
}
break;
default:
break;
}
procsDir = pDir;
return fName;
}
}
// 2. Check local
fileName fName
(
io.rootPath()/io.caseName()
/newInstancePath/io.db().dbDir()/io.local()/io.name()
);
if (isFileOrDir(isFile, fName))
{
searchType = fileOperation::FINDINSTANCE;
return fName;
}
}
}
}
// Nothing found
searchType = fileOperation::NOTFOUND;
return fileName();
}
Foam::fileName
Foam::fileOperations::masterUncollatedFileOperation::localObjectPath
(
const IOobject& io,
const pathType& searchType,
const word& procDir,
const word& instancePath
) const
{
// Replacement for IOobject::objectPath()
switch (searchType)
{
case fileOperation::ABSOLUTE:
{
return io.instance()/io.name();
}
break;
case fileOperation::OBJECT:
{
return io.path()/io.name();
}
break;
case fileOperation::WRITEOBJECT:
{
return objectPath(io, io.headerClassName());
}
break;
case fileOperation::PROCUNCOLLATED:
{
// Uncollated type, e.g. processor1
const word procName
(
"processor" + Foam::name(Pstream::myProcNo(UPstream::worldComm))
);
return
processorsPath
(
io,
io.instance(),
(
Pstream::parRun()
? procName
: procDir
)
)
/io.name();
}
break;
case fileOperation::PROCBASEOBJECT:
{
// Collated, e.g. processors4
return
processorsPath(io, io.instance(), procDir)
/io.name();
}
break;
case fileOperation::PROCOBJECT:
{
// Processors directory locally provided by the fileHandler itself
return
processorsPath(io, io.instance(), processorsDir(io))
/io.name();
}
break;
case fileOperation::PARENTOBJECT:
{
return
io.rootPath()/io.time().globalCaseName()
/io.instance()/io.db().dbDir()/io.local()/io.name();
}
break;
case fileOperation::FINDINSTANCE:
{
return
io.rootPath()/io.caseName()
/instancePath/io.db().dbDir()/io.local()/io.name();
}
break;
case fileOperation::PROCUNCOLLATEDINSTANCE:
{
// Uncollated type, e.g. processor1
const word procName
(
"processor"
+ Foam::name(Pstream::myProcNo(UPstream::worldComm))
);
return
processorsPath
(
io,
instancePath,
(
Pstream::parRun()
? procName
: procDir
)
)
/io.name();
}
break;
case fileOperation::PROCBASEINSTANCE:
{
// Collated, e.g. processors4
return
processorsPath(io, instancePath, procDir)
/io.name();
}
break;
case fileOperation::PROCINSTANCE:
{
// Processors directory locally provided by the fileHandler itself
return
processorsPath(io, instancePath, processorsDir(io))
/io.name();
}
break;
case fileOperation::NOTFOUND:
{
return fileName();
}
break;
default:
{
NotImplemented;
return fileName();
}
}
}
void Foam::fileOperations::masterUncollatedFileOperation::readAndSend
(
const fileName& filePath,
const labelUList& recvProcs,
PstreamBuffers& pBufs
)
{
if (recvProcs.empty()) return;
IFstream ifs(filePath, IOstreamOption::BINARY);
if (!ifs.good())
{
FatalIOErrorInFunction(filePath)
<< "Cannot open file " << filePath
//<< " using communicator " << pBufs.comm()
//<< " ioRanks:" << UPstream::procID(pBufs.comm())
<< exit(FatalIOError);
}
if (debug)
{
Info<< "masterUncollatedFileOperation::readAndSend :"
<< " compressed:" << bool(ifs.compression()) << " "
<< filePath << endl;
}
// Read file contents (compressed or uncompressed) into a character buffer
DynamicList buf(slurpFile(ifs));
for (const label proci : recvProcs)
{
UOPstream os(proci, pBufs);
os.write(buf.cdata_bytes(), buf.size_bytes());
}
if (debug)
{
Info<< "masterUncollatedFileOperation::readStream :"
<< " From " << filePath << " sent " << buf.size()
<< " bytes" << endl;
}
}
Foam::autoPtr
Foam::fileOperations::masterUncollatedFileOperation::read
(
IOobject& io,
const label comm,
const bool uniform, // on comms master only
const fileNameList& filePaths, // on comms master and sub-ranks
const boolUList& readOnProcs // on comms master and sub-ranks
)
{
autoPtr isPtr;
PstreamBuffers pBufs(comm, UPstream::commsTypes::nonBlocking);
if (UPstream::master(comm))
{
if (uniform)
{
if (readOnProcs[0])
{
if (filePaths[0].empty())
{
FatalIOErrorInFunction(filePaths[0])
<< "Cannot find file " << io.objectPath()
<< " fileHandler : comm:" << comm
<< " ioRanks:" << UPstream::procID(comm)
<< exit(FatalIOError);
}
DynamicList