Redis - Better way of cleaning the processing queue(reliable) while using BRPOPLPUSH

aspdeepak picture aspdeepak · Jan 16, 2015 · Viewed 7.6k times · Source

Our Current Design

Env Redis 2.8.17

We have implemented our reliable queue, using the pattern similar to the one described in redis documentation, under RPOPLPUSH

However, we are using BRPOPLPUSH considering its blocking nature, and LPUSH for ensuring the FIFO order.

Producers: multiple threads(from multiple servers) using LPUSH to push the items.

Consumers: multiple threads(from multiple servers) using BRPOPLPUSH to process the items.

BRPOPLPUSH q processing-q

As documented, redis pops the item from queue 'q', while adding them in 'processing-q'.

Problem

Owing to the multi-threaded(async) nature of our application, we don't have any control over, when the consumers will be completing their processing.

So, if we use LREM(as per documentation) to remove the processed element from processing-q, this will only remove the top element of the processing-q. Where as it has no guarantee, on whether it has removed the actual element, which was processed by the respective consumer.

So if we don't do anything the processing-q keeps on growing(eating-up memory), which is very bad IMHO.

Any suggestions or ideas ?

Answer

perilandmishap picture perilandmishap · Jan 12, 2016

You just need to include the job you want to delete in your call to LREM.

LREM takes the form:

LREM queue count "object"

It will remove count items equal to "object" from queue. So to remove the specific job your consumer thread is working on you'd do something like this.

LREM processing-q 1 "job_identifier"

For more see the documentation here: http://redis.io/commands/lrem

Then to handle crashed consumers and abandoned jobs you can use SETEX to create locks with an expiration and periodically check for jobs without locks.

So the whole process looks like this:

Producer

  1. RPUSH q "job_identifier"

Consumer

  1. SETEX lock:processing-q:job_identifier 60 (Set lock first to avoid race condition)
  2. BRPOPLPUSH q processing-queue
  3. Process job
  4. LREM processing-queue "job_identifier"

Expired Jobs Monitor

  1. jobs = LRANGE processing-queue 0 -1
  2. foreach job in jobs : lock = GET lock:processing-q:job_identifier
  3. if lock is null this job timed out, so remove from processing-q LREM processing-queue "job_identifier"
  4. and retry with RPUSH q "job_identifier"

@NotAUser has published an open source java implementation, here: https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq