"Early return" for WindowedStream aggregate or reduce

49 Views Asked by At

tl;dr is there a way to emit the first element of a WindowedStream which matches some condition, and to stop processing all events in that window for a given key after the condition has been matched?


I am performing a reduce operation on a WindowedStream. I am trying to see if any element of the stream matches a given condition. If an element does match the condition, that element should be the output of the stream. After this element has been found, it is no longer necessary to process events for that window.

Here is a simplified example. (Written in Kotlin, but I think it should be understandable to any Java or Scala developers who know Flink. Please feel free to post an answer in any Flink supported language.)

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


data class Event(val eventId: Int, val timestampMillis: Long, val failed: Boolean)


fun eventAggregator(stream: DataStream<Event>): DataStream<Event> {
  // Use the event's timestamp for watermarks.
  val watermarks = WatermarkStrategy.forMonotonousTimestamps<Event>()
                                    .withTimestampAssigner { event, _ -> event.timestampMillis }
  
  // Key the stream by eventId
  val keyedStream = stream.assignTimestampsAndWatermarks(watermarks).keyBy(Event::eventId)

  // One-minute window
  val windowed = keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)))
  
  // If any event with a given eventId failed, we want to mark that ID as failed.
  return windowed.reduce { a, b -> if (a.failed) a else b }
}

I tried implementing a custom trigger which would delegate to EventTimeTrigger unless a failure was found, then it would FIRE. But this would trigger the entire window, leading to incorrect results for most other events.

import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

class FireOnFirstFail() : Trigger<Event, TimeWindow>() {

  private val delegate = EventTimeTrigger.create()

  override fun onElement(element: Event, timestamp: Long, window: TimeWindow, ctx: TriggerContext) =
    if (element.failed) {
      TriggerResult.FIRE // Very buggy
    } else {
      delegate.onElement(element, timestamp, window, ctx)
    }
  
  override fun onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext) =
    delegate.onProcessingTime(time, window, ctx)

  override fun onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext) =
    delegate.onEventTime(time, window, ctx)

  override fun clear(window: TimeWindow, ctx: TriggerContext) =
    delegate.clear(window, ctx)
}

...

  return windowed.trigger(FireOnFirstFail()).reduce...

I am running this on Kinesis Data Analytics, so it is using Flink 1.15.2, which is the latest available version at the time of writing.

0

There are 0 best solutions below