Spark Databricks: Stream-Stream LeftOuter Join returning an empty result

63 Views Asked by At

Databricks, with Delta Live Tables, Spark 3.4

I have a streaming dataframe (let's call it "original") containing some records. I then filter this table based on some conditions, modify some column values and get a new "modified" dataframe.

I want to merge these two dataframes in a way that the records that are in "modified" replace the corresponding records in the "original". The approach I am taking right now is to "subtract" the modified dataframe from the original, and then union the result with the "modified" dataframe.

I have an ID field for each of the records.

I soon realized what I want to achieve can be done by either pyspark's subtract() function, or a left anti join. However, both of these are not supported if the right side dataframe is a streaming one. So, I tried to replicate a left-anti join with a left-outer join:

subtracted = original.join(modified, original['ID'] == modified['ID_mod'], 'leftOuter') \
                     .where(modified['ID_mod'].isNull()).select(original['*'])

However, then I got an error saying stream-stream left outer joins are only supported with watermarks and time range. So, following Spark's documentation, I did the following:

@dlt.table
def final_records():
    # origTime and modTime are two timestamp columns
    original = dlt.readStream("original_table").withWatermark('origTime', '2 hours')
    modified = dlt.readStream("modified_table").withWatermark('modTime', '3 hours')
    
    # Should give me original without modified records
    subtracted = original.join(modified, expr("""
                     ID = ID_mod AND
                     modTime >= origTime AND
                     modTime <= origTime + interval 1 hour
                 """), 'leftOuter') \
                 .where(modified['ID_mod'].isNull()).select(original['*'])

    return subtracted.unionByName(modified, allowMissingColumns=True)

However, when I run the DLT pipeline, in the scenario where there are four records in original and zero records in modified, I get zero records in subtracted, when I would expect to have four records in the result. The union hence also has zero records.

What could the issue be here?

1

There are 1 best solutions below

0
rick On

Upon further investigation, we determined that in the scenario where one of the dataframes (in our case the right "modified" table) is empty, the join will not succeed as there is no way for Spark to determine a valid watermark. It hence returns a blank dataframe from the join.