I'm really struggling to the get the proper setup for Flask, SQLAlchemy and Celery. I have searched extensively and tried different approaches, nothing really seems to work. Either I missed the application context or can't run the workers or there are some other problems. The structure is very general so that I can build a bigger application.
I'm using: Flask 0.10.1, SQLAlchemy 1.0, Celery 3.1.13, my current setup is the following:
app/__init__.py
#Empty
app/config.py
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
@staticmethod
def init_app(app):
pass
class LocalConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = r"sqlite:///" + os.path.join(basedir,
"data-dev.sqlite")
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
config = {
"local": LocalConfig}
app/exstensions.py
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
app/factory.py
from extensions import db, celery
from flask import Flask
from flask import g
from config import config
def create_before_request(app):
def before_request():
g.db = db
return before_request
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
db.init_app(app)
celery.config_from_object(config)
# Register the blueprints
# Add the before request handler
app.before_request(create_before_request(app))
return app
app/manage.py
from factory import create_app
app = create_app("local")
from flask import render_template
from flask import request
@app.route('/test', methods=['POST'])
def task_simple():
import tasks
tasks.do_some_stuff.delay()
return ""
if __name__ == "__main__":
app.run()
app/models.py
from extensions import db
class User(db.Model):
__tablename__ = "user"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(128), unique=True, nullable=False)
app/tasks.py
from extensions import celery
from celery.signals import task_prerun
from flask import g, current_app
@task_prerun.connect
def close_session(*args, **kwargs):
with current_app.app_context():
# use g.db
print g
@celery.task()
def do_some_stuff():
with current_app.app_context():
# use g.db
print g
In the folder app:
python.exe manage.py
celery.exe worker -A tasks
I get an import error that doesn't make any sense to me. Should I structure the application differently? At the end I think I want a quite basic setup, e.g. Using Flask with the factory pattern, be able to use the Flask-SQLAlchmey extension and have some worker that needs to access the database.
Any help is highly appreciated.
Traceback is executed when starting the celery worker.
Traceback (most recent call last):
File "[PATH]\scripts\celery-script.py", line 9, in <module>
load_entry_point('celery==3.1.13', 'console_scripts', 'celery')()
File "[PATH]\lib\site-packages\celery\__main__.py", line 30, in main
main()
File "[PATH]\lib\site-packages\celery\bin\celery.py", line 81, in main
cmd.execute_from_commandline(argv)
File "[PATH]\lib\site-packages\celery\bin\celery.py", line 769, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "[PATH]\lib\site-packages\celery\bin\base.py", line 305, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)
File "[PATH]\lib\site-packages\celery\bin\base.py", line 473, in setup_app_from_commandline
user_preload = tuple(self.app.user_options['preload'] or ())
AttributeError: 'Flask' object has no attribute 'user_options'
UPDATE I change the code according to the suggestion in the comment. The worker starts now up but when test it with a get request to http://127.0.0.1:5000/test
. I get the following traceback:
Traceback (most recent call last):
File "[PATH]\lib\site-packages\celery\app\trace.py", line 230, in trace_task
args=args, kwargs=kwargs)
File "[PATH]\lib\site-packages\celery\utils\dispatch\signal.py", line 166, in send
response = receiver(signal=self, sender=sender, \**named)
File "[PATH]\app\stackoverflow\tasks.py", line 7, in close_session
with current_app.app_context():
File "[PATH]\lib\site-packages\werkzeug\local.py", line 338, in __getattr__
return getattr(self._get_current_object(), name)
File "[PATH]\lib\site-packages\werkzeug\local.py", line 297, in _get_current_object
return self.__local()
File "[PATH]\lib\site-packages\flask\globals.py", line 34, in _find_app
raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context exc, exc_info.traceback)))
UPDATE Based on the comments from Marteen, I changed the code. The current working version is under: https://gist.github.com/anonymous/fa47834db2f4f3b8b257. Any further improvements or suggestions are welcome.
I was off with the current_app advice.
Your celery object needs access to the application context. I found some information online about creating the Celery object with a factory function. Example below is tested without a message broker.
#factory.py
from celery import Celery
from config import config
def create_celery_app(app=None):
app = app or create_app(config)
celery = Celery(__name__, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
and in tasks.py:
#tasks.py
from factory import create_celery_app
from celery.signals import task_prerun
from flask import g
celery = create_celery_app()
@task_prerun.connect
def celery_prerun(*args, **kwargs):
#print g
with celery.app.app_context():
# # use g.db
print g
@celery.task()
def do_some_stuff():
with celery.app.app_context():
# use g.db
g.user = "test"
print g.user
Some links:
Flask pattern for creating a Celery instance with factory function
Application using both application factory and celery