sending blocks of 2D array in C using MPI

Sundevil picture Sundevil · Feb 14, 2012 · Viewed 40.5k times · Source

How do you send blocks of 2-D array to different processors? Suppose the 2D array size is 400x400 an I want to send blocks of sizes 100X100 to different processors. The idea is that each processor will perform computation on its separate block and send its result back to the first processor for final result.
I am using MPI in C programs.

Answer

Jonathan Dursi picture Jonathan Dursi · Feb 14, 2012

Let me start by saying that you generally don't really want to do this - scatter and gather huge chunks of data from some "master" process. Normally you want each task to be chugging away at its own piece of the puzzle, and you should aim to never have one processor need a "global view" of the whole data; as soon as you require that, you limit scalability and the problem size. If you're doing this for I/O - one process reads the data, then scatters it, then gathers it back for writing, you'll want eventually to look into MPI-IO.

Getting to your question, though, MPI has very nice ways of pulling arbitrary data out of memory, and scatter/gathering it to and from a set of processors. Unfortunately that requires a fair number of MPI concepts - MPI Types, extents, and collective operations. A lot of the basic ideas are discussed in the answer to this question -- MPI_Type_create_subarray and MPI_Gather .

Update - In the cold light of day, this is a lot of code and not a lot of explanation. So let me expand a little bit.

Consider a 1d integer global array that task 0 has that you want to distribute to a number of MPI tasks, so that they each get a piece in their local array. Say you have 4 tasks, and the global array is [01234567]. You could have task 0 send four messages (including one to itself) to distribute this, and when it's time to re-assemble, receive four messages to bundle it back together; but that obviously gets very time consuming at large numbers of processes. There are optimized routines for these sorts of operations - scatter/gather operations. So in this 1d case you'd do something like this:

int global[8];   /* only task 0 has this */
int local[2];    /* everyone has this */
const int root = 0;   /* the processor with the initial global data */

if (rank == root) {
   for (int i=0; i<7; i++) global[i] = i;
}

MPI_Scatter(global, 2, MPI_INT,      /* send everyone 2 ints from global */
            local,  2, MPI_INT,      /* each proc receives 2 ints into local */
            root, MPI_COMM_WORLD);   /* sending process is root, all procs in */
                                     /* MPI_COMM_WORLD participate */

After this, the processors' data would look like

task 0:  local:[01]  global: [01234567]
task 1:  local:[23]  global: [garbage-]
task 2:  local:[45]  global: [garbage-]
task 3:  local:[67]  global: [garbage-]

That is, the scatter operation takes the global array and sends contiguous 2-int chunks to all the processors.

To re-assemble the array, we use the MPI_Gather() operation, which works exactly the same but in reverse:

for (int i=0; i<2; i++) 
   local[i] = local[i] + rank;

MPI_Gather(local,  2, MPI_INT,      /* everyone sends 2 ints from local */
           global, 2, MPI_INT,      /* root receives 2 ints each proc into global */
           root, MPI_COMM_WORLD);   /* recv'ing process is root, all procs in */
                                    /* MPI_COMM_WORLD participate */

and now the data looks like

task 0:  local:[01]  global: [0134679a]
task 1:  local:[34]  global: [garbage-]
task 2:  local:[67]  global: [garbage-]
task 3:  local:[9a]  global: [garbage-]

Gather brings all the data back, and here a is 10 because I didn't think my formatting through carefully enough upon starting this example.

What happens if the number of data points doesn't evenly divide the number of processes, and we need to send different numbers of items to each process? Then you need a generalized version of scatter, MPI_Scatterv(), which lets you specify the counts for each processor, and displacements -- where in the global array that piece of data starts. So let's say you had an array of characters [abcdefghi] with 9 characters, and you were going to assign every process two characters except the last, that got three. Then you'd need

char global[9];   /* only task 0 has this */
char local[3]={'-','-','-'};    /* everyone has this */
int  mynum;                     /* how many items */
const int root = 0;   /* the processor with the initial global data */

if (rank == 0) {
   for (int i=0; i<8; i++) global[i] = 'a'+i;
}

