DataFrame partitionBy to a single Parquet file (per partition)

Patrick McGloin picture Patrick McGloin · Jan 14, 2016 · Viewed 55.2k times · Source

I would like to repartition / coalesce my data so that it is saved into one Parquet file per partition. I would also like to use the Spark SQL partitionBy API. So I could do that like this:

df.coalesce(1)
    .write
    .partitionBy("entity", "year", "month", "day", "status")
    .mode(SaveMode.Append)
    .parquet(s"$location")

I've tested this and it doesn't seem to perform well. This is because there is only one partition to work on in the dataset and all the partitioning, compression and saving of files has to be done by one CPU core.

I could rewrite this to do the partitioning manually (using filter with the distinct partition values for example) before calling coalesce.

But is there a better way to do this using the standard Spark SQL API?

Answer

mortada picture mortada · Oct 23, 2016

I had the exact same problem and I found a way to do this using DataFrame.repartition(). The problem with using coalesce(1) is that your parallelism drops to 1, and it can be slow at best and error out at worst. Increasing that number doesn't help either -- if you do coalesce(10) you get more parallelism, but end up with 10 files per partition.

To get one file per partition without using coalesce(), use repartition() with the same columns you want the output to be partitioned by. So in your case, do this:

import spark.implicits._
df.repartition($"entity", $"year", $"month", $"day", $"status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")

Once I do that I get one parquet file per output partition, instead of multiple files.

I tested this in Python, but I assume in Scala it should be the same.