Why is a task stuck and not executed in airflow?

Tomas Jansson picture Tomas Jansson · May 4, 2018 · Viewed 19.4k times · Source

I'm trying out airflow with the BigQueryOperator. I thought I would use google composer later on, but I want it running locally first. I have airflow up and running an BashOperator works fine, I can also run airflow test <dag> <task> where task is the big query task I want to run, but when I trigger the DAG from the UI the bigquery task is never queued. Instead they have the REMOVED state and nothing happens.

My DAG definition looks like this:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

yesterday = datetime.combine(
    datetime.today() - timedelta(1),
datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
        'tutorial', default_args=default_args) as dag:

operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date')

    template_sql = '''
            SELECT 'TOMAS' name, '{{ params.my_value }}' value, "{{ params.my_value2 }}" tables
        '''

    sampleBigQuery1 = BigQueryOperator(
        task_id='bq_simple_sql1',
        bql=template_sql,
        use_legacy_sql=False,
        destination_dataset_table='temp_tomas.airflow_1',
        allow_large_results=True,
        params={'my_value': (datetime.now()).strftime("%D %T"),
                'my_value2': "yolo"},  # getTables()},
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE'
    )

    t1 >> sampleBigQuery1

So how do I debug a case when it works to run airflow test ... but not when it triggered by scheduler or UI? Is it something that seems to be wrong with what I have here?

Locally I'm using a standard install of airflow with sqllite, but that shouldn't have any impact I think. I am running everything in one python env, so it should be pretty contained.

Answer

tobi6 picture tobi6 · May 4, 2018

If this is your first Airflow setup, you might want to check those things first: Airflow 1.9.0 is queuing but not launching tasks

Additionally, here I'd recommend especially the last step:

  • If nothing else works, you can use the web ui to click on the dag, then on Graph View. Now select the first task and click on Task Instance. In the paragraph Task Instance Details you will see why a DAG is waiting or not running.

This might give you more of an idea why the task is not being scheduled.