How to close flink event time windows with idle streams?

167 Views Asked by At

Currently using Flink event time but sometimes the streams go idle, during this time I want the windows to close so the data gets output rather than waiting for another event to come through. Am I able to add periodic watermarks for when the stream is idle? I've tried to use AssignerWithPeriodicWatermarks but this is now deprecated, what's the current way to emit periodic watermarks? The documentation is rubbish and can't get my head around how to implement it. Here's the source I currently have setup:

```val SomeStream = env
  .addSource(new FlinkKafkaConsumer011[String](Pattern.compile(config.getInputFaultStream), new SimpleStringSchema(), properties))
  .map(x => JsonUtil.fromJson[SomeInput](x) match {
    case Success(value) => value
    case Failure(f) => null
  })
  .filter(x => x != null && x.timestamp != null && x.faultStatus != null && fixDate.isInstant(x.timestamp))
  .assignTimestampsAndWatermarks(
    WatermarkStrategy
      .forBoundedOutOfOrderness(Duration.ofSeconds(60))
      .withIdleness(Duration.ofSeconds(300))
      .withTimestampAssigner(new SerializableTimestampAssigner[SomeInput] {
        override def extractTimestamp(element: SomeInput, recordTimestamp: Long): Long = fixDate.makeInstant(element.timestamp).toEpochMilli
      }))
  .name("ReadFaultEventInput")```
1

There are 1 best solutions below

0
David Anderson On

This situation -- where all partitions are completely idle -- can be difficult to work with. The withIdleness method you're using is only effective so long as there are still messages coming on at least one partition.

If you can arrange for keep-alive messages, that can be a good way to keep the watermarks advancing despite the lack of real event traffic.

Otherwise you can implement a watermark strategy that uses a processing time timer to advance the watermark despite the lack of events. This can be a bit risky, since it won't be able to distinguish between an outage and a quiet period. There's also some risk that when events resume, some will be late because they're behind the artificially advanced watermark.

Here's an example of how to do this with the old watermark interface. I'm afraid I don't have an example showing how to do this with the newer WatermarkStrategy interface. Perhaps the best way to understand how this works is by reading the code. You could start with https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/BoundedOutOfOrdernessWatermarks.java.