I have a problem with parallelizing hyperopt with SparkTrials. I have followed the example shown in here. This is working in the case of a Databricks notebook and simple functions, as shown below. Note that I have to load with DeltaLake and not with spark.read.format('delta'), otherwise this will not work.
from hyperopt import STATUS_OK, fmin, hp, tpe, SparkTrials, Trials, hp
import numpy as np
import pandas as pd
from deltalake import DeltaTable
np.random.seed(0)
n = 300000
x = np.random.normal(size=n)
y = 0.05 + 0.2 * x + np.random.normal(loc=0, scale=1, size=n)
df = pd.DataFrame.from_dict({'x':x, 'y':y}, orient='index').T
spark.createDataFrame(df).write.format('delta').mode('overwrite').save('/FileStore/mv/sample_data/random_x')
# Outside of a class (same design)
def model(data, a, b):
return a + b * data
def rmse(x, y, a, b):
e = model(x, a, b) - y
return np.sqrt(np.mean(e ** 2))
def _load_data():
return DeltaTable(f'/dbfs/{path}').to_pandas()
def _rmse(params):
data = _load_data()
x = data.loc[:, 'x'].to_numpy()
y = data.loc[:, 'y'].to_numpy()
return rmse(x, y, **params)
def run_hyperopt():
trials = SparkTrials()
space = {'a': hp.uniform('a', 0, 1),
'b': hp.uniform('b', -1,1)}
best = fmin(fn=_rmse,
space=space,
algo=tpe.suggest,
max_evals=10,
trials=trials)
return best
path = '/FileStore/mv/sample_data/random_x'
run_hyperopt()
However, as soon as one would try to encapsulate this in a class, the same error appears: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.. I have also tried loading everything directly in rmse2 (it is commented out), and that is also not working.
class SomeClass:
def __init__(self, spark, path):
self.spark = spark
self.path = path
def model(self, data, a, b):
return a + b * data
def rmse(self, x, y, a, b):
e = self.model(x, a, b) - y
return np.sqrt(np.mean(e ** 2))
def _load_data(self):
return DeltaTable(f'/dbfs/{self.path}').to_pandas()
def _rmse(self, params):
data = self._load_data()
x = data.loc[:, 'x'].to_numpy()
y = data.loc[:, 'y'].to_numpy()
return self.rmse(x, y, **params)
# def rmse2(self, params):
# data = self._load_data()
# x = data.loc[:, 'x'].to_numpy()
# y = data.loc[:, 'y'].to_numpy()
# e = self.model(x, a, b) - y
# return np.sqrt(np.mean(e ** 2))
def run_hyperopt(self):
trials = SparkTrials()
space = {'a': hp.uniform('a', 0, 1),
'b': hp.uniform('b', -1,1)}
best = fmin(fn=self._rmse,
space=space,
algo=tpe.suggest,
max_evals=10,
trials=trials)
return best
path = '/FileStore/mv/sample_data/random_x'
c = SomeClass(spark, path)
c.run_hyperopt()