Publisher instantly gets the ack without waiting for the receiver to send it

38 Views Asked by At

I'm trying to create a very simple test program on Java that uses RabbitMQ. (An example configuration class to work off of was provided to me). I'm new to this software.

My goal is to get basic message publishing and listening working, with manual ACKs as well. So far I only achieved the former. The listener does receive the message, but the publisher seems to obtain the ACK immediately after setting the ConfirmCallBack, and it doesn't react when the listener actually sends the ACK. I read a lot of documentation and could not figure out the issue.

These are the methods responsible for sending and receiving the message. publisherMethod() gets called after the program starts running:

public void publisherMethod() {
    Thing thing = new Thing("string");
    template.convertAndSend(exchange, key, thing); //"template" is a RabbitTemplate object
    System.out.println("Message sent");
    template.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            System.out.println("Ack received");
        } else {
            System.out.println("Nack received");
        }
    });
    System.out.println("ConfirmCallback set");
}

@RabbitListener(queues = "${rabbitmq.queue.default}", ackMode = "MANUAL")
public void queueDefaultListener(Thing thing, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
        throws InterruptedException, IOException {
    System.out.println("Message received");
    Thread.sleep(5000);
    channel.basicAck(tag, false);
    System.out.println("Ack sent");
}

And these are the beans defined on the @Configuration class that I think may be relevant. (Not sure). The only thing I added myself was setting the PublisherConfirmType of the connectionFactory to CORRELATED.

private @Value("${rabbitmq.host.value}") String host;
private @Value("${rabbitmq.port.value}") Long port;
private @Value("${rabbitmq.username.value}") String username;
private @Value("${rabbitmq.password.value}") String password;

@Bean
ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(host);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    connectionFactory.setPort(port.intValue());
    connectionFactory.setPublisherConfirmType(ConfirmType.CORRELATED);
    return connectionFactory;
}

@Bean
MessageHandlerMethodFactory messageHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    MappingJackson2MessageConverter jsonConverter = new MappingJackson2MessageConverter();
    jsonConverter.getObjectMapper().registerModule(new ParameterNamesModule(JsonCreator.Mode.PROPERTIES));
    factory.setMessageConverter(jsonConverter);
    return factory;
}

@Bean
RabbitListenerConfigurer rabbitListenerConfigurer(MessageHandlerMethodFactory messageHandlerMethodFactory) {
    return c -> c.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
}

I guess I'm missing some property I have to set to something, but I'm very lost.

0

There are 0 best solutions below