Error in Zeppelin notebook stating unable to create a sink

118 Views Asked by At

I am trying to write to from 1 KDS to another after processing in Apache Flink. I have used Zeppelin notebook to create sink table with the following query :

%flink.ssql

CREATE TABLE seller_revenue (
    seller_id    VARCHAR,
    window_end   TIMESTAMP,
    sales        DOUBLE
) 
WITH (
    'connector'           = 'kinesis',
    'stream'              = 'seller_stream_window',
    'aws.region'          = 'ap-south-1',
    'scan.stream.initpos' = 'LATEST',
    'format'              = 'json'
)

then I am writing data using following

%flink.ssql(parallelism=1)

INSERT INTO seller_revenue
SELECT
    seller_id,
    TUMBLE_END(proctime, INTERVAL '30' SECONDS) AS window_end,
    SUM(product_quantity * product_price) AS sales
FROM seller_sales
GROUP BY
    TUMBLE(proctime, INTERVAL '30' SECONDS),
    seller_id

however getting following error:

Unable to create a sink for writing table 'hive.flink2.seller_revenue'.
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'kinesis'.

Unsupported options:

scan.stream.initpos

Could someone please help to resolve it?

I tried removing unsupported option scan.stream.initpos however no data is getting written after this.

1

There are 1 best solutions below

0
Jazzzzie On

if you deploy the Zeppelin notebook as a streaming application then the code will work.

In the Zeppelin notebook itself you aren't able to perform these steps, I had a similar issue.