int counts[4] = {2,2,2,3};   /* how many pieces of data everyone has */
mynum = counts[rank];
int displs[4] = {0,2,4,6};   /* the starting point of everyone's data */
                             /* in the global array */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] pts from displs[i] */
            MPI_INT,      
            local, mynum, MPI_INT;   /* I'm receiving mynum MPI_INTs into local */
            root, MPI_COMM_WORLD);

Now the data looks like

task 0:  local:[ab-]  global: [abcdefghi]
task 1:  local:[cd-]  global: [garbage--]
task 2:  local:[ef-]  global: [garbage--]
task 3:  local:[ghi]  global: [garbage--]

You've now used scatterv to distribute the irregular amounts of data. The displacement in each case is two*rank (measured in characters; the displacement is in unit of the types being sent for a scatter or received for a gather; it's not generally in bytes or something) from the start of the array, and the counts are {2,2,2,3}. If it had been the first processor we wanted to have 3 characters, we would have set counts={3,2,2,2} and displacements would have been {0,3,5,7}. Gatherv again works exactly the same but reverse; the counts and displs arrays would remain the same.

Now, for 2D, this is a bit trickier. If we want to send 2d sublocks of a 2d array, the data we're sending now no longer is contiguous. If we're sending (say) 3x3 subblocks of a 6x6 array to 4 processors, the data we're sending has holes in it:

2D Array

   ---------
   |000|111|
   |000|111|
   |000|111|
   |---+---|
   |222|333|
   |222|333|
   |222|333|
   ---------

Actual layout in memory

   [000111000111000111222333222333222333]

(Note that all high-performance computing comes down to understanding the layout of data in memory.)

If we want to send the data that is marked "1" to task 1, we need to skip three values, send three values, skip three values, send three values, skip three values, send three values. A second complication is where the subregions stop and start; note that region "1" doesn't start where region "0" stops; after the last element of region "0", the next location in memory is partway-way through region "1".

Let's tackle the first layout problem first - how to pull out just the data we want to send. We could always just copy out all the "0" region data to another, contiguous array, and send that; if we planned it out carefully enough, we could even do that in such a way that we could call MPI_Scatter on the results. But we'd rather not have to transpose our entire main data structure that way.

So far, all the MPI data types we've used are simple ones - MPI_INT specifies (say) 4 bytes in a row. However, MPI lets you create your own data types that describe arbitrarily complex data layouts in memory. And this case -- rectangular subregions of an array -- is common enough that there's a specific call for that. For the 2-dimensional case we're describing above,

    MPI_Datatype newtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &newtype);
    MPI_Type_commit(&newtype);

