I have a table like below where the key column contains information on the hierarchy.

columns = ["structure_id", "position", "hierarchy", "key", "key_text"]
data = [(1, 101, 1, 10000, "Europe"), (1, 102, 2, 11000, "France"), (1, 103, 3, 11100, "Paris"), (1, 104, 2, 12000, "Italy"), (1, 105, 3, 12100, "Rome"), (1, 106, 1, 20000, "Asia"), (1, 107, 2, 21000, "Japan")]
df = spark.createDataFrame(data).toDF(*columns)
df.show()
#+------------+--------+---------+-----+--------+
#|structure_id|position|hierarchy|  key|key_text|
#+------------+--------+---------+-----+--------+
#|           1|     101|        1|10000|  Europe|
#|           1|     102|        2|11000|  France|
#|           1|     103|        3|11100|   Paris|
#|           1|     104|        2|12000|   Italy|
#|           1|     105|        3|12100|    Rome|
#|           1|     106|        1|20000|    Asia|
#|           1|     107|        2|21000|   Japan|
#+------------+--------+---------+-----+--------+

I want to get for a given structure id and position, e.g. structure_id = 1 and position = 105, the key text from that position and also from any upper levels, i.e. in this example [Europe, Italy, Rome].

The "real" data contains approximately 50 different structure ids and > 10 hierarchy levels.

I have transformed the original table in a very cumbersome way (lots of joins and temporarily creating a new column for each hierarchy level) into this:

#+------------+--------+---------+--------------------+
#|structure_id|position|hierarchy|           key_array|
#+------------+--------+---------+--------------------+
#|           1|     101|        1|            [Europe]|
#|           1|     102|        2|    [Europe, France]|
#|           1|     103|        3|[Europe, France, ...|
#|           1|     104|        2|     [Europe, Italy]|
#|           1|     105|        3|[Europe, Italy, R...|
#|           1|     106|        1|              [Asia]|
#|           1|     107|        2|       [Asia, Japan]|
#+------------+--------+---------+--------------------+

If the output format makes sense, I wonder how to efficiently transform the original table into this. Otherwise, I would like to know what would make sense, is it perhaps better to use a tree structure?

1

There are 1 best solutions below

1
BlueBike On

You could use a combination of a window and the collect_list aggregation like this:

ws = Window.partitionBy('structure_id').orderBy('hierarchy').rangeBetween(Window.unboundedPreceding, 0)

(
    df_original
    .withColumn('key_text_new', F.collect_list('key_text').over(ws))
    .show()
)

This returns the dataframe in the following format:

+------------+--------+---------+-----+--------+--------------------+
|Structure_ID|Position|Hierarchy|  Key|Key_Text|        key_text_new|
+------------+--------+---------+-----+--------+--------------------+
|           1|     101|        1|10000|  Europe|            [Europe]|
|           1|     102|        2|11000|  France|    [Europe, France]|
|           1|     103|        3|11100|   Paris|[Europe, France, ...|
|           2|     104|        1|20000|    Asia|              [Asia]|
|           2|     105|        2|21000|   Japan|       [Asia, Japan]|
+------------+--------+---------+-----+--------+--------------------+

Please note that I adjusted the Structure_ID from your example for Asia and Japan. I guess it should have been two from the key column that you provided.