Time series pyspark summation

69 Views Asked by At

I have a pyspark dataframe with data which represents events. I want to calculate the total duration within a window for a given event.

Lets assume i have the following pyspark dataframe:

from pyspark.sql.types import (
    LongType,
    StringType,
    StructField,
    StructType,
    TimestampType,
)

from pyspark.sql.functions import col, to_timestamp

def load_traffic_df(datetime_now):
    result = [
        (
            "1",
            600,
            (datetime_now - timedelta(hours=1,minutes=10)).timestamp(),
            (datetime_now - timedelta(hours=1)).timestamp(),
        ),
        (
            "1",
            3600,
            (datetime_now - timedelta(hours=1)).timestamp(),
            (datetime_now).timestamp(),
        ),
        (
            "1",
            901,
            (datetime_now - timedelta(minutes=15, seconds=1)).timestamp(),
            (
                datetime_now
            ).timestamp(),
        ),
        (
            "1",
            901,
            (datetime_now - timedelta(hours=1)).timestamp(),
            (
                datetime_now  - timedelta(hours=1) + timedelta(minutes=15, seconds=1)
            ).timestamp(),
        ),
        (
            "1",
            600,
            (datetime_now).timestamp(),
            (datetime_now + timedelta(minutes=10)).timestamp(),
        )
    ]

    df = (
        spark.createDataFrame(data=result,schema=["id","duration","start","end",])
    ).withColumn("start", to_timestamp("start")).withColumn("end", to_timestamp("end"))

    df_fixed_schema = spark.createDataFrame(
        df.rdd,
        schema=StructType(
            [
                StructField("id", StringType(), True),
                StructField("duration", LongType(), True),
                StructField("start", TimestampType(), True),
                StructField("end", TimestampType(), True),
            ]
        ),
    )
    return df_fixed_schema`

I want to end up with a column whith the running sum within a given window. Lets assume the window is size of 1 hours (3600 seconds).

From the four datapoints i then want to end up with the following:

Running duration
3901
4501
2400
600

My initial solution was something like this:

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from datetime import datetime, timedelta

datetime_now = datetime.now()

# Initialize Spark session
spark = SparkSession.builder.appName("RunningDurationExample").getOrCreate()

df = load_traffic_df(datetime_now)

# Define the window specification
window_spec = (
    Window.partitionBy("id")
    .orderBy(F.unix_timestamp("start"))
    .rangeBetween(0, 3600)
)

# Calculate the running_duration within the window
df_with_duration = df.withColumn(
    "running_duration",
    F.sum("duration").over(window_spec)
).withColumn(
    "forced_window_end",
    F.to_timestamp(F.expr("cast(start as long) + 3600"))
)

df_with_duration = df_with_duration.withColumn(
    "running_end",
    F.max("end").over(window_spec)
)


# Show the result
df_with_duration.show(truncate=False)`

However, that yields this:

id duration start end running_duration forced_window_end running_end
1 600 2023-09-28 17:15:07 2023-09-28 17:25:07 5100 2023-09-28 18:15:07 2023-09-28 18:25:07
1 3600 2023-09-28 17:25:07 2023-09-28 18:25:07 5100 2023-09-28 18:25:07 2023-09-28 18:35:07
1 900 2023-09-28 18:10:07 2023-09-28 18:25:07 1500 2023-09-28 19:10:07 2023-09-28 18:35:07
1 600 2023-09-28 18:25:07 2023-09-28 18:35:07 600 2023-09-28 19:25:07 2023-09-28 18:35:07

The issue here is that I actually count in extra duration which goes beyond forced_window_end as seen with running_end > forced_window_end. I've then tried to calculate how many seconds that running_end is bigger than forced_window_end to subtract this, but that still doesn't subtract the correct amount, as this example contains overlapping events. And thus when I try to subtract I only account for the 1 overlapping event not the other. And thus subtract only 600 and not the 1200 to reach 3900 (for the first row).

I've then thought about "absorbing" the overlapping event into the bigger event and just sum the duration, before performing the running summation. But what if three events overlap? Then I need to absorb recursively until no events overlap. It would work, but I have no idea how to recursively do this in Pyspark.

Is there anyone out there savvy enough to crack this? I've been at it for hours to no avail. Again, my end result would be 3900, 4500, 2400, 600 preferred.

Any help is highly appreciated.

0

There are 0 best solutions below