I am trying to write to from 1 KDS to another after processing in Apache Flink. I have used Zeppelin notebook to create sink table with the following query :
%flink.ssql
CREATE TABLE seller_revenue (
seller_id VARCHAR,
window_end TIMESTAMP,
sales DOUBLE
)
WITH (
'connector' = 'kinesis',
'stream' = 'seller_stream_window',
'aws.region' = 'ap-south-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json'
)
then I am writing data using following
%flink.ssql(parallelism=1)
INSERT INTO seller_revenue
SELECT
seller_id,
TUMBLE_END(proctime, INTERVAL '30' SECONDS) AS window_end,
SUM(product_quantity * product_price) AS sales
FROM seller_sales
GROUP BY
TUMBLE(proctime, INTERVAL '30' SECONDS),
seller_id
however getting following error:
Unable to create a sink for writing table 'hive.flink2.seller_revenue'.
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'kinesis'.
Unsupported options:
scan.stream.initpos
Could someone please help to resolve it?
I tried removing unsupported option scan.stream.initpos however no data is getting written after this.
if you deploy the Zeppelin notebook as a streaming application then the code will work.
In the Zeppelin notebook itself you aren't able to perform these steps, I had a similar issue.