collatedFileOperation: preferentially collect all data in the simulation thread

so the write thread does not have to do any parallel communication.  This avoids
the bugs in the threading support in OpenMPI.

Patch contributed by Mattijs Janssens
Resolves bug-report https://bugs.openfoam.org/view.php?id=2669
This commit is contained in:
Henry Weller
2017-10-29 08:38:31 +00:00
parent 16416c79ec
commit 197d9d3bf2
3 changed files with 226 additions and 14 deletions

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
\\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -92,6 +92,36 @@ void Foam::UPstream::allToAll
}
void Foam::UPstream::gather
(
const char* sendData,
int sendSize,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator
)
{
memmove(recvData, sendData, sendSize);
}
void Foam::UPstream::scatter
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
int recvSize,
const label communicator
)
{
memmove(recvData, sendData, recvSize);
}
void Foam::UPstream::allocatePstreamCommunicator
(
const label,

View File

@ -146,6 +146,4 @@ bool Foam::UOPstream::write
}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// ************************************************************************* //

View File

@ -2,7 +2,7 @@
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
\\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation
\\/ M anipulation |
-------------------------------------------------------------------------------
License
@ -92,16 +92,7 @@ bool Foam::UPstream::init(int& argc, char**& argv)
// Initialise parallel structure
setParRun(numprocs);
if (Pstream::master() && provided_thread_support != MPI_THREAD_MULTIPLE)
{
WarningInFunction
<< "mpi does not seem to have thread support."
<< " There might be issues with e.g. threaded IO"
<< endl;
}
setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
#ifndef SGIMPI
string bufferSizeName = getEnv("MPI_BUFFER_SIZE");
@ -362,6 +353,199 @@ void Foam::UPstream::allToAll
}
void Foam::UPstream::allToAll
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator
)
{
label np = nProcs(communicator);
if
(
sendSizes.size() != np
|| sendOffsets.size() != np
|| recvSizes.size() != np
|| recvOffsets.size() != np
)
{
FatalErrorInFunction
<< "Size of sendSize " << sendSizes.size()
<< ", sendOffsets " << sendOffsets.size()
<< ", recvSizes " << recvSizes.size()
<< " or recvOffsets " << recvOffsets.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
if (recvSizes[0] != sendSizes[0])
{
FatalErrorInFunction
<< "Bytes to send " << sendSizes[0]
<< " does not equal bytes to receive " << recvSizes[0]
<< Foam::abort(FatalError);
}
memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]);
}
else
{
if
(
MPI_Alltoallv
(
sendData,
sendSizes.begin(),
sendOffsets.begin(),
MPI_BYTE,
recvData,
recvSizes.begin(),
recvOffsets.begin(),
MPI_BYTE,
PstreamGlobals::MPICommunicators_[communicator]
)
)
{
FatalErrorInFunction
<< "MPI_Alltoallv failed for sendSizes " << sendSizes
<< " recvSizes " << recvSizes
<< " communicator " << communicator
<< Foam::abort(FatalError);
}
}
}
void Foam::UPstream::gather
(
const char* sendData,
int sendSize,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator
)
{
label np = nProcs(communicator);
if
(
UPstream::master(communicator)
&& (recvSizes.size() != np || recvOffsets.size() < np)
)
{
// Note: allow recvOffsets to be e.g. 1 larger than np so we
// can easily loop over the result
FatalErrorInFunction
<< "Size of recvSizes " << recvSizes.size()
<< " or recvOffsets " << recvOffsets.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
memmove(recvData, sendData, sendSize);
}
else
{
if
(
MPI_Gatherv
(
sendData,
sendSize,
MPI_BYTE,
recvData,
recvSizes.begin(),
recvOffsets.begin(),
MPI_BYTE,
0,
MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
)
)
{
FatalErrorInFunction
<< "MPI_Gatherv failed for sendSize " << sendSize
<< " recvSizes " << recvSizes
<< " communicator " << communicator
<< Foam::abort(FatalError);
}
}
}
void Foam::UPstream::scatter
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
int recvSize,
const label communicator
)
{
label np = nProcs(communicator);
if
(
UPstream::master(communicator)
&& (sendSizes.size() != np || sendOffsets.size() != np)
)
{
FatalErrorInFunction
<< "Size of sendSizes " << sendSizes.size()
<< " or sendOffsets " << sendOffsets.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
memmove(recvData, sendData, recvSize);
}
else
{
if
(
MPI_Scatterv
(
sendData,
sendSizes.begin(),
sendOffsets.begin(),
MPI_BYTE,
recvData,
recvSize,
MPI_BYTE,
0,
MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
)
)
{
FatalErrorInFunction
<< "MPI_Scatterv failed for sendSizes " << sendSizes
<< " sendOffsets " << sendOffsets
<< " communicator " << communicator
<< Foam::abort(FatalError);
}
}
}
void Foam::UPstream::allocatePstreamCommunicator
(
const label parentIndex,