Airflow dynamic DAG and Task Ids

Dean Sha picture Dean Sha · Aug 24, 2016 · Viewed 22.5k times · Source

I mostly see Airflow being used for ETL/Bid data related jobs. I'm trying to use it for business workflows wherein a user action triggers a set of dependent tasks in future. Some of these tasks may need to be cleared (deleted) based on certain other user actions. I thought the best way to handle this would be via dynamic task ids. I read that Airflow supports dynamic dag ids. So, I created a simple python script that takes DAG id and task id as command line parameters. However, I'm running into problems making it work. It gives dag_id not found error. Has anyone tried this? Here's the code for the script (call it tmp.py) which I execute on command line as python (python tmp.py 820 2016-08-24T22:50:00 ):

from __future__ import print_function
import os
import sys
import shutil
from datetime import date, datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
execution = '2016-08-24T22:20:00'
if len(sys.argv) > 2  :
   dagid =  sys.argv[1]
   taskid = 'Activate' + sys.argv[1]
   execution = sys.argv[2]
else:
   dagid = 'DAGObjectId'
   taskid = 'Activate'
default_args = {'owner' : 'airflow', 'depends_on_past': False, 'start_date':date.today(), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1}
dag = DAG(dag_id = dagid,
       default_args=default_args,
       schedule_interval='@once',
      )
 globals()[dagid] = dag
task1 = BashOperator(
    task_id = taskid,
    bash_command='ls -l',
    dag=dag)

fakeTask = BashOperator(
    task_id = 'fakeTask',
    bash_command='sleep 5',
    retries = 3,
    dag=dag)
task1.set_upstream(fakeTask)

airflowcmd = "airflow run " + dagid + " " + taskid + "  " + execution
print("airflowcmd = " + airflowcmd)
os.system(airflowcmd)

Answer

Dean Sha picture Dean Sha · Sep 23, 2016

After numerous trials and errors, I was able to figure this out. Hopefully, it will help someone. Here's how it works: You need to have an iterator or an external source (file/database table) to generate dags/task dynamically through a template. You can keep the dag and task names static, just assign them ids dynamically in order to differentiate one dag from the other. You put this python script in the dags folder. When you start the airflow scheduler, it runs through this script on every heartbeat and writes the DAGs to the dag table in the database. If a dag (unique dag id) has already been written, it will simply skip it. The scheduler also look at the schedule of individual DAGs to determine which one is ready for execution. If a DAG is ready for execution, it executes it and updates its status. Here's a sample code:

from airflow.operators import PythonOperator
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import sys
import time

dagid   = 'DA' + str(int(time.time()))
taskid  = 'TA' + str(int(time.time()))

input_file = '/home/directory/airflow/textfile_for_dagids_and_schedule'

def my_sleeping_function(random_base):
    '''This is a function that will run within the DAG execution'''
    time.sleep(random_base)

def_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.now(), 'email_on_failure': False,                
    'retries': 1, 'retry_delay': timedelta(minutes=2)
}
with open(input_file,'r') as f:
    for line in f:
        args = line.strip().split(',')
    if len(args) < 6:
        continue
    dagid = 'DAA' + args[0]
    taskid = 'TAA' + args[0]
    yyyy    = int(args[1])
    mm      = int(args[2])
    dd      = int(args[3])
    hh      = int(args[4])
    mins    = int(args[5])
    ss      = int(args[6])
    dag = DAG(
        dag_id=dagid, default_args=def_args,
        schedule_interval='@once', start_date=datetime(yyyy,mm,dd,hh,mins,ss)
        )

    myBashTask = BashOperator(
        task_id=taskid,
        bash_command='python /home/directory/airflow/sendemail.py',
        dag=dag)

    task2id = taskid + '-X'

    task_sleep = PythonOperator(
        task_id=task2id,
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': 10},
        dag=dag)

    task_sleep.set_upstream(myBashTask)

f.close()