Airflow, mark a task success or skip it before dag run

Pablo picture Pablo · Jun 19, 2018 · Viewed 8.7k times · Source

We have a huge DAG, with many small and fast tasks and a few big and time consuming tasks.

We want to run just a part of the DAG, and the easiest way that we found is to not add the task that we don't want to run. The problem is that our DAG has many co-dependencies, so it became a real challenge to not broke the dag when we want to skip some tasks.

Its there a way to add a status to the task by default? (for every run), something like:

# get the skip list from a env variable    
task_list = models.Variable.get('list_of_tasks_to_skip')

dag.skip(task_list)

or

for task in task_list:
    task.status = 'success'

Answer

Trevor Edwards picture Trevor Edwards · Jun 20, 2018

As mentioned in the comments, you should use the BranchPythonOperator (or ShortCircuitOperator) to prevent the time-consuming tasks from executing. If you need downstream operators of these time-consuming tasks to run, you can use the TriggerRule.ALL_DONE to have those operators run, but note this will run even when the upstream operators fail.

You can use Airflow Variables to affect these BranchPythonOperators without having to update the DAG, eg:

from airflow.models import Variable

def branch_python_operator_callable()
  return Variable.get('time_consuming_operator_var')

and use branch_python_operator_callable as the Python callable for your BranchPythonOperator.