Using AsyncDataStream and RichAsyncFunction with SingleOutputStreamOperator

563 Views Asked by At

I'm aggregating over a keyed stream using a SingleOutputStreamOperator object in the form

stream = env.fromSource(...)...sideOutput(...).window(...).aggregate(...)

After I obtain the aggregates I would like to send each record to a REST endpoint using a POST request. To do this I've written a RichAsyncFunction. Unfortunately AsyncDataStream.unorderedWait(...) is incompatible with SingleOutputStreamOperator and instead needs a more generic DataStreams object.

The method unorderedWait(DataStream<IN>, AsyncFunction<IN,OUT>, long, TimeUnit, int) in the type AsyncDataStream is not applicable for the arguments (SingleOutputStreamOperator<Tuple6<String,String,Long,Long,Long,Long>>, AsyncFunction<String,String>, long, TimeUnit, int)

How can I use get the data stream from SingleOutputStreamOperator to use the RichAsyncFunction I've created. Or should I just use a process window function instead?

1

There are 1 best solutions below

2
kkrugler On BEST ANSWER

SingleOutputStreamOperator<T> extends DataStream<T>, so I don't think that's your problem.

It looks like you've defined an AsyncFunction<String,String>, so it expects a String as input, but based on the error message you're passing it Tuple6<String,String,Long,Long,Long,Long>.