Some Celery tasks work, others are NotRegistered

srgbnd picture srgbnd · Nov 11, 2016 · Viewed 11.4k times · Source

I follow the Celery Django tutorial and the tasks I see in the example (add, mul) work for me perfectly. I get the correct response when I do res = add.delay(1,2); res.get().

But I get *** NotRegistered: u'pipeline.tasks.sayhello' when I try to execute another my task res = sayhello.delay('trex').

If I do res = sayhello('trex') then I can get the result by just typing res. But in this way, I execute the function ornidarly, without using Celery.

The task works only if I run it in the Django shell ./manage shell

>>> res = sayhello.delay('trex')
>>> res.get()
u'Hello trex'

So, the problem is that I can't execute sayhello task from pipeline/views.py. But I can execute tasks add and mul from there.

Why is that? How to run tasks correctly from views.py?

The error full message:

[2016-11-11 10:56:09,870: ERROR/MainProcess] Received unregistered task of type u'pipeline.tasks.sayhello'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'[["tiger"], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (84b)
Traceback (most recent call last):
  File "/home/trex/Development/Sirius/new/rocket/rocket-venv/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 549, in on_task_received
    strategy = strategies[type_]
KeyError: u'pipeline.tasks.sayhello'

Django version

1.9.7

Celery version:

celery==4.0.0
django-celery==3.1.17

Django project dir tree:

rocket
├── etl
│   ├── etl
│   │   ├── celery.py
│   │   ├── __init__.py
│   │   ├── settings
│   │   │   ├── base.py
│   │   │   ├── dev.py
│   │   │   ├── __init__.py
│   │   │   ├── production.py
│   │   │   └── test.py
│   │   ├── urls.py
│   │   ├── wsgi.py
│   ├── manage.py
│   ├── pipeline
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── tasks.py
│   │   ├── tests.py
│   │   ├── urls.py
│   │   ├── views.py

etl/pipeline/views.py

from .tasks import *

def get_response(request):
    result = add.delay(1, 2)
    result.get()
    result = sayhello.delay('tiger')
    result.get()

etl/pipeline/tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def sayhello(name):
    return "Hello %s" % name

Also I tried this:

from celery.decorators import task

@task(name="sayhello")
def sayhello(name):
    return "Hello {0}".format(name)

etl/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'etl.settings.base')
app = Celery('etl')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

etl/__init__py

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']

etl/settings/base.py

...
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/London'
CELERY_IMPORTS = ('pipeline.tasks', )

Answer

Karuhanga picture Karuhanga · Feb 12, 2019

This might hopefully help someone. I had modified my code and neglected to restart the celery worker.

Try restarting the celery workers.