I follow the Celery Django tutorial and the tasks I see in the example (add, mul
) work for me perfectly. I get the correct response when I do res = add.delay(1,2); res.get()
.
But I get *** NotRegistered: u'pipeline.tasks.sayhello'
when I try to execute another my task res = sayhello.delay('trex')
.
If I do res = sayhello('trex')
then I can get the result by just typing res
. But in this way, I execute the function ornidarly, without using Celery.
The task works only if I run it in the Django shell ./manage shell
>>> res = sayhello.delay('trex')
>>> res.get()
u'Hello trex'
So, the problem is that I can't execute sayhello
task from pipeline/views.py
. But I can execute tasks add
and mul
from there.
Why is that? How to run tasks correctly from views.py
?
The error full message:
[2016-11-11 10:56:09,870: ERROR/MainProcess] Received unregistered task of type u'pipeline.tasks.sayhello'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
'[["tiger"], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (84b)
Traceback (most recent call last):
File "/home/trex/Development/Sirius/new/rocket/rocket-venv/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 549, in on_task_received
strategy = strategies[type_]
KeyError: u'pipeline.tasks.sayhello'
Django version
1.9.7
Celery version:
celery==4.0.0
django-celery==3.1.17
Django project dir tree:
rocket
├── etl
│ ├── etl
│ │ ├── celery.py
│ │ ├── __init__.py
│ │ ├── settings
│ │ │ ├── base.py
│ │ │ ├── dev.py
│ │ │ ├── __init__.py
│ │ │ ├── production.py
│ │ │ └── test.py
│ │ ├── urls.py
│ │ ├── wsgi.py
│ ├── manage.py
│ ├── pipeline
│ │ ├── __init__.py
│ │ ├── models.py
│ │ ├── tasks.py
│ │ ├── tests.py
│ │ ├── urls.py
│ │ ├── views.py
etl/pipeline/views.py
from .tasks import *
def get_response(request):
result = add.delay(1, 2)
result.get()
result = sayhello.delay('tiger')
result.get()
etl/pipeline/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def sayhello(name):
return "Hello %s" % name
Also I tried this:
from celery.decorators import task
@task(name="sayhello")
def sayhello(name):
return "Hello {0}".format(name)
etl/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'etl.settings.base')
app = Celery('etl')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
etl/__init__py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
etl/settings/base.py
...
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/London'
CELERY_IMPORTS = ('pipeline.tasks', )
This might hopefully help someone. I had modified my code and neglected to restart the celery worker.
Try restarting the celery workers
.