Macros in the Airflow Python operator

n1r44 picture n1r44 · Jun 13, 2017 · Viewed 8.5k times · Source

Can I use macros with the PythonOperator? I tried following, but I was unable to get the macros rendered:

dag = DAG(
    'temp',
    default_args=default_args,
    description='temp dag',
    schedule_interval=timedelta(days=1))

def temp_def(a, b, **kwargs):
    print '{{ds}}'
    print '{{execution_date}}'
    print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs))

ds = '{{ ds }}'
mm = '{{ execution_date }}'

t1 = PythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    op_args=[mm , ds],
    provide_context=False,
    dag=dag)

Answer

jhnclvr picture jhnclvr · Jun 14, 2017

Macros only get processed for templated fields. To get Jinja to process this field, extend the PythonOperator with your own.

class MyPythonOperator(PythonOperator):
    template_fields = ('templates_dict','op_args')

I added 'templates_dict' to the template_fields because the PythonOperator itself has this field templated: PythonOperator

Now you should be able to use a macro within that field:

ds = '{{ ds }}'
mm = '{{ execution_date }}'

t1 = MyPythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    op_args=[mm , ds],
    provide_context=False,
    dag=dag)