How to cancel Webflux Kafka subscription (and skip acknowledge) when client disconnect?

64 Views Asked by At

I'm working on a project to build a Server-Sent Events (SSE) by using Spring Webflux and Reactor Kafka. I will need to expose an endpoint for our 3rd party client to consume the message.

I've wrote some sample code based on this example (https://github.com/svgagan/kafka-summit-sse)

@GetMapping(value = "/message", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<JsonNode> getEvents(){
    Map<String, Object> propsMaps = this.kafkaConsumerConfigs();

    DefaultKafkaReceiver<String, JsonNode> kafkaReceiver =
            new DefaultKafkaReceiver(ConsumerFactory.INSTANCE, ReceiverOptions.create(propsMaps).subscription(Collections.singleton(TOPIC)));

    Flux<ReceiverRecord<String, JsonNode>> kafkaFlux = kafkaReceiver.receive();

    return kafkaFlux.checkpoint("Messages are started being consumed")
            .log()
            .doOnNext(r -> {
                    r.receiverOffset().acknowledge();
                    log.info("Demo Flux Acknowledging Message");
                }
            )
            .map(ReceiverRecord::value)
            .checkpoint("Messages are done consumed")
            .doOnCancel(() -> log.info("Demo Flux Cancelled"))
            .doFinally(signalType -> {
                if (signalType == SignalType.CANCEL) {
                    log.info("Flux Completed due to Cancellation");
                } else {
                    log.info("Flux Completed due to Completion or Error");
                }}
            );
}

I testing the subscription through CURL.

curl --location --request GET 'http://localhost:8080/message' --header 'Content-Type: text/event-stream;charset=UTF-8' --header 'Accept: text/event-stream;charset=UTF-8'

Here is the issue. When I tried to cancel the CURL connection in my terminal, I expected the subscription will auto cancel, and the doOnNext() action will be skip. However, there is nothing happen and no logs show up.

Then I tried to publish a new message to see what will happen. At this moment, I see the logs show that the consumer is being unregistered from the brokers. However, it seems the doOnNext() is still trigger even though the subscription is being cancelled.

As the doOnNext("where it used to acknowledge the kafka message - r.receiverOffset().acknowledge()") is executed, the offset has been updated. But the consumer won't able to receive the message anymore since the curl connection has been terminated. And the same message won't be send to other consumers within a same group since it has been "acknowledged".

May I know if there is anyway to cancel the subscription explicitly (where it won't trigger doOnNext() when client disconnect the GET request?

Or if there is anyway to detect a Cancellation event in the doOnNext() session? If there is, at least I can do a simple IF checking to decide whether to ack the message.

Please do advise. Thanks!

0

There are 0 best solutions below