This is a sample use case for counting the number of words.
Below is the code developed using the Kafka Streams library.
@Autowired
void buildPipeline(StreamsBuilder streamsBuilder) {
KStream<String, String> messageStream = streamsBuilder
.stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));
KTable<String, Long> wordCounts = messageStream
.mapValues((ValueMapper<String, String>) String::toLowerCase)
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
.count(Materialized.as("word-counts"));
wordCounts.toStream().to("output-topic");
}
@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams
.store(StoreQueryParameters.fromNameAndType("word-counts", QueryableStoreTypes.keyValueStore()));
return counts.get(word);
}
Below is the same sample use case, written using ksqldb.
CREATE STREAM message_stream (message VARCHAR)
WITH (KAFKA_TOPIC='input-topic', VALUE_FORMAT='JSON', PARTITIONS=3, REPLICAS=2);
CREATE STREAM words
WITH (KAFKA_TOPIC='words')
AS
SELECT EXPLODE(REGEXP_SPLIT_TO_ARRAY(LCASE(message), '\W+')) AS word
FROM message_stream;
CREATE TABLE word_counts
WITH (KAFKA_TOPIC='word-counts')
AS
SELECT word, COUNT(*) AS count
FROM words
GROUP BY word;
CREATE TABLE output_stream
WITH (KAFKA_TOPIC='output-topic')
AS
SELECT word, count
FROM word_counts
EMIT CHANGES;
I want to query 'word-counts' as a materialized table in ksqldb running in headless mode. In other words, I want to query the 'word-counts' from RocksDB in ksqldb running in headless mode.
How can I do this? (note. ksqldb rest api is blocked in headless mode)
I tested it by running the above codes directly.