How to increase throughput in Kafka with a constant number of partitions and dynamic number of consumers

892 Views Asked by At

I have 6 partitions for a certain topic, and 4 consumers consuming from that topic. The producer produces in a round robin manner to the partitions. The 4 consumers belong in the same consumer group.

I can see with some load testing that 2 of the partitions are getting consumed very slowly, while the others are almost always empty. I would like to increase my throughput as much as possible.

  1. What will be the the the default partition assignment strategy from kafka?
  2. If the load increases at some time I would like to scale my consumers up to 6 (same number as partitions so it is a 1-1 consumer to partition). In the 4 consumers scenario to achieve the best possible throughput should I limit my producer to produce only to 4 partitions until I have increased the number of my consumers?
2

There are 2 best solutions below

2
abs On

Which kafka version are you using?

It seems your producers are not using efficient method for partitioning.

  • Check if similar keys are being generated by Producers? Or if it is generating null keys?

You can write custom partition with efficient hash algo which distribute messages equally and give fair chance to consumers to consume the message in parallel

1
ChristDist On

Many factors contribute the overall performance of the Clients (Producer/Consumer) connected to a KAFKA Broker. First of all, I am not sure how you are running your consumer instances, whether 4 instances running on 4 separate servers or 4 instances through any IDE tool for loading test per se. You can better clarify here. Also, how is your consumer implementation look like. Is it just reading from the topic and writing it into a console or doing full blown business functionality connected to any of the backend systems. Kindly confirm.

Default Partitioner:

If a key exists and the default partitioner is used, Kafka will hash the key and use the result to map the message to a specific partition. The mapping of keys to partitions is consistent only as long as the number of partitions in a topic does not change.

You can change this behaviour implementing a Customer Partitioner

Dynamic consumers:

You can't increase the consumers dynamically based on the throughput, unless you have a multi-threaded consumers implemented. You can read more about Java Executor Service ref: https://dzone.com/articles/kafka-consumer-and-multi-threading. Your consumer implementation must be having something as follows. So you should have a counter of number of records polled, and if it is more than the threshold you are after then you can instantiate the ExecutorService to add up more instances.

private List executors = new ArrayList() ;

@Override
public void run(String... args) throws Exception {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            executors.forEach(exe -> {
                exe.shutdown();
                try {
                    if (!exe.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
                        exe.shutdownNow();
                    }
                } catch (InterruptedException e) {
                        exe.shutdownNow();
                        }

                        int instances = <<number of instances>>;
                        ExecutorService executor = Executors.newFixedThreadPool(instances);
                        for (int i=0; i < instances; i++) {
    executor.execute(<<Consumer Implemenation class>>);
    executors.add(executor);
    }
    }