I'm trying to use a UDF function to accept certain fields from my PySpark dataframe and return a Struct type. The idea is to store this Struct as a separate column for further processing.
The Function:
def get_category_details_udf(id, category_list, sub_type):
selected_categories = ((df_registry_checklist.filter(col("registry_id") == id).select("checklist_id")).agg(collect_list("checklist_id")).head()[0])
total_subcategory_items = category_list + selected_categories
if sub_type == "BABY":
completed_category = baby_checklist_template.filter(baby_checklist_template["subcategory_child_ids"].isin(total_subcategory_items))
completed_percentage = round((completed_category.select("checklist_id").distinct().count() / total_baby_checklist_count) * 100)
incomplete_checklist_df = baby_checklist_template.join(completed_category,baby_checklist_template["checklist_id"] == completed_category["checklist_id"], "left_anti")
checklist_suggestion = incomplete_checklist_df.select("category_id").distinct().limit(5).agg(collect_list("category_id")).head()[0]
else:
completed_category = wedding_checklist_template.filter(wedding_checklist_template["subcategory_child_ids"].isin(total_subcategory_items))
completed_percentage = round((completed_category.select("checklist_id").distinct().count() / total_wedding_checklist_count) * 100)
incomplete_checklist_df = wedding_checklist_template.join(completed_category,wedding_checklist_template["checklist_id"] == completed_category["checklist_id"], "left_anti")
checklist_suggestion = incomplete_checklist_df.select("category_id").distinct().limit(5).agg(collect_list("category_id")).head()[0]
return struct(completed_percentage, checklist_suggestion)
Some of the dataframes I've used here is defined in the main function and I expect them to serve as global variables.
Setting the UDF Function and it's invocation:
get_category_details_udf_schema = StructType([
StructField("completed_percentage", IntegerType()),
StructField("checklist_suggestion", ArrayType(StringType()))
])
get_category_details = udf(get_category_details_udf, returnType=get_category_details_udf_schema)
dataframe = dataframe.withColumn("udf_result", get_category_details(dataframe["id"], dataframe["category_id_list"], dataframe["subtype"]))
The Error:
_pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o89.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
I don't know what's going wrong. The only thing left for me to try is using Python Version 3.8 or higher. I'm currently using 3.6.x
Things I tried to debug:
- I tried using only pypark.sql functions within the UDF.
- I removed earlier flatMap and RDD operations within the UDF.
- I tried passing the global variables as parameters to the UDF function.