I need to consume data from two different topics, enrich that data, and then send it to two separate topics. I've provided the applicationId at the binder level as I need to provide same groupId for both topic and ran the application, but I've noticed that it's only assigning one of the topics and not the other one. Below are the logs and configuration for this scenario.
server.port: 5212
spring:
cloud:
stream:
bindings:
orderProcessor-in-0:
destination: raw-order-topic
orderProcessor-out-0:
destination: sane-order-topic
upperCaseProcessor-in-0:
destination: src-textMsg-topic
upperCaseProcessor-out-0:
destination: out-textMsg-topic
kafka:
streams:
binder:
brokers: localhost:9092
applicationId: ngcom_globallocal
function:
definition: orderProcessor;upperCaseProcessor
2023-10-11 21:00:11.674 INFO 13640 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=ngcom_globallocal-feef07de-fee8-4ebf-90a2-6d2a4e539401-StreamThread-1-consumer, groupId=ngcom_globallocal] Updating assignment with
Assigned partitions: [raw-order-topic-0]
Current owned partitions: [raw-order-topic-0]
2023-10-11 21:00:11.675 INFO 13640 --- [-StreamThread-1] o.a.k.s.processor.internals.TaskManager : stream-thread [ngcom_globallocal-feef07de-fee8-4ebf-90a2-6d2a4e539401-StreamThread-1] Handle new assignment with:
New active tasks: [0_0]
New standby tasks: []
Existing active tasks: [0_0]
Existing standby tasks: []
2023-10-11 21:00:11.773 INFO 13640 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [ngcom_globallocal-feef07de-fee8-4ebf-90a2-6d2a4e539401] State transition from REBALANCING to RUNNING
ext {
set('springCloudVersion', "2022.0.1")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'org.projectlombok:lombok:1.18.24'
annotationProcessor('org.projectlombok:lombok:1.18.24')
testAnnotationProcessor('org.projectlombok:lombok:1.18.24')
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
When I assign different consumers to the two different topics, it works as expected. However, I'm looking for a solution to use the same consumer group with multiple topics. Below is the working configuration code
server.port: 5212
spring:
cloud:
stream:
bindings:
orderProcessor-in-0:
destination: raw-order-topic
orderProcessor-out-0:
destination: sane-order-topic
upperCaseProcessor-in-0:
destination: src-textMsg-topic
upperCaseProcessor-out-0:
destination: out-textMsg-topic
kafka:
streams:
binder:
brokers: localhost:9092
functions:
orderProcessor:
applicationId: 'OrderProcessorStream'
upperCaseProcessor:
applicationId: 'lowerToUpperCaseStreamProcessor'
function:
definition: orderProcessor;upperCaseProcessor