Extremely slow S3 write times from EMR/ Spark

jspooner picture jspooner · Mar 16, 2017 · Viewed 20.8k times · Source

I'm writing to see if anyone knows how to speed up S3 write times from Spark running in EMR?

My Spark Job takes over 4 hours to complete, however the cluster is only under load during the first 1.5 hours.

enter image description here

I was curious into what Spark was doing all this time. I looked at the logs and I found many s3 mv commands, one for each file. Then taking a look directly at S3 I see all my files are in a _temporary directory.

Secondary, I'm concerned with my cluster cost, it appears I need to buy 2 hours of compute for this specific task. However, I end up buying unto 5 hours. I'm curious if EMR AutoScaling can help with cost in this situation.

Some articles discuss changing the file output committer algorithm but I've had little success with that.

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

Writing to the local HDFS is quick. I'm curious if issuing a hadoop command to copy the data to S3 would be faster?

enter image description here

Answer

Tal Joffe picture Tal Joffe · Mar 16, 2017

What you are seeing is a problem with outputcommitter and s3. the commit job applies fs.rename on the _temporary folder and since S3 does not support rename it means that a single request is now copying and deleting all the files from _temporary to its final destination..

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2") only works with hadoop version > 2.7. what it does is to copy each file from _temporary on commit task and not commit job so it is distributed and works pretty fast.

If you use older version of hadoop I would use Spark 1.6 and use:

sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

*note that it does not work with specualtion turned on or writing in append mode

**also note that it is deprecated in Spark 2.0 (replaced by algorithm.version=2)

BTW in my team we actually write with Spark to HDFS and use DISTCP jobs (specifically s3-dist-cp) in production to copy the files to S3 but this is done for several other reasons (consistency, fault tolerance) so it is not necessary.. you can write to S3 pretty fast using what I suggested.