added rendezvous via all2all

This commit is contained in:
Steve Plimpton
2019-01-23 14:49:52 -07:00
committed by Axel Kohlmeyer
parent 981f12ebeb
commit fc002e30d3
7 changed files with 566 additions and 105 deletions

View File

@ -622,6 +622,7 @@ int Irregular::create_data(int n, int *proclist, int sortflag)
num_send = new int[nsend_proc];
index_send = new int[n-work1[me]];
index_self = new int[work1[me]];
maxindex = n;
// proc_send = procs I send to
// num_send = # of datums I send to each proc
@ -679,8 +680,182 @@ int Irregular::create_data(int n, int *proclist, int sortflag)
// receive incoming messages
// proc_recv = procs I recv from
// num_recv = total size of message each proc sends me
// nrecvdatum = total size of data I recv
// num_recv = # of datums each proc sends me
// nrecvdatum = total # of datums I recv
int nrecvdatum = 0;
for (i = 0; i < nrecv_proc; i++) {
MPI_Recv(&num_recv[i],1,MPI_INT,MPI_ANY_SOURCE,0,world,status);
proc_recv[i] = status->MPI_SOURCE;
nrecvdatum += num_recv[i];
}
nrecvdatum += num_self;
// sort proc_recv and num_recv by proc ID if requested
// useful for debugging to insure reproducible ordering of received datums
if (sortflag) {
int *order = new int[nrecv_proc];
int *proc_recv_ordered = new int[nrecv_proc];
int *num_recv_ordered = new int[nrecv_proc];
for (i = 0; i < nrecv_proc; i++) order[i] = i;
#if defined(LMP_QSORT)
proc_recv_copy = proc_recv;
qsort(order,nrecv_proc,sizeof(int),compare_standalone);
#else
merge_sort(order,nrecv_proc,(void *)proc_recv,compare_standalone);
#endif
int j;
for (i = 0; i < nrecv_proc; i++) {
j = order[i];
proc_recv_ordered[i] = proc_recv[j];
num_recv_ordered[i] = num_recv[j];
}
memcpy(proc_recv,proc_recv_ordered,nrecv_proc*sizeof(int));
memcpy(num_recv,num_recv_ordered,nrecv_proc*sizeof(int));
delete [] order;
delete [] proc_recv_ordered;
delete [] num_recv_ordered;
}
// barrier to insure all MPI_ANY_SOURCE messages are received
// else another proc could proceed to exchange_data() and send to me
MPI_Barrier(world);
// return # of datums I will receive
return nrecvdatum;
}
/* ----------------------------------------------------------------------
create communication plan based on list of datums of uniform size
n = # of datums to send
procs = how many datums to send to each proc, must include self
sort = flag for sorting order of received messages by proc ID
return total # of datums I will recv, including any to self
------------------------------------------------------------------------- */
int Irregular::create_data_grouped(int n, int *procs, int sortflag)
{
int i,j,k,m;
// setup for collective comm
// work1 = # of datums I send to each proc, set self to 0
// work2 = 1 for all procs, used for ReduceScatter
for (i = 0; i < nprocs; i++) {
work1[i] = procs[i];
work2[i] = 1;
}
work1[me] = 0;
// nrecv_proc = # of procs I receive messages from, not including self
// options for performing ReduceScatter operation
// some are more efficient on some machines at big sizes
#ifdef LAMMPS_RS_ALLREDUCE_INPLACE
MPI_Allreduce(MPI_IN_PLACE,work1,nprocs,MPI_INT,MPI_SUM,world);
nrecv_proc = work1[me];
#else
#ifdef LAMMPS_RS_ALLREDUCE
MPI_Allreduce(work1,work2,nprocs,MPI_INT,MPI_SUM,world);
nrecv_proc = work2[me];
#else
MPI_Reduce_scatter(work1,&nrecv_proc,work2,MPI_INT,MPI_SUM,world);
#endif
#endif
// allocate receive arrays
proc_recv = new int[nrecv_proc];
num_recv = new int[nrecv_proc];
request = new MPI_Request[nrecv_proc];
status = new MPI_Status[nrecv_proc];
// work1 = # of datums I send to each proc, including self
// nsend_proc = # of procs I send messages to, not including self
for (i = 0; i < nprocs; i++) work1[i] = procs[i];
nsend_proc = 0;
for (i = 0; i < nprocs; i++)
if (work1[i]) nsend_proc++;
if (work1[me]) nsend_proc--;
// allocate send and self arrays
proc_send = new int[nsend_proc];
num_send = new int[nsend_proc];
index_send = new int[n-work1[me]];
index_self = new int[work1[me]];
maxindex = n;
// proc_send = procs I send to
// num_send = # of datums I send to each proc
// num_self = # of datums I copy to self
// to balance pattern of send messages:
// each proc begins with iproc > me, continues until iproc = me
// reset work1 to store which send message each proc corresponds to
int iproc = me;
int isend = 0;
for (i = 0; i < nprocs; i++) {
iproc++;
if (iproc == nprocs) iproc = 0;
if (iproc == me) {
num_self = work1[iproc];
work1[iproc] = 0;
} else if (work1[iproc] > 0) {
proc_send[isend] = iproc;
num_send[isend] = work1[iproc];
work1[iproc] = isend;
isend++;
}
}
// work2 = offsets into index_send for each proc I send to
// m = ptr into index_self
// index_send = list of which datums to send to each proc
// 1st N1 values are datum indices for 1st proc,
// next N2 values are datum indices for 2nd proc, etc
// index_self = list of which datums to copy to self
work2[0] = 0;
for (i = 1; i < nsend_proc; i++) work2[i] = work2[i-1] + num_send[i-1];
m = 0;
i = 0;
for (iproc = 0; iproc < nprocs; iproc++) {
k = procs[iproc];
for (j = 0; j < k; j++) {
if (iproc == me) index_self[m++] = i++;
else {
isend = work1[iproc];
index_send[work2[isend]++] = i++;
}
}
}
// tell receivers how much data I send
// sendmax_proc = largest # of datums I send in a single message
sendmax_proc = 0;
for (i = 0; i < nsend_proc; i++) {
MPI_Request tmpReq; // Use non-blocking send to avoid possible deadlock
MPI_Isend(&num_send[i],1,MPI_INT,proc_send[i],0,world,&tmpReq);
MPI_Request_free(&tmpReq); // the MPI_Barrier below marks completion
sendmax_proc = MAX(sendmax_proc,num_send[i]);
}
// receive incoming messages
// proc_recv = procs I recv from
// num_recv = # of datums each proc sends me
// nrecvdatum = total # of datums I recv
int nrecvdatum = 0;
for (i = 0; i < nrecv_proc; i++) {
@ -789,6 +964,12 @@ void Irregular::exchange_data(char *sendbuf, int nbytes, char *recvbuf)
// wait on all incoming messages
if (nrecv_proc) MPI_Waitall(nrecv_proc,request,status);
// approximate memory tally
bigint irregular_bytes = 2*nprocs*sizeof(int);
irregular_bytes += maxindex*sizeof(int);
irregular_bytes += maxbuf;
}
/* ----------------------------------------------------------------------