How can I set up Celery to call a custom initialization function before running my tasks?

xelk picture xelk · Jan 25, 2010 · Viewed 10.2k times · Source

I have a Django project and I'm trying to use Celery to submit tasks for background processing ( http://ask.github.com/celery/introduction.html ). Celery integrates well with Django and I've been able to submit my custom tasks and get back results.

The only problem is that I can't find a sane way of performing custom initialization in the daemon process. I need to call an expensive function that loads a lot of memory before I start processing the tasks, and I can't afford to call that function every time.

Has anyone had this problem before? Any ideas how to work around it without modifying the Celery source code?

Thanks

Answer

asksol picture asksol · Jan 27, 2010

You can either write a custom loader, or use the signals.

Loaders have the on_task_init method, which is called when a task is about to be executed, and on_worker_init which is called by the celery+celerybeat main process.

Using signals is probably the easiest, the signals available are:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    Dispatched when a task is about to be executed by the worker (or locally if using apply/or if CELERY_ALWAYS_EAGER has been set).

  • task_postrun(task_id, task, args, kwargs, retval) Dispatched after a task has been executed in the same conditions as above.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    Called when a task is applied (not good for long-running operations)

Additional signals available in 0.9.x (current master branch on github):

  • worker_init()

    Called when celeryd has started (before the task is initialized, so if on a system supporting fork, any memory changes would be copied to the child worker processes).

  • worker_ready()

    Called when celeryd is able to receive tasks.

  • worker_shutdown()

    Called when celeryd is shutting down.

Here's an example precalculating something the first time a task is run in the process:

from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun

_precalc_table = {}

class PowersOfTwo(Task):

    def run(self, x):
        if x in _precalc_table:
            return _precalc_table[x]
        else:
            return x ** 2
tasks.register(PowersOfTwo)


def _precalc_numbers(**kwargs):
    if not _precalc_table: # it's empty, so haven't been generated yet
        for i in range(1024):
            _precalc_table[i] = i ** 2


# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

If you want the function to be run for all tasks, just skip the sender argument.