I am trying to come up with a testing methodology for our django-celery project. I have read the notes in the documentation, but it didn't give me a good idea of what to actually do. I am not worried about testing the tasks in the actual daemons, just the functionality of my code. Mainly I am wondering:
task.delay()
during the test (I tried setting CELERY_ALWAYS_EAGER = True
but it made no difference)?manage.py test
or do we have to use a custom runner?Overall any hints or tips for testing with celery would be very helpful.
I like to use the override_settings decorator on tests which need celery results to complete.
from django.test import TestCase
from django.test.utils import override_settings
from myapp.tasks import mytask
class AddTestCase(TestCase):
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_ALWAYS_EAGER=True,
BROKER_BACKEND='memory')
def test_mytask(self):
result = mytask.delay()
self.assertTrue(result.successful())
If you want to apply this to all tests you can use the celery test runner as described at http://docs.celeryproject.org/en/2.5/django/unit-testing.html which basically sets these same settings except (BROKER_BACKEND = 'memory'
).
In settings:
TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
Look at the source for CeleryTestSuiteRunner and it's pretty clear what's happening.