Airflow default on_failure_callback

Pierre CORBEL picture Pierre CORBEL · Jul 25, 2017 · Viewed 17k times · Source

In my DAG file, I have define a on_failure_callback() function to post a Slack in case of failure.

It works well if I specify for each operator in my DAG : on_failure_callback=on_failure_callback()

Is there a way to automate (via default_args for instance, or via my DAG object) the dispatch to all of my operators?

Answer

Pierre CORBEL picture Pierre CORBEL · Sep 19, 2017

I finally found a way to do that.

You can pass your on_failure_callback as a default_args

class Foo:
  @staticmethod
  def get_default_args():
      """
      Return default args
      :return: default_args
      """

      default_args = {
          'on_failure_callback': Foo.on_failure_callback
      }

      return default_args

  @staticmethod
  def on_failure_callback(context):
     """
     Define the callback to post on Slack if a failure is detected in the Workflow
     :return: operator.execute
     """

     operator = SlackAPIPostOperator(
         task_id='failure',
         text=str(context['task_instance']),
         token=Variable.get("slack_access_token"),
         channel=Variable.get("slack_channel")
     )

     return operator.execute(context=context)