Python+Celery: Chaining jobs?

David Wolever picture David Wolever · Oct 10, 2010 · Viewed 23.5k times · Source

The Celery documentation suggests that it's a bad idea to have tasks wait on the results of other tasks… But the suggested solution (see “good” heading) leaves a something to be desired. Specifically, there's no clear way of getting the subtask's result back to the caller (also, it's kind of ugly).

So, is there any way of “chaining” jobs, so the caller gets the result of the final job? Eg, to use the add example:

>>> add3 = add.subtask(args=(3, ))
>>> add.delay(1, 2, callback=add3).get()
6

Alternately, is it OK to return instances of Result? For example:

@task
def add(x, y, callback=None):
    result = x + y
    if callback:
        return subtask(callback).delay(result)
    return result

This would let the result of the “final” job in the chain could be retrived with a simple:

result = add(1, 2, callback=add3).delay()
while isinstance(result, Result):
    result = result.get()
print "result:", result

Answer

ax003d picture ax003d · Aug 20, 2012

You can do it with a celery chain. See https://celery.readthedocs.org/en/latest/userguide/canvas.html#chains

@task()
def add(a, b):
    time.sleep(5) # simulate long time processing
    return a + b

Chaining job:

# import chain from celery import chain
# the result of the first add job will be 
# the first argument of the second add job
ret = chain(add.s(1, 2), add.s(3)).apply_async()

# another way to express a chain using pipes
ret2 = (add.s(1, 2) | add.s(3)).apply_async()

...

# check ret status to get result
if ret.status == u'SUCCESS':
    print "result:", ret.get()