I just read RabbitMQ's Java API docs, and found it very informative and straight-forward. The example for how to set up a simple Channel
for publishing/consuming is very easy to follow and understand. But it's a very simple/basic example, and it left me with an important question: How can I set up 1+ Channels
to publish/consume to and from multiple queues?
Let's say I have a RabbitMQ server with 3 queues on it: logging
, security_events
and customer_orders
. So we'd either need a single Channel
to have the ability to publish/consume to all 3 queues, or more likely, have 3 separate Channels
, each dedicated to a single queue.
On top of this, RabbitMQ's best practices dictate that we set up 1 Channel
per consumer thread. For this example, let's say security_events
is fine with only 1 consumer thread, but logging
and customer_order
both need 5 threads to handle the volume. So, if I understand correctly, does that mean we need:
Channel
and 1 consumer thread for publishing/consuming to and from security_events
; andChannels
and 5 consumer threads for publishing/consuming to and from logging
; andChannels
and 5 consumer threads for publishing/consuming to and from customer_orders
?If my understanding is misguided here, please begin by correcting me. Either way, could some battle-weary RabbitMQ veteran help me "connect the dots" with a decent code example for setting up publishers/consumers that meet my requirements here? Thanks in advance!
I think you have several issues with initial understanding. Frankly, I'm a bit surprised to see the following: both need 5 threads to handle the volume
. How did you identify you need that exact number? Do you have any guarantees 5 threads will be enough?
RabbitMQ is tuned and time tested, so it is all about proper design and efficient message processing.
Let's try to review the problem and find a proper solution. BTW, message queue itself will not provide any guarantees you have really good solution. You have to understand what you are doing and also do some additional testing.
As you definitely know there are many layouts possible:
I will use layout B
as the simplest way to illustrate 1
producer N
consumers problem. Since you are so worried about the throughput. BTW, as you might expect RabbitMQ behaves quite well (source). Pay attention to prefetchCount
, I'll address it later:
So it is likely message processing logic is a right place to make sure you'll have enough throughput. Naturally you can span a new thread every time you need to process a message, but eventually such approach will kill your system. Basically, more threads you have bigger latency you'll get (you can check Amdahl's law if you want).
(see Amdahl’s law illustrated)
Tip #1: Be careful with threads, use ThreadPools (details)
A thread pool can be described as a collection of Runnable objects (work queue) and a connections of running threads. These threads are constantly running and are checking the work query for new work. If there is new work to be done they execute this Runnable. The Thread class itself provides a method, e.g. execute(Runnable r) to add a new Runnable object to the work queue.
public class Main {
private static final int NTHREDS = 10;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
for (int i = 0; i < 500; i++) {
Runnable worker = new MyRunnable(10000000L + i);
executor.execute(worker);
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
// Wait until all threads are finish
executor.awaitTermination();
System.out.println("Finished all threads");
}
}
Tip #2: Be careful with message processing overhead
I would say this is obvious optimization technique. It is likely you'll send small and easy to process messages. The whole approach is about smaller messages to be continuously set and processed. Big messages eventually will play a bad joke, so it is better to avoid that.
So it is better to send tiny pieces of information, but what about processing? There is an overhead every time you submit a job. Batch processing can be very helpful in case of high incoming message rate.
For example, let's say we have simple message processing logic and we do not want to have thread specific overheads every time message is being processed. In order to optimize that very simple CompositeRunnable can be introduced
:
class CompositeRunnable implements Runnable {
protected Queue<Runnable> queue = new LinkedList<>();
public void add(Runnable a) {
queue.add(a);
}
@Override
public void run() {
for(Runnable r: queue) {
r.run();
}
}
}
Or do the same in a slightly different way, by collecting messages to be processed:
class CompositeMessageWorker<T> implements Runnable {
protected Queue<T> queue = new LinkedList<>();
public void add(T message) {
queue.add(message);
}
@Override
public void run() {
for(T message: queue) {
// process a message
}
}
}
In such a way you can process messages more effectively.
Tip #3: Optimize message processing
Despite the fact you know can process messages in parallel (Tip #1
) and reduce processing overhead (Tip #2
) you have to do everything fast. Redundant processing steps, heavy loops and so on might affect performance a lot. Please see interesting case-study:
Improving Message Queue Throughput tenfold by choosing the right XML Parser
Tip #4: Connection and Channel Management
(source)
Please note, all tips are perfectly work together. Feel free to let me know if you need additional details.
Complete consumer example (source)
Please note the following:
prefetchCount
might be very useful:
This command allows a consumer to choose a prefetch window that specifies the amount of unacknowledged messages it is prepared to receive. By setting the prefetch count to a non-zero value, the broker will not deliver any messages to the consumer that would breach that limit. To move the window forwards, the consumer has to acknowledge the receipt of a message (or a group of messages).
Example:
static class Worker extends DefaultConsumer {
String name;
Channel channel;
String queue;
int processed;
ExecutorService executorService;
public Worker(int prefetch, ExecutorService threadExecutor,
, Channel c, String q) throws Exception {
super(c);
channel = c;
queue = q;
channel.basicQos(prefetch);
channel.basicConsume(queue, false, this);
executorService = threadExecutor;
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
Runnable task = new VariableLengthTask(this,
envelope.getDeliveryTag(),
channel);
executorService.submit(task);
}
}
You can also check the following: