I have a spark streaming application that writes parquet data from stream.
sqlContext.sql(
"""
|select
|to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
|hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
|*
|from events
| where at >= 1473667200
""".stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)
this piece of code runs every hour but over time the writing to parquet has slowed down. When we started it took 15 mins to write data, now it takes 40 mins. It is taking time propotional to data existing in that path. I tried running the same application to a new location and that runs fast.
I have disabled schemaMerge and summary metadata:
sparkConf.set("spark.sql.hive.convertMetastoreParquet.mergeSchema","false")
sparkConf.set("parquet.enable.summary-metadata","false")
using spark 2.0
I've encountered this issue. The append mode is probably the culprit, in that finding the append location takes more and more time as the size of your parquet file grows.
One workaround I've found that solves this is to change the output path regularly. Merging and reordering the data from all the output dataframes is then usually not an issue.
def appendix: String = ((time.milliseconds - timeOrigin) / (3600 * 1000)).toString
df.write.mode(SaveMode.Append).format("parquet").save(s"${outputPath}-H$appendix")