I am attempting to download a whole ftp directory in parallel.
#!/usr/bin/python
import sys
import datetime
import os
from multiprocessing import Process, Pool
from ftplib import FTP
curYear=""
remotePath =""
localPath = ""
def downloadFiles (remotePath,localPath):
splitted = remotePath.split('/');
host= splitted[2]
path='/'+'/'.join(splitted[3:])
ftp = FTP(host)
ftp.login()
ftp.cwd(path)
filenames = ftp.nlst()
total=len(filenames)
i=0
pool = Pool()
for filename in filenames:
local_filename = os.path.join(localPath,filename)
pool.apply_async(downloadFile, (filename,local_filename,ftp))
#downloadFile(filename,local_filename,ftp);
i=i+1
pool.close()
pool.join()
ftp.close()
def downloadFile(filename,local_filename,ftp):
file = open(local_filename, 'wb')
ftp.retrbinary('RETR '+ filename, file.write)
file.close()
def getYearFromArgs():
if len(sys.argv) >= 2 and sys.argv[1] == "Y":
year = sys.argv[2]
del sys.argv[1:2]
else:
year = str(datetime.datetime.now().year)
return year
def assignGlobals():
global p
global remotePath
global localPath
global URL
global host
global user
global password
global sqldb
remotePath = 'ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/isd-lite/%s/' % (curYear)
localPath = '/home/isd-lite/%s/' % (curYear)
def main():
global curYear
curYear=getYearFromArgs()
assignGlobals()
downloadFiles(remotePath,localPath)
if __name__ == "__main__":
main()
But I get this exception:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
TypeError: expected string or Unicode object, NoneType found
If I comment out this line:
pool.apply_async(downloadFile, (filename,local_filename,ftp)
and remove the comment on this line:
downloadFile(filename,local_filename,ftp);
Then it works just fine but it is slow and not multithreaded.
Update, May 9, 2014:
I have determined the precise limitation. It is possible to send objects across process boundaries to worker processes as long as the objects can be pickled by Python's pickle facility. The problem which I described in my original answer occurred because I was trying to send a file handle to the workers. A quick experiment demonstrates why this doesn't work:
>>> f = open("/dev/null")
>>> import pickle
>>> pickle.dumps(f)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File "/usr/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle file objects
Thus, if you're encountering the Python error which led you to find this Stack Overflow question, make sure all the things you're sending across process boundaries can be pickled.
Original answer:
I'm a bit late to answering. However, I ran into the same error message as the original poster while trying to use Python's multiprocessing module. I'll record my findings so that anyone else who stumbles upon this thread has something to try.
In my case, the error occurred because of what I was trying to send to the pool of workers: I was trying to pass an array of file objects for the pool workers to chew on. That's apparently too much to send across process boundaries in Python. I solved the problem by sending the pool workers dictionaries which specified input and output filename strings.
So it seems that the iterable that you supply to the function such as apply_async
(I used map()
and imap_unordered()
) can contain a list of numbers or strings, or even a detailed dictionary data structure (as long as the values aren't objects).
In your case:
pool.apply_async(downloadFile, (filename,local_filename,ftp))
ftp
is an object, which might be causing the problem. As a workaround, I would recommend sending the parameters to the worker (looks like host
and path
in this case) and let the worker instantiate the object and deal with the cleanup.