I recently switch to Celery 3.0. Before that I was using Flask-Celery in order to integrate Celery with Flask. Although it had many issues like hiding some powerful Celery functionalities but it allowed me to use the full context of Flask app and especially Flask-SQLAlchemy.
In my background tasks I am processing data and the SQLAlchemy ORM to store the data. The maintainer of Flask-Celery has dropped support of the plugin. The plugin was pickling the Flask instance in the task so I could have full access to SQLAlchemy.
I am trying to replicate this behavior in my tasks.py file but with no success. Do you have any hints on how to achieve this?
extensions.py
import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
celery = FlaskCelery()
db = SQLAlchemy()
app.py
from flask import Flask
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.init_app(app)
return app
Once you've set up your app this way, you can run and use celery without having to explicitly run it from within an application context, as all your tasks will automatically be run in an application context if necessary, and you don't have to explicitly worry about post-task teardown, which is an important issue to manage (see other responses below).
Those who keep getting with _celery.app.app_context(): AttributeError: 'FlaskCelery' object has no attribute 'app'
make sure to:
celery
import at the app.py
file level. Avoid:app.py
from flask import Flask
def create_app():
app = Flask()
initiliaze_extensions(app)
return app
def initiliaze_extensions(app):
from extensions import celery, db # DOOMED! Keep celery import at the FILE level
db.init_app(app)
celery.init_app(app)
flask run
and usecelery worker -A app:celery -l info -f celery.log
Note the app:celery
, i.e. loading from app.py
.
You can still import from extensions to decorate tasks, i.e. from extensions import celery
.
I prefer to run all of celery within the application context by creating a separate file that invokes celery.start() with the application's context. This means your tasks file doesn't have to be littered with context setup and teardowns. It also lends itself well to the flask 'application factory' pattern.
extensions.py
from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
tasks.py
from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun
@celery.task
def do_some_stuff():
current_app.logger.info("I have the application context")
#you can now use the db object from extensions
@task_postrun.connect
def close_session(*args, **kwargs):
# Flask SQLAlchemy will automatically create new sessions for you from
# a scoped session factory, given that we are maintaining the same app
# context, this ensures tasks have a fresh session (e.g. session errors
# won't propagate across tasks)
db.session.remove()
app.py
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.config_from_object(app.config)
return app
RunCelery.py
from app import create_app
from extensions import celery
app = create_app()
if __name__ == '__main__':
with app.app_context():
celery.start()