I have data as below in a pyspark dataframe:
| Quarter A | Quarter B | Item 1 | Item 2 |
|---|---|---|---|
| 1 | 3 | A | C |
| 2 | 3 | B | C |
| 2 | 5 | B | E |
| 3 | 4 | C | D |
| 3 | 5 | C | E |
| 4 | 5 | D | E |
| 2 | 3 | F | G |
In the above examples A, B,C, D and E refer to the same item recorded at different quarters with different IDs. F and G relate to another item at quarters 2 and 3. The rows refer to known links between these items. That is in row 2 we are being told that item B in quarter 2 is the same item as item C in quarter 3.
What I want to do is assign one ID, the id of the first time they appeared to all entries, where we can calculate with the links provided that the items are the same items. So in the above example we would get:
| Quarter A | Quarter B | Item 1 | Item 2 | item ID |
|---|---|---|---|---|
| 1 | 3 | A | C | A |
| 2 | 3 | B | C | A |
| 2 | 5 | B | E | A |
| 3 | 4 | C | D | A |
| 3 | 5 | C | E | A |
| 4 | 5 | D | E | A |
| 2 | 3 | F | G | F |
As the first rows all relate to the same item and the last to another. The real data contains millions of such rows with different combinations of known links across different quarters (though always no more than over 5 quarters).
I have tried to do this via joins, but this requires numerous joins and seems very inefficient. The order is clearly critical and can be liable to issues with how the IDs are assigned.
The other idea is to use a window function over the item 1 id. Returning a list of all the IDs that ID appears with. So as below:
| Quarter A | Quarter B | Item 1 | Item 2 | ID list |
|---|---|---|---|---|
| 1 | 3 | A | C | A, C |
| 2 | 3 | B | C | B, C, E |
| 2 | 5 | B | E | B, C, E |
| 3 | 4 | C | D | C, D, E |
| 3 | 5 | C | E | C, D, E |
| 4 | 5 | D | E | D, E |
| 2 | 3 | F | G | F, G |
From this we can how the items are linked, as there is a trail of IDs from A to E (and F to G) across the records, with no common elements for different items. However I cannot work out the logic to do this comparison to get from these lists to the final item ID. Can you help using only pyspark APIs?