I can get this to run as a standalone application, but I am having trouble getting it to work in Django.
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks')
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TIMEZONE='US/Central',
CELERY_ENABLE_UTC=True,
CELERYBEAT_SCHEDULE = {
'test': {
'task': 'tasks.test',
'schedule': crontab(),
},
}
)
@app.task
def test():
with open('test.txt', 'a') as f:
f.write('Hello, World!\n')`
It feeds the Rabbitmq server and writes to the file every minute. It works like a charm, but when I try to get it to work in a Django I get this error:
Did you remember to import the module containing this task? Or maybe you are using relative imports? Please see ____ for more information.
The full contents of the message body was: {'retries': 0, 'eta': None, 'kwargs': {}, 'taskset': None, 'timelimit': [None, None], 'callbacks': None, 'task': 'proj.test', 'args': [], 'expires': None, 'id': '501ca998-b5eb-4ba4-98a8-afabda9e88dd', 'utc': True, 'errbacks': None, 'chord': None} (246b) Traceback (most recent call last): File "/home/user/CeleryDjango/venv/lib/python3.5/site-packages/celery/worker/consumer.py", line 456, in on_task_received strategies[name](message, body, KeyError: 'proj.test' [2016-06-16 01:16:00,051: INFO/Beat] Scheduler: Sending due task test (proj.test) [2016-06-16 01:16:00,055: ERROR/MainProcess] Received unregistered task of type 'proj.test'.
# CELERY STUFF
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'US/Central'
CELERYBEAT_SCHEDULE = {
'test': {
'task': 'proj.test',
'schedule': crontab(),
}
}
celery.py
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings # noqa
app = Celery('proj')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
task.py
from __future__ import absolute_import
from celery import shared_task
@shared_task
def test():
with open('test.txt', 'w') as f:
print('Hello, World', file=f)
init.py
from __future__ import absolute_import
from .celery import app as celery_app
Any thoughts on this are most appreciated. Thanks.
Why don't you try like the following and let me know if it worked out for you or not. It does work for me.
In settings.py
CELERYBEAT_SCHEDULE = {
'my_scheduled_job': {
'task': 'run_scheduled_jobs', # the same goes in the task name
'schedule': crontab(),
},
}
And in tasks.py..
from celery.task import task # notice the import of task and not shared task.
@task(name='run_scheduled_jobs') # task name found! celery will do its job
def run_scheduled_jobs():
# do whatever stuff you do
return True
But if you are looking for shared_task then..
@shared_task(name='my_shared_task') # name helps celery identify the functions it has to run
def my_shared_task():
# do what you want here..
return True
I use shared task for async jobs.. So I need to call it from a function like the following..
in views.py / or anywhere.py in your project app
def some_function():
my_shared_task.apply_async(countdown= in_seconds)
return True
and just in case if you have forgotten then remember to include your app in which you are trying to run to the tasks..
INSTALLED_APPS = [...
'my_app'...
] # include app
I'm sure this approach works fine.. Thanks