How to monitor events from workers in a Celery-Django application?

Clara picture Clara · Mar 1, 2013 · Viewed 13.8k times · Source

According to the celery tutorial regarding real-time monitoring of celery workers, one can also programmatically capture the events produced by the workers and take action accordingly.

My question is how can I integrate a monitor as the one in this example, in a Celery-Django application?

EDIT: The code example in the tutorial looks like:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        task_id = event['uuid']

        print('TASK FAILED: %s[%s] %s' % (
            event['name'], task_id, state[task_id].info(), ))
    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                'worker-heartbeat': announce_dead_workers,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    celery = Celery(broker='amqp://guest@localhost//')
    my_monitor(celery)

So I want to capture task_failed event sent by the worker, and to get its task_id like the tutorial shows, to get the result for this task from the result-backend that was configured for my application and process it further. My problem is that it is not obvious to me how to get the application, as in a django-celery project it is not transparent to me the instantiation of Celery library.

I am also open to any other idea as to how to process the results when a worker has finished executing a task.

Answer

Clara picture Clara · Mar 19, 2013

Ok, I found a way of doing this, though I am not sure that this is the solution, but it works for me. The monitor function basically connects directly to the broker and listens to different types of events. My code looks like this:

from celery.events import EventReceiver
from kombu import Connection as BrokerConnection

def my_monitor:
    connection = BrokerConnection('amqp://guest:guest@localhost:5672//')

    def on_event(event):
        print "EVENT HAPPENED: ", event

    def on_task_failed(event):
        exception = event['exception']
        print "TASK FAILED!", event, " EXCEPTION: ", exception

    while True:
        try:
            with connection as conn:
                recv = EventReceiver(conn,
                                 handlers={'task-failed' : on_task_failed,
                                           'task-succeeded' : on_event,
                                           'task-sent' : on_event,
                                           'task-received' : on_event,
                                           'task-revoked' : on_event,
                                           'task-started' : on_event,
                                           # OR: '*' : on_event
                                           })
            recv.capture(limit=None, timeout=None)
    except (KeyboardInterrupt, SystemExit):
        print "EXCEPTION KEYBOARD INTERRUPT"
        sys.exit()

This is all. And I run this in a different process than the normal application, meaning that I create a child process of my celery application which only runs this function. HTH