I'm struggling to understand how BranchPythonOperator in Airflow works. I know it's primarily used for branching, but am confused by the documentation as to what to pass into a task and what I need to pass/expect from the task upstream.
Given the simple example in the documentation on this page what would the source code look like for the upstream task called run_this_first
and the 2 downstream ones that are branched? How exactly does Airflow know to run branch_a
instead of branch_b
? Where does the upstream task's` output get noticed/read?
Your BranchPythonOperator is created with a python_callable
, which will be a function. That function shall return, based on your business logic, the task name of the immediately downstream tasks that you have connected. This could be 1 to N tasks immediately downstream. There is nothing that the downstream tasks HAVE to read, however you could pass them metadata using xcom.
def decide_which_path():
if something is True:
return "branch_a"
else:
return "branch_b"
branch_task = BranchPythonOperator(
task_id='run_this_first',
python_callable=decide_which_path,
trigger_rule="all_done",
dag=dag)
branch_task.set_downstream(branch_a)
branch_task.set_downstream(branch_b)
It's important to set the trigger_rule
or all of the rest will be skipped, as the default is all_success
.