I'm working on an application whose workflow is managed by passing messages in SQS, using boto.
My SQS queue is growing gradually, and I have no way to check how many elements it is supposed to contain.
Now I have a daemon that periodically polls the queue, and checks if i have a fixed-size set of elements. For example, consider the following "queue":
q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"]
Now I want to check if I have "msg1_comp1", "msg2_comp1" and "msg3_comp1" in the queue together at some point in time, but I don't know the size of the queue.
After looking through the API, it seems you can either get only 1 element, or a fixed number of elements in the queue, but not all:
>>> rs = q.get_messages()
>>> len(rs)
1
>>> rs = q.get_messages(10)
>>> len(rs)
10
A suggestion proposed in the answers would be to get for example 10 messages in a loop until I get nothing back, but messages in SQS have a visibility timeout, meaning that if I poll elements from the queue, they won't be really removed, they will only be invisible for a short period of time.
Is there a simple way to get all messages in the queue, without knowing how many there are?
I've been working with AWS SQS queues to provide instant notifications, so I need to be processing all of the messages in real time. The following code will help you to efficiently dequeue (all) messages and handle any errors when removing.
Note: to remove messages off the queue you need to delete them. I'm using the updated boto3 AWS python SDK, json library, and the following default values:
import boto3
import json
region_name = 'us-east-1'
queue_name = 'example-queue-12345'
max_queue_messages = 10
message_bodies = []
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>'
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>'
sqs = boto3.resource('sqs', region_name=region_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
queue = sqs.get_queue_by_name(QueueName=queue_name)
while True:
messages_to_delete = []
for message in queue.receive_messages(
MaxNumberOfMessages=max_queue_messages):
# process message body
body = json.loads(message.body)
message_bodies.append(body)
# add message to delete
messages_to_delete.append({
'Id': message.message_id,
'ReceiptHandle': message.receipt_handle
})
# if you don't receive any notifications the
# messages_to_delete list will be empty
if len(messages_to_delete) == 0:
break
# delete messages to remove them from SQS queue
# handle any errors
else:
delete_response = queue.delete_messages(
Entries=messages_to_delete)