Quarkus MutinyEmitter<T> timeout waiting for the signal

28 Views Asked by At

we are using Quarkus/Smallrye reactive messaging to produce messages to the Kafka topic. In most cases this works well. However - from time to time - we are facing timeouts - meaning the emitter.sendMessage(message) does not produce either Item or Failure signal and the downstream part of the pipeline timeouts. What is strange - messages are happily written in the Kafka topic. We have implemented a retry wrapper over the sendMessage call, but the results are just duplicate messages in the target topic. Code sample here:

return Uni.createFrom().deferred(() -> emitter.sendMessage(message).replaceWith(message))
                .ifNoItem().after(timeout).failWith(() -> {
                    Log.warnf("Timeout emitting %s %s to %s Kafka channel", event.getClass().getSimpleName(), event.getId(), channelName);
                    return new TimeoutException();
                })
                .onFailure(TimeoutException.class).retry().atMost(numberOfAttempts)
                .onFailure(th -> {
                    Log.errorf(th, "Error emitting %s %s to %s Kafka channel", event.getClass().getSimpleName(), event.getId(), channelName);
                    return th instanceof IllegalStateException;
                }).retry().withBackOff(timeout, timeout).atMost(numberOfAttempts);

Is there a way to detect what's going on behind the scenes?

Jan

0

There are 0 best solutions below