I have found following link:
https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand
which indeed explains how to use TriggerDagRunOperator
to execute separate Airflow dag. The documentation uses Airflow's own example dags, but I have a hard time understanding those as they are not using any sensors.
Can somebody explain how do I start separate dag using TriggerDagRunOperator
and SqlSensor
? I'm trying to start separate DAG when my SQL Server job task is finished. I know how to check the status of the SQL Server job by using SqlSensor
, but I don't know how to attach the result into TriggerDagRunOperator
to start separate DAG.
I don't want to use the Airflow CLI or do the both tasks in one DAG. Basically, I want this to be trigger dag only.
Below is my current code, which is missing the crucial conditionally_trigger
# File Name: check-when-db1-sql-task-is-done
from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from airflow.operators import SqlSensor
from datetime import datetime
default_args = {
'owner': 'airflow',
'retry_delay': timedelta(minutes=5),
}
dag = DAG('check-when-db1-sql-task-is-done',
description='Check-when-DB1-SQL-task-is-done',
default_args=default_args,
schedule_interval='@once',
start_date=datetime.now(),
)
# returns-0-or-1-based-on-job-task-status
sqlsensor = SqlSensor (
task_id='sql-sensor',
poke_interval=30,
timeout=3200,
sql="""select last_run_outcome from msdb.dbo.sysjobsteps where job_id = '249A5A5D-6AFC-4D6B-8CB1-27C16724A450' and step_id = '1' and last_run_date = (select convert(varchar(24),getdate(),112)); """,
mssql_conn_id='db1',
dag=dag,
)
# dag-to-start
trigger = TriggerDagRunOperator (
task_id='start-ssh-job',
trigger_dag_id="qa-knime-ssh-task",
python_callable=conditionally_trigger,
params={'condition_param': True,
'message': 'Hello World'},
dag=dag)
My understanding is that TriggerDagRunOperator
is for when you want to use a python function to determine whether or not to trigger the SubDag. That function is called conditionally_trigger
in your code and the examples.
In your case you are using a sensor to control the flow and do not need to pass a function. You could use a SubDagOperator
instead of TriggerDagRunOperator
or pass a simple always-true function as the python_callable
:
...
python_callable=lambda(context, dag_run_obj):dag_run_obj,
...