Celery does not see objects in the database

48 Views Asked by At

I know that similar questions have already been asked here, but none of the answers I found turned out to be suitable. I hope I get lucky now.

So, I have a Django application using Postgres, RabbitMQ and Celery. When I run the components individually, everything works fine. The problem occurs when running through docker-compose.

For some reason, celery tasks cannot find the object in the database. The object exists, moreover, it stores the celery task id (for example, for possible subsequent cancellation of the task).

Here are the parts of the code relevant to the problem:

# docker-compose.yml

version: "3.7"


services:
  db:
    image: postgres:12.4
    container_name: "fbrq_db"
    volumes:
      - postgres_data:/var/lib/postgresql/data/
    env_file:
      - .env
    environment:
      - POSTGRES_HOST_AUTH_METHOD=trust
    networks:
      - default

  rabbitmq:
    restart: always
    container_name: "fbrq_rabbit"
    image: rabbitmq:3-management-alpine
    ports:
      - 5672:5672
      - 15672:15672
    networks:
      - default

  app:
    restart: always
    container_name: "fbrq_app"
    build: .
    volumes:
      - .:/code
      - ./static:/code/static
    command: gunicorn --bind 0.0.0.0:8000 fbrq_api.wsgi
    ports:
      - "8000:8000"
    networks:
      - default

  celery:
    restart: always
    container_name: "fbrq_celery"
    build: .
    command: celery -A fbrq_api worker -l info
    env_file:
      - ./.env
    depends_on:
      - app
      - rabbitmq
    environment:
      - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672/
      - DJANGO_SETTINGS_MODULE=fbrq_api.settings
    networks:
      - default

  celery-beat:
    restart: always
    container_name: "fbrq_celery-beat"
    build: .
    command: celery -A fbrq_api beat -l info
    env_file:
      - ./.env
    depends_on:
      - app
      - rabbitmq
    environment:
      - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672/
    networks:
      - default

volumes:
  postgres_data:
# tasks.py


class MessageTaskManager:
    @staticmethod
    def create_message_celery_task(message):
        message_send_time = get_message_send_time(
            message.mailing, message.client
        )
        task = send_mailing.apply_async(
            args=[message.id], eta=message_send_time
        )
        celery_task_id = task.id
        message.celery_task_id = celery_task_id
        message.save()


@shared_task(bind=True, acks_late=True, name="Mailing")
def send_mailing(self, message_id):
    try:
        message = Message.objects.get(id=message_id)
        message.status = Message.Status.NOT_DELIVERED
        try:
            api_data = {
                "id": message_id,
                "phone": message.client.phone,
                "text": message.mailing.text,
            }
            headers = {
                "Authorization": api_token,
            }
            response = requests.post(
                api_url + str(message_id), json=api_data, headers=headers
            )
            response.raise_for_status()
            message.status = Message.Status.DELIVERED
        except (Timeout, HTTPError, RequestException) as e:
            MessageTaskManager.create_new_message_and_send(message)
        except Exception as e:
            MessageTaskManager.create_new_message_and_send(message)
        finally:
            message.save()

    except Message.DoesNotExist as e:
        logger.error(f"Message_{message_id} doesn't exist yet")
        self.retry(exc=e, countdown=10)
# views.py


def get_mailing_messages(mailing, created=False):
    if created:
        clients = [
            client
            for client in mailing.get_filtered_clients()
            if get_message_send_time(mailing, client)
        ]
        mailing.set_clients_count(clients)
        return list(
            Message.objects.create(mailing=mailing, client=client)
            for client in clients
        )
    return [message for message in Message.objects.filter(mailing=mailing)]


class MailingViewSet(viewsets.ModelViewSet):
    queryset = Mailing.objects.all()
    serializer_class = MailingSerializer


    def create(self, request, *args, **kwargs):
        logger.info(f"Request from API to create mailing: {request.data}")
        serializer = self.get_serializer(data=request.data)
        try:
            serializer.is_valid(raise_exception=True)
            self.perform_create(serializer)
            headers = self.get_success_headers(serializer.data)
            mailing_id = serializer.data["id"]
            mailing = Mailing.objects.get(id=mailing_id)
            messages = get_mailing_messages(mailing, created=True)
            for message in messages:
                transaction.on_commit(partial(message_task_manager.create_message_celery_task, message=message))
                # message_task_manager.create_message_celery_task(message)
            return Response(
                serializer.data,
                status=status.HTTP_201_CREATED,
                headers=headers,
            )
        except serializers.ValidationError as e:
            logger.error(f"Failed to validate data. Errors: {e.detail}")
            return Response(e.detail, status=status.HTTP_400_BAD_REQUEST)

You can see that initially I just called the celery task creation method:

for message in messages:
    message_task_manager.create_message_celery_task(message)

But after failure and a little study of the problem, I changed this part of the code:

for message in messages:
    transaction.on_commit(partial(message_task_manager.create_message_celery_task, message=message))

The messages I receive look like this:

[2024-02-10 08:34:02,930: ERROR/ForkPoolWorker-8] Message_1207 doesn't exist yet
[2024-02-10 08:34:02,932: INFO/MainProcess] Task Mailing[08226fa1-aa4a-4ba3-adb3-da5125295cc7] received
[2024-02-10 08:34:02,935: INFO/ForkPoolWorker-8] Task Mailing[08226fa1-aa4a-4ba3-adb3-da5125295cc7] retry: Retry in 10s: DoesNotExist('Message matching query does not exist.')

But, in the other hand:

>>> from api.models import Message
>>> msg = Message.objects.get(id=1207)
>>> msg.celery_task_id
'08226fa1-aa4a-4ba3-adb3-da5125295cc7'

Cheating like time.sleep() doesn't help too.

P. S. At the same time, access to the database as part of the Celery-Beat task is normal:

def send_daily_statistics_email():
    yesterday = timezone.now() - timezone.timedelta(days=1)
    messages = Message.objects.filter(datetime_send__date=yesterday)
    ...
0

There are 0 best solutions below