Celery Beat: Limit to single task instance at a time

erydo picture erydo · Jan 3, 2014 · Viewed 13.5k times · Source

I have celery beat and celery (four workers) to do some processing steps in bulk. One of those tasks is roughly along the lines of, "for each X that hasn't had a Y created, create a Y."

The task is run periodically at a semi-rapid rate (10sec). The task completes very quickly. There are other tasks going on as well.

I've run into the issue multiple times in which the beat tasks apparently become backlogged, and so the same task (from different beat times) are executed simultaneously, causing incorrectly duplicated work. It also appears that the tasks are executed out-of-order.

  1. Is it possible to limit celery beat to ensure only one outstanding instance of a task at a time? Is setting something like rate_limit=5 on the task the "correct" way to of doing this?

  2. Is it possible to ensure that beat tasks are executed in-order, e.g. instead of dispatching a task, beat adds it to a task chain?

  3. What's the best way of handling this, short of making those tasks themselves execute atomically and are safe to be executed concurrently? That was not a restriction I would have expected of beat tasks…

The task itself is defined naïvely:

@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
    # Do things in a database
    return

Here's an actual (cleaned) log:

  • [00:00.000] foocorp.tasks.add_y_to_xs sent. id->#1
  • [00:00.001] Received task: foocorp.tasks.add_y_to_xs[#1]
  • [00:10.009] foocorp.tasks.add_y_to_xs sent. id->#2
  • [00:20.024] foocorp.tasks.add_y_to_xs sent. id->#3
  • [00:26.747] Received task: foocorp.tasks.add_y_to_xs[#2]
  • [00:26.748] TaskPool: Apply #2
  • [00:26.752] Received task: foocorp.tasks.add_y_to_xs[#3]
  • [00:26.769] Task accepted: foocorp.tasks.add_y_to_xs[#2] pid:26528
  • [00:26.775] Task foocorp.tasks.add_y_to_xs[#2] succeeded in 0.0197986490093s: None
  • [00:26.806] TaskPool: Apply #1
  • [00:26.836] TaskPool: Apply #3
  • [01:30.020] Task accepted: foocorp.tasks.add_y_to_xs[#1] pid:26526
  • [01:30.053] Task accepted: foocorp.tasks.add_y_to_xs[#3] pid:26529
  • [01:30.055] foocorp.tasks.add_y_to_xs[#1]: Adding Y for X id #9725
  • [01:30.070] foocorp.tasks.add_y_to_xs[#3]: Adding Y for X id #9725
  • [01:30.074] Task foocorp.tasks.add_y_to_xs[#1] succeeded in 0.0594762689434s: None
  • [01:30.087] Task foocorp.tasks.add_y_to_xs[#3] succeeded in 0.0352867960464s: None

We're currently using Celery 3.1.4 with RabbitMQ as the transport.

EDIT Dan, here's what I came up with:

Dan, here's what I ended up using:

from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager


def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
    lock_fn = (func.pg_advisory_xact_lock_shared
               if shared else
               func.pg_advisory_xact_lock)
    if timeout:
        conn.execute(text('SET statement_timeout TO :timeout'),
                     timeout=timeout)
    try:
        conn.execute(select([lock_fn(lock_id)]))
    except DBAPIError:
        return False
    return True


def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
    lock_fn = (func.pg_try_advisory_xact_lock_shared
               if shared else
               func.pg_try_advisory_xact_lock)
    return conn.execute(select([lock_fn(lock_id)])).scalar()


class DatabaseLockFailed(Exception):
    pass


@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
    """
    Context manager which acquires a PSQL advisory transaction lock with a
    specified name.
    """
    lock_id = hash(name)

    with engine.begin() as conn, conn.begin():
        if block:
            locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
                                                  timeout)
        else:
            locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
        if not locked:
            raise DatabaseLockFailed()
        yield

And the celery task decorator (used only for periodic tasks):

from functools import wraps
from preo.extensions import db


def locked(name=None, block=True, timeout='1s'):
    """
    Using a PostgreSQL advisory transaction lock, only runs this task if the
    lock is available. Otherwise logs a message and returns `None`.
    """
    def with_task(fn):
        lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)

        @wraps(fn)
        def f(*args, **kwargs):
            try:
                with db_lock(db.engine, name=lock_id, block=block,
                             timeout=timeout):
                    return fn(*args, **kwargs)
            except DatabaseLockFailed:
                logger.error('Failed to get lock.')
                return None
        return f
    return with_task

Answer

quickes picture quickes · Feb 4, 2016
from functools import wraps
from celery import shared_task


def skip_if_running(f):
    task_name = f'{f.__module__}.{f.__name__}'

    @wraps(f)
    def wrapped(self, *args, **kwargs):
        workers = self.app.control.inspect().active()

        for worker, tasks in workers.items():
            for task in tasks:
                if (task_name == task['name'] and
                        tuple(args) == tuple(task['args']) and
                        kwargs == task['kwargs'] and
                        self.request.id != task['id']):
                    print(f'task {task_name} ({args}, {kwargs}) is running on {worker}, skipping')

                    return None

        return f(self, *args, **kwargs)

    return wrapped


@shared_task(bind=True)
@skip_if_running
def test_single_task(self):
    pass


test_single_task.delay()