I have a Flink Job that I can't understand why it won't print to standard output. I notice that If I remove my filter and watermark, I see raw messages from my kafka topic. But applying an aggregation and a watermark, I don't get anything. Here's my code under main()
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// // create kafka source
KafkaSource<VideoAdCompletedEvent> videoAdsSource = KafkaSource
.builder()
.setBootstrapServers(jobConfig.kafkaSourceServer)
.setTopics(jobConfig.sourceKafkaTopic).setGroupId(jobConfig.consumerGroupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new VideoDeserSchema())
.build();
WatermarkStrategy<VideoAdCompletedEvent> watermarkStrategy = WatermarkStrategy
.<VideoAdCompletedEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30)) // allow events up to 30 seconds out of order
.withTimestampAssigner(
(event, timestamp) -> event.getOriginTs()
)
.withIdleness(Duration.ofMillis(500));
DataStream<VideoAdCompletedEvent> videoAdsStream = env
.fromSource(videoAdsSource, watermarkStrategy, "Mobile Analytics Filtered Video Ads Topic")
.assignTimestampsAndWatermarks(watermarkStrategy);
DataStream<VideoAdCompletedEvent> filteredStream = videoAdsStream
.filter(new FilterFunction<VideoAdCompletedEvent>() {
@Override
public boolean filter(VideoAdCompletedEvent message) throws Exception {
return (message.getCompletedReason().equals("VIDEO_FINISHED") || message.getCompletedReason().equals("video_finished"));
}
});
DataStream<Tuple2<String, Integer>> aggStream = filteredStream
.keyBy((VideoAdCompletedEvent event) -> event.getVideoId())
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.aggregate(new DistinctUserIdAggregate(), new VideoIdCountProcessFunction());
SinkFunction<VideoAdCompletedEvent> printSink = new PrintSinkFunction();
aggStream.addSink(printSink);
env.execute("Test Job");
Any ideas?
A few possible explanations...
FilterFunctionis removing all events.VideoIdCountProcessFunctionisn't generating any results.Adding additional logging and/or counters would help figure out where events are being removed.
Note that the call to
.assignTimestampsAndWatermarks()should be removed, as you're already passing the watermark strategy to the Kafka source via the.fromSource()call.