KeyError: 'ti' in Apache Airflow xcom

SANN3 picture SANN3 · Sep 27, 2018 · Viewed 7.6k times · Source

We are trying to run a simple DAG with 2 tasks which will communicate data via xcom.

DAG file:

from __future__ import print_function
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    'example_xcom',
    schedule_interval="@once",
    default_args=args)

value_1 = [1, 2, 3]


def push(**kwargs):
    # pushes an XCom without a specific target
    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


def puller(**kwargs):
    ti = kwargs['ti']

    v1 = ti.xcom_pull(key=None, task_ids='push')
    assert v1 == value_1

    v1 = ti.xcom_pull(key=None, task_ids=['push'])
    assert (v1) == (value_1)


push1 = PythonOperator(
    task_id='push', dag=dag, python_callable=push)

pull = BashOperator(
    task_id='also_run_this',
    bash_command='echo {{ ti.xcom_pull(task_ids="push_by_returning") }}',
    dag=dag)

pull.set_upstream(push1)

But while running the DAG in airflow we are getting the following exception.

[2018-09-27 16:55:33,431] {base_task_runner.py:98} INFO - Subtask: [2018-09-27 16:55:33,430] {models.py:189} INFO - Filling up the DagBag from /home/airflow/gcs/dags/xcom.py
[2018-09-27 16:55:33,694] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-09-27 16:55:33,694] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/bin/airflow", line 27, in <module>
[2018-09-27 16:55:33,696] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-09-27 16:55:33,697] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-09-27 16:55:33,697] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-09-27 16:55:33,698] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-09-27 16:55:33,699] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-09-27 16:55:33,699] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
[2018-09-27 16:55:33,701] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-09-27 16:55:33,701] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 89, in execute
[2018-09-27 16:55:33,702] {base_task_runner.py:98} INFO - Subtask:     return_value = self.execute_callable()
[2018-09-27 16:55:33,703] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
[2018-09-27 16:55:33,703] {base_task_runner.py:98} INFO - Subtask:     return self.python_callable(*self.op_args, **self.op_kwargs)
[2018-09-27 16:55:33,704] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/gcs/dags/xcom.py", line 22, in push
[2018-09-27 16:55:33,707] {base_task_runner.py:98} INFO - Subtask:     kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
[2018-09-27 16:55:33,708] {base_task_runner.py:98} INFO - Subtask: KeyError: 'ti'

We validated the DAG there is but no issue, Please help us to fix this issue.

Answer

kaxil picture kaxil · Sep 28, 2018

Add provide_context: True to default args. This is needed to define **kwargs.

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define **kwargs in your function header.