Celery stop execution of a chain

Salami picture Salami · Jul 4, 2013 · Viewed 8.7k times · Source

I have a check_orders task that's executed periodically. It makes a group of tasks so that I can time how long executing the tasks took, and perform something when they're all done (this is the purpose of res.join [1] and grouped_subs) The tasks that are grouped are pairs of chained tasks.

What I want is for when the first task doesn't meet a condition (fails) don't execute the second task in the chain. I can't figure this out for the life of me and I feel this is pretty basic functionality for a job queue manager. When I try the things I have commented out after [2] (raising exceptions, removing callbacks)... we get stuck on the join() in check_orders for some reason (it breaks the group). I've tried setting ignore_result to False as well for all these tasks but it still doesn't work.

@task(ignore_result=True)
def check_orders():
    # check all the orders and send out appropriate notifications
    grouped_subs = []

    for thingy in things:
       ...

        grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )), 
                        notify.subtask((args_sub_2, ), immutable=True)))

    res = group(grouped_subs).apply_async()

    res.join()         #[1]
    logger.info('Done checking orders at %s' % current_task.request.id))

@task(ignore_result=True)
def is_room_open(args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        # [2]
        # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
        # None of the following things work:
        # is_room_open.update_state(state='FAILURE')
        # raise celery.exceptions.Ignore()
        # raise Exception('spam', 'eggs')
        # current_task.request.callbacks[:] = []

@task(ignore_result=True)
def notify(args_sub_2):
    # something else time consuming, only do this if the first part of the chain 
    # passed a test (the chained tasks before this were 'successful'
    notify_user(args_sub_2)

Answer

AntonioMO picture AntonioMO · Jan 14, 2014

In my opinion this is a common use-case that doesn't get enough love in the documentation.

Assuming you want to abort a chain mid-way while still reporting SUCCESS as status of the completed tasks, and not sending any error log or whatnot (else you can just raise an exception) then a way to accomplish this is:

@app.task(bind=True)  # Note that we need bind=True for self to work
def task1(self, other_args):
    #do_stuff
    if end_chain:
        self.request.callbacks = None
        return
    #Other stuff to do if end_chain is False

So in your example:

@app.task(ignore_result=True, bind=True)
def is_room_open(self, args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        self.request.callbacks = None

Will work. Note that instead of ignore_result=True and subtask() you can use the shortcut .si() as stated by @abbasov-alexander

Edited to work with EAGER mode, as suggested by @PhilipGarnero in the comments.