Airflow - Python file NOT in the same DAG folder

p.magalhaes picture p.magalhaes · Nov 3, 2015 · Viewed 24.7k times · Source

I am trying to use Airflow to execute a simple task python.

from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta


from pprint import pprint

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='python_test', default_args=args)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

If i try, for example:

airflow test python_test print 2015-01-01

It works!

Now i want to put my def print_context(ds, **kwargs) function in other python file. So i create antoher file called: simple_test.py and change:

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=simple_test.print_context,
    dag=dag)

Now I try to run again:

airflow test python_test print 2015-01-01

And OK! It still work!

But if i create a module, for example, worker module with file SimplePython.py, import (from worker import SimplePython)it and try:

airflow test python_test print 2015-01-01

It gives the message :

ImportError: No module named worker

The questions:

  1. Is it possible to import a module inside a DAG definition?
  2. How Airflow+Celery is going to distribute all necessary python sources files across the worker nodes?

Answer

ImDarrenG picture ImDarrenG · Sep 28, 2016

You can package dependencies of your DAG as per:

https://airflow.apache.org/concepts.html#packaged-dags

To allow this you can create a zip file that contains the dag(s) in the root of the zip file and have the extra modules unpacked in directories. For instance you can create a zip file that looks like this:

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

Airflow will scan the zip file and try to load my_dag1.py and my_dag2.py. It will not go into subdirectories as these are considered to be potential packages.

When using CeleryExecutor, you need to manually sync DAG directories, Airflow doesn't take care of that for you:

https://airflow.apache.org/configuration.html?highlight=scaling%20out%20celery#scaling-out-with-celery

The worker needs to have access to its DAGS_FOLDER, and you need to synchronize the filesystems by your own means