Make custom Airflow macros expand other macros

mxxk picture mxxk · Jul 1, 2017 · Viewed 12k times · Source

Is there any way to make a user-defined macro in Airflow which is itself computed from other macros?

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': '{{ dag.following_schedule(execution_date) }}',
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date }}"',
    dag=dag,
)

The use case here is to back-port the new Airflow v1.8 next_execution_date macro to work in Airflow v1.7. Unfortunately, this template is rendered without macro expansion:

$ airflow render simple bash_op 2017-08-09 21:00:00
    # ----------------------------------------------------------
    # property: bash_command
    # ----------------------------------------------------------
    echo "{{ dag.following_schedule(execution_date) }}"

Answer

Géraud picture Géraud · Nov 29, 2017

Here are some solutions:

1. Override BashOperator to add some values to the context

class NextExecutionDateAwareBashOperator(BashOperator):
    def render_template(self, attr, content, context):
        dag = context['dag']
        execution_date = context['execution_date']
        context['next_execution_date'] = dag.following_schedule(execution_date)

        return super().render_templates(attr, content, context)
        # or in python 2:
        # return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context)

The good part with this approach: you can capture some repeated code in your custom operator.

The bad part: you have to write a custom operator to add values to the context, before templated fields are rendered.

2. Do your computation in a user defined macro

Macros are not necessarily values. They can be functions.

In your dag :

def compute_next_execution_date(dag, execution_date):
    return dag.following_schedule(execution_date)

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': compute_next_execution_date,
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
    dag=dag,
)

The good part: you can define reusable functions to process values available at runtime (XCom values, job instance properties, task instance properties, etc...), and make your function result available to render a template.

The bad part (but not that annoying): you have to import such a function as a user defined macro in every dag where needed.

3. Call your statement directly in your template

This solution is the simplest (as mentioned by Ardan's answer), and probably the good one in your case.

BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
    dag=dag,
)

Ideal for simple calls like this one. And they are some other objects directly available as macros (like task, task_instance, etc...); even some standard modules are available (like macros.time, ...).