I have a question about the materialized view in ksqlDB (in headless mode) (feat. kafka, kafka-streams)

32 Views Asked by At

This is a sample use case for counting the number of words.

Below is the code developed using the Kafka Streams library.

@Autowired
void buildPipeline(StreamsBuilder streamsBuilder) {
    KStream<String, String> messageStream = streamsBuilder
        .stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));

    KTable<String, Long> wordCounts = messageStream
        .mapValues((ValueMapper<String, String>) String::toLowerCase)
        .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
        .groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
        .count(Materialized.as("word-counts"));

    wordCounts.toStream().to("output-topic");
}

@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
  KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
  ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams
      .store(StoreQueryParameters.fromNameAndType("word-counts", QueryableStoreTypes.keyValueStore()));
  return counts.get(word);
}

Below is the same sample use case, written using ksqldb.

CREATE STREAM message_stream (message VARCHAR)
WITH (KAFKA_TOPIC='input-topic', VALUE_FORMAT='JSON', PARTITIONS=3, REPLICAS=2);

CREATE STREAM words
WITH (KAFKA_TOPIC='words')
AS
SELECT EXPLODE(REGEXP_SPLIT_TO_ARRAY(LCASE(message), '\W+')) AS word
FROM message_stream;

CREATE TABLE word_counts
WITH (KAFKA_TOPIC='word-counts')
AS
SELECT word, COUNT(*) AS count
FROM words
GROUP BY word;

CREATE TABLE output_stream
WITH (KAFKA_TOPIC='output-topic')
AS
SELECT word, count
FROM word_counts
EMIT CHANGES;

I want to query 'word-counts' as a materialized table in ksqldb running in headless mode. In other words, I want to query the 'word-counts' from RocksDB in ksqldb running in headless mode.

How can I do this? (note. ksqldb rest api is blocked in headless mode)

I tested it by running the above codes directly.

0

There are 0 best solutions below