Here's my setup:
In my settings.py file I have
BROKER_BACKEND = "djkombu.transport.DatabaseTransport"
i.e. I'm just using the database to queue tasks.
Now on to my problem: I have a user-initiated task that could take a few minutes to complete. I want the task to only run once per user, and I will cache the results of the task in a temporary file so if the user initiates the task again I just return the cached file. I have code that looks like this in my view function:
task_id = "long-task-%d" % user_id
result = tasks.some_long_task.AsyncResult(task_id)
if result.state == celery.states.PENDING:
# The next line makes a duplicate task if the user rapidly refreshes the page
tasks.some_long_task.apply_async(task_id=task_id)
return HttpResponse("Task started...")
elif result.state == celery.states.STARTED:
return HttpResponse("Task is still running, please wait...")
elif result.state == celery.states.SUCCESS:
if cached_file_still_exists():
return get_cached_file()
else:
result.forget()
tasks.some_long_task.apply_async(task_id=task_id)
return HttpResponse("Task started...")
This code almost works. But I'm running into a problem when the user rapidly reloads the page. There's a 1-3 second delay between when the task is queued and when the task is finally pulled off the queue and given to a worker. During this time, the task's state remains PENDING which causes the view logic to kick off a duplicate task.
What I need is some way to tell if the task has already been submitted to the queue so I don't end up submitting it twice. Is there a standard way of doing this in celery?
I solved this with Redis. Just set a key in redis for each task and then remove the key from redis in task's after_return method. Redis is lightweight and fast.