How to update pyspark dataframe inside a Python function

52 Views Asked by At

I have a Python function that receives a pyspark dataframe and checks if it has all the columns expected by other functions used in a script. In particular, if the column 'weight' is missing, I want to update the dataframe passed by the user by assigning a new column to it.

For example:

from pyspark.sql import functions as F

def verify_cols(df):
    if 'weight' not in df.columns:
        df = df.withColumn('weight', F.lit(1))  # Can I update `df` inside this function?

As you can see, I want the function to update df. How can I achieve this? If possible, I would like to avoid using a return statement.

This post is very similar but uses pandas' inplace argument.

1

There are 1 best solutions below

0
Ivannpy On BEST ANSWER

To avoid the return statement you can declare a class and keep the df as a member field.

from pyspark.sql import functions as F
from pyspark.sql.DataFrame import DataFrame
class Validator:
    def __init__(self, df: DataFrame):
        self.df = df

    def verify_cols(self):
        if 'weight' not in self.df.columns:
            self.df = self.df.withColumn('weight', F.lit(1))

After calling verify_cols methods, the field df will be updated.