Celery chaining tasks sequentially

psychok7 picture psychok7 · Mar 5, 2013 · Viewed 13.6k times · Source

i need to download a file through ftp, change it and upload it back. I am using celery to do this but i am running into problems when trying to use chaining, where i am getting :

TypeError: upload_ftp_image() takes exactly 5 arguments (6 given)

Also, can i use chains and be assured that the steps will be sequential? if not what is the alternative?

res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async()
print res.get()

Tasks:

@task()
def download_ftp_image(ftp_server, username , password , filename, directory):
    try:
        ftp = FTP(ftp_server)
        ftp.login(username, password)
        if not os.path.exists(directory):
            os.makedirs(directory)
            ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
        else:
            ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
        ftp.quit()
    except error_perm, resp:
        raise download_ftp_image.retry(countdown=15)

    return "SUCCESS: "  

@task()
def upload_ftp_image(ftp_server, username , password , file , directory):
    try:
        ftp = FTP(ftp_server)
        ftp.login(username, password)
        new_file= file.replace(directory, "")
        directory = directory.replace("tmp","")
        try:
            ftp.storbinary("STOR " + directory + new_file , open(file, "rb"))
        except:
            ftp.mkd(directory)
            ftp.storbinary("STOR " + directory + new_file, open(file, "rb"))
        ftp.quit()
    except error_perm, resp:
        raise upload_ftp_image.retry(countdown=15)

    return "SUCCESS: "

and is this a good or a bad practice for my specific case? :

result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
result.get()
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
#result.get()

Answer

mpaf picture mpaf · Mar 6, 2013

Another option if you don't want the return value of the previous task to be used as an argument, is to use 'immutability'.

http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability

Instead of defining your subtasks as:

download_ftp_image.s(...) and upload_ftp_image.s(...)

define them as:

download_ftp_image.si(...) and upload_ftp_image.si(...)

And you can now use the tasks with the usual number of arguments in a chain.