Flagging records after meeting a condition using Spark Scala

131 Views Asked by At

I need some expert opinion on the below scenario:

I have following dataframe df1:

+------------+------------+-------+-------+
|   Date1    | OrderDate  | Value | group |
+------------+------------+-------+-------+
| 10/10/2020 | 10/01/2020 | hostA | grp1  |
| 10/01/2020 | 09/30/2020 | hostB | grp1  |
| Null       | 09/15/2020 | hostC | grp1  |
| 08/01/2020 | 08/30/2020 | hostD | grp1  |
| Null       | 10/01/2020 | hostP | grp2  |
| Null       | 09/28/2020 | hostQ | grp2  |
| 07/11/2020 | 08/08/2020 | hostR | grp2  |
| 07/01/2020 | 08/01/2020 | hostS | grp2  |
| NULL       | 07/01/2020 | hostL | grp2  |
| NULL       | 08/08/2020 | hostM | grp3  |
| NULL       | 08/01/2020 | hostN | grp3  |
| NULL       | 07/01/2020 | hostO | grp3  |
+------------+------------+-------+-------+

Each group is ordered by OrderDate in descending order. Post ordering, Each value having Current_date < (Date1 + 31Days) or Date1 as NULL needs to be flagged as valid until Current_date > (Date1 + 31Days). Post that, every Value should be marked as Invalid irrespective of Date1 value.

If for a group, all the records are NULL, all the Value should be tagged as Valid

My output df should look like below:

+------------+------------+-------+-------+---------+
|   Date1    | OrderDate  | Value | group |  Flag   |
+------------+------------+-------+-------+---------+
| 10/10/2020 | 10/01/2020 | hostA | grp1  | Valid   |
| 10/01/2020 | 09/30/2020 | hostB | grp1  | Valid   |
| Null       | 09/15/2020 | hostC | grp1  | Valid   |
| 08/01/2020 | 08/30/2020 | hostD | grp1  | Invalid |
| Null       | 10/01/2020 | hostP | grp2  | Valid   |
| Null       | 09/28/2020 | hostQ | grp2  | Valid   |
| 07/11/2020 | 08/08/2020 | hostR | grp2  | Invalid |
| 07/01/2020 | 08/01/2020 | hostS | grp2  | Invalid |
| NULL       | 07/01/2020 | hostL | grp2  | Invalid |
| NULL       | 08/08/2020 | hostM | grp3  | Valid   |
| NULL       | 08/01/2020 | hostN | grp3  | Valid   |
| NULL       | 07/01/2020 | hostO | grp3  | Valid   |
+------------+------------+-------+-------+---------+

My approach:

I created row_number for each group after ordering by OrderDate. Post that i am getting the min(row_number) having Current_date > (Date1 + 31Days) for each group and save it as new dataframe dfMin.

I then join it with df1 and dfMin on group and filter based on row_number(row_number < min(row_number))

This approach works for most cases. But when for a group all values of Date1 are NULL, this approach fails.

Is there any other better approach to include the above scenario as well?

Note: I am using pretty old version of Spark- Spark 1.5. Also windows function won't work in my environment(Its a custom framework and there are many restrictions in place). For row_number, i used zipWithIndex method.

0

There are 0 best solutions below