Streaming in / chunking csv's from S3 to Python

Ajjit Narayanan picture Ajjit Narayanan · Jun 28, 2018 · Viewed 7.1k times · Source

I intend to perform some memory intensive operations on a very large csv file stored in S3 using Python with the intention of moving the script to AWS Lambda. I know I can read in the whole csv nto memory, but I will definitely run into Lambda's memory and storage limits with such a large filem is there any way to stream in or just read in chunks of a csv at a time into Python using boto3/botocore, ideally by spefifying row numbers to read in?

Here are some things I've already tried:

1) using the range parameter in S3.get_object to specify the range of bytes to read in. Unfortunately this means the last rows get cut off in the middle since there's no ways to specify the number of rows to read in. There are some messy workarounds like scanning for the last newline character, recording the index, and then using that as the starting point for the next bytes range, but I'd like to avoid this clunky solution if possible.

2) Using S3 select to write sql queries to selectively retrieve data from S3 buckets. Unfortunately the row_numbers SQL function isn't supported and it doesn't look like there's a way to read in a a subset of rows.

Answer

Kirk Broadhurst picture Kirk Broadhurst · Jul 2, 2018

Assuming your file isn't compressed, this should involve reading from a stream and splitting on the newline character. Read a chunk of data, find the last instance of the newline character in that chunk, split and process.

s3 = boto3.client('s3')
body = s3.get_object(Bucket=bucket, Key=key)['Body']

# number of bytes to read per chunk
chunk_size = 1000000

# the character that we'll split the data with (bytes, not string)
newline = '\n'.encode()   
partial_chunk = b''

while (True):
    chunk = partial_chunk + body.read(chunk_size)

    # If nothing was read there is nothing to process
    if chunk == b'':
        break

    last_newline = chunk.rfind(newline)

    # write to a smaller file, or work against some piece of data
    result = chunk[0:last_newline+1].decode('utf-8')

    # keep the partial line you've read here
    partial_chunk = chunk[last_newline+1:]

If you have gzipped files, then you need to use BytesIO and the GzipFile class inside the loop; it's a harder problem because you need to retain the Gzip compression details.