Refreshing Kafka Producer during runtime in spring application

165 Views Asked by At

I'm using spring cloud stream with Kafka (Spring Boot 3.0.11, Spring Cloud 2022.0.4), sending messages with a StreamBridge. Kafka broker requires using a client certificate. Since we already have dynamic client certificates available for any other components, like feign clients, I don't want to use dumb keystore-file approach. Client certificate is available inside spring application as a base64 encoded string. I've implemented a custom SSLContext, which changes whenever the certificate is renewed. I've also managed to use custom SSLContext with Kafka by implementing a custom SslEngineFactory. However, while other clients like feign, are using the updated SSLContext, Kafka keeps the old context. The only idea which came in my mind to force Kafka using the new certificate, would be to refresh the producer during runtime, which would result in being rebuild and using the updated SSLContext. Unfortunately, any information I found about doing so, won't work with spring cloud stream. I would highly appreciate any idea, how to either update SSL configuration or refresh producer. Here is a small configuration example:

spring:
  kafka:
    admin:
      client-id: my-client
      fail-fast: true
  cloud:
    stream:
      default-binder: my-binder
      default:
        producer:
          partition-count: 2
      binders:
        my-binder:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      autoCreateTopics: false
                      brokers: my-broker:9092
      bindings:
        my-output:
          binder: my-binder
          destination: my-topic
          contentType: application/json
          producer:
            partitionCount: 2
      kafka:
        binder:
          configuration:
            ssl.engine.factory.class: com.example.kafka.ssl.MySslEngineFactory
            security.protocol: SSL
            ssl.endpoint.identification.algorithm: https
            ssl.client.auth: required

and here an example sending a message:

@Component
public class KafkaService {

    @Autowired
    StreamBridge streamBridge;

    @Autowired
    EventPublishingExecutor eventPublishingExecutor;

    @Value("${spring.cloud.stream.bindings.my-output.destination}")
    String topic;

    public void publishMessage(String message) {
        if (null == streamBridge)
            throw new RuntimeException("Cannot send kafka event: streamBridge is null");

        //Send to broker
        final KafkaPublisherRunnable runnable = new KafkaPublisherRunnable(streamBridge, topic, message);
        eventPublishingExecutor.execute(runnable);
    }
}
1

There are 1 best solutions below

1
Gary Russell On

The DefaultKafkaProducerFactory has a property maxAge which causes the producer to be refreshed automatically after that period. The factory also has a reset() method.

The binder does not currently expose the property, or provide any mechanism to reset the producers for its internal producer facories.

I suggest you open a new feature request: https://github.com/spring-cloud/spring-cloud-stream/issues referencing this question/answer.