How to handle changing parquet schema in Apache Spark

V. Samma picture V. Samma · Dec 2, 2016 · Viewed 22.3k times · Source

I have run into a problem where I have Parquet data as daily chunks in S3 (in the form of s3://bucketName/prefix/YYYY/MM/DD/) but I cannot read the data in AWS EMR Spark from different dates because some column types do not match and I get one of many exceptions, for example:

java.lang.ClassCastException: optional binary element (UTF8) is not a group

appears when in some files there's an array type which has a value but the same column may have null value in other files which are then inferred as String types.

or

org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal):
org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true)

I have raw data in S3 in JSON format and my initial plan was to create an automatic job, which starts an EMR cluster, reads in the JSON data for the previous date and simply writes it as parquet back to S3.

The JSON data is also divided into dates, i.e. keys have date prefixes. Reading JSON works fine. Schema is inferred from the data no matter how much data is currently being read.

But the problem rises when parquet files are written. As I understand, when I write parquet with metadata files, these files contain the schema for all parts/partitions of the parquet files. Which, to me it seems, can also be with different schemas. When I disable writing metadata, Spark was said to infer the whole schema from the first file within the given Parquet path and presume it stays the same through other files.

When some columns, which should be double type, have only integer values for a given day, reading in them from JSON (which has these numbers as integers, without floating points) makes Spark think it is a column with type long. Even if I can cast these columns to double before writing the Parquet files, this still is not good as the schema might change, new columns can be added, and tracking this is impossible.

I have seen some people have the same problems but I have yet to find a good enough solution.

What are the best practices or solutions for this?

Answer

stevel picture stevel · Dec 2, 2016

These are the options I use for writing parquet to S3; turning off schema merging boosts writeback performance -it may also address your problem

val PARQUET_OPTIONS = Map(
 "spark.sql.parquet.mergeSchema" -> "false",
 "spark.sql.parquet.filterPushdown" -> "true")