I have a question about Kafka Streams.
If I consume KTable, it will automatically materialized as KeyValueStore. I want to change this to VersionedKeyValueStore.
Also, if I set topology.optimization to all, then changelog for that materialized store is not created but original consuming topic is used as a changelog topic.
How can I change type of materialized store from KeyValueStore to VersionedKeyValueStore?
fun process() = BiConsumer<KStream<String, String>, KTable<String, String>> { input, table ->
input.join(table) { value1, value2 ->
Pair(value1, value2)
}.peek { key, value -> println("$key: $value") }
}
I have materialized KTable as VersionedKeyValueStore, but changelog topic has created although I've set topology.optimization to all.
fun process() = BiConsumer<KStream<String, String>, KTable<String, String>> { input, table ->
val storedTable = table
.toStream()
.groupByKey()
.aggregate(
{ byteArrayOf() },
{ _, value, _ -> value.toByteArray() },
Materialized.`as`(store)
)
val store = Stores.persistentVersionedKeyValueStore("kafka-streams-test-store", Duration.ofDays(1))
val stream = input.join(storedTable) { value1, value2 ->
Pair(value1, value2)
}.peek { key, value -> println("$key, ${value?.first}, ${value?.second?.decodeToString()") }
}