Python celery - how to wait for all subtasks in chord

dynamicmindset picture dynamicmindset · Jul 5, 2016 · Viewed 8.6k times · Source

I am unit testing celery tasks. I have chain tasks that also have groups, so a chord is resulted.

The test should look like:

  • run celery task ( delay )
  • wait for task and all subtasks
  • assert

I tried the following:

def wait_for_result(result):
    result.get()
    for child in result.children or list():
        if isinstance(child, GroupResult):
           # tried looping over task result in group
           # until tasks are ready, but without success 
           pass
        wait_for_result(child)

This creates a deadlock, chord_unlock being retried forever. I am not interested in task results. How can I wait for all the subtasks to finish?

Answer

Nitheesh A S picture Nitheesh A S · Oct 20, 2016

Although this is an old question, I just wanted to share how I got rid of the deadlock issue, just in case it helps somebody.

Like the celery logs says, never use get() inside a task. This indeed will create a deadlock.

I have a similar set of celery tasks which includes chain of group tasks, hence making it a chord. I'm calling these tasks using tornado, by making HTTP request. So what I did was something like this:

@task
def someFunction():
    ....


@task
def someTask():
    ....


@task
def celeryTask():
    groupTask = group([someFunction.s(i) for i in range(10)])

    job = (groupTask| someTask.s())

    return job

When celeryTask() is being called by tornado, the chain will start executing, & the UUID of someTask() will be held in job. It will look something like

AsyncResult: 765b29a8-7873-4b28-b05c-7e19c33e950c

This UUID is returned and the celeryTask() exits before even the chain starts executing(ideally), hence leaving space for another process to run.

I then used the tornado layer to check the status of the task. Details on the tornado layer can be found in this stackoverflow question