Airflow: pass {{ ds }} as param to PostgresOperator

Omar14 picture Omar14 · Jun 13, 2017 · Viewed 9.6k times · Source

i would like to use execution date as parameter to my sql file:

i tried

dt = '{{ ds }}'

s3_to_redshift = PostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    params={'file': dt},
    dag=dag
)

but it doesn't work.

Answer

jhnclvr picture jhnclvr · Jun 14, 2017

dt = '{{ ds }}'

Doesn't work because Jinja (the templating engine used within airflow) does not process the entire Dag definition file.

For each Operator there are fields which Jinja will process, which are part of the definition of the operator itself.

In this case, you can make the params field (which is actually called parameters, make sure to change this) templated if you extend the PostgresOperator like this:

class MyPostgresOperator(PostgresOperator):
    template_fields = ('sql','parameters')

Now you should be able to do:

s3_to_redshift = MyPostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    parameters={'file': '{{ ds }}'},
    dag=dag
)