We have a huge DAG, with many small and fast tasks and a few big and time consuming tasks.
We want to run just a part of the DAG, and the easiest way that we found is to not add the task that we don't want to run. The problem is that our DAG has many co-dependencies, so it became a real challenge to not broke the dag when we want to skip some tasks.
Its there a way to add a status to the task by default? (for every run), something like:
# get the skip list from a env variable
task_list = models.Variable.get('list_of_tasks_to_skip')
dag.skip(task_list)
or
for task in task_list:
task.status = 'success'
As mentioned in the comments, you should use the BranchPythonOperator
(or ShortCircuitOperator
) to prevent the time-consuming tasks from executing. If you need downstream operators of these time-consuming tasks to run, you can use the TriggerRule.ALL_DONE
to have those operators run, but note this will run even when the upstream operators fail.
You can use Airflow Variables to affect these BranchPythonOperators
without having to update the DAG, eg:
from airflow.models import Variable
def branch_python_operator_callable()
return Variable.get('time_consuming_operator_var')
and use branch_python_operator_callable
as the Python callable for your BranchPythonOperator.