Using provided topics for changelog and repartition while aggregating in Kafka Stream Processing

1.5k Views Asked by At

I am using Kafka stream processing for aggregating data from source object with Springboot.

@Bean
public java.util.function.Consumer<KStream<String, SourceObject>> processSourceObject() {
    Serde<SourceObject> SourceObjectSerde = new JsonSerde<>(SourceObject.class);
    Serde<AgrregatedObject> AgrregatedObjectSerde = new JsonSerde<>(AgrregatedObject.class);
    return input -> input.map((key, value) -> new KeyValue<String, SourceObject>(value.uniques(), value))
            .groupByKey(Grouped.with(Serdes.String(), SourceObjectSerde))
            .aggregate(AgrregatedObject::new, (uniques, sourceObject,
                    destinationList) -> new SourceObjectUpdater().apply(sourceObject, destinationList),
                    Materialized.<String, AgrregatedObject>as(Stores.inMemoryKeyValueStore("custome-snapshots")).withKeySerde(Serdes.String()).withValueSerde(AgrregatedObjectSerde))
            .toStream().foreach((foo, bar) -> process);
}

While running this application, along with provided topic to processSourceObject it is auto-creating two more topics

  1. processSourceObject-applicationId-data-snapshots-changelog
  2. processSourceObject-applicationId-data-snapshots-repartition

I want to use existing topics instead of using these two topics for some reasons. Where do I make changes to provide names of predefined topics to use for changelog and repartition data by my application?

1

There are 1 best solutions below

0
Matthias J. Sax On

It depends on the version you are using. As of Apache Kafka 2.4, the Streams API allows to name all operators/processors and those names are used for repartition and changelog topics.

However, all internal topics are always prefixed with <application.id>- and suffixed with -repartition or -changelog -- so you can only set part of the topic names.

For example, you can use Grouped.as("myName") to set a name for the repartition topic.