diff --git a/applications/test/parallel-nonBlocking/Make/files b/applications/test/parallel-nonBlocking/Make/files new file mode 100644 index 0000000000..f35c700ebd --- /dev/null +++ b/applications/test/parallel-nonBlocking/Make/files @@ -0,0 +1,3 @@ +Test-parallel-nonBlocking.C + +EXE = $(FOAM_USER_APPBIN)/Test-parallel-nonBlocking diff --git a/applications/test/parallel-nonBlocking/Make/options b/applications/test/parallel-nonBlocking/Make/options new file mode 100644 index 0000000000..e69de29bb2 diff --git a/applications/test/parallel-nonBlocking/Test-parallel-nonBlocking.C b/applications/test/parallel-nonBlocking/Test-parallel-nonBlocking.C new file mode 100644 index 0000000000..5654db9658 --- /dev/null +++ b/applications/test/parallel-nonBlocking/Test-parallel-nonBlocking.C @@ -0,0 +1,207 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | Copyright (C) 2012 OpenFOAM Foundation + \\/ M anipulation | +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see . + +Application + Test-parallel-nonBlocking + +Description + Test for various non-blocking parallel routines. + +\*---------------------------------------------------------------------------*/ + +#include "List.H" +#include "mapDistribute.H" +#include "argList.H" +#include "Time.H" +#include "IPstream.H" +#include "OPstream.H" +#include "vector.H" +#include "IOstreams.H" +#include "Random.H" +#include "Tuple2.H" +#include "PstreamBuffers.H" + +using namespace Foam; + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +int main(int argc, char *argv[]) +{ + +# include "setRootCase.H" +# include "createTime.H" + + + // Test PstreamBuffers + // ~~~~~~~~~~~~~~~~~~~ + if (false) + { + Perr<< "\nStarting transfers\n" << endl; + + vector data + ( + Pstream::myProcNo(), + Pstream::myProcNo(), + Pstream::myProcNo() + ); + + PstreamBuffers pBufs(Pstream::nonBlocking); + + if (Pstream::myProcNo() != Pstream::masterNo()) + { + Perr<< "slave sending to master " + << Pstream::masterNo() << endl; + UOPstream toMaster(Pstream::masterNo(), pBufs); + toMaster << data; + } + + // Start sending and receiving and block + pBufs.finishedSends(); + + // Consume + DynamicList allData; + if (Pstream::myProcNo() == Pstream::masterNo()) + { + // Collect my own data + allData.append(data); + + for + ( + int slave=Pstream::firstSlave(); + slave<=Pstream::lastSlave(); + slave++ + ) + { + Perr << "master receiving from slave " << slave << endl; + UIPstream fromSlave(slave, pBufs); + allData.append(vector(fromSlave)); + } + } + + + // Send allData back + PstreamBuffers pBufs2(Pstream::nonBlocking); + if (Pstream::myProcNo() == Pstream::masterNo()) + { + for + ( + int slave=Pstream::firstSlave(); + slave<=Pstream::lastSlave(); + slave++ + ) + { + Perr << "master sending to slave " << slave << endl; + UOPstream toSlave(slave, pBufs2); + toSlave << allData; + } + } + + // Start sending and receiving and block + pBufs2.finishedSends(); + + // Consume + if (Pstream::myProcNo() != Pstream::masterNo()) + { + Perr<< "slave receiving from master " + << Pstream::masterNo() << endl; + UIPstream fromMaster(Pstream::masterNo(), pBufs2); + fromMaster >> allData; + Perr<< allData << endl; + } + } + + + // Test non-blocking reductions + // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + scalar data1 = 1.0; + label request1 = -1; + { + Foam::reduce(data1, sumOp(), Pstream::msgType(), request1); + } + + scalar data2 = 0.1; + label request2 = -1; + { + Foam::reduce(data2, sumOp(), Pstream::msgType(), request2); + } + + + // Do a non-blocking send inbetween + { + PstreamBuffers pBufs(Pstream::nonBlocking); + + for (label procI = 0; procI < Pstream::nProcs(); procI++) + { + UOPstream toProc(procI, pBufs); + toProc << Pstream::myProcNo(); + } + + // Start sending and receiving and block + pBufs.finishedSends(); + + // Consume + for (label procI = 0; procI < Pstream::nProcs(); procI++) + { + UIPstream fromProc(procI, pBufs); + label data; + fromProc >> data; + + if (data != procI) + { + FatalErrorIn(args.executable()) + << "From processor " << procI << " received " << data + << " but expected " << procI + << exit(FatalError); + } + } + } + + + if (request1 != -1) + { + Pout<< "Waiting for non-blocking reduce with request " << request1 + << endl; + Pstream::waitRequest(request1); + } + Info<< "Reduced data1:" << data1 << endl; + + if (request2 != -1) + { + Pout<< "Waiting for non-blocking reduce with request " << request1 + << endl; + Pstream::waitRequest(request2); + } + Info<< "Reduced data2:" << data2 << endl; + + + // Clear any outstanding requests + Pstream::resetRequests(0); + + Info<< "End\n" << endl; + + return 0; +} + + +// ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H index 8a89c80097..b04e77a514 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H @@ -96,6 +96,20 @@ T returnReduce } +// Non-blocking version of reduce. Sets request. +template +void reduce +( + T& Value, + const BinaryOp& bop, + const int tag, + label& request +) +{ + notImplemented("reduce(T&, const BinaryOp&, const int, label&"); +} + + // Insist there is a specialisation for the sum reduction of scalar(s) void reduce ( @@ -111,6 +125,14 @@ void reduce const int tag = Pstream::msgType() ); +void reduce +( + scalar& Value, + const sumOp& bop, + const int tag, + label& request +); + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // diff --git a/src/Pstream/dummy/UPstream.C b/src/Pstream/dummy/UPstream.C index 687b606558..3c1bd813c6 100644 --- a/src/Pstream/dummy/UPstream.C +++ b/src/Pstream/dummy/UPstream.C @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2012 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2011-2012 OpenFOAM Foundation \\/ M anipulation | ------------------------------------------------------------------------------- License @@ -63,6 +63,10 @@ void Foam::reduce(vector2D&, const sumOp&, const int) {} +void Foam::reduce(scalar&, const sumOp&, const int, label&) +{} + + Foam::label Foam::UPstream::nRequests() { return 0; diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C index 3ab7e1c9f1..11d654b61f 100644 --- a/src/Pstream/mpi/UPstream.C +++ b/src/Pstream/mpi/UPstream.C @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2012 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2011-2012 OpenFOAM Foundation \\/ M anipulation | ------------------------------------------------------------------------------- License @@ -455,6 +455,41 @@ void Foam::reduce(vector2D& Value, const sumOp& bop, const int tag) } +void Foam::reduce +( + scalar& Value, + const sumOp& bop, + const int tag, + label& requestID +) +{ +#ifdef MPIX_COMM_TYPE_SHARED + // Assume mpich2 with non-blocking collectives extensions. Once mpi3 + // is available this will change. + MPI_Request request; + scalar v = Value; + MPIX_Ireduce + ( + &v, + &Value, + 1, + MPI_SCALAR, + MPI_SUM, + 0, //root + MPI_COMM_WORLD, + &request + ); + + requestID = PstreamGlobals::outstandingRequests_.size(); + PstreamGlobals::outstandingRequests_.append(request); +#else + // Non-blocking not yet implemented in mpi + reduce(Value, bop, tag); + requestID = -1; +#endif +} + + Foam::label Foam::UPstream::nRequests() { return PstreamGlobals::outstandingRequests_.size();