I'm working with data coming from kafka in real-time. As a result of processing a record from one line, several lines are formed (due to the JOIN and EXPLODE operations). Then I need to select among all these rows only one, with the highest value of a certain field.

The question is how to collect all these rows obtained from one source record into a window and apply an aggregate function to them. In Spark Structured Streaming I managed to solve a similar problem using foreachBatch, but I'm stuck in Flink.

The closest to the desired result was obtained using Top-N, but it is not suitable for append-mode and I'm not entirely sure how to store it in the target storage. Am I even going in the right direction?

I'm new to Flink, all answers are appreciated.


Update:

OK, I tried using session windows with an aggregate function to find the maximum:

DataStream<Row> joined = tableEnv.toDataStream(tableEnv.from("joined_msip"));


DataStream<Row> filtered = joined
                .keyBy((KeySelector<Row, Object>) value -> value.getField("unique_id"))
                .window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(1000)))
                .aggregate(new MaxStartTimeAggregate());

And it works, but I am forced to set a session gap, and this results in a loss of time compared to just collecting all the records spawned by the original record.

Well, this does not look like the best solution, so the question remains.

0

There are 0 best solutions below