The following snippet "exposes" a Kafka topic, parameterized here as click-events, as an HTTP SSE endpoint. I like the compact nature of this code. How do I write this code to use click-events topic as a KTable instead of a stream that it is using now?
@Path("/clicks")
public class ClickStreamer {
@Incoming("click-events")
@Outgoing("click-sse")
public String process(JsonObject in) {
return in.encode();
}
@Inject
@Channel("click-sse")
Multi<String> clickSse;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
return clickSse;
}
}