How to skip tasks on Airflow?

Maayan picture Maayan · Sep 5, 2018 · Viewed 16k times · Source

I'm trying to understand whether Airflow supports skipping tasks in a DAG for ad-hoc executions?

Lets say my DAG graph look like this: task1 > task2 > task3 > task4

And I would like to start my DAG manually from task3, what is the best way of doing that?

I've read about ShortCircuitOperator, but I'm looking for more ad-hoc solution which can apply once the execution is triggered.

Thanks!

Answer

Ben Gregory picture Ben Gregory · Sep 5, 2018

You can incorporate the SkipMixin that the ShortCircuitOperator uses under the hood to skip downstream tasks.

from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults


class mySkippingOperator(BaseOperator, SkipMixin)
    
    @apply_defaults
    def __init__(self,
                 condition,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.condition = condition
    
    def execute(self, context):

        if self.condition:
           self.log.info('Proceeding with downstream tasks...')
           return

        self.log.info('Skipping downstream tasks...')

        downstream_tasks = context['task'].get_flat_relatives(upstream=False)
       
        self.log.debug("Downstream task_ids %s", downstream_tasks)

        if downstream_tasks:
            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

        self.log.info("Done.")