Airflow S3KeySensor - How to make it continue running

Kyle Bridenstine picture Kyle Bridenstine · May 29, 2018 · Viewed 11.1k times · Source

With the help of this Stackoverflow post I just made a program (the one shown in the post) where when a file is placed inside an S3 bucket a task in one of my running DAGs is triggered and then I perform some work using the BashOperator. Once it's done though the DAG is no longer in a running state but instead goes into a success state and if I want to have it pick up another file I need to clear all the 'Past', 'Future', 'Upstream', 'Downstream' activity. I would like to make this program so that it's always running and anytime a new file is placed inside the S3 bucket the program kicks off the tasks.

Can I continue using the S3KeySenor to do this or do I need to figure out a way of setting up an External Trigger to run my DAG? As of now my S3KeySensor is pretty pointless if it's only going to ever run once.

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 5, 29),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')

# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
    task_id='create_emr_cluster_1',
    bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
    retries=1,
    dag=dag)

t1 = BashOperator(
    task_id='success_log',
    bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='new_s3_file_in_foobar-bucket',
    bucket_key='*',
    wildcard_match=True,
    bucket_name='foobar-bucket',
    s3_conn_id='s3://foobar-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)
t2.set_upstream(t1)

I'm wondering if this is not possible because it then wouldn't be a Directed Acyclic Graph but rather it would have a loop that repeated sensor -> t1 -> t2 -> sensor -> t1 -> t2 -> sensor -> ... keep repeating.

Update:

My use case is pretty simple, anytime a new file is placed inside a designated AWS S3 Bucket I want my DAG to be triggered and start my process of various tasks. The tasks will do things like instantiate a new AWS EMR Cluster, extract the files from the AWS S3 Bucket, perform some AWS EMR Activities, then shut down the AWS EMR Cluster. From there the DAG would go back into a waiting state where it would wait for new files to arrive in the AWS S3 Bucket and then repeat the process indefinitely.

Answer

Taylor Edmiston picture Taylor Edmiston · May 29, 2018

Within Airflow, there isn't a concept that maps to an always running DAG. You could have a DAG run very frequently like every 1 to 5 minutes if that suits your use case.

The main thing here is that the S3KeySensor checks until it detects that the first file exists in the key's wildcard path (or timeout), then it runs. But when a second, or third, or fourth file lands, the S3 sensor will have already completed running for that DAG run. It won't get scheduled to run again until the next DAG run. (The looping idea you described is roughly equivalent to what the scheduler does when it creates DAG runs except not forever.)

An external trigger definitely sounds like the best approach for your use case, whether that trigger comes via the Airflow CLI's trigger_dag command ($ airflow trigger_dag ...):

https://github.com/apache/incubator-airflow/blob/972086aeba4616843005b25210ba3b2596963d57/airflow/bin/cli.py#L206-L222

Or via the REST API:

https://github.com/apache/incubator-airflow/blob/5de22d7fa0d8bc6b9267ea13579b5ac5f62c8bb5/airflow/www/api/experimental/endpoints.py#L41-L89

Both turn around and call the trigger_dag function in the common (experimental) API:

https://github.com/apache/incubator-airflow/blob/089c996fbd9ecb0014dbefedff232e8699ce6283/airflow/api/common/experimental/trigger_dag.py#L28-L67

You could, for instance, setup an AWS Lambda function, called when a file lands on S3, that runs the trigger DAG call.