Previously I was reading all the messages present in the queue, but now I have to return specific amount of message based of users choice(count).
I try to change the for loop accordingly but its reading all the message because of auto acknowledge. So I tried changing it to manual in config file.
In my program how to ack message manually after reading msg(currently i am using AmqpTemplate to receive and i don't have reference of channel)?
Properties properties = admin.getQueueProperties("queue_name");
if(null != properties)
{
Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());
while(messageCount > 0)
{
Message msg = amqpTemplate.receive(queue_name);
String value = new String(msg.getBody());
valueList.add(value);
messageCount--;
}
}
Any help is highly appreciable, Thanks in advance.
You cannot manually ack with the receive()
method - use a SimpleMessageListenerContainer
for an event-driven consumer with MANUAL acks and a ChannelAwareMessageListener
. Or, use the template's execute()
method which gives you access to the Channel
- but then you will be using the lower-level RabbitMQ API, not the Message
abstraction.
EDIT:
You need to learn the underlying RabbitMQ Java API to use execute, but something like this will work...
final int messageCount = 3;
boolean result = template.execute(new ChannelCallback<Boolean>() {
@Override
public Boolean doInRabbit(final Channel channel) throws Exception {
int n = messageCount;
channel.basicQos(messageCount); // prefetch
long deliveryTag = 0;
while (n > 0) {
GetResponse result = channel.basicGet("si.test.queue", false);
if (result != null) {
System.out.println(new String(result.getBody()));
deliveryTag = result.getEnvelope().getDeliveryTag();
n--;
}
else {
Thread.sleep(1000);
}
}
if (deliveryTag > 0) {
channel.basicAck(deliveryTag, true);
}
return true;
}
});