I have a REST API written in Django, with and endpoint that queues a celery task when posting to it. The response contains the task id which I'd like to use to test that the task is created and get the result. So, I'd like to do something like:
def test_async_job():
response = self.client.post("/api/jobs/", some_test_data, format="json")
task_id = response.data['task_id']
result = my_task.AsyncResult(task_id).get()
self.assertEquals(result, ...)
I obviously don't want to have to run a celery worker to run the unit tests, I expect to mock it somehow. I can't use CELERY_ALWAYS_EAGER because that seems to bypass the broker altogether, preventing me to use AsyncResult to get the task by its id (as stated here).
Going through celery and kombu docs, I've found that there is an in-memory transport for unit tests, that would do what I'm looking for. I tried overriding the BROKER_URL
setting to use it on the tests:
@override_settings(BROKER_URL='memory://')
def test_async_job():
But the behavior is the same as with the ampq broker: it blocks the test waiting for the result. Any Idea how am I supposed to configure this broker to get it working in the tests?
You can specify the broker_backend in your settings :
if 'test' in sys.argv[1:]:
BROKER_BACKEND = 'memory'
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True
or you can override the settings with a decorator directly in your test
import unittest
from django.test.utils import override_settings
class MyTestCase(unittest.TestCase):
@override_settings(CELERY_TASK_EAGER_PROPAGATES=True,
CELERY_TASK_ALWAYS_EAGER=True,
BROKER_BACKEND='memory')
def test_mytask(self):
...