I have a data frame with 3 columns, presented below.
I would like to calculate potential risk of each material. For each material, the potential risk starts with the date corresponding to the first non-zero value in cut column and ends with the date after which there are three zero values in cut column. The Cut_Total is calculated by taking the sum of cut values within this time frame.
| Material | Date | Cut |
|---|---|---|
| Soap | 01/01/2024 | 1 |
| Soap | 01/02/2024 | 5 |
| Soap | 01/03/2024 | 0 |
| Soap | 01/04/2024 | 0 |
| Soap | 01/05/2024 | 0 |
| Soap | 01/06/2024 | 0 |
| Soap | 01/07/2024 | 2 |
| Soap | 01/08/2024 | 0 |
| Soap | 01/09/2024 | 2 |
| Soap | 01/10/2024 | 0 |
| Brush | 01/01/2024 | 0 |
| Brush | 01/02/2024 | 0 |
| Brush | 01/03/2024 | 2 |
| Brush | 01/04/2024 | 3 |
| Brush | 01/05/2024 | 0 |
| Brush | 01/06/2024 | 0 |
| Brush | 01/07/2024 | 5 |
| Brush | 01/08/2024 | 4 |
| Brush | 01/09/2024 | 0 |
| Brush | 01/10/2024 | 1 |
I was trying to do this using window functions but could not achieve expected results. Please help me with the approach/codes to solve this. Thanks!
I tried this:
window = Window.partitionBy("Material").orderBy("Date")
# Define a window to look forward up to three rows
forward_window = window.rowsBetween(0, 3)
# Add a new column that checks for non-zero 'Cut' values
df = df.withColumn("non_zero_cut", F.when(F.col("Cut") > 0, True).otherwise(False))
# Add a column to count consecutive zeros using forward-looking window
df = df.withColumn("consecutive_zeros", F.sum(F.when(F.col("Cut") == 0, 1).otherwise(0)).over(forward_window))
# Find rows where risk period starts (first non-zero 'Cut')
df = df.withColumn("start_of_risk", F.when((F.col("non_zero_cut") == True) & (F.lag("Cut", 1).over(window) == 0) | (F.lag("Cut", 1).over(window).isNull()), F.col("Date")).otherwise(None))
# Fill down 'start_of_risk' to propagate the start date
df = df.withColumn("Risk_Start_Date", F.max("start_of_risk").over(window.rowsBetween(Window.unboundedPreceding, 0)))
# Identify the end of the risk period (three consecutive 'Cut' values of zero)
df = df.withColumn("end_of_risk", F.when(F.col("consecutive_zeros") >= 3, F.lag("Date", -3).over(window)).otherwise(None))
# Fill down 'end_of_risk' to propagate the end date
df = df.withColumn("Risk_End_Date", F.max("end_of_risk").over(window.rowsBetween(Window.unboundedPreceding, 0)))
# Filter to keep only rows where risk period ends
df = df.filter(F.col("Risk_End_Date").isNotNull())
# Calculate 'Cut_Total' by summing 'Cut' values between 'Risk_Start_Date' and 'Risk_End_Date'
cut_risk_summary = df.groupBy("Material", "Risk_Start_Date", "Risk_End_Date").agg(F.sum("Cut").alias("Cut_Total"))
cut_risk_summary.display()
The result obtained is below, but it is not the intended result:
| Material | Risk_Start_Date | Risk_End_Date | Cut_Total |
|---|---|---|---|
| Soap | 01/01/2024 | 01/05/2024 | 5 |
| Soap | 01/01/2024 | 01/06/2024 | 0 |
| Soap | 01/01/2024 | 01/07/2024 | 0 |
| Soap | 01/01/2024 | 01/08/2024 | 0 |
| Soap | 01/07/2024 | 01/08/2024 | 2 |
| Soap | 01/09/2024 | 01/08/2024 | 2 |
The expected result of this exercise is as follows:
| Material | Cut_Total | Risk_Start_Date | Risk_End_Date |
|---|---|---|---|
| Soap | 6 | 01/01/2024 | 01/02/2024 |
| Soap | 4 | 01/07/2024 | 01/09/2024 |
| Brush | 15 | 01/03/2024 | 01/10/2024 |
Let's think this through. I will use native SQL in this answer, which you can then use as it is or break up into function calls. First, let's understand what are our risk start dates:
Here we find the record for which does not exist a pair with the same material with the
Datebeing the previous day.Similarly, risk end dates:
Okay. So, our records will result from querying the table, joining the start and the end days with no overlaps:
We apply the same logic here, we join the start date and the end date to our records, then, in order to exclude overextended intervals, we left join with supposed in-between start dates and end dates and those intervals which have no further start and end dates in-between are the actual results we are seeking. Now that this is clear, we group the results so we can compute the sum for cut.
Of course, this solution can be greatly simplified if we create temporary tables for start and end dates and insert-select the appropriate records and use those instead of the subselections here.
EDIT
This is how I converted your script data into row insertions:
This results in these insertions:
I created the table as
This way I managed to reproduce the issue and it turned out that I forgot to check for
Materialmatch in theonconditions of joins. Here's the fix:EDIT
Further improvements: