I am unit testing celery tasks. I have chain tasks that also have groups, so a chord is resulted.
The test should look like:
I tried the following:
def wait_for_result(result):
result.get()
for child in result.children or list():
if isinstance(child, GroupResult):
# tried looping over task result in group
# until tasks are ready, but without success
pass
wait_for_result(child)
This creates a deadlock, chord_unlock being retried forever. I am not interested in task results. How can I wait for all the subtasks to finish?
Although this is an old question, I just wanted to share how I got rid of the deadlock issue, just in case it helps somebody.
Like the celery logs says, never use get()
inside a task. This indeed will create a deadlock.
I have a similar set of celery tasks which includes chain of group tasks, hence making it a chord. I'm calling these tasks using tornado, by making HTTP request. So what I did was something like this:
@task
def someFunction():
....
@task
def someTask():
....
@task
def celeryTask():
groupTask = group([someFunction.s(i) for i in range(10)])
job = (groupTask| someTask.s())
return job
When celeryTask()
is being called by tornado, the chain will start executing, & the UUID of someTask()
will be held in job
. It will look something like
AsyncResult: 765b29a8-7873-4b28-b05c-7e19c33e950c
This UUID is returned and the celeryTask()
exits before even the chain starts executing(ideally), hence leaving space for another process to run.
I then used the tornado layer to check the status of the task. Details on the tornado layer can be found in this stackoverflow question