How to create a conditional task in Airflow

Alexis.Rolland picture Alexis.Rolland · Apr 28, 2017 · Viewed 35.6k times · Source

I would like to create a conditional task in Airflow as described in the schema below. The expected scenario is the following:

  • Task 1 executes
  • If Task 1 succeed, then execute Task 2a
  • Else If Task 1 fails, then execute Task 2b
  • Finally execute Task 3

Conditional Task All tasks above are SSHExecuteOperator. I'm guessing I should be using the ShortCircuitOperator and / or XCom to manage the condition but I am not clear on how to implement that. Could you please describe the solution?

Answer

villasv picture villasv · Oct 8, 2017

Airflow has a BranchPythonOperator that can be used to express the branching dependency more directly.

The docs describe its use:

The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id. The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.

...

If you want to skip some tasks, keep in mind that you can’t have an empty path, if so make a dummy task.

Code Example

def dummy_test():
    return 'branch_a'

A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)

branch_task = BranchPythonOperator(
    task_id='branching',
    python_callable=dummy_test,
    dag=dag,
)

branch_task >> A_task 
branch_task >> B_task

EDIT:

If you're installing an Airflow version >=1.10.3, you can also return a list of task ids, allowing you to skip multiple downstream paths in a single Operator and don't use a dummy task before joining.