I am using Kafka stream processing for aggregating data from source object with Springboot.
@Bean
public java.util.function.Consumer<KStream<String, SourceObject>> processSourceObject() {
Serde<SourceObject> SourceObjectSerde = new JsonSerde<>(SourceObject.class);
Serde<AgrregatedObject> AgrregatedObjectSerde = new JsonSerde<>(AgrregatedObject.class);
return input -> input.map((key, value) -> new KeyValue<String, SourceObject>(value.uniques(), value))
.groupByKey(Grouped.with(Serdes.String(), SourceObjectSerde))
.aggregate(AgrregatedObject::new, (uniques, sourceObject,
destinationList) -> new SourceObjectUpdater().apply(sourceObject, destinationList),
Materialized.<String, AgrregatedObject>as(Stores.inMemoryKeyValueStore("custome-snapshots")).withKeySerde(Serdes.String()).withValueSerde(AgrregatedObjectSerde))
.toStream().foreach((foo, bar) -> process);
}
While running this application, along with provided topic to processSourceObject it is auto-creating two more topics
- processSourceObject-applicationId-data-snapshots-changelog
- processSourceObject-applicationId-data-snapshots-repartition
I want to use existing topics instead of using these two topics for some reasons. Where do I make changes to provide names of predefined topics to use for changelog and repartition data by my application?
It depends on the version you are using. As of Apache Kafka 2.4, the Streams API allows to name all operators/processors and those names are used for repartition and changelog topics.
However, all internal topics are always prefixed with
<application.id>-and suffixed with-repartitionor-changelog-- so you can only set part of the topic names.For example, you can use
Grouped.as("myName")to set a name for the repartition topic.