Filter and sum array columns inside an aggregation in Spark Scala

29 Views Asked by At

this feels like a basic question but here I am. I have two ordered list columns and I want to filter them for a combination of their values and then count how many occurrences I get as an aggregation. This is what I have so far:

val df=(Seq(
  ("id1", Array(0,1,2), Array(2,3,4)),
  ("id2", Array(0,1,2), Array(2,3,4)),
  ("id3", Array(0,1,2), Array(2,3,4))
)).toDF("id", "feature1", "feature2")

First I zipped the arrays together to process them together:

val dfz = df.withColumn("zipped", arrays_zip($"feature1", $"feature2"))

Then I have this for the aggregation:

dfz.groupBy("query").agg(sum(($"zipped").filter(x => x._1 > 0 && x._2 == 0).size))

I get the error error: value filter is not a member of org.apache.spark.sql.ColumnName so I think I am not filtering right. All help appreciated!

I also tried writing a UDF, but that also ends up being a map/case/count that doesn't seem very simple so I figured there might be a better way.

1

There are 1 best solutions below

0
Callie On

A coworker helped me by pointing out the zip_with is the function I was looking for! Problem solved like this:

df.groupBy("id").agg(sum(size(zip_with(df("feature1"), df("feature2"), (x, y) => (x > 0 && y == 0)))))