Using Flink SQL with Table Aggregate Function (UDTAGG)

37 Views Asked by At

I am using Flink 1.18 with Java and I am trying use a User-defined table aggregate function (UDTAGG). The documentation only includes an example with Table API for UDTAGG. However, when I try achieving same results by using a query in Flink SQL, I get planning errors.

My records consist of "Event" logs, where each event has a timestamp, eventId and some payload:

{
    "ts": BIGINT,
    "eventId": BIGINT,
    "payload": VARCHAR
}

I am trying to group events by their EventId values and apply my user-defined table aggregate function to them. The following program, which uses the Table API, works fine:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

final DataStream<Event> events = env.fromCollection(
                                  Arrays.asList(
                                   new Event(1710104562L, 1001L, "ABC"),
                                   new Event(1710105162L, 1002L, "DEF")
                                  ...));
final Table table = tableEnv.fromDataStream(events);
tableEnv.createTemporaryView("tab", table);

// Define a UDTAGG
tableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION aggrFunc as '...' using jar '...'");

final Table results = tableEnv.from("tab")
                        .groupBy($("eventId"))
                        .flatAggregate(
                            Expressions.call("aggrFunc", Row.of($("ts"), $("eventId"), $("payload")))
                                .as("ts", "eid", "payload"))
                        .select($("ts"), $("eid").as("eventId"), $("payload"));
results.execute().print();

Now, I am trying to use a Flink SQL query to do the same and get a type mismatch error from Calcite:

String sql = "SELECT eventId, aggrFunc(CAST((ts, eventId, payload) AS Row<ts BIGINT, eventId BIGINT, payload STRING>)) 
              FROM tab 
              GROUP BY eventId";
tableEnv.executeSql(sql).print();

// error
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(BIGINT EXPR$0) NOT NULL
equiv rowtype: RecordType(RecordType:peek_no_expand(BIGINT ts, BIGINT id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" payload) EXPR$0) NOT NULL
Difference:
EXPR$0: BIGINT -> RecordType:peek_no_expand(BIGINT ts, BIGINT id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" payload)

at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
...

Any suggestions on how a UDTAGG should be used via the SQL API in Flink?

1

There are 1 best solutions below

0
CodeWOD On

As of Flink 1.18, UDTAGG is only supported via Table API. Check here