Testing flink streams with windowAll

34 Views Asked by At

Ι have been trying to test windowAll operation in apache flink with window type TumblingProcessingTimeWindows but when I try to retrieve the results from sink, they are always 0. Using TumblingProcessingTimeWindows is mandatory and part of the spec, so I cannot use a window triggered by events. Specifically I have the following flow:

public void executeStream() {

  var stream = env.addSource(convertToSourceFunction)
              .returns(new TypeHint<SomeObject>() {})
              .windowAll(some window of type TumblingProcessingTimeWindows)
              .process(a class of type ProcessAllWindowFunction<SomeObject, ListOfObjects, TimeWindow> );
              .addSink(myKafkaSink);
      env.execute();
}

When trying to test the code above I create the following resources:

    private FromElementsFunction<SomeObject> convertToSourceFunction(SomeObject element) {
        def hint = new TypeHint<SomeObject>(){}
        return new FromElementsFunction<>(hint.typeInfo.createSerializer(env.getConfig()), element)
    }
class MyKafkaSink implements SinkFunction<ListOfObjects> {
    static def values = []

    @Override
    void invoke(ListOfObjects value, Context context) throws Exception {
        values << value
    }
}

When trying to compare and validate that I have exactly one element in the sink, I get false in the following check:

MyKafkaSink.values.size == 1 //real value is 0

Of course if I don't use window functions, everything works as expected. What is the proper way to test a case like this? I am using Spock to test Java code.

2

There are 2 best solutions below

0
David Anderson On

Your test job almost certainly runs to completion before the window has a chance to fire. Testing applications that rely on processing time can be challenging, because Flink will not wait for processing time timers to fire before exiting.

You'll need to either find some way to keep the job from exiting prematurely, and/or artificially advance the time-of-day clock so the window has a chance to fire.

For what it's worth, the test harnesses provided by Flink have a setProcessingTime method that handles this, but they only support unit tests of single operators, and not end-to-end integration tests.

0
kkrugler On

For testing something like this, you can (a) use a very short window size, and then (b) use a custom source that will first send out the N test events, and then delay for some amount of time greater than your window size.

This is still not completely deterministic, but it's generally pretty reliable. If you wanted to harden it, you could use an atomic counter that your sink would increment when it received an event, and in your source you'd delay until that counter had reached the target value. You'd still need to run your workflow async, and have a timeout (error case) if it didn't complete soon enough.