How to create table using Table Api in flink for modelling
I am working on creating a table using Table API from the data-stream of confluent-avro (from kafka topic). I am trying using the below code ( Flink version is 1.17) and it is creating table in such a way that resultTable column contains the complete json in one column of table. Can someone please help me in how to create table in such a way that each field of event is mapped to one column in the table.
DataStream<GenericRecord> dataStream = ...;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table attribution = tableEnv.fromDataStream(dataStream);
tableEnv.createTemporaryView("attribution", attribution);
Table resultTable = tableEnv.sqlQuery("SELECT f0 FROM attribution");
tableEnv.createTemporaryView("resultTable", resultTable);
tableEnv.sqlQuery("SELECT `field_name` from resultTable").execute().collect().forEachRemaining(System.out::println); // Throwing Exception.
You can use the variant of
fromDataStreamthat takes a Schema as the second argument. Here's an example I copied from the documentation:SOURCE_WATERMARK()is for cases where the DataStream being converted to a Table already has suitable watermarks.