I'm using spring cloud stream with Kafka (Spring Boot 3.0.11, Spring Cloud 2022.0.4), sending messages with a StreamBridge. Kafka broker requires using a client certificate. Since we already have dynamic client certificates available for any other components, like feign clients, I don't want to use dumb keystore-file approach. Client certificate is available inside spring application as a base64 encoded string. I've implemented a custom SSLContext, which changes whenever the certificate is renewed. I've also managed to use custom SSLContext with Kafka by implementing a custom SslEngineFactory. However, while other clients like feign, are using the updated SSLContext, Kafka keeps the old context. The only idea which came in my mind to force Kafka using the new certificate, would be to refresh the producer during runtime, which would result in being rebuild and using the updated SSLContext. Unfortunately, any information I found about doing so, won't work with spring cloud stream. I would highly appreciate any idea, how to either update SSL configuration or refresh producer. Here is a small configuration example:
spring:
kafka:
admin:
client-id: my-client
fail-fast: true
cloud:
stream:
default-binder: my-binder
default:
producer:
partition-count: 2
binders:
my-binder:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
autoCreateTopics: false
brokers: my-broker:9092
bindings:
my-output:
binder: my-binder
destination: my-topic
contentType: application/json
producer:
partitionCount: 2
kafka:
binder:
configuration:
ssl.engine.factory.class: com.example.kafka.ssl.MySslEngineFactory
security.protocol: SSL
ssl.endpoint.identification.algorithm: https
ssl.client.auth: required
and here an example sending a message:
@Component
public class KafkaService {
@Autowired
StreamBridge streamBridge;
@Autowired
EventPublishingExecutor eventPublishingExecutor;
@Value("${spring.cloud.stream.bindings.my-output.destination}")
String topic;
public void publishMessage(String message) {
if (null == streamBridge)
throw new RuntimeException("Cannot send kafka event: streamBridge is null");
//Send to broker
final KafkaPublisherRunnable runnable = new KafkaPublisherRunnable(streamBridge, topic, message);
eventPublishingExecutor.execute(runnable);
}
}
The
DefaultKafkaProducerFactoryhas a propertymaxAgewhich causes the producer to be refreshed automatically after that period. The factory also has areset()method.The binder does not currently expose the property, or provide any mechanism to reset the producers for its internal producer facories.
I suggest you open a new feature request: https://github.com/spring-cloud/spring-cloud-stream/issues referencing this question/answer.