/*---------------------------------------------------------------------------*\ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | \\ / A nd | Copyright (C) 2011-2013 OpenFOAM Foundation \\/ M anipulation | ------------------------------------------------------------------------------- License This file is part of OpenFOAM. OpenFOAM is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. OpenFOAM is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with OpenFOAM. If not, see . \*---------------------------------------------------------------------------*/ #include "mpi.h" #include "UPstream.H" #include "PstreamReduceOps.H" #include "OSspecific.H" #include "PstreamGlobals.H" #include "SubList.H" #include "allReduce.H" #include #include #include #if defined(WM_SP) # define MPI_SCALAR MPI_FLOAT #elif defined(WM_DP) # define MPI_SCALAR MPI_DOUBLE #endif // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // NOTE: // valid parallel options vary between implementations, but flag common ones. // if they are not removed by MPI_Init(), the subsequent argument processing // will notice that they are wrong void Foam::UPstream::addValidParOptions(HashTable& validParOptions) { validParOptions.insert("np", ""); validParOptions.insert("p4pg", "PI file"); validParOptions.insert("p4wd", "directory"); validParOptions.insert("p4amslave", ""); validParOptions.insert("p4yourname", "hostname"); validParOptions.insert("machinefile", "machine file"); } bool Foam::UPstream::init(int& argc, char**& argv) { MPI_Init(&argc, &argv); int numprocs; MPI_Comm_size(MPI_COMM_WORLD, &numprocs); int myRank; MPI_Comm_rank(MPI_COMM_WORLD, &myRank); if (debug&1) { Pout<< "UPstream::init : initialised with numProcs:" << numprocs << " myRank:" << myRank << endl; } if (numprocs <= 1) { FatalErrorIn("UPstream::init(int& argc, char**& argv)") << "bool IPstream::init(int& argc, char**& argv) : " "attempt to run parallel on 1 processor" << Foam::abort(FatalError); } // Initialise parallel structure setParRun(numprocs); # ifndef SGIMPI string bufferSizeName = getEnv("MPI_BUFFER_SIZE"); if (bufferSizeName.size()) { int bufferSize = atoi(bufferSizeName.c_str()); if (bufferSize) { MPI_Buffer_attach(new char[bufferSize], bufferSize); } } else { FatalErrorIn("UPstream::init(int& argc, char**& argv)") << "UPstream::init(int& argc, char**& argv) : " << "environment variable MPI_BUFFER_SIZE not defined" << Foam::abort(FatalError); } # endif //int processorNameLen; //char processorName[MPI_MAX_PROCESSOR_NAME]; // //MPI_Get_processor_name(processorName, &processorNameLen); //processorName[processorNameLen] = '\0'; //Pout<< "Processor name:" << processorName << endl; return true; } void Foam::UPstream::exit(int errnum) { if (debug&1) { Pout<< "UPstream::exit." << endl; } # ifndef SGIMPI int size; char* buff; MPI_Buffer_detach(&buff, &size); delete[] buff; # endif if (PstreamGlobals::outstandingRequests_.size()) { label n = PstreamGlobals::outstandingRequests_.size(); PstreamGlobals::outstandingRequests_.clear(); WarningIn("UPstream::exit(int)") << "There are still " << n << " outstanding MPI_Requests." << endl << "This means that your code exited before doing a" << " UPstream::waitRequests()." << endl << "This should not happen for a normal code exit." << endl; } // Clean mpi communicators forAll(myProcNo_, communicator) { if (myProcNo_[communicator] != -1) { freePstreamCommunicator(communicator); } } if (errnum == 0) { MPI_Finalize(); ::exit(errnum); } else { MPI_Abort(MPI_COMM_WORLD, errnum); } } void Foam::UPstream::abort() { MPI_Abort(MPI_COMM_WORLD, 1); } void Foam::reduce ( scalar& Value, const sumOp& bop, const int tag, const label communicator ) { if (UPstream::warnComm != -1 && communicator != UPstream::warnComm) { Pout<< "** reducing:" << Value << " with comm:" << communicator << endl; error::printStack(Pout); } allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator); } void Foam::reduce ( scalar& Value, const minOp& bop, const int tag, const label communicator ) { if (UPstream::warnComm != -1 && communicator != UPstream::warnComm) { Pout<< "** reducing:" << Value << " with comm:" << communicator << endl; error::printStack(Pout); } allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator); } void Foam::reduce ( vector2D& Value, const sumOp& bop, const int tag, const label communicator ) { if (UPstream::warnComm != -1 && communicator != UPstream::warnComm) { Pout<< "** reducing:" << Value << " with comm:" << communicator << endl; error::printStack(Pout); } allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator); } void Foam::sumReduce ( scalar& Value, label& Count, const int tag, const label communicator ) { if (UPstream::warnComm != -1 && communicator != UPstream::warnComm) { Pout<< "** reducing:" << Value << " with comm:" << communicator << endl; error::printStack(Pout); } vector2D twoScalars(Value, scalar(Count)); reduce(twoScalars, sumOp(), tag, communicator); Value = twoScalars.x(); Count = twoScalars.y(); } void Foam::reduce ( scalar& Value, const sumOp& bop, const int tag, const label communicator, label& requestID ) { #ifdef MPIX_COMM_TYPE_SHARED // Assume mpich2 with non-blocking collectives extensions. Once mpi3 // is available this will change. MPI_Request request; scalar v = Value; MPIX_Ireduce ( &v, &Value, 1, MPI_SCALAR, MPI_SUM, 0, //root PstreamGlobals::MPICommunicators_[communicator], &request ); requestID = PstreamGlobals::outstandingRequests_.size(); PstreamGlobals::outstandingRequests_.append(request); #else // Non-blocking not yet implemented in mpi reduce(Value, bop, tag, communicator); requestID = -1; #endif } void Foam::UPstream::allocatePstreamCommunicator ( const label parentIndex, const label index ) { if (index == PstreamGlobals::MPIGroups_.size()) { // Extend storage with dummy values MPI_Group newGroup; PstreamGlobals::MPIGroups_.append(newGroup); MPI_Comm newComm; PstreamGlobals::MPICommunicators_.append(newComm); } else if (index > PstreamGlobals::MPIGroups_.size()) { FatalErrorIn ( "UPstream::allocatePstreamCommunicator\n" "(\n" " const label parentIndex,\n" " const labelList& subRanks\n" ")\n" ) << "PstreamGlobals out of sync with UPstream data. Problem." << Foam::exit(FatalError); } if (parentIndex == -1) { // Allocate world communicator //std::cout // << "MPI : Allocating world communicator at index " << index // << std::endl; if (index != UPstream::worldComm) { FatalErrorIn ( "UPstream::allocateCommunicator\n" "(\n" " const label parentIndex,\n" " const labelList& subRanks\n" ")\n" ) << "world communicator should always be index " << UPstream::worldComm << Foam::exit(FatalError); } PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD; MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]); MPI_Comm_rank ( PstreamGlobals::MPICommunicators_[index], &myProcNo_[index] ); // Set the number of processes to the actual number int numProcs; MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs); procIDs_[index] = identity(numProcs); } else { //std::cout // << "MPI : Allocating new communicator at index " << index // << " from parent " << parentIndex // << std::endl; // Create new group MPI_Group_incl ( PstreamGlobals::MPIGroups_[parentIndex], procIDs_[index].size(), procIDs_[index].begin(), &PstreamGlobals::MPIGroups_[index] ); //std::cout // << "MPI : New group " << long(PstreamGlobals::MPIGroups_[index]) // << std::endl; // Create new communicator MPI_Comm_create ( PstreamGlobals::MPICommunicators_[parentIndex], PstreamGlobals::MPIGroups_[index], &PstreamGlobals::MPICommunicators_[index] ); if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL) { //std::cout // << "MPI : NULL : not in group" // << std::endl; myProcNo_[index] = -1; } else { //std::cout // << "MPI : New comm " // << long(PstreamGlobals::MPICommunicators_[index]) // << std::endl; MPI_Comm_rank ( PstreamGlobals::MPICommunicators_[index], &myProcNo_[index] ); } } //std::cout<< "MPI : I am rank " << myProcNo_[index] << std::endl; } void Foam::UPstream::freePstreamCommunicator(const label communicator) { if (communicator != UPstream::worldComm) { if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL) { MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]); } MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]); } } Foam::label Foam::UPstream::nRequests() { return PstreamGlobals::outstandingRequests_.size(); } void Foam::UPstream::resetRequests(const label i) { if (i < PstreamGlobals::outstandingRequests_.size()) { PstreamGlobals::outstandingRequests_.setSize(i); } } void Foam::UPstream::waitRequests(const label start) { if (debug&1) { Pout<< "UPstream::waitRequests : starting wait for " << PstreamGlobals::outstandingRequests_.size()-start << " outstanding requests starting at " << start << endl; } if (PstreamGlobals::outstandingRequests_.size()) { SubList waitRequests ( PstreamGlobals::outstandingRequests_, PstreamGlobals::outstandingRequests_.size() - start, start ); if ( MPI_Waitall ( waitRequests.size(), waitRequests.begin(), MPI_STATUSES_IGNORE ) ) { FatalErrorIn ( "UPstream::waitRequests()" ) << "MPI_Waitall returned with error" << Foam::endl; } resetRequests(start); } if (debug&1) { Pout<< "UPstream::waitRequests : finished wait." << endl; } } void Foam::UPstream::waitRequest(const label i) { if (debug&1) { Pout<< "UPstream::waitRequest : starting wait for request:" << i << endl; } if (i >= PstreamGlobals::outstandingRequests_.size()) { FatalErrorIn ( "UPstream::waitRequest(const label)" ) << "There are " << PstreamGlobals::outstandingRequests_.size() << " outstanding send requests and you are asking for i=" << i << nl << "Maybe you are mixing blocking/non-blocking comms?" << Foam::abort(FatalError); } if ( MPI_Wait ( &PstreamGlobals::outstandingRequests_[i], MPI_STATUS_IGNORE ) ) { FatalErrorIn ( "UPstream::waitRequest()" ) << "MPI_Wait returned with error" << Foam::endl; } if (debug&1) { Pout<< "UPstream::waitRequest : finished wait for request:" << i << endl; } } bool Foam::UPstream::finishedRequest(const label i) { if (debug&1) { Pout<< "UPstream::waitRequests : checking finishedRequest:" << i << endl; } if (i >= PstreamGlobals::outstandingRequests_.size()) { FatalErrorIn ( "UPstream::finishedRequest(const label)" ) << "There are " << PstreamGlobals::outstandingRequests_.size() << " outstanding send requests and you are asking for i=" << i << nl << "Maybe you are mixing blocking/non-blocking comms?" << Foam::abort(FatalError); } int flag; MPI_Test ( &PstreamGlobals::outstandingRequests_[i], &flag, MPI_STATUS_IGNORE ); if (debug&1) { Pout<< "UPstream::waitRequests : finished finishedRequest:" << i << endl; } return flag != 0; } // ************************************************************************* //