I'm trying out airflow with the BigQueryOperator
. I thought I would use google composer later on, but I want it running locally first. I have airflow up and running an BashOperator
works fine, I can also run airflow test <dag> <task>
where task
is the big query task I want to run, but when I trigger the DAG from the UI the bigquery task is never queued. Instead they have the REMOVED
state and nothing happens.
My DAG definition looks like this:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
yesterday = datetime.combine(
datetime.today() - timedelta(1),
datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'tutorial', default_args=default_args) as dag:
operators
t1 = BashOperator(
task_id='print_date',
bash_command='date')
template_sql = '''
SELECT 'TOMAS' name, '{{ params.my_value }}' value, "{{ params.my_value2 }}" tables
'''
sampleBigQuery1 = BigQueryOperator(
task_id='bq_simple_sql1',
bql=template_sql,
use_legacy_sql=False,
destination_dataset_table='temp_tomas.airflow_1',
allow_large_results=True,
params={'my_value': (datetime.now()).strftime("%D %T"),
'my_value2': "yolo"}, # getTables()},
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE'
)
t1 >> sampleBigQuery1
So how do I debug a case when it works to run airflow test ...
but not when it triggered by scheduler or UI? Is it something that seems to be wrong with what I have here?
Locally I'm using a standard install of airflow with sqllite, but that shouldn't have any impact I think. I am running everything in one python env, so it should be pretty contained.
If this is your first Airflow setup, you might want to check those things first: Airflow 1.9.0 is queuing but not launching tasks
Additionally, here I'd recommend especially the last step:
This might give you more of an idea why the task is not being scheduled.