Proper way to create dynamic workflows in Airflow

costrouc picture costrouc · Jan 7, 2017 · Viewed 35k times · Source


Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A? I have looked at subdags but it looks like it can only work with a static set of tasks that have to be determined at Dag creation.

Would dag triggers work? And if so could you please provide an example.

I have an issue where it is impossible to know the number of task B's that will be needed to calculate Task C until Task A has been completed. Each Task B.* will take several hours to compute and cannot be combined.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

Idea #1

I don't like this solution because I have to create a blocking ExternalTaskSensor and all the Task B.* will take between 2-24 hours to complete. So I do not consider this a viable solution. Surely there is an easier way? Or was Airflow not designed for this?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Edit 1:

As of now this question still does not have a great answer. I have been contacted by several people looking for a solution.


Oleg Yamin picture Oleg Yamin · Jan 13, 2017

Here is how I did it with a similar request without any subdags:

First create a method that returns whatever values you want

def values_function():
     return values

Next create method that will generate the jobs dynamically:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                bash_command=' {} {}'.format(dyn_value, number),

And then combine them:

push_func = PythonOperator(

complete = DummyOperator(

for i in values_function():
        push_func >> group(i) >> complete