Consuming a kinesis stream in python

aliirz picture aliirz · Feb 28, 2014 · Viewed 16.5k times · Source

I cant seem to find a decent example that shows how can I consume an AWS Kinesis stream via Python. Can someone please provide me with some examples I could look into?

Best

Answer

Eyal Ch picture Eyal Ch · Mar 14, 2014

you should use boto.kinesis:

from boto import kinesis

After you created a stream:

step 1: connect to aws kinesis:

auth = {"aws_access_key_id":"id", "aws_secret_access_key":"key"}
connection = kinesis.connect_to_region('us-east-1',**auth)

step 2: get the stream info (like how many shards, if it is active ..)

tries = 0
while tries < 10:
    tries += 1
    time.sleep(1)
    try:
        response = connection.describe_stream('stream_name')   
        if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
            break 
    except :
        logger.error('error while trying to describe kinesis stream : %s')
else:
    raise TimeoutError('Stream is still not active, aborting...')

step 3 : get all shard ids, and for each shared id get the shard iterator:

shard_ids = []
stream_name = None 
if response and 'StreamDescription' in response:
    stream_name = response['StreamDescription']['StreamName']                   
    for shard_id in response['StreamDescription']['Shards']:
         shard_id = shard_id['ShardId']
         shard_iterator = connection.get_shard_iterator(stream_name, shard_id, shard_iterator_type)
         shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : shard_iterator['ShardIterator'] })

step 4 : read the data for each shard

limit is the limit of records that you want to receive. (you can receive up to 10 MB) shard_iterator is the shared from previous step.

tries = 0
result = []
while tries < 100:
     tries += 1
     response = connection.get_records(shard_iterator = shard_iterator , limit = limit)
     shard_iterator = response['NextShardIterator']
     if len(response['Records'])> 0:
          for res in response['Records']: 
               result.append(res['Data'])                  
          return result , shard_iterator

in your next call to get_records, you should use the shard_iterator that you received with the result of the previous get_records.

note: in one call to get_records, (limit = None) you can receive empty records. if calling to get_records with a limit, you will get the records that are in the same partition key (when you put data in to stream, you have to use partition key :

connection.put_record(stream_name, data, partition_key)