diff --git a/applications/test/parallel-file-write1/Make/files b/applications/test/parallel-file-write1/Make/files new file mode 100644 index 0000000000..7026affab8 --- /dev/null +++ b/applications/test/parallel-file-write1/Make/files @@ -0,0 +1,3 @@ +Test-parallel-file-write1.cxx + +EXE = $(FOAM_USER_APPBIN)/Test-parallel-file-write1 diff --git a/applications/test/parallel-file-write1/Make/options b/applications/test/parallel-file-write1/Make/options new file mode 100644 index 0000000000..18e6fe47af --- /dev/null +++ b/applications/test/parallel-file-write1/Make/options @@ -0,0 +1,2 @@ +/* EXE_INC = */ +/* EXE_LIBS = */ diff --git a/applications/test/parallel-file-write1/Test-parallel-file-write1.cxx b/applications/test/parallel-file-write1/Test-parallel-file-write1.cxx new file mode 100644 index 0000000000..5d92636ae5 --- /dev/null +++ b/applications/test/parallel-file-write1/Test-parallel-file-write1.cxx @@ -0,0 +1,260 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2025 Mark Olesen +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see . + +Application + Test-parallel-file-write1 + +Description + Simple test of writing with MPI/IO + +\*---------------------------------------------------------------------------*/ + +#include "argList.H" +#include "Time.H" +#include "Switch.H" +#include "UPstreamFile.H" +#include "SpanStream.H" + +using namespace Foam; + +template +void zeropadded(std::ostream& os, IntType val, char fillch = '0') +{ + // set fill char and width + os.setf(std::ios_base::right, std::ios_base::adjustfield); + fillch = os.fill(fillch); + os.width(std::numeric_limits::digits10+1); + os << val; + // restore fill char + os.fill(fillch); +} + + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +int main(int argc, char *argv[]) +{ + argList::noCheckProcessorDirectories(); + argList::addVerboseOption(); + argList::addBoolOption("master-footer", "Write footer from master"); + + #include "setRootCase.H" + + if (!UPstream::parRun()) + { + Info<< "###############" << nl + << "Not running in parallel. Stopping now" << nl + << "###############" << endl; + return 1; + } + + const bool optMasterFooter = args.found("master-footer"); + + Info<< nl << "Write master-footer: " << Switch::name(optMasterFooter) + << nl << nl; + + Info<< "Create time (without controlDict)\n" << endl; + + auto runTimePtr = Time::New(); + auto& runTime = runTimePtr(); + + const auto myProc = UPstream::myProcNo(); + const auto nProcs = UPstream::nProcs(); + + // Some content + OCharStream charset; + for (int i = 0; i < 10; ++i) + { + charset<< char('A' + i); + } + + // Header/footer buffers - these can be separate or bundled into + // the first/last blocks + + OCharStream header; + OCharStream footer; + + // Content buffer + OCharStream os(IOstream::BINARY); + + { + const auto v = charset.view(); + + os << nl; + os.beginBlock(word("rank" + Foam::name(myProc))); + + for (int repeat = 0; repeat <= myProc; ++repeat) + { + os << indent << word("entry" + Foam::name(repeat)) + << ' ' << word("List"); + // os << nl; + os << ' '; + os << label(v.size()); + os.write(v.data(), v.size()); + // os << nl; + os.endEntry(); + } + + os.endBlock(); + } + + // Bundle the footer into the last block + if (!optMasterFooter && (myProc == nProcs-1)) + { + IOobject::writeEndDivider(os); + } + + + // All content now exists - commit to disk + const std::string_view blockData(os.view()); + const int64_t blockSize(blockData.size()); + + // Collect sizes + const List sizes + ( + UPstream::allGatherValues(blockSize, UPstream::worldComm) + ); + + + // Format header with size information + if (UPstream::master()) + { + header + << "Simple MPI/IO test with " << nProcs << " ranks" << nl << nl; + + ocharstream labelbuf; + labelbuf.reserve_exact(32); + + // Position before writing a label + auto labelBegin = header.tellp(); + + header.beginBlock("meta"); + { + header << indent << word("data.start") << ' '; + + labelBegin = header.tellp(); + + // Add the start value (placeholder) + { + labelbuf.rewind(); + zeropadded(labelbuf, label(0)); + header.append(labelbuf.view()); + } + + header.endEntry(); + + header << indent << word("data.sizes") << nl; + sizes.writeList(header); // flatOutput + header.endEntry(); + } + header.endBlock(); + + header << nl; + IOobject::writeDivider(header); + + // Now update with the correct size + { + labelbuf.rewind(); + zeropadded(labelbuf, label(header.view().size())); + header.overwrite(labelBegin, labelbuf.view()); + } + + // Bundled the footer into the last block or from master? + if (optMasterFooter) + { + IOobject::writeEndDivider(footer); + } + } + + // With additional header/footer + int64_t headerSize(header.view().size()); + int64_t footerSize(footer.view().size()); + + Pstream::broadcast(headerSize); + if (optMasterFooter) + { + Pstream::broadcast(footerSize); + } + + + int64_t totalSize(headerSize); + for (int i = 0; i < myProc; ++i) + { + totalSize += sizes[i]; + } + + const int64_t blockOffset(totalSize); + + for (int i = myProc; i < nProcs; ++i) + { + totalSize += sizes[i]; + } + const int64_t footerOffset(totalSize); + totalSize += footerSize; + + + Pout<< "write size=" << label(blockSize) + << " at=" << label(blockOffset) << " total=" << label(totalSize) << nl; + + { + UPstream::File file; + + bool ok = file.open_write + ( + UPstream::worldComm, + runTime.globalPath()/"mpiio-test1.txt", + IOstreamOption::ATOMIC + ); + + if (ok) + { + Info<< "writing: " << file.name() << nl; + + if (UPstream::master()) + { + // A no-op for empty buffer + ok = file.write_at(0, header.view()); + } + + ok = file.write_at_all(blockOffset, blockData); + + if (UPstream::master()) + { + // A no-op for empty buffer + ok = file.write_at(footerOffset, footer.view()); + } + } + + file.set_size(totalSize); + file.close(); + } + + Info<< "\nEnd\n" << endl; + + return 0; +} + + +// ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamFile.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamFile.H new file mode 100644 index 0000000000..6c7db54231 --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamFile.H @@ -0,0 +1,267 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2025 Mark Olesen +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see . + +Class + Foam::UPstream::File + +Description + An opaque wrapper for MPI_File methods + without any \c header dependency. + +Note + Not included as part of UPstream.H - only include locally as required + +SourceFiles + UPstreamFile.txx + +\*---------------------------------------------------------------------------*/ + +#ifndef Foam_UPstreamFile_H +#define Foam_UPstreamFile_H + +#include "fileName.H" +#include "UPstream.H" + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +namespace Foam +{ + +/*---------------------------------------------------------------------------*\ + Class UPstream::File Declaration +\*---------------------------------------------------------------------------*/ + +class UPstream::File +{ + // Forward Declaration + class Impl; + + + // Private Data + + //- Implementation wrapper of MPI_File etc + std::unique_ptr file_; + + +protected: + + // Protected Method Functions + + //- MPI_File_write [non-collective] : write data + bool write_data + ( + //! Source buffer + const void* buffer, + //! The data count - number of elements + std::streamsize count, + const UPstream::dataTypes dataTypeId + ); + + //- MPI_File_write_at [non-collective] : write data at specified offset + bool write_data_at + ( + //! The offset - number of 'view' elements (default: byte) + std::streamsize offset, + //! Source buffer + const void* buffer, + //! The data count - number of elements + std::streamsize count, + const UPstream::dataTypes dataTypeId + ); + + //- MPI_File_write_all [collective] : write data + bool write_data_all + ( + //! Source buffer + const void* buffer, + //! The data count - number of elements + std::streamsize count, + const UPstream::dataTypes dataTypeId + ); + + //- MPI_File_write_at_all [collective] : + //- write data at specified offset + bool write_data_at_all + ( + //! The offset - number of 'view' elements (default: byte) + std::streamsize offset, + //! Source buffer + const void* buffer, + //! The data count - number of elements + std::streamsize count, + const UPstream::dataTypes dataTypeId + ); + + +public: + + // Generated Methods + + //- No copy construct + File(const File&&) = delete; + + //- Move construct + File(File&&) noexcept; + + //- No copy assignment + File& operator=(const File&) = delete; + + //- Move assignment + File& operator=(File&&) noexcept; + + + // Constructors + + //- Default construct + File(); + + //- Destructor. Non-default in header (incomplete types) + ~File(); + + + // Static Member Functions + + //- True if MPI/IO appears to be supported + static bool supported(); + + + // Member Functions + + // Access + + //- The name of the open stream + const fileName& name() const; + + //- True if allocated and open has been called + bool is_open() const; + + + // Basics + + //- MPI_File_open [collective] : + //- open file in write-only mode, no-append + bool open_write + ( + //! The OpenFOAM communicator index + const int communicator, + //! Full file path (parent directory must exist before calling) + const fileName& pathname, + //! Simulated atomic file handling + IOstreamOption::atomicType = IOstreamOption::NON_ATOMIC + ); + + //- MPI_File_close [collective] + bool close(); + + //- Set the (output) file size [collective] + bool set_size(std::streamsize num_bytes); + + + // Writing + + //- MPI_File_write [non-collective] : write data. + // A no-op and return true if content is empty + inline bool write(std::string_view sv); + + //- MPI_File_write [non-collective] : write data. + // A no-op and return true if buffer is nullptr or count is zero + template + inline bool write + ( + //! The content + const Type* buffer, + //! The data count - number of elements + std::streamsize count + ); + + //- MPI_File_write_at [non-collective] : write data at specified offset. + // A no-op and return true if content is empty + inline bool write_at(std::streamsize offset, std::string_view sv); + + //- MPI_File_write_at [non-collective] : write data at specified offset. + // A no-op and return true if buffer is nullptr or count is zero + template + inline bool write_at + ( + //! The offset within the file - number of 'view' elements + std::streamsize offset, + //! The content + const Type* buffer, + //! The data count - number of elements + std::streamsize count + ); + + //- MPI_File_write_all [collective] : write data + inline bool write_all(std::string_view sv); + + //- MPI_File_write_all [collective] : write data + template + inline bool write_all + ( + //! The content + const Type* buffer, + //! The data count - number of elements + std::streamsize count + ); + + //- MPI_File_write_at_all [collective] : + //- write data at specified offset + inline bool write_at_all + ( + //! The offset within the file - number of 'view' elements + std::streamsize offset, + //! The content + std::string_view sv + ); + + //- MPI_File_write_at_all [collective] : + //- write data at specified offset + template + inline bool write_at_all + ( + //! The offset within the file - number of 'view' elements + std::streamsize offset, + //! The content + const Type* buffer, + //! The data count - number of elements + std::streamsize count + ); +}; + + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +} // End namespace Foam + + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +#include "UPstreamFile.txx" + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +#endif + +// ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamFile.txx b/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamFile.txx new file mode 100644 index 0000000000..d41f029c61 --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamFile.txx @@ -0,0 +1,220 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2025 Mark Olesen +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see . + +\*---------------------------------------------------------------------------*/ + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +inline bool Foam::UPstream::File::write +( + std::string_view sv +) +{ + if (sv.empty()) + { + // no-op for no content + return true; + } + + return this->write_data + ( + sv.data(), sv.size(), + UPstream::dataTypes::type_byte + ); +} + + +template +bool Foam::UPstream::File::write +( + const Type* buffer, + std::streamsize count +) +{ + if constexpr (!is_contiguous_v) + { + FatalErrorInFunction + << "Only contiguous data can be supported!" + << Foam::abort(FatalError); + return false; + } + else if (buffer && count > 1) + { + // Use element or component type (or byte-wise) for data type + return this->write_data + ( + buffer, // The data or cmpt pointer + UPstream_dataType::size(count), + UPstream_dataType::datatype_id + ); + } + else + { + // no-op for no content + return true; + } +} + + +inline bool Foam::UPstream::File::write_at +( + std::streamsize offset, + std::string_view sv +) +{ + if (sv.empty()) + { + // no-op for no content + return true; + } + + return this->write_data_at + ( + offset, + sv.data(), sv.size(), + UPstream::dataTypes::type_byte + ); +} + + +template +bool Foam::UPstream::File::write_at +( + std::streamsize offset, + const Type* buffer, + std::streamsize count +) +{ + if constexpr (!is_contiguous_v) + { + FatalErrorInFunction + << "Only contiguous data can be supported!" + << Foam::abort(FatalError); + return false; + } + else if (buffer && count > 1) + { + // Use element or component type (or byte-wise) for data type + return this->write_data_at + ( + offset, + buffer, // The data or cmpt pointer + UPstream_dataType::size(count), + UPstream_dataType::datatype_id + ); + } + else + { + // no-op for no content + return true; + } +} + + +inline bool Foam::UPstream::File::write_all +( + std::string_view sv +) +{ + return this->write_data_all + ( + sv.data(), sv.size(), + UPstream::dataTypes::type_byte + ); +} + + +template +bool Foam::UPstream::File::write_all +( + const Type* buffer, + std::streamsize count +) +{ + if constexpr (!is_contiguous_v) + { + FatalErrorInFunction + << "Only contiguous data can be supported!" + << Foam::abort(FatalError); + return false; + } + else + { + // Use element or component type (or byte-wise) for data type + return this->write_data_all + ( + buffer, // The data or cmpt pointer + UPstream_dataType::size(count), + UPstream_dataType::datatype_id + ); + } +} + + +inline bool Foam::UPstream::File::write_at_all +( + std::streamsize offset, + std::string_view sv +) +{ + return this->write_data_at_all + ( + offset, + sv.data(), sv.size(), + UPstream::dataTypes::type_byte + ); +} + + +template +bool Foam::UPstream::File::write_at_all +( + std::streamsize offset, + const Type* buffer, + std::streamsize count +) +{ + if constexpr (!is_contiguous_v) + { + FatalErrorInFunction + << "Only contiguous data can be supported!" + << Foam::abort(FatalError); + return false; + } + else + { + // Use element or component type (or byte-wise) for data type + return this->write_data_at_all + ( + offset, + buffer, // The data or cmpt pointer + UPstream_dataType::size(count), + UPstream_dataType::datatype_id + ); + } +} + + +// ************************************************************************* // diff --git a/src/Pstream/dummy/Make/files b/src/Pstream/dummy/Make/files index 6126bd6cf7..efa2cbbb60 100644 --- a/src/Pstream/dummy/Make/files +++ b/src/Pstream/dummy/Make/files @@ -5,6 +5,7 @@ UPstreamCommunicator.C UPstreamGatherScatter.C UPstreamReduce.C UPstreamRequest.C +UPstreamFile.C UPstreamWindow.C UIPstreamRead.C diff --git a/src/Pstream/dummy/UPstreamFile.C b/src/Pstream/dummy/UPstreamFile.C new file mode 100644 index 0000000000..c2f5d57b9d --- /dev/null +++ b/src/Pstream/dummy/UPstreamFile.C @@ -0,0 +1,170 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2025 Mark Olesen +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see . + +\*---------------------------------------------------------------------------*/ + +#include "fileName.H" +#include "UPstreamFile.H" + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +namespace Foam +{ + +/*---------------------------------------------------------------------------*\ + Class UPstream::File::Impl Declaration +\*---------------------------------------------------------------------------*/ + +class UPstream::File::Impl {}; + +} // End namespace Foam + + +// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * // + +bool Foam::UPstream::File::supported() +{ + return false; +} + + +// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // + +Foam::UPstream::File::File() {} + + +// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * // + +Foam::UPstream::File::~File() +{} // Non-default in header (incomplete types) + + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +const Foam::fileName& Foam::UPstream::File::name() const +{ + return fileName::null; +} + + +bool Foam::UPstream::File::is_open() const +{ + return false; +} + + +bool Foam::UPstream::File::close() +{ + return false; +} + + +// * * * * * * * * * * * * Member Functions (Reading) * * * * * * * * * * * // + +#if 0 +bool Foam::UPstream::File::open_read +( + const int communicator, + const fileName& pathname +) +{ + NotImplemented; + return false; +} +#endif + + +// * * * * * * * * * * * * Member Functions (Writing) * * * * * * * * * * * // + +bool Foam::UPstream::File::open_write +( + const int communicator, + const fileName& pathname, + IOstreamOption::atomicType +) +{ + NotImplemented; + return false; +} + + +bool Foam::UPstream::File::write_data +( + const void* data, + std::streamsize count, + const UPstream::dataTypes dataTypeId +) +{ + NotImplemented; + return false; +} + + +bool Foam::UPstream::File::write_data_at +( + std::streamsize offset, + const void* data, + std::streamsize count, + const UPstream::dataTypes dataTypeId +) +{ + NotImplemented; + return false; +} + + +bool Foam::UPstream::File::write_data_all +( + const void* data, + std::streamsize count, + const UPstream::dataTypes dataTypeId +) +{ + NotImplemented; + return false; +} + + +bool Foam::UPstream::File::write_data_at_all +( + std::streamsize offset, + const void* data, + std::streamsize count, + const UPstream::dataTypes dataTypeId +) +{ + NotImplemented; + return false; +} + + +bool Foam::UPstream::File::set_size(std::streamsize num_bytes) +{ + NotImplemented; + return false; +} + + +// ************************************************************************* // diff --git a/src/Pstream/mpi/Make/files b/src/Pstream/mpi/Make/files index e859a9209f..7752acf66e 100644 --- a/src/Pstream/mpi/Make/files +++ b/src/Pstream/mpi/Make/files @@ -6,6 +6,7 @@ UPstreamCommunicator.C UPstreamGatherScatter.C UPstreamReduce.C UPstreamRequest.C +UPstreamFile.C UPstreamWindow.C UIPstreamRead.C diff --git a/src/Pstream/mpi/UPstreamFile.C b/src/Pstream/mpi/UPstreamFile.C new file mode 100644 index 0000000000..c7ba9b4055 --- /dev/null +++ b/src/Pstream/mpi/UPstreamFile.C @@ -0,0 +1,687 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2025 Mark Olesen +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see . + +\*---------------------------------------------------------------------------*/ + +#include "fileName.H" +#include "UPstreamFile.H" +#include "PstreamGlobals.H" +#include "OSspecific.H" + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +// Has _c() version? +#undef Foam_UPstream_largeCountFile + +#if defined(OMPI_MAJOR_VERSION) + #if (OMPI_MAJOR_VERSION >= 5) + #define Foam_UPstream_largeCountFile + #endif +#endif + +// Macros for calling versions with or without '_c' +#ifdef Foam_UPstream_largeCountFile +#define Foam_mpiCall(Function) Function##_c +#else +#define Foam_mpiCall(Function) Function +#endif + + +// * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * // + +namespace +{ + +inline bool checkCount(std::streamsize count, const char* what) +{ + #ifndef Foam_UPstream_largeCountFile + if (FOAM_UNLIKELY(count > std::streamsize(INT_MAX))) + { + using namespace Foam; + FatalErrorInFunction + << "Write size " << label(count) + << " exceeds INT_MAX bytes for '" << what << "'\n" + << Foam::abort(Foam::FatalError); + return false; + } + #endif + + return true; +} + +} // End anonymous namespace + + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +namespace Foam +{ + +/*---------------------------------------------------------------------------*\ + Class UPstream::File::Impl Declaration +\*---------------------------------------------------------------------------*/ + +class UPstream::File::Impl +{ + //- The file-handle + MPI_File fh_; + + //- Path of the open file + fileName name_; + + //- The current state (open|read|write|closed etc) + int state_; + + //- The associated rank when openned + int rank_; + +public: + + //- The file states + enum states : int { CLOSED = 0, READ, WRITE, ATOMIC_WRITE }; + + // Constructors + + //- Default construct + Impl() + : + fh_(MPI_FILE_NULL), + state_(CLOSED), + rank_(0) + {} + + + // Member Functions + + // The file handle + const MPI_File& handle() const noexcept { return fh_; } + MPI_File& handle() noexcept { return fh_; } + + // Path to the open file + const fileName& name() const noexcept { return name_; } + fileName& name() noexcept { return name_; } + + // Change the file state, return the old value + int state(states val) noexcept + { + int old(state_); + state_ = val; + return old; + } + + //- Is rank 0 ? (master rank) + bool master() const noexcept { return (rank_ == 0); } + + //- Get the associated rank + int rank() const noexcept { return rank_; } + + //- Set the associated rank + void rank(int val) noexcept { rank_ = val; } + + + // Checks + + // The file state + bool is_open() const noexcept { return state_; } + + // The file read state + bool is_read() const noexcept + { + return (states::READ == state_); + } + + // The file write atomic state + bool is_atomic() const noexcept + { + return (states::ATOMIC_WRITE == state_); + } + + // The file write state (atomic or non-atomic) + bool is_write() const noexcept + { + return (states::ATOMIC_WRITE == state_ || states::WRITE == state_); + } + + //- Assert is_read() or FatalError + inline bool checkReadable(const char* what) const + { + if (FOAM_UNLIKELY(!is_read())) + { + FatalErrorInFunction + << "File handler not open for reading '" << what << "'\n" + << "name: " << name() << nl + << Foam::exit(Foam::FatalError); + return false; + } + return true; + } + + //- Assert is_write() or FatalError + inline bool checkWritable(const char* what) const + { + if (FOAM_UNLIKELY(!is_write())) + { + FatalErrorInFunction + << "File handler not open for writing'" << what << "'\n" + << "name: " << name() << nl + << Foam::exit(Foam::FatalError); + return false; + } + return true; + } +}; + +} // End namespace Foam + + +// * * * * * * * * * * * * * Static Member Functions * * * * * * * * * * * * // + +bool Foam::UPstream::File::supported() +{ + return true; +} + + +// * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * // + +Foam::UPstream::File::File() +: + file_(new UPstream::File::Impl) +{} + + +// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * // + +Foam::UPstream::File::~File() +{ + if (FOAM_UNLIKELY(file_ && file_->is_open())) + { + WarningInFunction + << "Exited scope without close()" << nl + << " FIX YOUR CODE!!" << endl; + // Do not call close() since we don't know where that collective + // should have been called + } +} + + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +const Foam::fileName& Foam::UPstream::File::name() const +{ + return (file_ ? file_->name() : fileName::null); +} + + +bool Foam::UPstream::File::is_open() const +{ + return bool(file_ && file_->is_open()); +} + + +bool Foam::UPstream::File::close() +{ + if (FOAM_UNLIKELY(!file_->is_open())) + { + WarningInFunction + << "Called without an open file handler !" << endl; + return false; + } + + MPI_File_close(&(file_->handle())); + + // Atomic rename of file (master only) + const fileName& pathname = file_->name(); + + if (file_->master() && file_->is_atomic() && !pathname.empty()) + { + std::rename + ( + (pathname + "~tmp~").c_str(), + pathname.c_str() + ); + } + + file_->state(Impl::CLOSED); + file_->name() = ""; + file_->rank(0); + + return true; +} + + +// * * * * * * * * * * * * Member Functions (Reading) * * * * * * * * * * * // + +#if 0 +bool Foam::UPstream::File::open_read +( + const int communicator, + const fileName& pathname +) +{ + //Needed? PstreamGlobals::checkCommunicator(communicator, 0); + + if (FOAM_UNLIKELY(file_->is_open())) + { + WarningInFunction + << "Previous use of file handler did not call close()" << nl + << " FIX YOUR CODE!!" << endl; + // Do not call close() since we don't know where that collective + // should have been called + } + file_->state(Impl::CLOSED); + file_->name() = pathname; // <- set now for external error messages + file_->rank(0); + + int returnCode = MPI_File_open + ( + PstreamGlobals::MPICommunicators_[communicator], + pathname.c_str(), + (MPI_MODE_RDONLY), + MPI_INFO_NULL, + &(file_->handle()) + ); + + if (FOAM_UNLIKELY(MPI_SUCCESS != returnCode)) + { + FatalErrorInFunction + << "Error encounted in MPI_File_open() : " + << pathname << nl + << Foam::exit(Foam::FatalError); + + return false; + } + + file_->state(Impl::READ); + file_->name() = pathname; + file_->rank(UPstream::myProcNo(communicator)); + + return true; // ie, is_read() +} +#endif + + +// * * * * * * * * * * * * Non-Collective Reading * * * * * * * * * * * * * // + +#if 0 +bool Foam::UPstream::File::get_header(DynamicList& content) +{ + std::streamsize headerSize(4096); + + // constexpr const char* const func = "MPI_File_read_at"; + file_->checkReadable("MPI_File_read_at"); + + if (off_t fileLen = Foam::fileSize(this->name()); fileLen >= 0) + { + std::streamsize size = std::streamsize(fileLen); + if (headerSize > size) + { + headerSize = size; + } + } + else + { + content.clear(); + return false; + } + + // Get the first header content: + content.resize_nocopy(headerSize); + + int returnCode = Foam_mpiCall(MPI_File_read_at) + ( + file_->handle(), + 0, // offset + content.data(), + content.size(), + MPI_BYTE, + MPI_STATUS_IGNORE + ); + + // Wrap as ISpanStream headerStream(content); + if (MPI_SUCCESS == returnCode) + { + ISpanStream is(content); + dictionary headerDict; + + // Read the regular "FoamFile" header + bool ok = io.readHeader(headerDict, is); + + // Probably collated - extract class from "data.class" + if + ( + decomposedBlockData::isCollatedType(io) + && headerDict.readIfPresent("data.class", io.headerClassName()) + ) + { + return ok; + } + } + + return (MPI_SUCCESS == returnCode); +} +#endif + + +// * * * * * * * * * * * * Member Functions (Writing) * * * * * * * * * * * // + +bool Foam::UPstream::File::open_write +( + const int communicator, + const fileName& pathname, + IOstreamOption::atomicType atomicType +) +{ + //Needed? PstreamGlobals::checkCommunicator(communicator, 0); + + if (FOAM_UNLIKELY(file_->is_open())) + { + WarningInFunction + << "Previous use of file handler did not call close()" << nl + << " FIX YOUR CODE!!" << endl; + // Do not call close() since we don't know where that collective + // should have been called + } + file_->state(Impl::CLOSED); + file_->name() = pathname; // <- set now for external error messages + file_->rank(0); + + const bool atomic = (IOstreamOption::atomicType::ATOMIC == atomicType); + + // When opening new files, remove file variants out of the way. + // Eg, opening "file1" + // - remove old "file1.gz" (compressed) + // - also remove old "file1" if it is a symlink + + const fileName pathname_gz(pathname + ".gz"); + const fileName pathname_tmp(pathname + "~tmp~"); + + // File to open with MPI_File_open + const auto& target = (atomic ? pathname_tmp : pathname); + + // Remove old compressed version (if any) + if + ( + auto fType = Foam::type(pathname_gz, false); + (fType == fileName::SYMLINK || fType == fileName::FILE) + ) + { + Foam::rm(pathname_gz); + } + + // Avoid writing into symlinked files (non-append mode) + if + ( + auto fType = Foam::type(target, false); + fType == fileName::SYMLINK + ) + { + Foam::rm(target); + } + + int returnCode = MPI_File_open + ( + PstreamGlobals::MPICommunicators_[communicator], + target.c_str(), + (MPI_MODE_CREATE | MPI_MODE_WRONLY), + MPI_INFO_NULL, + &(file_->handle()) + ); + + if (FOAM_UNLIKELY(MPI_SUCCESS != returnCode)) + { + FatalErrorInFunction + << "Error encounted in MPI_File_open() : " + << target << nl + << Foam::exit(Foam::FatalError); + + return false; + } + + file_->state(atomic ? Impl::ATOMIC_WRITE : Impl::WRITE); + file_->name() = pathname; + file_->rank(UPstream::myProcNo(communicator)); + + return true; // ie, is_write() +} + + +bool Foam::UPstream::File::set_size(std::streamsize num_bytes) +{ + if (FOAM_UNLIKELY(!file_->is_open())) + { + WarningInFunction + << "Called without an open file handler !" << endl; + return false; + } + + int returnCode = MPI_File_set_size(file_->handle(), num_bytes); + + return (MPI_SUCCESS == returnCode); +} + + +// * * * * * * * * * * * * Non-Collective Writing * * * * * * * * * * * * * // + +bool Foam::UPstream::File::write_data +( + const void* data, + std::streamsize count, + const UPstream::dataTypes dataTypeId +) +{ + MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId); + + // constexpr const char* const func = "MPI_File_write"; + file_->checkWritable("MPI_File_write"); + checkCount(count, "MPI_File_write"); + + int returnCode = Foam_mpiCall(MPI_File_write) + ( + file_->handle(), + data, + count, + datatype, + MPI_STATUS_IGNORE + ); + + return (MPI_SUCCESS == returnCode); +} + + +bool Foam::UPstream::File::write_data_at +( + std::streamsize offset, + const void* data, + std::streamsize count, + const UPstream::dataTypes dataTypeId +) +{ + MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId); + + // constexpr const char* const func = "MPI_File_write_at"; + file_->checkWritable("MPI_File_write_at"); + checkCount(count, "MPI_File_write_at"); + + int returnCode = Foam_mpiCall(MPI_File_write_at) + ( + file_->handle(), + offset, + data, + count, + datatype, + MPI_STATUS_IGNORE + ); + + return (MPI_SUCCESS == returnCode); +} + + +// * * * * * * * * * * * * * Collective Writing * * * * * * * * * * * * * * // + +bool Foam::UPstream::File::write_data_all +( + const void* data, + std::streamsize count, + const UPstream::dataTypes dataTypeId +) +{ + MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId); + + // constexpr const char* const func = "MPI_File_write_all"; + file_->checkWritable("MPI_File_write_all"); + checkCount(count, "MPI_File_write_all"); + + int returnCode = Foam_mpiCall(MPI_File_write_all) + ( + file_->handle(), + data, + count, + datatype, + MPI_STATUS_IGNORE + ); + + return (MPI_SUCCESS == returnCode); +} + + +bool Foam::UPstream::File::write_data_at_all +( + std::streamsize offset, + const void* data, + std::streamsize count, + const UPstream::dataTypes dataTypeId +) +{ + MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId); + + // constexpr const char* const func = "MPI_File_write_at_all"; + file_->checkWritable("MPI_File_write_at_all"); + checkCount(count, "MPI_File_write_at_all"); + + int returnCode = Foam_mpiCall(MPI_File_write_at_all) + ( + file_->handle(), + offset, + data, + count, + datatype, + MPI_STATUS_IGNORE + ); + + return (MPI_SUCCESS == returnCode); +} + + +// bool Foam::UPstream::File::write_data_all_begin +// ( +// const void* data, +// std::streamsize count, +// const UPstream::dataTypes dataTypeId +// ) +// { +// MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId); +// +// // constexpr const char* const func = "MPI_File_write_all_begin"; +// file_->checkWritable("MPI_File_write_all_begin"); +// checkCount(count, "MPI_File_write_all_begin"); +// +// int returnCode = Foam_mpiCall(MPI_File_write_all_begin) +// ( +// file_->handle(), +// data, +// count, +// datatype, +// MPI_STATUS_IGNORE +// ); +// +// return (MPI_SUCCESS == returnCode); +// } + + +// bool Foam::UPstream::File::write_data_all_end +// ( +// const void* data +// ) +// { +// file_->checkWritable("MPI_File_write_all_end"); +// int returnCode = Foam_mpiCall(MPI_File_write_all_end) +// ( +// file_->handle(), +// data +// MPI_STATUS_IGNORE +// ); +// +// return (MPI_SUCCESS == returnCode); +// } + + +// bool Foam::UPstream::File::write_data_at_all_begin +// ( +// std::streamsize offset, +// const void* data, +// std::streamsize count, +// const UPstream::dataTypes dataTypeId +// ) +// { +// MPI_Datatype datatype = PstreamGlobals::getDataType(dataTypeId); +// +// // constexpr const char* const func = "MPI_File_write_at_all_begin"; +// file_->checkWritable("MPI_File_write_at_all_begin"); +// checkCount(count, "MPI_File_write_at_all_begin"); +// +// int returnCode = Foam_mpiCall(MPI_File_write_at_all_begin) +// ( +// file_->handle(), +// offset, +// data, +// count, +// datatype, +// MPI_STATUS_IGNORE +// ); +// +// return (MPI_SUCCESS == returnCode); +// } + + +// bool Foam::UPstream::File::write_data_at_all_end +// ( +// const void* data +// ) +// { +// file_->checkWritable("MPI_File_write_at_all_end"); +// +// int returnCode = Foam_mpiCall(MPI_File_write_at_all_end) +// ( +// file_->handle(), +// data +// MPI_STATUS_IGNORE +// ); +// +// return (MPI_SUCCESS == returnCode); +// } + + +// ************************************************************************* //