Multiply vector embedding column with itself to generate similarity scores for all combinations in spark dataframe

11 Views Asked by At

The below dataset contains an id and a dense vector embedding generated using miniLM-L2-v6 embedding model.

I want to generate similarity scores of each embedding with other embeddings in the dataset.

+---+----------------------------------------------------------------------------------------------
|1  |[-0.14059880375862122, -0.046212051063776016, -0.034053537994623184, ...384 dimensions]|
|2  |[-0.13473762571811676, -0.014029680751264095, -0.006330421194434166, ...384 dimensions]
+---+----------------------------------------------------------------------------------------------
df.types
[('id', 'int'), ('summary_embedding', 'array<double>')]

I tried using BlockMatrix and udf. It worked well on small data dataset but never ends on the dataset with 20000 rows which will generate 20000 x 20000 combinations.

I need the output as follows for all the combinations:

id1 | id2 | similarity_score
1   | 2   | 0.134356
1   | 3   | 0.232352
1   | 4   | 0.331353
...
1   | 20000   | 0.134356
2   | 3   | 0.134356
...
2   | 20000   | 0.634354
...
20000   | 2   | 0.134356

Looking out for fast performing code on large dataset. The spark instance will be of 10 Executors with 4 CPUs and 4 GB RAM each. Thanks for your guidance.

BlockMatrix:

df = src_df.select("summary_embedding").distinct().withColumn("id", row_number().over(Window.orderBy("summary_embedding"))).repartition(1024)
mat = IndexedRowMatrix(df.select("id", "summary_embedding").rdd.map(lambda row: IndexedRow(row.id-1, (row.summary_embedding)))).toBlockMatrix(rowsPerBlock=blockRows, colsPerBlock=blockCols)
dot = mat.multiply(mat.transpose())
coord_matrix = dot.toCoordinateMatrix()
coord_df = coord_matrix.entries.map(lambda entry: (entry.i, entry.j, entry.value)).toDF(["id1", "id2", "similarity"]).cache()
coord_df.count() 

UDF: I tried using dataframe udf to first convert the datatype to VectorUDT(). However, either it fails or too slow.

array_to_vector = udf(lambda l: Vectors.dense(l), VectorUDT())

Looking out for fast performing code on large dataset. The spark instance will be of 10 Executors with 4 CPUs and 4 GB RAM each. Thanks for your guidance.

0

There are 0 best solutions below