How to selectively delete messages from an AMQP (RabbitMQ) queue?

Wojciech Kaczmarek picture Wojciech Kaczmarek · Aug 8, 2010 · Viewed 31.8k times · Source

I'd like to selectively delete messages from an AMQP queue without even reading them.

The scenario is as follows:

Sending side wants to expire messages of type X based on a fact that new information of type X arrived. Because it's very probable that the subscriber didn't consume latest message of type X yet, publisher should just delete previous X-type messages and put a newest one into the queue. The whole operation should be transparent to the subscriber - in fact he should use something as simple as STOMP to get the messages.

How to do it using AMQP? Or maybe it's more convenient in another messaging protocol?

I'd like to avoid a complicated infrastructure. The whole messaging needed is as simple as above: one queue, one subscriber, one publisher, but the publisher must have an ability to ad-hoc deleting the messages for a given criteria.

The publisher client will use Ruby but actually I'd deal with any language as soon as I discover how to do it in the protocol.

Answer

Michael Dillon picture Michael Dillon · Jun 1, 2011

You do not want a message queue, you want a key-value database. For instance you could use Redis or Tokyo Tyrant to get a simple network-accessible key-value database. Or just use a memcache.

Each message type is a key. When you write a new message with the same key, it overwrites the previous value so the reader of this database will never be able to get out of date information.

At this point, you only need a message queue to establish the order in which keys should be read, if that is important. Otherwise, just continually scan the database. If you do continually scan the database, it is best to put the database near the readers to reduce network traffic.

I would probably do something like this key: typecode value: lastUpdated, important data

Then I would send messages that contain typecode, lastUpdated That way the reader can compare lastupdated for that key to the one that they last read from the database and skip reading it because they are already up to date.

If you really need to do this with AMQP, then use RabbitMQ and a custom exchange type, specifically a Last Value Cache Exchange. Example code is here https://github.com/squaremo/rabbitmq-lvc-plugin