Airflow Worker Configuration

Soundar Raj picture Soundar Raj · Feb 20, 2018 · Viewed 11.1k times · Source

I am a newbie to Airflow. I'm trying to setup Distributed Mode of Airflow Using Celery Executor by Refering this article https://stlong0521.github.io/20161023%20-%20Airflow.html

Before getting into detail about the specification I would like to confirm that I've installed PostgreSQL on a seperate instance.

The specification of the setup is detailed below:

Airflow core/server computer

  • Python 3.5
    • airflow (AIRFLOW_HOME = ~/airflow)
    • celery
    • psycogp2
  • RabbitMQ

Configurations made in airflow.cfg:

sql_alchemy_conn = postgresql+psycopg2://username:[email protected]:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:[email protected]:5672//
celery_result_backend = db+postgresql://username:[email protected]:5432/airflow

Tests performed:

RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)

Airflow worker computer

Has the following installed:

  • Python 3.5 with
    • airflow (AIRFLOW_HOME = ~/airflow)
    • celery
  • psycogp2

Configurations made in airflow.cfg are exactly the same as in the server:

sql_alchemy_conn = postgresql+psycopg2://username:[email protected]:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:[email protected]:5672//
celery_result_backend = db+postgresql://username:[email protected]:5432/airflow

Output from commands run on the worker machine:

When running airflow flower:

[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:[email protected]:5672//
[I 180219 14:58:15 command:147] Registered tasks: 
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:[email protected]:5672//

I am passing the dag in the Airflow Core machine and also I have copied the sample data(Excel sheets) which the dag will process to the same core machine.

My worker log raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1

Now my query is

1) Should I copy the dag folder to the worker computer also

2) Right now, I have not copied the dag folder on the worker computer and I'm not able to see the worker process pick up the task.

Please point me where I am making a mistake and how to make the worker process pick up the process.

Answer

Taylor Edmiston picture Taylor Edmiston · May 31, 2018

Some of the biggest pain points with Airflow come up around deployment and keeping DAG files and plugins in sync across your Airflow scheduler, Airflow webserver, and Celery worker nodes.

We've created an open source project called Astronomer Open that automates a Dockerized Airflow, Celery, and PostgreSQL with some other goodies baked in. The project was motivated by seeing so many people hit the same pain points creating a very similar setup.

For example, here's the Airflow Dockerfile: https://github.com/astronomer/astronomer/blob/master/docker/airflow/1.10.2/Dockerfile

And the docs: https://open.astronomer.io/

Full disclosure: This is a project I contribute to at work — we offer a paid enterprise edition as well that runs on Kubernetes (docs). That said, the Open Edition is totally free to use.