Hyperopt not able to reference SparkContext from a broadcasted variable

139 Views Asked by At

I have a problem with parallelizing hyperopt with SparkTrials. I have followed the example shown in here. This is working in the case of a Databricks notebook and simple functions, as shown below. Note that I have to load with DeltaLake and not with spark.read.format('delta'), otherwise this will not work.

from hyperopt import STATUS_OK, fmin, hp, tpe, SparkTrials, Trials, hp
import numpy as np
import pandas as pd
from deltalake import DeltaTable

np.random.seed(0)
n = 300000

x = np.random.normal(size=n)
y = 0.05 + 0.2 * x + np.random.normal(loc=0, scale=1, size=n)
df = pd.DataFrame.from_dict({'x':x, 'y':y}, orient='index').T
spark.createDataFrame(df).write.format('delta').mode('overwrite').save('/FileStore/mv/sample_data/random_x')

# Outside of a class (same design)

def model(data, a, b):
    return a + b * data

def rmse(x, y, a, b):
  e = model(x, a, b) - y
  return np.sqrt(np.mean(e ** 2))

def _load_data():
  return DeltaTable(f'/dbfs/{path}').to_pandas()

def _rmse(params):
  data = _load_data()
  x = data.loc[:, 'x'].to_numpy()
  y = data.loc[:, 'y'].to_numpy()
  return rmse(x, y, **params)

def run_hyperopt():
  trials = SparkTrials()

  space = {'a': hp.uniform('a', 0, 1),
            'b': hp.uniform('b', -1,1)}
  
  best = fmin(fn=_rmse,
                  space=space,
                  algo=tpe.suggest,
                  max_evals=10,
                  trials=trials)
  return best

path = '/FileStore/mv/sample_data/random_x'
run_hyperopt()


However, as soon as one would try to encapsulate this in a class, the same error appears: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.. I have also tried loading everything directly in rmse2 (it is commented out), and that is also not working.

class SomeClass:
  
  def __init__(self, spark, path):
    self.spark = spark
    self.path = path
  
  def model(self, data, a, b):
    return a + b * data

  def rmse(self, x, y, a, b):
    e = self.model(x, a, b) - y
    return np.sqrt(np.mean(e ** 2))
  
  def _load_data(self):
    return DeltaTable(f'/dbfs/{self.path}').to_pandas()

  def _rmse(self, params):
    data = self._load_data()
    x = data.loc[:, 'x'].to_numpy()
    y = data.loc[:, 'y'].to_numpy()
    return self.rmse(x, y, **params)
  
  # def rmse2(self, params):
  #   data = self._load_data()
  #   x = data.loc[:, 'x'].to_numpy()
  #   y = data.loc[:, 'y'].to_numpy()
  #   e = self.model(x, a, b) - y
  #   return np.sqrt(np.mean(e ** 2))

  def run_hyperopt(self):
    trials = SparkTrials()

    space = {'a': hp.uniform('a', 0, 1),
             'b': hp.uniform('b', -1,1)}
    
    best = fmin(fn=self._rmse,
                    space=space,
                    algo=tpe.suggest,
                    max_evals=10,
                    trials=trials)
    return best
  


path = '/FileStore/mv/sample_data/random_x'
c = SomeClass(spark, path)
c.run_hyperopt()

0

There are 0 best solutions below