How to read file chunk by chunk from S3 using aws-java-sdk

Sky picture Sky · Jun 6, 2017 · Viewed 10.5k times · Source

I am trying to read large file into chunks from S3 without cutting any line for parallel processing.

Let me explain by example: There is file of size 1G on S3. I want to divide this file into chucks of 64 MB. It is easy I can do it like :

S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));

InputStream stream = s3object.getObjectContent();

byte[] content = new byte[64*1024*1024];

while (stream.read(content)  != -1) {

//process content here 

}

but problem with chunk is it may have 100 complete line and one incomplete. but I can not process incomplete line and don't want to discard it.

Is any way to handle this situations ? means all chucks have no partial line.

Answer

Stephen Harrison picture Stephen Harrison · Nov 19, 2017

My usual approach (InputStream -> BufferedReader.lines() -> batches of lines -> CompletableFuture) won't work here because the underlying S3ObjectInputStream times out eventually for huge files.

So I created a new class S3InputStream, which doesn't care how long it's open for and reads byte blocks on demand using short-lived AWS SDK calls. You provide a byte[] that will be reused. new byte[1 << 24] (16Mb) appears to work well.

package org.harrison;

import java.io.IOException;
import java.io.InputStream;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;

/**
 * An {@link InputStream} for S3 files that does not care how big the file is.
 *
 * @author stephen harrison
 */
public class S3InputStream extends InputStream {
    private static class LazyHolder {
        private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
    }

    private final String bucket;
    private final String file;
    private final byte[] buffer;
    private long lastByteOffset;

    private long offset = 0;
    private int next = 0;
    private int length = 0;

    public S3InputStream(final String bucket, final String file, final byte[] buffer) {
        this.bucket = bucket;
        this.file = file;
        this.buffer = buffer;
        this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1;
    }

    @Override
    public int read() throws IOException {
        if (next >= length) {
            fill();

            if (length <= 0) {
                return -1;
            }

            next = 0;
        }

        if (next >= length) {
            return -1;
        }

        return buffer[this.next++];
    }

    public void fill() throws IOException {
        if (offset >= lastByteOffset) {
            length = -1;
        } else {
            try (final InputStream inputStream = s3Object()) {
                length = 0;
                int b;

                while ((b = inputStream.read()) != -1) {
                    buffer[length++] = (byte) b;
                }

                if (length > 0) {
                    offset += length;
                }
            }
        }
    }

    private InputStream s3Object() {
        final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset,
                offset + buffer.length - 1);

        return LazyHolder.S3.getObject(request).getObjectContent();
    }
}