I am using Python's multiprocessing
module to process large numpy arrays in parallel. The arrays are memory-mapped using numpy.load(mmap_mode='r')
in the master process. After that, multiprocessing.Pool()
forks the process (I presume).
Everything seems to work fine, except I am getting lines like:
AttributeError("'NoneType' object has no attribute 'tell'",)
in `<bound method memmap.__del__ of
memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)>`
ignored
in the unittest logs. The tests pass fine, nevertheless.
Any idea what's going on there?
Using Python 2.7.2, OS X, NumPy 1.6.1.
UPDATE:
After some debugging, I hunted down the cause to a code path that was using a (small slice of) this memory-mapped numpy array as input to a Pool.imap
call.
Apparently the "issue" is with the way multiprocessing.Pool.imap
passes its input to the new processes: it uses pickle. This doesn't work with mmap
ed numpy arrays, and something inside breaks which leads to the error.
I found this reply by Robert Kern which seems to address the same issue. He suggests creating a special code path for when the imap
input comes from a memory-mapped array: memory-mapping the same array manually in the spawned process.
This would be so complicated and ugly that I'd rather live with the error and the extra memory copies. Is there any other way that would be lighter on modifying existing code?
My usual approach (if you can live with extra memory copies) is to do all IO in one process and then send things out to a pool of worker threads. To load a slice of a memmapped array into memory just do x = np.array(data[yourslice])
(data[yourslice].copy()
doesn't actually do this, which can lead to some confusion.).
First off, let's generate some test data:
import numpy as np
np.random.random(10000).tofile('data.dat')
You can reproduce your errors with something like this:
import numpy as np
import multiprocessing
def main():
data = np.memmap('data.dat', dtype=np.float, mode='r')
pool = multiprocessing.Pool()
results = pool.imap(calculation, chunks(data))
results = np.fromiter(results, dtype=np.float)
def chunks(data, chunksize=100):
"""Overly-simple chunker..."""
intervals = range(0, data.size, chunksize) + [None]
for start, stop in zip(intervals[:-1], intervals[1:]):
yield data[start:stop]
def calculation(chunk):
"""Dummy calculation."""
return chunk.mean() - chunk.std()
if __name__ == '__main__':
main()
And if you just switch to yielding np.array(data[start:stop])
instead, you'll fix the problem:
import numpy as np
import multiprocessing
def main():
data = np.memmap('data.dat', dtype=np.float, mode='r')
pool = multiprocessing.Pool()
results = pool.imap(calculation, chunks(data))
results = np.fromiter(results, dtype=np.float)
def chunks(data, chunksize=100):
"""Overly-simple chunker..."""
intervals = range(0, data.size, chunksize) + [None]
for start, stop in zip(intervals[:-1], intervals[1:]):
yield np.array(data[start:stop])
def calculation(chunk):
"""Dummy calculation."""
return chunk.mean() - chunk.std()
if __name__ == '__main__':
main()
Of course, this does make an extra in-memory copy of each chunk.
In the long run, you'll probably find that it's easier to switch away from memmapped files and move to something like HDF. This especially true if your data is multidimensional. (I'd reccomend h5py
, but pyTables
is nice if your data is "table-like".)
Good luck, at any rate!