I am performing an aggregation on the following dataframe to get a list of advertisers with array of brands
+------------+------+
|advertiser |brand |
+------------+------+
|Advertiser 1|Brand1|
|Advertiser 1|Brand2|
|Advertiser 2|Brand3|
|Advertiser 2|Brand4|
|Advertiser 3|Brand5|
|Advertiser 3|Brand6|
+------------+------+
Here is my code:
import org.apache.spark.sql.functions.collect_list
df2
.groupBy("advertiser")
.agg(collect_list("brand").as("brands"))
That gives me the following dataframe:
+------------+----------------+
|advertiser |brands |
+------------+----------------+
|Advertiser 1|[Brand1, Brand2]|
|Advertiser 2|[Brand3, Brand4]|
|Advertiser 3|[Brand5, Brand6]|
+------------+----------------+
During the aggregation, I want to filter the list of brands with the following table of brands :
+------+------------+
|brand |brand name |
+------+------------+
|Brand1|Brand_name_1|
|Brand3|Brand_name_3|
+------+------------+
In order to achieve:
+------------+--------+
|advertiser |brands |
+------------+--------+
|Advertiser 1|[Brand1]|
|Advertiser 2|[Brand3]|
|Advertiser 3|null |
+------------+--------+
I see two solutions for your question, that I will call Collect Solution and Join Solution
Collect solution
If you can collect your
brandsdataframe, you can use this collected collection to keep only right brands when performingcollect_list, thenflattenyour array and replace empty array bynullas follow:Join solution
If your
brandsdataframe doesn't fit in memory, you can first left joindf2withbrandsto have a column containing brand if the brand is inbrandsdataframe, elsenull, then do your group by, and finally replace empty array due to advertisers without brands you want to filter bynull:Details
So if we start with a
df2dataframe as follow:And a
brandsdataframe as follow:After the first left outer join between
df2andbrandsdataframes (first line), you get the following dataframe:When you group this dataframe by advertiser, collecting list of filtered brands, you get the following dataframe:
And finally, when you apply last line replacing empty array with null, you get your expected result:
Conclusion
Collect Solution creates only one expensive suffle step (during the groupBy), and should be chosen in priority if your
brandsdataframe is small. Join solution works if yourbrandsdataframe is big, but it creates lot of expensive suffle steps, with one groupBy and one join.