This creates a type which picks out just the region "0" from the global array; we could send just that piece of data now to another processor

    MPI_Send(&(global[0][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "0" */

and the receiving process could receive it into a local array. Note that the receiving process, if it's only receiving it into a 3x3 array, can not describe what it's receiving as a type of newtype; that no longer describes the memory layout. Instead, it's just receiving a block of 3*3 = 9 integers:

    MPI_Recv(&(local[0][0]), 3*3, MPI_INT, 0, tag, MPI_COMM_WORLD);

Note that we could do this for other sub-regions, too, either by creating a different type (with different start array) for the other blocks, or just by sending at the starting point of the particular block:

    MPI_Send(&(global[0][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "1" */
    MPI_Send(&(global[3][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "2" */
    MPI_Send(&(global[3][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "3" */

Finally, note that we require global and local to be contiguous chunks of memory here; that is, &(global[0][0]) and &(local[0][0]) (or, equivalently, *global and *local point to contiguous 6*6 and 3*3 chunks of memory; that isn't guaranteed by the usual way of allocating dynamic multi-d arrays. It's shown how to do this below.

Now that we understand how to specify subregions, there's only one more thing to discuss before using scatter/gather operations, and that's the "size" of these types. We couldn't just use MPI_Scatter() (or even scatterv) with these types yet, because these types have an extent of 16 integers; that is, where they end is 16 integers after they start -- and where they end doesn't line up nicely with where the next block begins, so we can't just use scatter - it would pick the wrong place to start sending data to the next processor.

Of course, we could use MPI_Scatterv() and specify the displacements ourselves, and that's what we'll do - except the displacements are in units of the send-type size, and that doesn't help us either; the blocks start at offsets of (0,3,18,21) integers from the start of the global array, and the fact that a block ends 16 integers from where it starts doesn't let us express those displacements in integer multiples at all.

To deal with this, MPI lets you set the extent of the type for the purposes of these calculations. It doesn't truncate the type; it's just used for figuring out where the next element starts given the last element. For types like these with holes in them, it's frequently handy to set the extent to be something smaller than the distance in memory to the actual end of the type.

We can set the extent to be anything that's convenient to us. We could just make the extent 1 integer, and then set the displacements in units of integers. In this case, though, I like to set the extent to be 3 integers - the size of a sub-row - that way, block "1" starts immediately after block "0", and block "3" starts immediately after block "2". Unfortunately, it doesn't quite work as nicely when jumping from block "2" to block "3", but that can't be helped.

So to scatter the subblocks in this case, we'd do the following:

    MPI_Datatype type, resizedtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    /* as before */
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);  
    /* change the extent of the type */
    MPI_Type_create_resized(type, 0, 3*sizeof(int), &resizedtype);
    MPI_Type_commit(&resizedtype);

Here we've created the same block type as before, but we've resized it; we haven't changed where the type "starts" (the 0) but we've changed where it "ends" (3 ints). We didn't mention this before, but the MPI_Type_commit is required to be able to use the type; but you only need to commit the final type you actually use, not any intermediate steps. You use MPI_Type_free to free the type when you're done.

So now, finally, we can scatterv the blocks: the data manipulations above are a little complicated, but once it's done, the scatterv looks just like before:

int counts[4] = {1,1,1,1};   /* how many pieces of data everyone has, in units of blocks */
int displs[4] = {0,1,6,7};   /* the starting point of everyone's data */
                             /* in the global array, in block extents */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] types from displs[i] */
            resizedtype,      
            local, 3*3, MPI_INT;   /* I'm receiving 3*3 MPI_INTs into local */
            root, MPI_COMM_WORLD);

And now we're done, after a little tour of scatter, gather, and MPI derived types.

An example code which shows both the gather and the scatter operation, with character arrays, follows. Running the program:

$ mpirun -n 4 ./gathervarray
Global array is:
0123456789
3456789012
6789012345
9012345678
2345678901
5678901234
8901234567
1234567890
4567890123
7890123456
Local process on rank 0 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Local process on rank 1 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 2 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 3 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Processed grid:
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD

and the code follows.

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

int malloc2dchar(char ***array, int n, int m) {

    /* allocate the n*m contiguous items */
    char *p = (char *)malloc(n*m*sizeof(char));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = (char **)malloc(n*sizeof(char*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (int i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2dchar(char ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    char **global, **local;
    const int gridsize=10; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) {
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    }


    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2dchar(&global, gridsize, gridsize);
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                global[i][j] = '0'+(3*i+j)%10;
        }


        printf("Global array is:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                putchar(global[i][j]);

            printf("\n");
        }
    }

    /* create the local array which we'll process */
    malloc2dchar(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */

    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(char), &subarrtype);
    MPI_Type_commit(&subarrtype);

    char *globalptr=NULL;
    if (rank == 0) globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) {
        for (int i=0; i<procgridsize*procgridsize; i++) sendcounts[i] = 1;
        int disp = 0;
        for (int i=0; i<procgridsize; i++) {
            for (int j=0; j<procgridsize; j++) {
                displs[i*procgridsize+j] = disp;
                disp += 1;
            }
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        }
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (int p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (int i=0; i<gridsize/procgridsize; i++) {
                putchar('|');
                for (int j=0; j<gridsize/procgridsize; j++) {
                    putchar(local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (int i=0; i<gridsize/procgridsize; i++) {
        for (int j=0; j<gridsize/procgridsize; j++) {
            local[i][j] = 'A' + rank;
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_CHAR,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2dchar(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++) {
                putchar(global[i][j]);
            }
            printf("\n");
        }

        free2dchar(&global);
    }


    MPI_Finalize();

    return 0;
}