Kafka IO code on dataflow job:
PCollection<KafkaRecord<String, TopicMessage>> topicPCollection = p.apply("Read from topic", KafkaIO.<String, TopicMessage>read()
.withBootstrapServers(options.getKafkaBroker().get())
.withTopic("TOPIC_NAME")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(TopicMessageDeserializer.class).withConsumerConfigUpdates(
new ImmutableMap.Builder<String, Object>()
.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId")
.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "104857600")
.build()
));
I have configured 50 partitions and 50 workers on the dataflow jobs, but still, it is unable to consume more messages from the topic.