In my project, I need to connect to two different Kafka brokers.
My application.yaml looks somewhat like this:
spring:
cloud:
function:
definition: orderCreatedListener;orderProcessedListener
stream:
bindings:
orderCreatedProducer-out-0:
destination: order-created
binder: kafka-one
orderCreatedListener-in-0:
destination: order-created
group: spot
binder: kafka-one
orderCreatedListener-out-0:
destination: order-processed
binder: kafka-two
orderProcessedListener-in-0:
destination: order-processed
group: spot
binder: kafka-two
kafka:
binder:
auto-create-topics: true
bindings:
orderCreatedListener-in-0:
consumer:
enableDlq: true
dlqName: order-created-dlq
autoCommitOnError: true
autoCommitOffset: true
orderProcessedListener-in-0:
consumer:
enableDlq: true
dlqName: order-processed-dlq
autoCommitOnError: true
autoCommitOffset: true
binders:
kafka-one:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
kafka-two:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
But it didn't work when I ran the application, this caused the following error:
2024-03-05T23:35:48.473-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 31 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:49.595-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:49.595-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 32 due to node 1001 being disconnected (elapsed time since creation: 5ms, elapsed time since send: 5ms, request timeout: 3600000ms)
2024-03-05T23:35:50.727-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:50.728-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 33 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:51.086-03:00 INFO 25569 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata
I want to separate the Kafka topics in two clusters:
- kafka-one containing the
order-createdandorder-created-dlq - kafka-two containing the
order-processedandorder-processed-dlq
I using:
- Spring Boot 3.2.3
- Spring Cloud 2023.0.0
I have the two Kafka clusters running fine on my development environment with docker containers, one exposed on the 9092 port and the other exposed on the 9093 port.
How adjust this?
The principal related problem has been caused by incorrect configurations on the docker containers.
Internally, in the docker network, all containers of Kafka were running on the 9092 port and it caused a don't expect behavior!
Old
docker-compose.yaml:The adjusted
docker-compose.yaml:All adjusted points have been marked with a
# modified herecomment on the line of the file.