How do I trigger Airflow -dag using TriggerDagRunOperator

Mika Heino picture Mika Heino · Aug 8, 2017 · Viewed 13.7k times · Source

I have found following link:

https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

which indeed explains how to use TriggerDagRunOperator to execute separate Airflow dag. The documentation uses Airflow's own example dags, but I have a hard time understanding those as they are not using any sensors.

Can somebody explain how do I start separate dag using TriggerDagRunOperator and SqlSensor? I'm trying to start separate DAG when my SQL Server job task is finished. I know how to check the status of the SQL Server job by using SqlSensor, but I don't know how to attach the result into TriggerDagRunOperator to start separate DAG.

I don't want to use the Airflow CLI or do the both tasks in one DAG. Basically, I want this to be trigger dag only.

Below is my current code, which is missing the crucial conditionally_trigger

# File Name: check-when-db1-sql-task-is-done

from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from airflow.operators import SqlSensor
from datetime import datetime


default_args = {
        'owner': 'airflow',
        'retry_delay': timedelta(minutes=5),
}

dag = DAG('check-when-db1-sql-task-is-done',
        description='Check-when-DB1-SQL-task-is-done',
        default_args=default_args,
        schedule_interval='@once',
        start_date=datetime.now(),
        )

# returns-0-or-1-based-on-job-task-status
sqlsensor = SqlSensor (
        task_id='sql-sensor',
        poke_interval=30,
        timeout=3200,
        sql="""select last_run_outcome from msdb.dbo.sysjobsteps where job_id = '249A5A5D-6AFC-4D6B-8CB1-27C16724A450' and step_id = '1' and last_run_date = (select convert(varchar(24),getdate(),112)); """,    
        mssql_conn_id='db1',
        dag=dag,
        )

# dag-to-start
trigger = TriggerDagRunOperator (
        task_id='start-ssh-job',
        trigger_dag_id="qa-knime-ssh-task",
        python_callable=conditionally_trigger,
        params={'condition_param': True,
                'message': 'Hello World'},
        dag=dag)

Answer

7yl4r picture 7yl4r · Jan 8, 2018

My understanding is that TriggerDagRunOperator is for when you want to use a python function to determine whether or not to trigger the SubDag. That function is called conditionally_trigger in your code and the examples.

In your case you are using a sensor to control the flow and do not need to pass a function. You could use a SubDagOperator instead of TriggerDagRunOperator or pass a simple always-true function as the python_callable:

...
python_callable=lambda(context, dag_run_obj):dag_run_obj,
...