I have a requirement for a Django app that publishes messages to multiple RabbitMQ brokers. The downstream distributed consumers receive messages from their respective brokers.
It works as expected, when the specific celery task is triggered the respective consumers start receiving the messages. I have a problem when using the periodic task feature provided in django-celery-beat. Django is not able to discover all the tasks within the project.
Registering the task path manually and then running the task throws task not found error.
celery.py
# celery.py
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "demo_project.settings")
demo_one_celery = Celery(
"demo_one",
broker="amqp://{}:{}@{}:{}/{}".format(
settings.DEFAULT_RMQ_USER,
settings.DEFAULT_RMQ_PASS,
settings.DEFAULT_RMQ_HOST,
settings.DEFAULT_RMQ_PORT,
settings.DEFAULT_RMQ_V_HOST,
),
)
demo_one_celery.config_from_object("django.conf:settings")
demo_one_celery.autodiscover_tasks(lambda: ['demo_one'])
demo_two_celery = Celery(
"demo_two",
broker="amqp://{}:{}@{}:{}/{}".format(
settings.TWO_RMQ_USER,
settings.TWO_RMQ_PASS,
settings.TWO_RMQ_HOST,
settings.TWO_RMQ_PORT,
settings.TWO_RMQ_V_HOST,
),
)
demo_two_celery.config_from_object("django.conf:settings")
demo_two_celery.autodiscover_tasks(lambda: ['demo_two'])
demo_one/tasks.py
# demo_one/tasks.py
from demo_project.celery import demo_one_celery
@demo_one_celery.task
def demo_one_task():
print("DEMO ONE task")
demo_two/tasks.py
# demo_two/tasks.py
from demo_project.celery import demo_two_celery
@demo_two_celery
def demo_two_task():
print("DEMO TWO task")
demo_project/init.py
# demo_project/__init__.py
from .celery import demo_one_celery, demo_two_celery
__all__ = ('demo_one_celery', 'demo_two_celery',)
Commands to run celery
$ celery -A demo_project.celery:demo_one_celery worker --loglevel=info
$ celery -A demo_project.celery:demo_two_celery worker --loglevel=info
After several attempts and exploring, It was suggested to me to use only one rabbitmq broker but isolation of tasks and it's execution can be handled with Exchanges (Direct & Topic) along with queues.
Routing tasks - https://docs.celeryq.dev/en/stable/userguide/routing.html