I would like to create a historical dataset on which I would like to add all NEW records of a dataset.
For NEW records I mean new records or modified records: all those that are the same for all columns except the 'reference_date' one.
I insert here the piece of code that allows me to do it on all columns, but I can't figure out how to implement the exclusion condition of a column.
Inputs:
historical (previous):
| ID | A | B | dt_run |
|---|---|---|---|
| 1 | abc | football | 2022-02-14 21:00:00 |
| 2 | dba | volley | 2022-02-14 21:00:00 |
| 3 | wxy | tennis | 2022-02-14 21:00:00 |
input_df (new data):
| ID | A | B |
|---|---|---|
| 1 | abc | football |
| 2 | dba | football |
| 3 | wxy | tennis |
| 7 | abc | tennis |
DESIRED OUTPUT (new records in bold)
| ID | A | B | dt_run |
|---|---|---|---|
| 1 | abc | football | 2022-02-14 21:00:00 |
| 2 | dba | volley | 2022-02-15 21:00:00 |
| 3 | wxy | tennis | 2022-02-01 21:00:00 |
| 2 | dba | football | 2022-03-15 14:00:00 |
| 7 | abc | tennis | 2022-03-15 14:00:00 |
My code which doesn't work:
@incremental(snapshot_inputs=['input_df'])
@transform(historical = Output(....), input_df = Input(....))
def append(input_df, historical):
input_df = input_df.dataframe().withColumn('dt_run', F.to_timestamp(F.lit(datetime.now())))
historical = historical.write_dataframe(dataset_input_df.distinct()\
.subtract(historical.dataframe('previous', schema=input_df.schema)))
return historical

I've tested the following script and it works. In the following example, you don't need to
drop/selectcolumns. UsingwithColumnyou create the missing column ininput_dfand also change the values in the existing column inhistorical. This way you can safely dosubtracton the whole dataframe. Later, since you append the data rows, the oldhistoricalrows will stay intact with their old timestamps.