mpi4py scatter and gather with large numpy arrays

218 picture 218 · Mar 15, 2016 · Viewed 7.9k times · Source

I am trying to parallelise some operations on a large numpy array using mpi4py. I am currently using numpy.array_split to divide the array into chunks, followed by com.scatter to send the array to different cores and then comm.gather to collect the resulting arrays. A minimal (not) working example is below:

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    test = np.random.rand(411,48,52,40)
    test_chunks = np.array_split(test,size,axis=0)

else:
    test_chunks = None

test_chunk = comm.scatter(test_chunks,root=0)
output_chunk = np.zeros([np.shape(test_chunk)[0],128,128,128])

for i in range(0,np.shape(test_chunk)[0],1):
    print(i)
    output_chunk[i,0:48,0:52,0:40] = test_chunk[i]

outputData = comm.gather(output_chunk,root=0)


if rank == 0:
    outputData = np.concatenate(outputData,axis = 0)

Running this gives me the error:

  File "test_4d.py", line 23, in <module>
    outputData = comm.gather(output_chunk,root=0)
  File "Comm.pyx", line 869, in mpi4py.MPI.Comm.gather (src/mpi4py.MPI.c:73266)
  File "pickled.pxi", line 614, in mpi4py.MPI.PyMPI_gather (src/mpi4py.MPI.c:33592)
  File "pickled.pxi", line 146, in mpi4py.MPI._p_Pickle.allocv (src/mpi4py.MPI.c:28517)
  File "pickled.pxi", line 95, in mpi4py.MPI._p_Pickle.alloc (src/mpi4py.MPI.c:27832)
SystemError: Negative size passed to PyString_FromStringAndSize

This error seems to result from the large size of the numpy arrays being collected by gather; since scatter and gather send the arrays as a list of arrays, it appears easy to exceed the list size. One suggestion I have come across is to use comm.Scatter and comm.Gather. However, I am struggling to find clear documentation for these functions and so far have been unable to successfully implement them. For example:

replacing

outputData = comm.gather(output_chunk,root=0)

with the line

outputData=comm.Gather(sendbuf[test_chunks,MPI.DOUBLE],recvbuf=output_chunk,MPI.DOUBLE],root=0)

gives the error:

  File "Comm.pyx", line 415, in mpi4py.MPI.Comm.Gather (src/mpi4py.MPI.c:66916)
  File "message.pxi", line 426, in mpi4py.MPI._p_msg_cco.for_gather (src/mpi4py.MPI.c:23559)
  File "message.pxi", line 355, in mpi4py.MPI._p_msg_cco.for_cco_send (src/mpi4py.MPI.c:22959)
  File "message.pxi", line 111, in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:20516)
  File "message.pxi", line 51, in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:19644)
  File "asbuffer.pxi", line 108, in mpi4py.MPI.getbuffer (src/mpi4py.MPI.c:6757)
  File "asbuffer.pxi", line 50, in mpi4py.MPI.PyObject_GetBufferEx (src/mpi4py.MPI.c:6093)
TypeError: expected a readable buffer object

or with the line:

outputData = comm.Gather(sendbuf=test_chunks, recvbuf=output_chunk,root=0)

gives the error:

  File "test_4d_2.py", line 24, in <module>
    outputData = comm.Gather(sendbuf=test_chunks, recvbuf=output_chunk,root=0)
  File "Comm.pyx", line 415, in mpi4py.MPI.Comm.Gather (src/mpi4py.MPI.c:66916)
  File "message.pxi", line 426, in mpi4py.MPI._p_msg_cco.for_gather (src/mpi4py.MPI.c:23559)
  File "message.pxi", line 355, in mpi4py.MPI._p_msg_cco.for_cco_send (src/mpi4py.MPI.c:22959)
  File "message.pxi", line 111, in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:20516)
  File "message.pxi", line 60, in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:19747)
TypeError: unhashable type: 'numpy.ndarray'

Furthermore, the input matrix, test may also increase in size, which could cause similar problems for comm.scatter. Aside from the problems I already have with comm.Gather, I am not sure how to set up comm.Scatter, since recvbuf is defined based on the size of test_chunk, which is the output of comm.scatter, so hence I can't specify recvbuf within comm.Scatter.

Answer

218 picture 218 · Mar 18, 2016

The solution is to use comm.Scatterv and comm.Gatherv which send and receive the data as a block of memory, rather than a list of numpy arrays, getting around the data size issue. comm.Scatterv and comm.Gatherv assume a block of data in C-order (row-major) in memory and it is necessary to specify two vectors, sendcounts and displacements. Sendcounts gives the integer value (index) for the positions at which to split the input data (i.e. the starting point of each vector to send to a given core), while displacements gives the length of that vector. Hence it is possible to vary the amount of data sent to each core. More details can be found here: http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html

An example using comm.Scatterv and comm.Gatherv for a 2D matrix is given here: Along what axis does mpi4py Scatterv function split a numpy array?