How to create Asynchronous Camel-kafka consumer?

1k Views Asked by At

I have a route in camel to consumer from kafka. It is consuming and producing with a TPS of 2000 if the incoming message is 18000 TPS. so the consumer topic has consumer lag. If I keep max.poll.recors = 500 i'm able to achieve 2000 TPS. If I keep producer settings requestRequiredAcks=0 I can achieve 4000 TPS. but still with consumer lag. We know that camel route is complete when from->to is complete. a consumer which is consuming from 2 partitions with a consumer count 2, is busy until the route is complete.

is there a way to make camel-kafka consumer asynchronous . any code example?


        from("kafka:{{consumer.topic}}?brokers={{kafka_dev.host}}"
                    + "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount=2"
                    + "&seekTo=latest" + "&groupId={{consumer.group}}" + "&keyDeserializer="
                    + KEYDESERIALIZER + "&valueDeserializer=" + VALUEDESERIALIZER + SSL).doTry()
                            .routeId("route1")
                            .process(new CamelProcessor())
                            .to("kafka:{{producer.topic}}?brokers={{kafka_dev.host}}" +"&requestRequiredAcks=1" )

                            .doCatch(Exception.class));

Also we have observed that introducing threads in this route re-reads the same processed and sent messages. is this link saying camel-kafka https://stackoverflow.com/questions/56716812/how-to-commit-offsets-thread-safe-using-camel-kafka

0

There are 0 best solutions below