Apache Flink (AWS) does not recognize saved temporary function

15 Views Asked by At

I have a UDF function defined as

%flink.pyflink

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(result_type=DataTypes.FLOAT(), input_types=[DataTypes.STRING(), DataTypes.FLOAT()])
def calculate_cmgr(date_str: str, close_price: float, start_price: float = 92.30) -> float:
    date_obj = datetime.strptime(date_str, '%Y-%m-%d')
    
    if date_obj.day > 3 or (date_obj.month == 1 and date_obj.year == 2021):
        return 0.0
    
    start_date = datetime(2021, 1, 1)
    months_diff = (date_obj.year - start_date.year) * 12 + date_obj.month - start_date.month
   
    if months_diff <= 0:
        return 0.0
    
    cmgr = ((close_price / start_price) ** (1 / months_diff) - 1) * 100
    return cmgr

st_env = StreamTableEnvironment.create(StreamExecutionEnvironment.get_execution_environment())

st_env.create_temporary_function("calculate_cmgr", calculate_cmgr)

And in a cell below I have this ssql query

%flink.ssql(type=update)

SELECT
    event_time,
    close_price,
    calculate_cmgr(CAST(event_time AS STRING), close_price) AS cmgr
FROM
    stock_table
WHERE
    calculate_cmgr(CAST(event_time AS STRING), close_price) <> 0;

When I run all paragraphs I get error SQL validation failed. From line 8, column 5 to line 8, column 59: No match found for function signature calculate_cmgr(, )

What am I doing wrong?

0

There are 0 best solutions below