How to send periodic tasks to specific queue in Celery

Artem Mezhenin picture Artem Mezhenin · Jun 6, 2013 · Viewed 17.1k times · Source

By default Celery send all tasks to 'celery' queue, but you can change this behavior by adding extra parameter:

@task(queue='celery_periodic')
def recalc_last_hour():
    log.debug('sending new task')
    recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example

Scheduler settings:

CELERYBEAT_SCHEDULE = {
   'installer_recalc_hour': {
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15  # every 15 sec for test
    },
}
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"

Run worker:

python manage.py celery worker -c 1 -Q celery_periodic -B -E

This scheme doesn't work as expected: this workers sends periodic tasks to 'celery' queue, not 'celery_periodic'. How can I fix that?

P.S. celery==3.0.16

Answer

abhi shukla picture abhi shukla · Jan 9, 2015

Periodic are sent to queues by celerybeat.You can do every thing we do with Celery api. Here is the list of configurations comes with celerybeat.

http://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#available-fields

In your case

CELERYBEAT_SCHEDULE = {
   'installer_recalc_hour': {
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15  # every 15 sec for test,
        'options': {'queue' : 'celery_periodic'} ##options are mapped to apply_async options
    },
}