Env Redis 2.8.17
We have implemented our reliable queue, using the pattern similar to the one described in redis documentation, under RPOPLPUSH
However, we are using BRPOPLPUSH considering its blocking nature, and LPUSH for ensuring the FIFO order.
Producers: multiple threads(from multiple servers) using LPUSH to push the items.
Consumers: multiple threads(from multiple servers) using BRPOPLPUSH to process the items.
BRPOPLPUSH q processing-q
As documented, redis pops the item from queue 'q', while adding them in 'processing-q'.
Owing to the multi-threaded(async) nature of our application, we don't have any control over, when the consumers will be completing their processing.
So, if we use LREM(as per documentation) to remove the processed element from processing-q, this will only remove the top element of the processing-q. Where as it has no guarantee, on whether it has removed the actual element, which was processed by the respective consumer.
So if we don't do anything the processing-q keeps on growing(eating-up memory), which is very bad IMHO.
Any suggestions or ideas ?
You just need to include the job you want to delete in your call to LREM.
LREM takes the form:
LREM queue count "object"
It will remove count items equal to "object" from queue. So to remove the specific job your consumer thread is working on you'd do something like this.
LREM processing-q 1 "job_identifier"
For more see the documentation here: http://redis.io/commands/lrem
Then to handle crashed consumers and abandoned jobs you can use SETEX to create locks with an expiration and periodically check for jobs without locks.
So the whole process looks like this:
Producer
RPUSH q "job_identifier"
Consumer
SETEX lock:processing-q:job_identifier 60
(Set lock first to avoid race condition)BRPOPLPUSH q processing-queue
LREM processing-queue "job_identifier"
Expired Jobs Monitor
LRANGE processing-queue 0 -1
GET lock:processing-q:job_identifier
LREM processing-queue "job_identifier"
RPUSH q "job_identifier"
@NotAUser has published an open source java implementation, here: https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq