I have a question about Kafka Streams. If I consume KTable, it will automatically materialized as KeyValueStore. I want to change this to VersionedKeyValueStore. Also, if I set topology.optimization to all, then changelog for that materialized store is not created but original consuming topic is used as a changelog topic. How can I change type of materialized store from KeyValueStore to VersionedKeyValueStore?

fun process() = BiConsumer<KStream<String, String>, KTable<String, String>> { input, table ->
    input.join(table) { value1, value2 ->
        Pair(value1, value2)
    }.peek { key, value -> println("$key: $value") }
}

I have materialized KTable as VersionedKeyValueStore, but changelog topic has created although I've set topology.optimization to all.

fun process() = BiConsumer<KStream<String, String>, KTable<String, String>> { input, table ->
    val storedTable = table
        .toStream()
        .groupByKey()
        .aggregate(
            { byteArrayOf() },
            { _, value, _ -> value.toByteArray() },
            Materialized.`as`(store)
        )
    val store = Stores.persistentVersionedKeyValueStore("kafka-streams-test-store", Duration.ofDays(1))
    val stream = input.join(storedTable) { value1, value2 ->
        Pair(value1, value2)
    }.peek { key, value -> println("$key, ${value?.first}, ${value?.second?.decodeToString()") }
}

0

There are 0 best solutions below