How to reject messages in aio_pika with dead letter exchange?

493 Views Asked by At

I am using aio_pika and having trouble with rabbitMQ subscribe worker when reject message from queue. I rejected the message from queue and exchanged to dead letter queue. Message reached to dead letter queue. But message from original queue doesn't disappear. So, worker does not stop and a lot of message got in dead letter queue.

I want to drop message from original queue when message is rejected.

I have tried with this. Could you please help?

from aio_pika.abc import AbstractIncomingMessage

async def callback(
    message: AbstractIncomingMessage,
) -> None:
    try:
        async with message.process(ignore_processed=True):
            parsed = json.loads(message.body)
            success = False
            if False:
                log.msg("[x] Reject")
                await message.reject(requeue=False)
            else:
                log.msg("[x] Updated!")
1

There are 1 best solutions below

1
sergeusprecious On

made a little clearer

from aio_pika.abc import AbstractIncomingMessage

def some_validation(message) -> bool:
    # some logic
    return True

async def callback(
    message: AbstractIncomingMessage,
) -> None:
    async with message.process(ignore_processed=True):
        parsed = json.loads(message.body)
        success = False
        if some_validation(parsed):
            log.msg("[x] Updated!")
            # some logic
            return None    
        log.msg("[x] Reject")
        await message.reject(requeue=False)