Kafka liveness with Quarkus SmallRye

167 Views Asked by At

I'm fairly new to Quarkus and I'm trying to send some data to Kafka. To do so, I'm using smallrye-kafka:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

To send data, I'm using an Emitter like this:

@Inject
@Channel("my-channel")
private Emitter<Record<String, Hotel>> emitter;

emitter.send(Record.of(hotel.getId(), hotel))

Is it possible to check if Kafka server is up and in case to handle the error with an exception? Or is it available some documentation which describe how to do so?

1

There are 1 best solutions below

0
Ozan Günalp On

One way of doing this is to rely on the HealthCenter that Smallrye reactive messaging provides. For the Kafka connector, the default health check looks at the Kafka client APIs metrics and check for the connection status. You can look at other options from the documentation too.

Unfortunately, Kafka Client APIs don't provide anything more helpful to check the connection status. Here is a sample code :


    @Channel("quote-requests")
    Emitter<String> emitter;

    @Inject
    HealthCenter healthCenter;

    @POST
    @Path("/request")
    @Produces(MediaType.TEXT_PLAIN)
    public String createRequest() {
        if (healthCenter.getReadiness().getChannels().stream().noneMatch(c -> c.getChannel().equals("quote-requests") && c.isOk())) {
            throw new WebApplicationException(503);
        }
        UUID uuid = UUID.randomUUID();
        emitter.send(uuid.toString());
        return uuid.toString();
    }

Note that, this is not optimal though, it'll get the readiness of all reactive messaging channels, and won't work if you disable the health checks for these channels. Another option is to use the KafkaClientService and access the KafkaProducer directly in order to get the metrics yourself. The connector checks the connection-count metric.

Hope this helps.