How to create a custom transformer using pySpark?

255 Views Asked by At

I am trying to build a custom transformer and make it in pyspark pipline but i don't know how.

My goal is to create this transformer to estimate a probability. Here is the code that I use without the transformer:

# Calculate the pass probability for all softbins
softbins_df = (
    counts_df
    .groupBy('softbin_first_test', 'time_window')
    .agg(F.sum('PASS').alias('total_pass'), F.sum('FAIL').alias('total_fail'))
) 
# Calculate the prior pass probability for each softbin
softbins_total_counts_df = softbins_df.withColumn('total', F.col('total_pass') + F.col('total_fail'))
prior_pass_prob = F.col('total_pass') / F.col('total')
prior_fail_prob = F.col('total_fail') / F.col('total')
# Calculate probabilities using Bayesian estimation with empirical prior
rho = 0.3
prior_window = Window.partitionBy('softbin_first_test').orderBy(F.col('time_window')).rowsBetween(Window.unboundedPreceding, Window.currentRow)
prior_pass = F.mean(prior_pass_prob).over(prior_window)
prior_fail = F.mean(prior_fail_prob).over(prior_window)
alpha = prior_pass * (1 - rho) / rho
beta = prior_fail * (1 - rho) / rho
pass_prob = alpha / (alpha + beta)

I want to create it to avoid data leakage during the train_test_split because here i calculate the probability of the whole dataset and so when I split the data the training set have information from the future (because the calculated probability uses the whole dataset) that why i want to create this transformer and incorporate a pipeline

1

There are 1 best solutions below

0
Anay On BEST ANSWER

Creating a custom transformer is a good choice for dealing with data leakage.

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol

# Initialize SparkSession
spark = SparkSession.builder.appName("ProbabilityEstimator").getOrCreate()

# Sample data
data = [
    (1, "A", 2, 4),
    (1, "A", 3, 6),
    (1, "B", 4, 3),
    (1, "B", 5, 7),
    (2, "A", 1, 2),
    (2, "A", 2, 1),
    (2, "B", 2, 2),
    (2, "B", 3, 3)
]

columns = ["softbin_first_test", "time_window", "PASS", "FAIL"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Custom Transformer
class ProbabilityEstimator(Transformer, HasInputCol, HasOutputCol):
    
    def __init__(self, inputCol=None, outputCol=None):
        super(ProbabilityEstimator, self).__init__()
        self._setDefault(inputCol=inputCol, outputCol=outputCol)  
        self.setParams(inputCol=inputCol, outputCol=outputCol)  

    def setParams(self, inputCol=None, outputCol=None):
        return self._set(inputCol=inputCol, outputCol=outputCol)  

    def _transform(self, df):
        softbins_window = Window.partitionBy('softbin_first_test').orderBy('time_window').rowsBetween(Window.unboundedPreceding, Window.currentRow)
        prior_pass_prob = F.sum('PASS').over(softbins_window) / F.sum(F.col('PASS') + F.col('FAIL')).over(softbins_window)
        prior_fail_prob = F.sum('FAIL').over(softbins_window) / F.sum(F.col('PASS') + F.col('FAIL')).over(softbins_window)

        rho = 0.3
        alpha = prior_pass_prob * (1 - rho) / rho
        beta = prior_fail_prob * (1 - rho) / rho
        pass_prob = alpha / (alpha + beta)

        return df.withColumn(self.getOutputCol(), pass_prob)

# Initialize the custom transformer
probability_estimator = ProbabilityEstimator(inputCol='input_col', outputCol='output_col')

# Define pipeline
pipeline = Pipeline(stages=[probability_estimator])

# Fit the pipeline to transform the DataFrame
model = pipeline.fit(df)
df_transformed = model.transform(df)

# Show the results
df_transformed.show()

Ouput:

+------------------+-----------+----+----+-------------------+
|softbin_first_test|time_window|PASS|FAIL|         output_col|
+------------------+-----------+----+----+-------------------+
|                 1|          A|   2|   4| 0.3333333333333333|
|                 1|          A|   3|   6| 0.3333333333333333|
|                 1|          B|   4|   3|0.40909090909090906|
|                 1|          B|   5|   7|  0.411764705882353|
|                 2|          A|   1|   2| 0.3333333333333333|
|                 2|          A|   2|   1|                0.5|
|                 2|          B|   2|   2|                0.5|
|                 2|          B|   3|   3|                0.5|
+------------------+-----------+----+----+-------------------+

Make appropriate changes as per you data. This is just to showcase custom transformer function