Here is my Java code to implement a simple stream processing logic:
package com.test.processstrings.consumer;
import com.test.processstrings.domain.EmailEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Service
@EnableKafkaStreams
public class KafkaStreamsService {
@Autowired
private StreamsBuilder streamsBuilder;
@Bean
public KafkaStreams kafkaStreams() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
Map<String, Object> serdeProps = new HashMap<>();
final JsonSerde<EmailEvent> emailEventJsonSerde = new JsonSerde<>();
emailEventJsonSerde.configure(serdeProps, false);
KStream<String, EmailEvent> emailStream = streamsBuilder
.stream("input-topic", Consumed.with(Serdes.String(), emailEventJsonSerde));
// Count unique emails
emailStream.groupByKey()
.count()
.toStream()
.to("output-one", Produced.with(Serdes.String(), Serdes.Long()));
// Count unique domains
emailStream.groupBy((key, value) -> value.getEmail().split("@")[1])
.count()
.toStream()
.to("output-two", Produced.with(Serdes.String(), Serdes.Long()));
return new KafkaStreams(streamsBuilder.build(), props);
}
}
In my configuration I just declare three beans, one for each topic. The topics are correctly created as I can list them from the console (the app starts but then shuts down quickly). The EmailEvent just has the one "email" String field plus setters, getters, constructor and no arguments constructor.
But, I keep getting the error:
stream-client Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
It may be a silly mistake but I have been unable to fix this in any way! Has anyone encountered this / understood what I am doing wrong?
I tried to comment on the output stream creation, but the error persisted.