Find mean and mode of a list in column values of pyspark dataframe

85 Views Asked by At

I have a pyspark dataframe that looks like this:

+----+--------------------------+--------------+
|id  | score                    |review        |
+----+--------------------------+--------------+
|1   |[83.52, 81.79, 84, 75]    |[P,N,P,P]     |
|2   |[86.13, 85.48]            |[N,N,N,P]     |
+----+--------------------------+--------------+

The schema of this dataframe is this:

root
 |--id: int (nullable = false)
 |--score: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |--review : array (nullable = true)
 |    |-- element: string (containsNull = true)

I want to find the mean of each column value of score and mode of each column value of review , and create new columns with those. The data-types of these new columns should be float and string respectively. Like this:

+----+--------------------------+--------------+---------+----------+
|id  | score                    |review        |scoreMean|reviewMode|
+----+--------------------------+--------------+---------+----------+
|1   |[83.52, 81.79, 84, 75]    |[P,N,P,P]     |81.08    |P         |
|2   |[86.13, 85.48]            |[N,N,N,P]     |85.81    |N         |
+----+--------------------------+--------------+---------+----------+

I have tried it using udf:

import statistics
import pyspark.sql.functions as F

#define UDFs
def mean_udf(data):
    if len(data) == 0:
        return None
    data_float = [eval(i) for i in data]
    return statistics.mean(data_float)

def mode_udf(data):
    if len(data) == 0:
        return None    
    return statistics.mode(data)

#register the UDFs
mean_func = F.udf(mean_udf)
mode_func = F.udf(mode_udf)

#apply UDFs
df= (df.withColumn("scoreMean", mean_func(F.col("score")))
       .withColumn("reviewMode", mode_func(F.col("review")))
    )

But it throws calculation failed error during evaluation (when I do show or collect).

Any help is welcome. Thanks.

3

There are 3 best solutions below

0
Lingesh.K On

I have tried a mixture of changing the mean_score calculation using and group by & explode expressions and a custom udf to get the mode_review as follows:

# Define the schema for the DataFrame
schema = StructType([
    StructField("score", ArrayType(DoubleType(), True), True),
    StructField("review", ArrayType(StringType(), True), True)
])

# Define the data to be used in the DataFrame
data = [([83.52, 81.79, 84.0, 75.0], ["P", "N", "P", "P"]), ([86.13, 85.48], ["N", "N", "N", "P"])]

# Create a DataFrame with the defined schema and data
df = spark.createDataFrame(data, schema)

# Define a function to calculate the mode of a list
def mode_of_list(input_list):
    return max(set(input_list), key=input_list.count)


# Register the function as a UDF (User Defined Function)
modeUDF = udf(mode_of_list, StringType())

# Create a new DataFrame with the following columns: score, review, mean score and mode review
df_result = (df
             .withColumn("score_exploded", explode(col("score")))
             .groupBy("review")
             .agg(avg("score_exploded").alias("mean_score"))
             .withColumn("mode_review", modeUDF(col("review")))
             .join(df.alias("input"), on="review")
             .selectExpr("input.score", "input.review", "mean_score", "mode_review"))

# Display the DataFrame with the mean score and median review
df_result.show(truncate=False)
0
karan On

I resolved this without using any UDF. The method is a bit similar to one of the answers already here. For calculating mean, I exoloded the list in every column value and then used groupBy and aggregate functionalities.

# First explode the lists in 'score' column
# This will generate a new column with default name as 'col'
df_score_exploded = (
    df.select(df.id, 
              F.explode(df.score)
             )
)

# Then aggregate on the new exploded column, grouping on the level of original table
df_score_exploded = (
    df_score_exploded 
    .groupby("id")
    .agg(F.avg("col").alias("scoreMean"))
)

# Finally join this new created table with original table
df= (df.join(df_score_exploded,
             df["id"] == df_score_exploded["id"]
            )                                 
            .select(df["id"], df["score"], df["review"], df_score_exploded["scoreMean"])
     )

Similarly, for calculating mode, I exploded the list in every column value, then used groupBy and aggregate functionalities to calculate the count of each sentiment in a list and then used window functionality to select the sentiment with the maximum count.

# First explode the lists in 'sentiment' column
# This will generate a new column with default name as 'col'
df_sentiment_exploded = (
    df.select(df.id, 
              F.explode(df.sentiment)
             )
)

# Then aggregate on the new exploded column, grouping on the level of 'original table + exploded column'
df_sentiment_exploded = (
    df_sentiment_exploded 
    .groupby("id", "col")
    .agg(F.count("*").alias("countSentiment"))
)

# create a window function to sort 'id' wise sentiment in descending order
w = Window().partitionBy("id").orderBy(F.col("countSentiment").desc())

# Select the sentiment with highest count for an 'id'
df_sentiment_exploded = (
    df_sentiment_exploded
    .withColumn("rn", F.row_number().over(w))
    .where(F.col("rn") == 1)
)

# Finally join this new created table with original table
df= (df.join(df_sentiment_exploded,
             df["id"] == df_sentiment_exploded["id"]                                 
            )
            .select(df["id"], df["score"], df["review"], df_sentiment_exploded["col"].alias("modeSentiment")) 
    )
0
Vincent Doba On

Since Spark 3.1, you can use Spark built-in functions to compute mean and mode of an array without using windows, groupBy and join, that may trigger inefficient shuffles

For the mean, you use aggregate to compute the sum of elements in an array, then divide it by the size of the array, and finally round the result, as follows:

from pyspark.sql import functions as F

array_mean = F.round(
    F.aggregate(F.col('score'), F.lit(0.0), lambda acc, c: c.cast('float') + acc) / F.size(F.col('score')),
    2
)

Computing mode is more difficult. First, you create a map whose key is the element in the array and value is how often it appears in the array. Then you extract the key with the greatest value, using a combination of transform and array_max functions:

from pyspark.sql import functions as F

array_mode = F.array_max(
    F.transform(
        F.map_entries(
            F.aggregate(
                F.col('review'),
                F.create_map().cast("map<string,int>"),
                lambda acc, c: F.map_concat(
                    F.create_map(c, 1 + F.coalesce(F.element_at(acc, c), F.lit(0))),
                    F.map_filter(acc, lambda k, v: k != c)
                )
            )
        ),
        lambda e: F.struct(e.getField('value'), e.getField('key'))
    )
).getField('key')

You can call those expressions using withColumn dataframe's method, as follows:

result = df.withColumn('scoreMean', array_mean).withColumn('reviewMode', array_mode)

With the input dataframe df defined in the question, you get the following result dataframe:

+---+--------------------------+------------+---------+----------+
|id |score                     |review      |scoreMean|reviewMode|
+---+--------------------------+------------+---------+----------+
|1  |[83.52, 81.79, 84.0, 75.0]|[P, N, P, P]|81.08    |P         |
|2  |[86.13, 85.48]            |[N, N, N, P]|85.81    |N         |
+---+--------------------------+------------+---------+----------+