I cant seem to find a decent example that shows how can I consume an AWS Kinesis stream via Python. Can someone please provide me with some examples I could look into?
Best
you should use boto.kinesis:
from boto import kinesis
After you created a stream:
step 1: connect to aws kinesis:
auth = {"aws_access_key_id":"id", "aws_secret_access_key":"key"}
connection = kinesis.connect_to_region('us-east-1',**auth)
step 2: get the stream info (like how many shards, if it is active ..)
tries = 0
while tries < 10:
tries += 1
time.sleep(1)
try:
response = connection.describe_stream('stream_name')
if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
break
except :
logger.error('error while trying to describe kinesis stream : %s')
else:
raise TimeoutError('Stream is still not active, aborting...')
step 3 : get all shard ids, and for each shared id get the shard iterator:
shard_ids = []
stream_name = None
if response and 'StreamDescription' in response:
stream_name = response['StreamDescription']['StreamName']
for shard_id in response['StreamDescription']['Shards']:
shard_id = shard_id['ShardId']
shard_iterator = connection.get_shard_iterator(stream_name, shard_id, shard_iterator_type)
shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : shard_iterator['ShardIterator'] })
step 4 : read the data for each shard
limit is the limit of records that you want to receive. (you can receive up to 10 MB) shard_iterator is the shared from previous step.
tries = 0
result = []
while tries < 100:
tries += 1
response = connection.get_records(shard_iterator = shard_iterator , limit = limit)
shard_iterator = response['NextShardIterator']
if len(response['Records'])> 0:
for res in response['Records']:
result.append(res['Data'])
return result , shard_iterator
in your next call to get_records, you should use the shard_iterator that you received with the result of the previous get_records.
note: in one call to get_records, (limit = None) you can receive empty records. if calling to get_records with a limit, you will get the records that are in the same partition key (when you put data in to stream, you have to use partition key :
connection.put_record(stream_name, data, partition_key)