AWS Glue to Redshift: Is it possible to replace, update or delete data?

krchun picture krchun · Sep 14, 2017 · Viewed 22.2k times · Source

Here are some bullet points in terms of how I have things setup:

  • I have CSV files uploaded to S3 and a Glue crawler setup to create the table and schema.
  • I have a Glue job setup that writes the data from the Glue table to our Amazon Redshift database using a JDBC connection. The Job also is in charge of mapping the columns and creating the redshift table.

By re-running a job, I am getting duplicate rows in redshift (as expected). However, is there way to replace or delete rows before inserting the new data, using a key or the partitions setup in glue?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields

from pyspark.sql.functions import lit

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

columnMapping = [
    ("id", "int", "id", "int"),
    ("name", "string", "name", "string"),
]

datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1")
resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1")
dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1")
df1 = dropnullfields1.toDF()
data1 = df1.withColumn('platform', lit('test'))
data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1")

## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")

job.commit()

Answer

Matthijs picture Matthijs · Sep 20, 2017

Job bookmarks are the key. Just edit the job and enable "Job bookmarks" and it won't process already processed data. Note that the job has to rerun once before it will detect it does not have to reprocess the old data again.

For more info see: http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

The name "bookmark" is a bit far fetched in my opinion. I would have never looked at it if I did not coincidentally stumble upon it during my search.