I have a UDF function defined as
%flink.pyflink
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
@udf(result_type=DataTypes.FLOAT(), input_types=[DataTypes.STRING(), DataTypes.FLOAT()])
def calculate_cmgr(date_str: str, close_price: float, start_price: float = 92.30) -> float:
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
if date_obj.day > 3 or (date_obj.month == 1 and date_obj.year == 2021):
return 0.0
start_date = datetime(2021, 1, 1)
months_diff = (date_obj.year - start_date.year) * 12 + date_obj.month - start_date.month
if months_diff <= 0:
return 0.0
cmgr = ((close_price / start_price) ** (1 / months_diff) - 1) * 100
return cmgr
st_env = StreamTableEnvironment.create(StreamExecutionEnvironment.get_execution_environment())
st_env.create_temporary_function("calculate_cmgr", calculate_cmgr)
And in a cell below I have this ssql query
%flink.ssql(type=update)
SELECT
event_time,
close_price,
calculate_cmgr(CAST(event_time AS STRING), close_price) AS cmgr
FROM
stock_table
WHERE
calculate_cmgr(CAST(event_time AS STRING), close_price) <> 0;
When I run all paragraphs I get error SQL validation failed. From line 8, column 5 to line 8, column 59: No match found for function signature calculate_cmgr(, )
What am I doing wrong?