Scalable delayed task execution with Redis

Alexander Gladysh picture Alexander Gladysh · Jun 3, 2012 · Viewed 16.8k times · Source

I need to design a Redis-driven scalable task scheduling system.

Requirements:

  • Multiple worker processes.
  • Many tasks, but long periods of idleness are possible.
  • Reasonable timing precision.
  • Minimal resource waste when idle.
  • Should use synchronous Redis API.
  • Should work for Redis 2.4 (i.e. no features from upcoming 2.6).
  • Should not use other means of RPC than Redis.

Pseudo-API: schedule_task(timestamp, task_data). Timestamp is in integer seconds.

Basic idea:

  • Listen for upcoming tasks on list.
  • Put tasks to buckets per timestamp.
  • Sleep until the closest timestamp.
  • If a new task appears with timestamp less than closest one, wake up.
  • Process all upcoming tasks with timestamp ≤ now, in batches (assuming that task execution is fast).
  • Make sure that concurrent worker wouldn't process same tasks. At the same time, make sure that no tasks are lost if we crash while processing them.

So far I can't figure out how to fit this in Redis primitives...

Any clues?

Note that there is a similar old question: Delayed execution / scheduling with Redis? In this new question I introduce more details (most importantly, many workers). So far I was not able to figure out how to apply old answers here — thus, a new question.

Answer

Dan Benamy picture Dan Benamy · Feb 22, 2013

Here's another solution that builds on a couple of others [1]. It uses the redis WATCH command to remove the race condition without using lua in redis 2.6.

The basic scheme is:

  • Use a redis zset for scheduled tasks and redis queues for ready to run tasks.
  • Have a dispatcher poll the zset and move tasks that are ready to run into the redis queues. You may want more than 1 dispatcher for redundancy but you probably don't need or want many.
  • Have as many workers as you want which do blocking pops on the redis queues.

I haven't tested it :-)

The foo job creator would do:

def schedule_task(queue, data, delay_secs):
    # This calculation for run_at isn't great- it won't deal well with daylight
    # savings changes, leap seconds, and other time anomalies. Improvements
    # welcome :-)
    run_at = time.time() + delay_secs

    # If you're using redis-py's Redis class and not StrictRedis, swap run_at &
    # the dict.
    redis.zadd(SCHEDULED_ZSET_KEY, run_at, {'queue': queue, 'data': data})

schedule_task('foo_queue', foo_data, 60)

The dispatcher(s) would look like:

while working:
    redis.watch(SCHEDULED_ZSET_KEY)
    min_score = 0
    max_score = time.time()
    results = redis.zrangebyscore(
        SCHEDULED_ZSET_KEY, min_score, max_score, start=0, num=1, withscores=False)
    if results is None or len(results) == 0:
        redis.unwatch()
        sleep(1)
    else: # len(results) == 1
        redis.multi()
        redis.rpush(results[0]['queue'], results[0]['data'])
        redis.zrem(SCHEDULED_ZSET_KEY, results[0])
        redis.exec()

The foo worker would look like:

while working:
    task_data = redis.blpop('foo_queue', POP_TIMEOUT)
    if task_data:
        foo(task_data)

[1] This solution is based on not_a_golfer's, one at http://www.saltycrane.com/blog/2011/11/unique-python-redis-based-queue-delay/, and the redis docs for transactions.