From 0a4007c55b556d6fd3525bd0666e7e3df401a70b Mon Sep 17 00:00:00 2001 From: Steve Plimpton Date: Fri, 26 Oct 2018 17:37:50 -0600 Subject: [PATCH] add parallel file read capability to ReadDump --- src/read_dump.cpp | 946 ++++++++++++++++++++++++++++++++-------------- src/read_dump.h | 47 ++- src/rerun.cpp | 2 +- 3 files changed, 689 insertions(+), 306 deletions(-) diff --git a/src/read_dump.cpp b/src/read_dump.cpp index 30934f123b..620e84e910 100644 --- a/src/read_dump.cpp +++ b/src/read_dump.cpp @@ -43,8 +43,7 @@ using namespace LAMMPS_NS; -#define CHUNK 1024 -#define EPSILON 1.0e-6 +#define CHUNK 16384 // also in reader_native.cpp @@ -65,17 +64,21 @@ ReadDump::ReadDump(LAMMPS *lmp) : Pointers(lmp) nfile = 0; files = NULL; + nnew = maxnew = 0; nfield = 0; fieldtype = NULL; fieldlabel = NULL; fields = NULL; + buf = NULL; int n = strlen("native") + 1; readerstyle = new char[n]; strcpy(readerstyle,"native"); - reader = NULL; - fp = NULL; + nreader = 0; + readers = NULL; + nsnapatoms = NULL; + clustercomm = MPI_COMM_NULL; } /* ---------------------------------------------------------------------- */ @@ -90,7 +93,13 @@ ReadDump::~ReadDump() delete [] readerstyle; memory->destroy(fields); - delete reader; + memory->destroy(buf); + + for (int i = 0; i < nreader; i++) delete readers[i]; + delete [] readers; + delete [] nsnapatoms; + + MPI_Comm_free(&clustercomm); } /* ---------------------------------------------------------------------- */ @@ -134,13 +143,20 @@ void ReadDump::command(int narg, char **arg) bigint natoms_prev = atom->natoms; atoms(); - if (me == 0) reader->close_file(); + if (filereader) + for (int i = 0; i < nreader; i++) + readers[i]->close_file(); // print out stats - bigint npurge_all,nreplace_all,ntrim_all,nadd_all; + bigint nsnap_all,npurge_all,nreplace_all,ntrim_all,nadd_all; + + bigint tmp = 0; + if (filereader) + for (int i = 0; i < nreader; i++) + tmp += nsnapatoms[i]; + MPI_Allreduce(&tmp,&nsnap_all,1,MPI_LMP_BIGINT,MPI_SUM,world); - bigint tmp; tmp = npurge; MPI_Allreduce(&tmp,&npurge_all,1,MPI_LMP_BIGINT,MPI_SUM,world); tmp = nreplace; @@ -155,7 +171,7 @@ void ReadDump::command(int narg, char **arg) if (me == 0) { if (screen) { fprintf(screen," " BIGINT_FORMAT " atoms before read\n",natoms_prev); - fprintf(screen," " BIGINT_FORMAT " atoms in snapshot\n",nsnapatoms); + fprintf(screen," " BIGINT_FORMAT " atoms in snapshot\n",nsnap_all); fprintf(screen," " BIGINT_FORMAT " atoms purged\n",npurge_all); fprintf(screen," " BIGINT_FORMAT " atoms replaced\n",nreplace_all); fprintf(screen," " BIGINT_FORMAT " atoms trimmed\n",ntrim_all); @@ -164,7 +180,7 @@ void ReadDump::command(int narg, char **arg) } if (logfile) { fprintf(logfile," " BIGINT_FORMAT " atoms before read\n",natoms_prev); - fprintf(logfile," " BIGINT_FORMAT " atoms in snapshot\n",nsnapatoms); + fprintf(logfile," " BIGINT_FORMAT " atoms in snapshot\n",nsnap_all); fprintf(logfile," " BIGINT_FORMAT " atoms purged\n",npurge_all); fprintf(logfile," " BIGINT_FORMAT " atoms replaced\n",nreplace_all); fprintf(logfile," " BIGINT_FORMAT " atoms trimmed\n",ntrim_all); @@ -181,10 +197,22 @@ void ReadDump::store_files(int nstr, char **str) nfile = nstr; files = new char*[nfile]; + // either all or none of files must have '%' wild-card + for (int i = 0; i < nfile; i++) { int n = strlen(str[i]) + 1; files[i] = new char[n]; strcpy(files[i],str[i]); + + if (i == 0) { + if (strchr(files[i],'%')) multiproc = 1; + else multiproc = 0; + } else { + if (multiproc && !strchr(files[i],'%')) + error->all(FLERR,"All read_dump files must be serial or parallel"); + if (!multiproc && strchr(files[i],'%')) + error->all(FLERR,"All read_dump files must be serial or parallel"); + } } } @@ -192,18 +220,47 @@ void ReadDump::store_files(int nstr, char **str) void ReadDump::setup_reader(int narg, char **arg) { - // allocate snapshot field buffer + // setup serial or parallel file reading + // multiproc = 0: only one file to read from, only proc 0 is a reader + // multiproc_nfile >= nprocs: every proc reads one or more files + // multiproc_nfile < nprocs: multiproc_nfile readers, create clusters + // see read_dump.h for explanation of these variables - memory->create(fields,CHUNK,nfield,"read_dump:fields"); + if (multiproc == 0) { + nreader = 1; + firstfile = -1; + MPI_Comm_dup(world,&clustercomm); + } else if (multiproc_nfile >= nprocs) { + firstfile = static_cast ((bigint) me * multiproc_nfile/nprocs); + int lastfile = static_cast ((bigint) (me+1) * multiproc_nfile/nprocs); + nreader = lastfile - firstfile; + MPI_Comm_split(world,me,0,&clustercomm); + } else if (multiproc_nfile < nprocs) { + nreader = 1; + int icluster = static_cast ((bigint) me * multiproc_nfile/nprocs); + firstfile = icluster; + MPI_Comm_split(world,icluster,0,&clustercomm); + } - // create reader class + MPI_Comm_rank(clustercomm,&me_cluster); + MPI_Comm_size(clustercomm,&nprocs_cluster); + if (me_cluster == 0) filereader = 1; + else filereader = 0; + + readers = new Reader*[nreader]; + nsnapatoms = new bigint[nreader]; + + // create Nreader reader classes per reader // match readerstyle to options in style_reader.h if (0) return; // dummy line to enable else-if macro expansion #define READER_CLASS #define ReaderStyle(key,Class) \ - else if (strcmp(readerstyle,#key) == 0) reader = new Class(lmp); + else if (strcmp(readerstyle,#key) == 0) { \ + for (int i = 0; i < nreader; i++) \ + readers[i] = new Class(lmp); \ + } #include "style_reader.h" #undef READER_CLASS @@ -211,9 +268,11 @@ void ReadDump::setup_reader(int narg, char **arg) else error->all(FLERR,"Unknown dump reader style"); - // pass any arguments to reader + // pass any arguments to readers - if (narg > 0) reader->settings(narg,arg); + if (narg > 0 && filereader) + for (int i = 0; i < nreader; i++) + readers[i]->settings(narg,arg); } /* ---------------------------------------------------------------------- @@ -228,6 +287,8 @@ bigint ReadDump::seek(bigint nrequest, int exact) int ifile,eofflag; bigint ntimestep; + // proc 0 finds the timestep in its first reader + if (me == 0) { // exit file loop when dump timestep >= nrequest @@ -235,24 +296,75 @@ bigint ReadDump::seek(bigint nrequest, int exact) for (ifile = 0; ifile < nfile; ifile++) { ntimestep = -1; - reader->open_file(files[ifile]); + if (multiproc) { + char *ptr = strchr(files[ifile],'%'); + char *multiname = new char[strlen(files[ifile]) + 16]; + *ptr = '\0'; + sprintf(multiname,"%s%d%s",files[ifile],0,ptr+1); + *ptr = '%'; + readers[0]->open_file(multiname); + delete [] multiname; + } else readers[0]->open_file(files[ifile]); + while (1) { - eofflag = reader->read_time(ntimestep); + eofflag = readers[0]->read_time(ntimestep); if (eofflag) break; if (ntimestep >= nrequest) break; - reader->skip(); + readers[0]->skip(); } + if (ntimestep >= nrequest) break; - reader->close_file(); + readers[0]->close_file(); } currentfile = ifile; if (ntimestep < nrequest) ntimestep = -1; if (exact && ntimestep != nrequest) ntimestep = -1; - if (ntimestep < 0) reader->close_file(); } + // proc 0 broadcasts timestep and currentfile to all procs + MPI_Bcast(&ntimestep,1,MPI_LMP_BIGINT,0,world); + MPI_Bcast(¤tfile,1,MPI_INT,0,world); + + // if ntimestep < 0: + // all filereader procs close all their files and return + + if (ntimestep < 0) { + if (filereader) + for (int i = 0; i < nreader; i++) + readers[i]->close_file(); + return ntimestep; + } + + // for multiproc mode: + // all filereader procs search for same ntimestep in currentfile + + if (multiproc && filereader) { + for (int i = 0; i < nreader; i++) { + if (me == 0 && i == 0) continue; // proc 0, reader 0 already found it + char *ptr = strchr(files[currentfile],'%'); + char *multiname = new char[strlen(files[currentfile]) + 16]; + *ptr = '\0'; + sprintf(multiname,"%s%d%s",files[currentfile],firstfile+i,ptr+1); + *ptr = '%'; + readers[i]->open_file(multiname); + delete [] multiname; + + bigint step; + while (1) { + eofflag = readers[i]->read_time(step); + if (eofflag) break; + if (step == ntimestep) break; + readers[i]->skip(); + } + + if (eofflag) + error->one(FLERR,"Read dump parallel files " + "do not all have same timestep"); + } + } + return ntimestep; } @@ -270,6 +382,8 @@ bigint ReadDump::next(bigint ncurrent, bigint nlast, int nevery, int nskip) int ifile,eofflag; bigint ntimestep; + // proc 0 finds the timestep in its first reader + if (me == 0) { // exit file loop when dump timestep matches all criteria @@ -279,19 +393,34 @@ bigint ReadDump::next(bigint ncurrent, bigint nlast, int nevery, int nskip) for (ifile = currentfile; ifile < nfile; ifile++) { ntimestep = -1; - if (ifile != currentfile) reader->open_file(files[ifile]); + if (ifile != currentfile) { + if (multiproc) { + char *ptr = strchr(files[ifile],'%'); + char *multiname = new char[strlen(files[ifile]) + 16]; + *ptr = '\0'; + sprintf(multiname,"%s%d%s",files[ifile],0,ptr+1); + *ptr = '%'; + readers[0]->open_file(multiname); + delete [] multiname; + } else readers[0]->open_file(files[ifile]); + } + while (1) { - eofflag = reader->read_time(ntimestep); + eofflag = readers[0]->read_time(ntimestep); + if (eofflag) break; + if (ntimestep > nlast) break; + if (ntimestep <= ncurrent) { + readers[0]->skip(); + continue; + } if (iskip == nskip) iskip = 0; iskip++; - if (eofflag) break; - if (ntimestep <= ncurrent) break; - if (ntimestep > nlast) break; - if (nevery && ntimestep % nevery) reader->skip(); - else if (iskip < nskip) reader->skip(); + if (nevery && ntimestep % nevery) readers[0]->skip(); + else if (iskip < nskip) readers[0]->skip(); else break; } - if (eofflag) reader->close_file(); + + if (eofflag) readers[0]->close_file(); else break; } @@ -299,10 +428,50 @@ bigint ReadDump::next(bigint ncurrent, bigint nlast, int nevery, int nskip) if (eofflag) ntimestep = -1; if (ntimestep <= ncurrent) ntimestep = -1; if (ntimestep > nlast) ntimestep = -1; - if (ntimestep < 0) reader->close_file(); } + // proc 0 broadcasts timestep and currentfile to all procs + MPI_Bcast(&ntimestep,1,MPI_LMP_BIGINT,0,world); + MPI_Bcast(¤tfile,1,MPI_INT,0,world); + + // if ntimestep < 0: + // all filereader procs close all their files and return + + if (ntimestep < 0) { + for (int i = 0; i < nreader; i++) + readers[i]->close_file(); + return ntimestep; + } + + // for multiproc mode: + // all filereader procs search for same ntimestep in currentfile + + if (multiproc && filereader) { + for (int i = 0; i < nreader; i++) { + if (me == 0 && i == 0) continue; + char *ptr = strchr(files[currentfile],'%'); + char *multiname = new char[strlen(files[currentfile]) + 16]; + *ptr = '\0'; + sprintf(multiname,"%s%d%s",files[currentfile],firstfile+i,ptr+1); + *ptr = '%'; + readers[i]->open_file(multiname); + delete [] multiname; + + bigint step; + while (1) { + eofflag = readers[i]->read_time(step); + if (eofflag) break; + if (step == ntimestep) break; + readers[i]->skip(); + } + + if (eofflag) + error->one(FLERR,"Read dump parallel files " + "do not all have same timestep"); + } + } + return ntimestep; } @@ -316,15 +485,17 @@ void ReadDump::header(int fieldinfo) int triclinic_snap; int fieldflag,xflag,yflag,zflag; - if (me == 0) - nsnapatoms = reader->read_header(box,triclinic_snap, - fieldinfo,nfield,fieldtype,fieldlabel, - scaleflag,wrapflag,fieldflag, - xflag,yflag,zflag); + if (filereader) { + for (int i = 0; i < nreader; i++) + nsnapatoms[i] = readers[i]->read_header(box,triclinic_snap,fieldinfo, + nfield,fieldtype,fieldlabel, + scaleflag,wrapflag,fieldflag, + xflag,yflag,zflag); + } - MPI_Bcast(&nsnapatoms,1,MPI_LMP_BIGINT,0,world); - MPI_Bcast(&triclinic_snap,1,MPI_INT,0,world); - MPI_Bcast(&box[0][0],9,MPI_DOUBLE,0,world); + MPI_Bcast(nsnapatoms,nreader,MPI_LMP_BIGINT,0,clustercomm); + MPI_Bcast(&triclinic_snap,1,MPI_INT,0,clustercomm); + MPI_Bcast(&box[0][0],9,MPI_DOUBLE,0,clustercomm); // local copy of snapshot box parameters // used in xfield,yfield,zfield when converting dump atom to absolute coords @@ -358,10 +529,10 @@ void ReadDump::header(int fieldinfo) if (!fieldinfo) return; - MPI_Bcast(&fieldflag,1,MPI_INT,0,world); - MPI_Bcast(&xflag,1,MPI_INT,0,world); - MPI_Bcast(&yflag,1,MPI_INT,0,world); - MPI_Bcast(&zflag,1,MPI_INT,0,world); + MPI_Bcast(&fieldflag,1,MPI_INT,0,clustercomm); + MPI_Bcast(&xflag,1,MPI_INT,0,clustercomm); + MPI_Bcast(&yflag,1,MPI_INT,0,clustercomm); + MPI_Bcast(&zflag,1,MPI_INT,0,clustercomm); // error check on current vs new box and fields // triclinic_snap < 0 means no box info in file @@ -430,7 +601,9 @@ void ReadDump::header(int fieldinfo) } } -/* ---------------------------------------------------------------------- */ +/* ---------------------------------------------------------------------- + read and process one snapshot of atoms +------------------------------------------------------------------------- */ void ReadDump::atoms() { @@ -447,8 +620,23 @@ void ReadDump::atoms() atom->natoms = 0; } - // to match existing atoms to dump atoms: + // read all the snapshot atoms into fields + // each proc will own an arbitrary subset of atoms + + read_atoms(); + + // migrate old owned atoms to new procs based on atom IDs + // not necessary if purged all old atoms or if only 1 proc + + if (!purgeflag && nprocs > 1) migrate_old_atoms(); + + // migrate new snapshot atoms to same new procs based on atom IDs + // not necessary if purged all old atoms or if only 1 proc + + if (!purgeflag && nprocs > 1) migrate_new_atoms(); + // must build map if not a molecular system + // this will be needed to match new atoms to old atoms int mapflag = 0; if (atom->map_style == 0) { @@ -457,52 +645,14 @@ void ReadDump::atoms() atom->map_set(); } - // uflag[i] = 1 for each owned atom appearing in dump - // ucflag = similar flag for each chunk atom, used in process_atoms() + // each proc now owns both old and new info for same subset of atoms + // update each local atom with new info - int nlocal = atom->nlocal; - memory->create(uflag,nlocal,"read_dump:uflag"); - for (int i = 0; i < nlocal; i++) uflag[i] = 0; - memory->create(ucflag,CHUNK,"read_dump:ucflag"); - memory->create(ucflag_all,CHUNK,"read_dump:ucflag"); + process_atoms(); - // read, broadcast, and process atoms from snapshot in chunks + // check that atom IDs are valid - addproc = -1; - - int nchunk; - bigint nread = 0; - while (nread < nsnapatoms) { - nchunk = MIN(nsnapatoms-nread,CHUNK); - if (me == 0) reader->read_atoms(nchunk,nfield,fields); - MPI_Bcast(&fields[0][0],nchunk*nfield,MPI_DOUBLE,0,world); - process_atoms(nchunk); - nread += nchunk; - } - - // if addflag = YESADD, assign IDs to new snapshot atoms - - if (addflag == YESADD) { - bigint nblocal = atom->nlocal; - MPI_Allreduce(&nblocal,&atom->natoms,1,MPI_LMP_BIGINT,MPI_SUM,world); - if (atom->natoms < 0 || atom->natoms >= MAXBIGINT) - error->all(FLERR,"Too many total atoms"); - if (atom->tag_enable) atom->tag_extend(); - } - - // if trimflag set, delete atoms not replaced by snapshot atoms - - if (trimflag) { - delete_atoms(); - bigint nblocal = atom->nlocal; - MPI_Allreduce(&nblocal,&atom->natoms,1,MPI_LMP_BIGINT,MPI_SUM,world); - } - - // can now delete uflag arrays - - memory->destroy(uflag); - memory->destroy(ucflag); - memory->destroy(ucflag_all); + atom->tag_check(); // delete atom map if created it above // else reinitialize map for current atoms @@ -543,209 +693,152 @@ void ReadDump::atoms() domain->set_local_box(); } - // move atoms back inside simulation box and to new processors - // use remap() instead of pbc() in case atoms moved a long distance - // adjust image flags of all atoms (old and new) based on current box - // use irregular() in case atoms moved a long distance + // migrate atoms to their new owing proc, based on atom coords - double **x = atom->x; - imageint *image = atom->image; - nlocal = atom->nlocal; - for (int i = 0; i < nlocal; i++) domain->remap(x[i],image[i]); - - if (triclinic) domain->x2lamda(atom->nlocal); - domain->reset_box(); - Irregular *irregular = new Irregular(lmp); - irregular->migrate_atoms(1); - delete irregular; - if (triclinic) domain->lamda2x(atom->nlocal); - - // check that atom IDs are valid - - atom->tag_check(); + migrate_atoms_by_coords(); } /* ---------------------------------------------------------------------- - process arg list for dump file fields and optional keywords + read all the snapshot atoms into fields + done in different ways for multiproc no/yes and # of procs < or >= nprocs + nnew = # of snapshot atoms this proc stores ------------------------------------------------------------------------- */ -int ReadDump::fields_and_keywords(int narg, char **arg) +void ReadDump::read_atoms() { - // per-field vectors, leave space for ID and TYPE + int count,nread,nsend,nrecv,otherproc; + bigint nsnap,ntotal,ofirst,olast,rfirst,rlast,lo,hi; + MPI_Request request; + MPI_Status status; - fieldtype = new int[narg+2]; - fieldlabel = new char*[narg+2]; + // one reader per cluster of procs + // each reading proc reads one file and splits data across cluster + // cluster can be all procs or a subset - // add id and type fields as needed - // scan ahead to see if "add yes/keep" keyword/value is used - // requires extra "type" field from from dump file + if (!multiproc || multiproc_nfile < nprocs) { + nsnap = nsnapatoms[0]; - int iarg; - for (iarg = 0; iarg < narg; iarg++) - if (strcmp(arg[iarg],"add") == 0) - if (iarg < narg-1 && (strcmp(arg[iarg+1],"yes") == 0 || - strcmp(arg[iarg+1],"keep") == 0)) break; + if (filereader) { + if (!buf) memory->create(buf,CHUNK,nfield,"read_dump:buf"); - nfield = 0; - fieldtype[nfield++] = ID; - if (iarg < narg) fieldtype[nfield++] = TYPE; + otherproc = 0; + ofirst = (bigint) otherproc * nsnap/nprocs_cluster; + olast = (bigint) (otherproc+1) * nsnap/nprocs_cluster; + if (olast-ofirst > MAXSMALLINT) + error->one(FLERR,"Read dump snapshot is too large for a proc"); + nnew = static_cast (olast - ofirst); - // parse fields + if (nnew > maxnew || maxnew == 0) { + memory->destroy(fields); + maxnew = MAX(nnew,1); // avoid NULL ptr + memory->create(fields,maxnew,nfield,"read_dump:fields"); + } - iarg = 0; - while (iarg < narg) { - int type = whichtype(arg[iarg]); - if (type < 0) break; - if (type == Q && !atom->q_flag) - error->all(FLERR,"Read dump of atom property that isn't allocated"); - fieldtype[nfield++] = type; - iarg++; - } + ntotal = 0; + while (ntotal < nsnap) { + nread = MIN(CHUNK,nsnap-ntotal); + readers[0]->read_atoms(nread,nfield,buf); + rfirst = ntotal; + rlast = ntotal + nread; - // check for no fields + nsend = 0; + while (nsend < nread) { + lo = MAX(ofirst,rfirst); + hi = MIN(olast,rlast); + if (otherproc) // send to otherproc or copy to self + MPI_Send(&buf[nsend][0],(hi-lo)*nfield,MPI_DOUBLE, + otherproc,0,clustercomm); + else + memcpy(&fields[rfirst][0],&buf[nsend][0], + (hi-lo)*nfield*sizeof(double)); + nsend += hi-lo; + if (hi == olast) { + otherproc++; + ofirst = (bigint) otherproc * nsnap/nprocs_cluster; + olast = (bigint) (otherproc+1) * nsnap/nprocs_cluster; + } + } - if (fieldtype[nfield-1] == ID || fieldtype[nfield-1] == TYPE) - error->all(FLERR,"Illegal read_dump command"); + ntotal += nread; + } - if (dimension == 2) { - for (int i = 0; i < nfield; i++) - if (fieldtype[i] == Z || fieldtype[i] == VZ || - fieldtype[i] == IZ || fieldtype[i] == FZ) - error->all(FLERR,"Illegal read_dump command"); - } + } else { + ofirst = (bigint) me_cluster * nsnap/nprocs_cluster; + olast = (bigint) (me_cluster+1) * nsnap/nprocs_cluster; + if (olast-ofirst > MAXSMALLINT) + error->one(FLERR,"Read dump snapshot is too large for a proc"); + nnew = static_cast (olast - ofirst); + if (nnew > maxnew || maxnew == 0) { + memory->destroy(fields); + maxnew = MAX(nnew,1); // avoid NULL ptr + memory->create(fields,maxnew,nfield,"read_dump:fields"); + } - for (int i = 0; i < nfield; i++) - for (int j = i+1; j < nfield; j++) - if (fieldtype[i] == fieldtype[j]) - error->all(FLERR,"Duplicate fields in read_dump command"); + nrecv = 0; + while (nrecv < nnew) { + MPI_Irecv(&fields[nrecv][0],(nnew-nrecv)*nfield,MPI_DOUBLE,0,0, + clustercomm,&request); + MPI_Wait(&request,&status); + MPI_Get_count(&status,MPI_DOUBLE,&count); + nrecv += count/nfield; + } + } - // parse optional args + // every proc is a filereader, reads one or more files + // each proc keeps all data it reads, no communication required - boxflag = 1; - replaceflag = 1; - purgeflag = 0; - trimflag = 0; - addflag = NOADD; - for (int i = 0; i < nfield; i++) fieldlabel[i] = NULL; - scaleflag = 0; - wrapflag = 1; + } else if (multiproc_nfile >= nprocs) { + bigint sum = 0; + for (int i = 0; i < nreader; i++) + sum += nsnapatoms[i]; + if (sum > MAXSMALLINT) + error->one(FLERR,"Read dump snapshot is too large for a proc"); + nnew = static_cast (sum); + if (nnew > maxnew || maxnew == 0) { + memory->destroy(fields); + maxnew = MAX(nnew,1); // avoid NULL ptr + memory->create(fields,maxnew,nfield,"read_dump:fields"); + } - while (iarg < narg) { - if (strcmp(arg[iarg],"box") == 0) { - if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); - if (strcmp(arg[iarg+1],"yes") == 0) boxflag = 1; - else if (strcmp(arg[iarg+1],"no") == 0) boxflag = 0; - else error->all(FLERR,"Illegal read_dump command"); - iarg += 2; - } else if (strcmp(arg[iarg],"replace") == 0) { - if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); - if (strcmp(arg[iarg+1],"yes") == 0) replaceflag = 1; - else if (strcmp(arg[iarg+1],"no") == 0) replaceflag = 0; - else error->all(FLERR,"Illegal read_dump command"); - iarg += 2; - } else if (strcmp(arg[iarg],"purge") == 0) { - if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); - if (strcmp(arg[iarg+1],"yes") == 0) purgeflag = 1; - else if (strcmp(arg[iarg+1],"no") == 0) purgeflag = 0; - else error->all(FLERR,"Illegal read_dump command"); - iarg += 2; - } else if (strcmp(arg[iarg],"trim") == 0) { - if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); - if (strcmp(arg[iarg+1],"yes") == 0) trimflag = 1; - else if (strcmp(arg[iarg+1],"no") == 0) trimflag = 0; - else error->all(FLERR,"Illegal read_dump command"); - iarg += 2; - } else if (strcmp(arg[iarg],"add") == 0) { - if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); - if (strcmp(arg[iarg+1],"yes") == 0) addflag = YESADD; - else if (strcmp(arg[iarg+1],"no") == 0) addflag = NOADD; - else if (strcmp(arg[iarg+1],"keep") == 0) addflag = KEEPADD; - else error->all(FLERR,"Illegal read_dump command"); - iarg += 2; - } else if (strcmp(arg[iarg],"label") == 0) { - if (iarg+3 > narg) error->all(FLERR,"Illegal read_dump command"); - int type = whichtype(arg[iarg+1]); - int i; - for (i = 0; i < nfield; i++) - if (type == fieldtype[i]) break; - if (i == nfield) error->all(FLERR,"Illegal read_dump command"); - int n = strlen(arg[iarg+2]) + 1; - fieldlabel[i] = new char[n]; - strcpy(fieldlabel[i],arg[iarg+2]); - iarg += 3; - } else if (strcmp(arg[iarg],"scaled") == 0) { - if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); - if (strcmp(arg[iarg+1],"yes") == 0) scaleflag = 1; - else if (strcmp(arg[iarg+1],"no") == 0) scaleflag = 0; - else error->all(FLERR,"Illegal read_dump command"); - iarg += 2; - } else if (strcmp(arg[iarg],"wrapped") == 0) { - if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); - if (strcmp(arg[iarg+1],"yes") == 0) wrapflag = 1; - else if (strcmp(arg[iarg+1],"no") == 0) wrapflag = 0; - else error->all(FLERR,"Illegal read_dump command"); - iarg += 2; - } else if (strcmp(arg[iarg],"format") == 0) { - if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); - delete [] readerstyle; - int n = strlen(arg[iarg+1]) + 1; - readerstyle = new char[n]; - strcpy(readerstyle,arg[iarg+1]); - iarg += 2; - break; - } else error->all(FLERR,"Illegal read_dump command"); - } - - if (purgeflag && (replaceflag || trimflag)) - error->all(FLERR,"If read_dump purges it cannot replace or trim"); - if (addflag == KEEPADD && atom->tag_enable == 0) - error->all(FLERR,"Read_dump cannot use 'add keep' without atom IDs"); - - return narg-iarg; + nnew = 0; + for (int i = 0; i < nreader; i++) { + nsnap = nsnapatoms[i]; + ntotal = 0; + while (ntotal < nsnap) { + nread = MIN(CHUNK,nsnap-ntotal); + readers[i]->read_atoms(nread,nfield,&fields[nnew+ntotal]); + ntotal += nread; + } + nnew += nsnap; + } + } } /* ---------------------------------------------------------------------- - check if str is a field argument - if yes, return index of which - if not, return -1 -------------------------------------------------------------------------- */ - -int ReadDump::whichtype(char *str) -{ - int type = -1; - if (strcmp(str,"id") == 0) type = ID; - else if (strcmp(str,"type") == 0) type = TYPE; - else if (strcmp(str,"x") == 0) type = X; - else if (strcmp(str,"y") == 0) type = Y; - else if (strcmp(str,"z") == 0) type = Z; - else if (strcmp(str,"vx") == 0) type = VX; - else if (strcmp(str,"vy") == 0) type = VY; - else if (strcmp(str,"vz") == 0) type = VZ; - else if (strcmp(str,"q") == 0) type = Q; - else if (strcmp(str,"ix") == 0) type = IX; - else if (strcmp(str,"iy") == 0) type = IY; - else if (strcmp(str,"iz") == 0) type = IZ; - else if (strcmp(str,"fx") == 0) type = FX; - else if (strcmp(str,"fy") == 0) type = FY; - else if (strcmp(str,"fz") == 0) type = FZ; - return type; -} - -/* ---------------------------------------------------------------------- - process each of N atoms in chunk read from dump file + update info for each old atom I own based on snapshot info if in replace mode and atom ID matches current atom, overwrite atom info with fields from dump file - if in add mode and atom ID does not match any current atom, - create new atom with dump file field values, - and assign to a proc in round-robin manner - use round-robin method, b/c atom coords may not be inside simulation box + if in add mode and atom ID does not match any old atom, + create new atom with dump file field values ------------------------------------------------------------------------- */ -void ReadDump::process_atoms(int n) +void ReadDump::process_atoms() { int i,m,ifield,itype; int xbox,ybox,zbox; tagint mtag; + int *updateflag,*newflag; + + // updateflag[i] = flag for old atoms, 1 if updated, else 0 + // newflag[i] = flag for new atoms, 0 if used to update old atom, else 1 + + int nlocal = atom->nlocal; + memory->create(updateflag,nlocal,"read_dump:updateflag"); + for (int i = 0; i < nlocal; i++) updateflag[i] = 0; + memory->create(newflag,nnew,"read_dump:newflag"); + for (int i = 0; i < nnew; i++) newflag[i] = 1; + + // loop over new atoms double **x = atom->x; double **v = atom->v; @@ -753,11 +846,9 @@ void ReadDump::process_atoms(int n) double **f = atom->f; tagint *tag = atom->tag; imageint *image = atom->image; - int nlocal = atom->nlocal; tagint map_tag_max = atom->map_tag_max; - for (i = 0; i < n; i++) { - ucflag[i] = 0; + for (i = 0; i < nnew; i++) { // check if new atom matches one I own // setting m = -1 forces new atom not to match @@ -769,8 +860,8 @@ void ReadDump::process_atoms(int n) else m = -1; if (m < 0 || m >= nlocal) continue; - ucflag[i] = 1; - uflag[m] = 1; + updateflag[m] = 1; + newflag[i] = 0; if (replaceflag) { nreplace++; @@ -838,12 +929,39 @@ void ReadDump::process_atoms(int n) } } - // create any atoms in chunk that no processor owned - // add atoms in round-robin sequence on processors - // cannot do it geometrically b/c dump coords may not be in simulation box - // check that dump file snapshot has atom type field + // if trimflag set, delete atoms not updated by snapshot atoms - if (addflag == NOADD) return; + if (trimflag) { + AtomVec *avec = atom->avec; + + int i = 0; + while (i < nlocal) { + if (!updateflag[i]) { + avec->copy(nlocal-1,i,1); + updateflag[i] = updateflag[nlocal-1]; + nlocal--; + ntrim++; + } else i++; + } + + atom->nlocal = nlocal; + bigint nblocal = atom->nlocal; + MPI_Allreduce(&nblocal,&atom->natoms,1,MPI_LMP_BIGINT,MPI_SUM,world); + } + + // done if cannot add new atoms + + if (addflag == NOADD) { + memory->destroy(updateflag); + memory->destroy(newflag); + return; + } + + // ---------------------------------------------------- + // create new atoms for dump file atoms with ID that matches no old atom + // ---------------------------------------------------- + + // first check that dump file snapshot has atom type field int tflag = 0; for (ifield = 0; ifield < nfield; ifield++) @@ -851,19 +969,11 @@ void ReadDump::process_atoms(int n) if (!tflag) error->all(FLERR,"Cannot add atoms if dump file does not store atom type"); - MPI_Allreduce(ucflag,ucflag_all,n,MPI_INT,MPI_SUM,world); - int nlocal_previous = atom->nlocal; double one[3]; - for (i = 0; i < n; i++) { - if (ucflag_all[i]) continue; - - // each processor adds every Pth atom - - addproc++; - if (addproc == nprocs) addproc = 0; - if (addproc != me) continue; + for (i = 0; i < nnew; i++) { + if (!newflag[i]) continue; // create type and coord fields from dump file // coord = 0.0 unless corresponding dump file field was specified @@ -932,7 +1042,7 @@ void ReadDump::process_atoms(int n) break; } - // replace image flag in case changed by ix,iy,iz fields + // reset image flag in case changed by ix,iy,iz fields image[m] = ((imageint) (xbox + IMGMAX) & IMGMASK) | (((imageint) (ybox + IMGMAX) & IMGMASK) << IMGBITS) | @@ -940,31 +1050,283 @@ void ReadDump::process_atoms(int n) } } + // if addflag = YESADD + // assign consistent IDs to new snapshot atoms across all procs + + if (addflag == YESADD) { + bigint nblocal = atom->nlocal; + MPI_Allreduce(&nblocal,&atom->natoms,1,MPI_LMP_BIGINT,MPI_SUM,world); + if (atom->natoms < 0 || atom->natoms >= MAXBIGINT) + error->all(FLERR,"Too many total atoms"); + if (atom->tag_enable) atom->tag_extend(); + } + // init per-atom fix/compute/variable values for created atoms atom->data_fix_compute_variable(nlocal_previous,atom->nlocal); + + // free allocated vectors + + memory->destroy(updateflag); + memory->destroy(newflag); + } /* ---------------------------------------------------------------------- - delete atoms not flagged as replaced by dump atoms + migrate old atoms to new procs based on atom IDs + use migrate_atoms() with explicit processor assignments ------------------------------------------------------------------------- */ -void ReadDump::delete_atoms() +void ReadDump::migrate_old_atoms() { - AtomVec *avec = atom->avec; + tagint *tag = atom->tag; int nlocal = atom->nlocal; - int i = 0; - while (i < nlocal) { - if (uflag[i] == 0) { - avec->copy(nlocal-1,i,1); - uflag[i] = uflag[nlocal-1]; - nlocal--; - ntrim++; - } else i++; + int *procassign; + memory->create(procassign,nlocal,"read_dump:procassign"); + for (int i = 0; i < nlocal; i++) + procassign[i] = tag[i] % nprocs; + + Irregular *irregular = new Irregular(lmp); + irregular->migrate_atoms(1,1,procassign); + delete irregular; + + memory->destroy(procassign); +} + +/* ---------------------------------------------------------------------- + migrate new atoms to same new procs based on atom IDs +------------------------------------------------------------------------- */ + +void ReadDump::migrate_new_atoms() +{ + tagint mtag; + int *procassign; + double **newfields; + + memory->create(procassign,nnew,"read_dump:procassign"); + for (int i = 0; i < nnew; i++) { + mtag = static_cast (fields[i][0]); + procassign[i] = mtag % nprocs; } - atom->nlocal = nlocal; + Irregular *irregular = new Irregular(lmp); + int nrecv = irregular->create_data(nnew,procassign,1); + int newmaxnew = MAX(nrecv,maxnew); + newmaxnew = MAX(newmaxnew,1); // avoid NULL ptr + memory->create(newfields,newmaxnew,nfield,"read_dump:newfields"); + irregular->exchange_data((char *) &fields[0][0],nfield*sizeof(double), + (char *) &newfields[0][0]); + irregular->destroy_data(); + delete irregular; + + memory->destroy(fields); + memory->destroy(procassign); + + // point fields at newfields + + fields = newfields; + maxnew = newmaxnew; + nnew = nrecv; +} + +/* ---------------------------------------------------------------------- + migrate final atoms to new procs based on atom coords + use migrate_atoms() with implicit processor assignments based on atom coords + move atoms back inside simulation box and to new processors + use remap() instead of pbc() in case atoms moved a long distance + adjust image flags of all atoms (old and new) based on current box +------------------------------------------------------------------------- */ + +void ReadDump::migrate_atoms_by_coords() +{ + double **x = atom->x; + imageint *image = atom->image; + int nlocal = atom->nlocal; + for (int i = 0; i < nlocal; i++) domain->remap(x[i],image[i]); + + if (triclinic) domain->x2lamda(atom->nlocal); + domain->reset_box(); + Irregular *irregular = new Irregular(lmp); + irregular->migrate_atoms(1); + delete irregular; + if (triclinic) domain->lamda2x(atom->nlocal); +} + +/* ---------------------------------------------------------------------- + process arg list for dump file fields and optional keywords +------------------------------------------------------------------------- */ + +int ReadDump::fields_and_keywords(int narg, char **arg) +{ + // per-field vectors, leave space for ID and TYPE + + fieldtype = new int[narg+2]; + fieldlabel = new char*[narg+2]; + + // add id and type fields as needed + // scan ahead to see if "add yes/keep" keyword/value is used + // requires extra "type" field from from dump file + + int iarg; + for (iarg = 0; iarg < narg; iarg++) + if (strcmp(arg[iarg],"add") == 0) + if (iarg < narg-1 && (strcmp(arg[iarg+1],"yes") == 0 || + strcmp(arg[iarg+1],"keep") == 0)) break; + + nfield = 0; + fieldtype[nfield++] = ID; + if (iarg < narg) fieldtype[nfield++] = TYPE; + + // parse fields + + iarg = 0; + while (iarg < narg) { + int type = whichtype(arg[iarg]); + if (type < 0) break; + if (type == Q && !atom->q_flag) + error->all(FLERR,"Read dump of atom property that isn't allocated"); + fieldtype[nfield++] = type; + iarg++; + } + + // check for no fields + + if (fieldtype[nfield-1] == ID || fieldtype[nfield-1] == TYPE) + error->all(FLERR,"Illegal read_dump command"); + + if (dimension == 2) { + for (int i = 0; i < nfield; i++) + if (fieldtype[i] == Z || fieldtype[i] == VZ || + fieldtype[i] == IZ || fieldtype[i] == FZ) + error->all(FLERR,"Illegal read_dump command"); + } + + for (int i = 0; i < nfield; i++) + for (int j = i+1; j < nfield; j++) + if (fieldtype[i] == fieldtype[j]) + error->all(FLERR,"Duplicate fields in read_dump command"); + + // parse optional args + + multiproc_nfile = 0; + boxflag = 1; + replaceflag = 1; + purgeflag = 0; + trimflag = 0; + addflag = NOADD; + for (int i = 0; i < nfield; i++) fieldlabel[i] = NULL; + scaleflag = 0; + wrapflag = 1; + + while (iarg < narg) { + if (strcmp(arg[iarg],"nfile") == 0) { + if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); + multiproc_nfile = force->inumeric(FLERR,arg[iarg+1]); + iarg += 2; + } else if (strcmp(arg[iarg],"box") == 0) { + if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); + if (strcmp(arg[iarg+1],"yes") == 0) boxflag = 1; + else if (strcmp(arg[iarg+1],"no") == 0) boxflag = 0; + else error->all(FLERR,"Illegal read_dump command"); + iarg += 2; + } else if (strcmp(arg[iarg],"replace") == 0) { + if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); + if (strcmp(arg[iarg+1],"yes") == 0) replaceflag = 1; + else if (strcmp(arg[iarg+1],"no") == 0) replaceflag = 0; + else error->all(FLERR,"Illegal read_dump command"); + iarg += 2; + } else if (strcmp(arg[iarg],"purge") == 0) { + if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); + if (strcmp(arg[iarg+1],"yes") == 0) purgeflag = 1; + else if (strcmp(arg[iarg+1],"no") == 0) purgeflag = 0; + else error->all(FLERR,"Illegal read_dump command"); + iarg += 2; + } else if (strcmp(arg[iarg],"trim") == 0) { + if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); + if (strcmp(arg[iarg+1],"yes") == 0) trimflag = 1; + else if (strcmp(arg[iarg+1],"no") == 0) trimflag = 0; + else error->all(FLERR,"Illegal read_dump command"); + iarg += 2; + } else if (strcmp(arg[iarg],"add") == 0) { + if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); + if (strcmp(arg[iarg+1],"yes") == 0) addflag = YESADD; + else if (strcmp(arg[iarg+1],"no") == 0) addflag = NOADD; + else if (strcmp(arg[iarg+1],"keep") == 0) addflag = KEEPADD; + else error->all(FLERR,"Illegal read_dump command"); + iarg += 2; + } else if (strcmp(arg[iarg],"label") == 0) { + if (iarg+3 > narg) error->all(FLERR,"Illegal read_dump command"); + int type = whichtype(arg[iarg+1]); + int i; + for (i = 0; i < nfield; i++) + if (type == fieldtype[i]) break; + if (i == nfield) error->all(FLERR,"Illegal read_dump command"); + int n = strlen(arg[iarg+2]) + 1; + fieldlabel[i] = new char[n]; + strcpy(fieldlabel[i],arg[iarg+2]); + iarg += 3; + } else if (strcmp(arg[iarg],"scaled") == 0) { + if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); + if (strcmp(arg[iarg+1],"yes") == 0) scaleflag = 1; + else if (strcmp(arg[iarg+1],"no") == 0) scaleflag = 0; + else error->all(FLERR,"Illegal read_dump command"); + iarg += 2; + } else if (strcmp(arg[iarg],"wrapped") == 0) { + if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); + if (strcmp(arg[iarg+1],"yes") == 0) wrapflag = 1; + else if (strcmp(arg[iarg+1],"no") == 0) wrapflag = 0; + else error->all(FLERR,"Illegal read_dump command"); + iarg += 2; + } else if (strcmp(arg[iarg],"format") == 0) { + if (iarg+2 > narg) error->all(FLERR,"Illegal read_dump command"); + delete [] readerstyle; + int n = strlen(arg[iarg+1]) + 1; + readerstyle = new char[n]; + strcpy(readerstyle,arg[iarg+1]); + iarg += 2; + break; + } else error->all(FLERR,"Illegal read_dump command"); + } + + if (multiproc == 0 && multiproc_nfile) + error->all(FLERR,"Dump file is not a multi-proc file"); + if (multiproc && multiproc_nfile == 0) + error->all(FLERR,"Dump file is a multi-proc file"); + + if (purgeflag && (replaceflag || trimflag)) + error->all(FLERR,"If read_dump purges it cannot replace or trim"); + if (addflag == KEEPADD && atom->tag_enable == 0) + error->all(FLERR,"Read_dump cannot use 'add keep' without atom IDs"); + + return narg-iarg; +} + +/* ---------------------------------------------------------------------- + check if str is a field argument + if yes, return index of which + if not, return -1 +------------------------------------------------------------------------- */ + +int ReadDump::whichtype(char *str) +{ + int type = -1; + if (strcmp(str,"id") == 0) type = ID; + else if (strcmp(str,"type") == 0) type = TYPE; + else if (strcmp(str,"x") == 0) type = X; + else if (strcmp(str,"y") == 0) type = Y; + else if (strcmp(str,"z") == 0) type = Z; + else if (strcmp(str,"vx") == 0) type = VX; + else if (strcmp(str,"vy") == 0) type = VY; + else if (strcmp(str,"vz") == 0) type = VZ; + else if (strcmp(str,"q") == 0) type = Q; + else if (strcmp(str,"ix") == 0) type = IX; + else if (strcmp(str,"iy") == 0) type = IY; + else if (strcmp(str,"iz") == 0) type = IZ; + else if (strcmp(str,"fx") == 0) type = FX; + else if (strcmp(str,"fy") == 0) type = FY; + else if (strcmp(str,"fz") == 0) type = FZ; + return type; } /* ---------------------------------------------------------------------- diff --git a/src/read_dump.h b/src/read_dump.h index eb70b99808..73cb0a8995 100644 --- a/src/read_dump.h +++ b/src/read_dump.h @@ -22,6 +22,7 @@ CommandStyle(read_dump,ReadDump) #ifndef LMP_READ_DUMP_H #define LMP_READ_DUMP_H +#include #include #include "pointers.h" @@ -43,15 +44,26 @@ class ReadDump : protected Pointers { private: int me,nprocs; - FILE *fp; - int dimension; + char **files; // list of input dump files to process + int nfile; // # of dump files to process (each may be parallel) + int currentfile; // current open file (0 to nfile-1) + + MPI_Comm clustercomm; // comm for proc cluster that reads/shares a file + int me_cluster,nprocs_cluster; // proc ID and count for my read cluster + + int multiproc; // 0 = each dump file is a single file + // 1 = each dump file is parallel (multiple files) + int multiproc_nfile; // number of parallel files in one dump file + + int nreader; // # of parallel dump files read by my cluster + int firstfile; // index of 1st dump file my cluster reads + // (0 to multiproc_nfile-1) + int filereader; // 1 if this proc reads from a dump file(s) + + int dimension; // same as in Domain int triclinic; - int nfile; // # of dump files to process - char **files; // list of file names - int currentfile; // currently open file - int boxflag; // overwrite simulation with dump file box params int replaceflag,addflag; // flags for processing dump snapshot atoms int trimflag,purgeflag; @@ -59,10 +71,13 @@ private: int wrapflag; // user 0/1 if dump file coords are unwrapped/wrapped char *readerstyle; // style of dump files to read + int nnew; // # of dump file atoms this proc owns int nfield; // # of fields to extract from dump file int *fieldtype; // type of each field = X,VY,IZ,etc char **fieldlabel; // user specified label for field double **fields; // per-atom field values + int maxnew; // allocation size of fields array + double **buf; // read buffer int scaled; // 0/1 if dump file coords are unscaled/scaled int wrapped; // 0/1 if dump file coords are unwrapped/wrapped @@ -71,20 +86,26 @@ private: double xlo,xhi,ylo,yhi,zlo,zhi,xy,xz,yz; // dump snapshot box params double xprd,yprd,zprd; - bigint nsnapatoms; // # of atoms in dump file shapshot + bigint *nsnapatoms; // # of atoms in one snapshot from + // one (parallel) dump file + // nreader-length vector b/c a reader proc + // may read from multiple parallel dump files int npurge,nreplace,ntrim,nadd; // stats on processed atoms - int addproc; // proc that should add next atom int yindex,zindex; // field index for Y,Z coords - int *uflag; // set to 1 if snapshot atom matches owned atom - int *ucflag,*ucflag_all; // set to 1 if snapshot chunk atom was processed + class Reader **readers; // class that reads a dump file + // nreader-length list of readers if proc reads + // from multiple parallel dump files - class Reader *reader; // class that reads dump file + void read_atoms(); + void process_atoms(); + void migrate_old_atoms(); + void migrate_new_atoms(); + void migrate_atoms_by_coords(); + void setup_multiproc(); int whichtype(char *); - void process_atoms(int); - void delete_atoms(); double xfield(int, int); double yfield(int, int); diff --git a/src/rerun.cpp b/src/rerun.cpp index f8a37b5946..063ea882c2 100644 --- a/src/rerun.cpp +++ b/src/rerun.cpp @@ -57,7 +57,7 @@ void Rerun::command(int narg, char **arg) if (nfile == 0 || nfile == narg) error->all(FLERR,"Illegal rerun command"); // parse optional args up until "dump" - // user MAXBIGINT -1 so Output can add 1 to it and still be a big int + // use MAXBIGINT -1 so Output can add 1 to it and still be a big int bigint first = 0; bigint last = MAXBIGINT - 1;