Our codebase includes a structure like the following:
- a
UDFdefined as a static method referencing another static method - another method in the same class that references the
UDF:
class DataFrameUtils:
@staticmethod
def subtract_microsecond(datetime_to_convert) -> str or None:
if datetime_to_convert is None:
return datetime_to_convert
return (datetime_to_convert - timedelta(microseconds=1)).strftime(
"%Y-%m-%d %H:%M:%S.%f%Z"
)
@staticmethod
@udf
def subtract_microsecond_udf(datetime_to_convert) -> str or None:
return DataFrameUtils.subtract_microsecond(datetime_to_convert)
def compute_something():
window_group = Window.partitionBy(*primary_key_list).orderBy(order_by_col)
return data_frame.withColumn(
"computed_column",
DataFrameUtils.subtract_microsecond_udf(lead(order_by_col).over(window_group)),
)
This structure was working for over a year, but after what seems to be completely unrelated changes it stopped working and now CloudPickle gives a ModuleNotFoundError on the package/module containing this class.
converted = PythonException('\n An exception was thrown from the Python worker. Please see the stack trace below.\nTraceback (mos...1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\t... 1 more\n')
def deco(*a: Any, **kw: Any) -> Any:
try:
return f(*a, **kw)
except Py4JJavaError as e:
converted = convert_exception(e.java_exception)
if not isinstance(converted, UnknownException):
# Hide where the exception came from that shows a non-Pythonic
# JVM exception message.
> raise converted from None
E pyspark.sql.utils.PythonException:
E An exception was thrown from the Python worker. Please see the stack trace below.
E Traceback (most recent call last):
E File "/usr/local/Cellar/apache-spark/3.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 670, in main
E func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
E File "/usr/local/Cellar/apache-spark/3.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 507, in read_udfs
E udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
E File "/usr/local/Cellar/apache-spark/3.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 289, in read_single_udf
E f, return_type = read_command(pickleSer, infile)
E File "/usr/local/Cellar/apache-spark/3.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
E command = serializer._read_with_length(file)
E File "/usr/local/Cellar/apache-spark/3.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
E return self.loads(obj)
E File "/usr/local/Cellar/apache-spark/3.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
E return cloudpickle.loads(obj, encoding=encoding)
E ModuleNotFoundError: No module named 'ourproject.utils.dataframe_utils'
Now I've seen cloudpickle/serialization errors in the past for example when accessing Accumulators within a UDF. Here I'm not sure what is the root problem and what is a clean fix. In the short term I have made a copy of the method and the udf within the client method. This does work but means a copy is needed every place it is used - the opposite of DRY.
@staticmethod
def subtract_microsecond(datetime_to_convert) -> str or None:
if datetime_to_convert is None:
return datetime_to_convert
return (datetime_to_convert - timedelta(microseconds=1)).strftime(
"%Y-%m-%d %H:%M:%S.%f%Z"
)
@staticmethod
@udf
def subtract_microsecond_udf(datetime_to_convert) -> str or None:
return DataFrameUtils.subtract_microsecond(datetime_to_convert)
def compute_something():
window_group = Window.partitionBy(*primary_key_list).orderBy(order_by_col)
# Reference the local copy of `subtract_microsecond_udf`
return data_frame.withColumn(
"computed_column",
subtract_microsecond_udf(lead(order_by_col).over(window_group)),
)
Any pointers on how to reliably share UDF's across different methods would be appreciated.