Fuzzy Matching Optimization in PySpark

868 Views Asked by At

I am trying to perform some fuzzy matching on some data through PySpark. To accomplish this I am using the fuzzywuzzy package and running it on Databricks.

My dataset is very simple. It is stored in a CSV file and contains two columns: Name1 and Name2. However, I don't just want to compare the two values in the same row, but I want to compare each Name1 to all available Name2 values.

This is what my code looks like,

from pyspark.sql import functions as f
from fuzzywuzzy import fuzz
from pyspark.sql.types import StringType

# create a simple function that performs fuzzy matching on two strings
def match_string(s1, s2):
    return fuzz.token_sort_ratio(s1, s2)

# convert the function into a UDF
MatchUDF = f.udf(match_string, StringType())

# separate the two Name columns into individual DataFrames
df1 = raw_df.select('Name1')
df2 = raw_df.select('Name2')

# perform a CROSS JOIN on the two DataFrames
# CAN THIS BE AVOIDED?
df = df1.crossJoin(df2)

# use the UDF from before to calculate a similarity score for each combination
df = df.withColumn("similarity_score", MatchUDF(f.col("Name1"), f.col("Name2")))

Once I have the similarity scores, I can calculate a rank for each name thereby, get the best match.

What I am worried about is the CROSS JOIN. This exponentially increases the number of data points that I have. Is there anyway that this can be avoided?

I am also open to completely different approaches that will accomplish what I need to do in more optimized manner.

2

There are 2 best solutions below

1
Viktor Dremov On

There is a extractBests function in fuzzywuzzy package, that returns a list of the best matches to a collection of choices (Name2 column).

This function can be applied to single value in Name1 column and whole Name2 column, so you it can be transformed to UDF without need to cross join the columns.

0
Leonardo Pedroso On

Given that you need to check all occurrences from Name1 in Name2 and you have small data frames, the straightforward solution would be the Cross Join.

However, you can apply some tricks if you need to scale up your data pipeline in the future or needs to play with the performance.

  • Try reduce workload into mini-batches: You can make this operation into small chunks (or mini-batches) by using .option("maxFilesPerTrigger", 100) or by splitting your dataframe.

  • Broadcast df2: If df2 is a small data frame you can broadcast it (from pyspark.sql.functions import broadcast) to all workers and speed up the parallelism (e.g. join(broadcast(df2)))

  • Adjust the number of partitions: Probably after the cross join, the number of partitions will skyrocket (df1 partitions * df2 partitions).The number of dataframe partitions has an impact on the run time of your computations. if you have too few partitions, your computations will not be able to utilize all the parallelism available in the cluster. Likewise, if you have too many partitions, there will be excessive overhead in managing many small tasks, making your computations very slow to run. So, Cross-joining dataframes falls into "the too many partitions" case. Usedf1.rdd.partitions.size to check the current state of the partitions and apply .repartition to reduce the size.

  • Convert UDF to Pandas UDF: Currently, it is one of the most efficient ways given that leverages Apache Arrow for transferring data.

Another option for calculating similarity scores is with the native functions like pyspark.sql.functions.levenshtein if applicable.