Configuring Kafka Consumer Group for Multiple Topics Assignment Issue

42 Views Asked by At

I need to consume data from two different topics, enrich that data, and then send it to two separate topics. I've provided the applicationId at the binder level as I need to provide same groupId for both topic and ran the application, but I've noticed that it's only assigning one of the topics and not the other one. Below are the logs and configuration for this scenario.

server.port: 5212
spring:
  cloud:
    stream:
      bindings:
        orderProcessor-in-0:
          destination: raw-order-topic
        orderProcessor-out-0:
          destination: sane-order-topic
        upperCaseProcessor-in-0:
          destination: src-textMsg-topic
        upperCaseProcessor-out-0:
          destination: out-textMsg-topic
      kafka:
        streams:
          binder:
            brokers: localhost:9092
            applicationId: ngcom_globallocal
      function:
        definition: orderProcessor;upperCaseProcessor
2023-10-11 21:00:11.674  INFO 13640 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=ngcom_globallocal-feef07de-fee8-4ebf-90a2-6d2a4e539401-StreamThread-1-consumer, groupId=ngcom_globallocal] Updating assignment with
    Assigned partitions:                       [raw-order-topic-0]
    Current owned partitions:                  [raw-order-topic-0]
    
2023-10-11 21:00:11.675  INFO 13640 --- [-StreamThread-1] o.a.k.s.processor.internals.TaskManager  : stream-thread [ngcom_globallocal-feef07de-fee8-4ebf-90a2-6d2a4e539401-StreamThread-1] Handle new assignment with:
    New active tasks: [0_0]
    New standby tasks: []
    Existing active tasks: [0_0]
    Existing standby tasks: []
    
2023-10-11 21:00:11.773  INFO 13640 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [ngcom_globallocal-feef07de-fee8-4ebf-90a2-6d2a4e539401] State transition from REBALANCING to RUNNING
ext {
    set('springCloudVersion', "2022.0.1")
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
    implementation 'org.projectlombok:lombok:1.18.24'
    annotationProcessor('org.projectlombok:lombok:1.18.24')
    testAnnotationProcessor('org.projectlombok:lombok:1.18.24')
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

When I assign different consumers to the two different topics, it works as expected. However, I'm looking for a solution to use the same consumer group with multiple topics. Below is the working configuration code

server.port: 5212
spring:
  cloud:
    stream:
      bindings:
        orderProcessor-in-0:
          destination: raw-order-topic
        orderProcessor-out-0:
          destination: sane-order-topic
        upperCaseProcessor-in-0:
          destination: src-textMsg-topic
        upperCaseProcessor-out-0:
          destination: out-textMsg-topic
      kafka:
        streams:
          binder:
            brokers: localhost:9092
            functions:
              orderProcessor:
                applicationId: 'OrderProcessorStream'
              upperCaseProcessor:
                applicationId: 'lowerToUpperCaseStreamProcessor'
      function:
        definition: orderProcessor;upperCaseProcessor
0

There are 0 best solutions below