We are trying to run a simple DAG with 2 tasks which will communicate data via xcom.
DAG file:
from __future__ import print_function
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
'example_xcom',
schedule_interval="@once",
default_args=args)
value_1 = [1, 2, 3]
def push(**kwargs):
# pushes an XCom without a specific target
kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
def puller(**kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key=None, task_ids='push')
assert v1 == value_1
v1 = ti.xcom_pull(key=None, task_ids=['push'])
assert (v1) == (value_1)
push1 = PythonOperator(
task_id='push', dag=dag, python_callable=push)
pull = BashOperator(
task_id='also_run_this',
bash_command='echo {{ ti.xcom_pull(task_ids="push_by_returning") }}',
dag=dag)
pull.set_upstream(push1)
But while running the DAG in airflow we are getting the following exception.
[2018-09-27 16:55:33,431] {base_task_runner.py:98} INFO - Subtask: [2018-09-27 16:55:33,430] {models.py:189} INFO - Filling up the DagBag from /home/airflow/gcs/dags/xcom.py
[2018-09-27 16:55:33,694] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-09-27 16:55:33,694] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-09-27 16:55:33,696] {base_task_runner.py:98} INFO - Subtask: args.func(args)
[2018-09-27 16:55:33,697] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-09-27 16:55:33,697] {base_task_runner.py:98} INFO - Subtask: pool=args.pool,
[2018-09-27 16:55:33,698] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-09-27 16:55:33,699] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
[2018-09-27 16:55:33,699] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
[2018-09-27 16:55:33,701] {base_task_runner.py:98} INFO - Subtask: result = task_copy.execute(context=context)
[2018-09-27 16:55:33,701] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 89, in execute
[2018-09-27 16:55:33,702] {base_task_runner.py:98} INFO - Subtask: return_value = self.execute_callable()
[2018-09-27 16:55:33,703] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
[2018-09-27 16:55:33,703] {base_task_runner.py:98} INFO - Subtask: return self.python_callable(*self.op_args, **self.op_kwargs)
[2018-09-27 16:55:33,704] {base_task_runner.py:98} INFO - Subtask: File "/home/airflow/gcs/dags/xcom.py", line 22, in push
[2018-09-27 16:55:33,707] {base_task_runner.py:98} INFO - Subtask: kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
[2018-09-27 16:55:33,708] {base_task_runner.py:98} INFO - Subtask: KeyError: 'ti'
We validated the DAG there is but no issue, Please help us to fix this issue.
Add provide_context: True
to default args. This is needed to define **kwargs
.
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}
provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define **kwargs in your function header.