Python Airflow - Return result from PythonOperator

Teja picture Teja · May 3, 2018 · Viewed 42.1k times · Source

I have written a DAG with multiple PythonOperators

task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment',
                          provide_context=True,
                          python_callable=Task1, dag=dag1)

def Task1(**kwargs):
    return(kwargs['dag_run'].conf.get('file'))

From PythonOperator i am calling "Task1" method. That method is returning a value,that value i need to pass to the next PythonOperator.How can i get the value from the "task1" variable or How can i get the value which is returned from Task1 method?

updated :

    def Task1(**kwargs):
          file_name = kwargs['dag_run'].conf.get[file]
          task_instance = kwargs['task_instance']
          task_instance.xcom_push(key='file', value=file_name) 
          return file_name

  t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag)

  t2 =   BashOperator(
      task_id='Moving_bucket', 
      bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ',
      dag=dag,
    )

t2.set_upstream(t1)

Answer

tobi6 picture tobi6 · May 3, 2018

You might want to check out Airflow's XCOM: https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html

If you return a value from a function, this value is stored in xcom. In your case, you could access it like so from other Python code:

task_instance = kwargs['task_instance']
task_instance.xcom_pull(task_ids='Task1')

or in a template like so:

{{ task_instance.xcom_pull(task_ids='Task1') }}

If you want to specify a key you can push into XCOM (being inside a task):

task_instance = kwargs['task_instance']
task_instance.xcom_push(key='the_key', value=my_str)

Then later on you can access it like so:

task_instance.xcom_pull(task_ids='my_task', key='the_key')

EDIT 1

Follow-up question: Instead of using the value in another function how can i pass the value to another PythonOperator like - "t2 = "BashOperator(task_id='Moving_bucket', bash_command='python /home/raw.py "%s" '%file_name, dag=dag)" --- i want to access file_name which is returned by "Task1". How can this will be acheived?

First of all, it seems to me that the value is, in fact, not being passed to another PythonOperator but to a BashOperator.

Secondly, this is already covered in my answer above. The field bash_command is templated (see template_fields in the source: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py). Hence, we can use the templated version:

BashOperator(
  task_id='Moving_bucket', 
  bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ',
  dag=dag,
)

EDIT 2

Explanation: Airflow works like this: It will execute Task1, then populate xcom and then execute the next task. So for your example to work you need Task1 executed first and then execute Moving_bucket downstream of Task1.

Since you are using a return function, you could also omit the key='file' from xcom_pull and not manually set it in the function.