I'm aggregating over a keyed stream using a SingleOutputStreamOperator object in the form
stream = env.fromSource(...)...sideOutput(...).window(...).aggregate(...)
After I obtain the aggregates I would like to send each record to a REST endpoint using a POST request. To do this I've written a RichAsyncFunction. Unfortunately AsyncDataStream.unorderedWait(...) is incompatible with SingleOutputStreamOperator and instead needs a more generic DataStreams object.
The method unorderedWait(DataStream<IN>, AsyncFunction<IN,OUT>, long, TimeUnit, int) in the type AsyncDataStream is not applicable for the arguments (SingleOutputStreamOperator<Tuple6<String,String,Long,Long,Long,Long>>, AsyncFunction<String,String>, long, TimeUnit, int)
How can I use get the data stream from SingleOutputStreamOperator to use the RichAsyncFunction I've created. Or should I just use a process window function instead?
SingleOutputStreamOperator<T>extendsDataStream<T>, so I don't think that's your problem.It looks like you've defined an
AsyncFunction<String,String>, so it expects aStringas input, but based on the error message you're passing itTuple6<String,String,Long,Long,Long,Long>.