How to partition data by datetime in AWS Glue?

user2642287 picture user2642287 · Aug 8, 2019 · Viewed 7.3k times · Source

The current set-up:

  • S3 location with json files. All files stored in the same location (no day/month/year structure).

  • Glue Crawler reads the data in a catalog table

  • Glue ETL job transforms and stores the data into parquet tables in s3
  • Glue Crawler reads from s3 parquet tables and stores into a new table that gets queried by Athena

What I want to achieve is the parquet tables to be partitioned by day (1) and the parquet tables for 1 day to be in the same file (2). Currently there is a parquet table for each json file.

How would I go about it?

One thing to mention, there is a datetime column in the data, but it's a unix epoch timestamp. I would probably need to convert that to a 'year/month/day' format, otherwise I'm assuming it will create a partition for each file again.

Thanks a lot for your help!!

Answer

Yuriy Bondaruk picture Yuriy Bondaruk · Aug 11, 2019

Convert Glue's DynamicFrame into Spark's DataFrame to add year/month/day columns and repartition. Reducing partitions to one will ensure that only one file will be written into a folder but it may slow down job performance.

Here is python code:

from pyspark.sql.functions import col,year,month,dayofmonth,to_date,from_unixtime

...

df = dynamicFrameSrc.toDF()

repartitioned_with_new_columns_df = df
    .withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))
    .withColumn(“year”, year(col(“date_col”)))
    .withColumn(“month”, month(col(“date_col”)))
    .withColumn(“day”, dayofmonth(col(“date_col”)))
    .drop(col(“date_col”))
    .repartition(1)

dyf = DynamicFrame.fromDF(repartitioned_with_new_columns_df, glueContext, "enriched")

datasink = glueContext.write_dynamic_frame.from_options(
    frame = dyf, 
    connection_type = "s3", 
    connection_options = {
        "path": "s3://yourbucket/data”, 
        "partitionKeys": [“year”, “month”, “day”]
    }, 
    format = “parquet”, 
    transformation_ctx = "datasink"
)

Note that the from pyspark.qsl.functions import col can give a reference error, this shouldn't be a prblem as explained here.