The method of getting a BashOperator
or SqlOperator
to pick up an external file for its template is somewhat clearly documented, but looking at the PythonOperator
my test of what I understand from the docs is not working. I am not sure how the templates_exts
and templates_dict
parameters would correctly interact to pick up a file.
In my dags folder I've created: pyoptemplate.sql
and pyoptemplate.t
as well as test_python_operator_template.py
:
SELECT * FROM {{params.table}};
SELECT * FROM {{params.table}};
# coding: utf-8
# vim:ai:si:et:sw=4 ts=4 tw=80
"""
# A Test of Templates in PythonOperator
"""
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import pprint
pp = pprint.PrettyPrinter(indent=4)
def templated_function(ds, **kwargs):
"""This function will try to use templates loaded from external files"""
pp.pprint(ds)
pp.pprint(kwargs)
# Define the DAG
dag = DAG(dag_id='test_python_operator_template_dag',
default_args={"owner": "lamblin",
"start_date": datetime.now()},
template_searchpath=['/Users/daniellamblin/airflow/dags'],
schedule_interval='@once')
# Define the single task in this controller example DAG
op = PythonOperator(task_id='test_python_operator_template',
provide_context=True,
python_callable=templated_function,
templates_dict={
'pyoptemplate': '',
'pyoptemplate.sql': '',
'sql': 'pyoptemplate',
'file1':'pyoptemplate.sql',
'file2':'pyoptemplate.t',
'table': '{{params.table}}'},
templates_exts=['.sql','.t'],
params={'condition_param': True,
'message': 'Hello World',
'table': 'TEMP_TABLE'},
dag=dag)
The result from a run shows that table
was templated correctly as a string, but the others did not pull in any files for templating.
dlamblin$ airflow test test_python_operator_template_dag test_python_operator_template 2017-01-18
[2017-01-18 23:58:06,698] {__init__.py:36} INFO - Using executor SequentialExecutor
[2017-01-18 23:58:07,342] {models.py:154} INFO - Filling up the DagBag from /Users/daniellamblin/airflow/dags
[2017-01-18 23:58:07,620] {models.py:1196} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2017-01-18 23:58:07,620] {models.py:1219} INFO - Executing <Task(PythonOperator): test_python_operator_template> on 2017-01-18 00:00:00
'2017-01-18'
{ u'END_DATE': '2017-01-18',
u'conf': <module 'airflow.configuration' from '/Library/Python/2.7/site-packages/airflow/configuration.pyc'>,
u'dag': <DAG: test_python_operator_template_dag>,
u'dag_run': None,
u'ds_nodash': u'20170118',
u'end_date': '2017-01-18',
u'execution_date': datetime.datetime(2017, 1, 18, 0, 0),
u'latest_date': '2017-01-18',
u'macros': <module 'airflow.macros' from '/Library/Python/2.7/site-packages/airflow/macros/__init__.pyc'>,
u'params': { 'condition_param': True,
'message': 'Hello World',
'table': 'TEMP_TABLE'},
u'run_id': None,
u'tables': None,
u'task': <Task(PythonOperator): test_python_operator_template>,
u'task_instance': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>,
u'task_instance_key_str': u'test_python_operator_template_dag__test_python_operator_template__20170118',
'templates_dict': { 'file1': u'pyoptemplate.sql',
'file2': u'pyoptemplate.t',
'pyoptemplate': u'',
'pyoptemplate.sql': u'',
'sql': u'pyoptemplate',
'table': u'TEMP_TABLE'},
u'test_mode': True,
u'ti': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>,
u'tomorrow_ds': '2017-01-19',
u'tomorrow_ds_nodash': u'20170119',
u'ts': '2017-01-18T00:00:00',
u'ts_nodash': u'20170118T000000',
u'yesterday_ds': '2017-01-17',
u'yesterday_ds_nodash': u'20170117'}
[2017-01-18 23:58:07,634] {python_operator.py:67} INFO - Done. Returned value was: None
As of Airflow 1.8, the way the PythonOperator replaces its template_ext
field in __init__
doesn't work. Tasks only check template_ext
on the __class__
. To create a PythonOperator that picks up SQL template files you only need to do the following:
class SQLTemplatedPythonOperator(PythonOperator):
template_ext = ('.sql',)
And then to access the SQL from your task when it runs:
SQLTemplatedPythonOperator(
templates_dict={'query': 'my_template.sql'},
params={'my_var': 'my_value'},
python_callable=my_func,
provide_context=True,
)
def my_func(**context):
context['templates_dict']['query']