Retain recent record for each primary key with dedup transactions in the dataframe

30 Views Asked by At

I am working on Data pet project exploring delta format and tables. Initial load file contains Op which is operation flag as 'I' and this is read in pyspark and save into delta format.

The subsequent files which is changed data capture, When I try to load this in pyspark dataframe and perform operations for dedup I am running into issue.

from pyspark.sql.types import * 
from pyspark.sql.functions import to_timestamp

# Sample data as a list of tuples
data = [
    ("I", "2024-03-22 22:49:56.000000", 71, 104.75),
    ("U", "2024-03-22 22:50:00.000000", 72, 114.75),
    ("I", "2024-03-22 22:49:56.000000", 73, 10.00),
    ("U", "2024-03-22 22:50:56.000000", 73, 20.00),
    ("U", "2024-03-22 22:51:56.000000", 73, 30.00),
    ("I", "2024-03-22 22:55:57.000000", 74, 30.00),
    ("U", "2024-03-22 22:49:56.000000", 75, 40.00),
    ("U", "2024-03-22 22:52:56.000000", 75, 50.00),
    ("U", "2024-03-22 22:57:56.000000", 75, 60.00)
]

# Define the schema 
schema = StructType([
    StructField("Op", StringType(), True),
    StructField("sourceRecordTime", StringType(), True),
    StructField("id", IntegerType(), True),
    StructField("amount", DoubleType(), True)
])

# Create the DataFrame
df = spark.createDataFrame(data, schema)
df = df.withColumn("sourceRecordTime", to_timestamp(df["sourceRecordTime"], "yyyy-MM-dd HH:mm:ss.SSSSSS"))

# Conditions
# If the record for an id has only 'I' operation, retain that record.
# If the latest record for an id has Single 'U' operation, retain that record.
# If the latest record for an id has multiple 'U' operation, retain that latest record based on sourceRecordTime.
# If the latest record for an id has both 'I' and 'U' operation, retain the latest record with 'U' flag and change 'U' to 'I' in the final for that record

sort_order = Window.partitionBy(col('id')).orderBy(col('sourceRecordTime').desc())
update_df = df.withColumn("rec_val", row_number().over(sort_order)).filter("rec_val=1").drop("rec_val")
update_df.show()

Ouput: +---+-------------------+---+------+ | Op| sourceRecordTime| id|amount| +---+-------------------+---+------+ | I|2024-03-22 22:49:56| 71|104.75| | U|2024-03-22 22:50:00| 72|114.75| | U|2024-03-22 22:51:56| 73| 30.0| | I|2024-03-22 22:55:57| 74| 30.0| | U|2024-03-22 22:57:56| 75| 60.0| +---+-------------------+---+------+

Expected Output: +---+-------------------+---+------+ | Op| sourceRecordTime| id|amount| +---+-------------------+---+------+ | I|2024-03-22 22:49:56| 71|104.75| | U|2024-03-22 22:50:00| 72|114.75| | I|2024-03-22 22:51:56| 73| 30.0| | I|2024-03-22 22:55:57| 74| 30.0| | U|2024-03-22 22:57:56| 75| 60.0| +---+-------------------+---+------+

1

There are 1 best solutions below

0
ScootCork On

You are almost there as taking the last record per id already satisfies the first 3 conditions. You can cover the last condition by creating another window, only partitioning by id, and taking the min (the min of PySpark, i.e. pyspark.sql.functions.min) of the Op column. Note, this works as when ordering strings "I" < "U". If you have other operations, say "D", then you'd need to make some adjustments.

update_df = (
    df
    .withColumn("Op", min("Op").over(Window.partitionBy(col("id"))))
    .withColumn("rec_val", row_number().over(sort_order)).filter("rec_val=1")
    .drop("rec_val")
)

+---+-------------------+---+------+
| Op|   sourceRecordTime| id|amount|
+---+-------------------+---+------+
|  I|2024-03-22 22:49:56| 71|104.75|
|  U|2024-03-22 22:50:00| 72|114.75|
|  I|2024-03-22 22:51:56| 73|  30.0|
|  I|2024-03-22 22:55:57| 74|  30.0|
|  U|2024-03-22 22:57:56| 75|  60.0|
+---+-------------------+---+------+