Using Spark to write a parquet file to s3 over s3a is very slow

Brutus35 picture Brutus35 · Apr 29, 2016 · Viewed 18.6k times · Source

I'm trying to write a parquet file out to Amazon S3 using Spark 1.6.1. The small parquet that I'm generating is ~2GB once written so it's not that much data. I'm trying to prove Spark out as a platform that I can use.

Basically what I'm going is setting up a star schema with dataframes, then I'm going to write those tables out to parquet. The data comes in from csv files provided by a vendor and I'm using Spark as an ETL platform. I currently have a 3 node cluster in ec2(r3.2xlarge) So 120GB of memory on the executors and 16 cores total.

The input files total about 22GB and I'm extracting about 2GB of that data for now. Eventually this will be many terabytes when I start loading the full dataset.

Here is my spark/scala pseudocode:

  def loadStage(): Unit = {
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
    var sqlCtx = new SQLContext(sc)


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")

    //Setup header table/df
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
    header.registerTempTable("header")
    sqlCtx.cacheTable("header")


    //Setup fact table/df
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
    val df = sqlCtx.createDataFrame(records, factSchema)
    df.registerTempTable("fact")

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")


    println(results.count())



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")


  }

The count takes about 2 minutes for 465884512 rows. The write to parquet takes 38 minutes

I understand that the coalesce does a shuffle to the driver which does the write.... but the amount of time it's taking is making me think I'm doing something seriously wrong. Without the coalesce, this still takes 15 minutes, which IMO is still too long and gives me a ton of small parquet files. I'd like to have one large file per day of data that I'll have. I have code to do the partitioning by a field value as well, and it is just as slow. I've also tried to output this to csv and that takes ~1 hour.

Also, I'm not really setting run time props when I'm submitting my job. My console stats for one job are:

  • Alive Workers: 2
  • Cores in use: 16 Total, 16 Used
  • Memory in use: 117.5 GB Total, 107.5 GB Used
  • Applications: 1 Running, 5 Completed
  • Drivers: 0 Running, 0 Completed
  • Status: ALIVE

Answer

David picture David · May 2, 2016

Spark defaults cause a large amount of (probably) unnecessary overhead during I/O operations, especially when writing to S3. This article discusses this more thoroughly, but there are 2 settings you'll want to consider changing.

  • Using the DirectParquetOutputCommitter. By default, Spark will save all of the data to a temporary folder then move those files afterwards. Using the DirectParquetOutputCommitter will save time by directly writting to the S3 output path

    • No longer available in Spark 2.0+
      • As stated in the jira ticket, the current solution is to
        1. Switch your code to using s3a and Hadoop 2.7.2+ ; it's better all round, gets better in Hadoop 2.8, and is the basis for s3guard
        2. Use the Hadoop FileOutputCommitter and set mapreduce.fileoutputcommitter.algorithm.version to 2

    -Schema merging is turned off by default as of Spark 1.5 Turn off Schema Merging. If schema merging is on, the driver node will scan all of the files to ensure a consistent schema. This is especially costly because it is not a distributed operation. Make sure this is turned off by doing

    val file = sqx.read.option("mergeSchema", "false").parquet(path)