I have the need to create logic on PySpark that updates tables that contain millions of records. The tables fetch data from BigQuery. When a new flow is inserted, it should compare the existing records with the new records: the comparison must be done via keys. If the keys match those of the new flow, update the entire record with all fields. Otherwise, it must insert new records based on the corresponding keys. All this to avoid duplicates. Do you have suggestions?
At the moment I thought about creating a temporary table and doing a merge. But I don't like this idea because it would create too much data.
I would prefer to use
foreachBatchwhich will also works for static dataframes not only for sparkstreaminggiving sample way of doing this
then do a microbatch logic UPSERT logic using foreachBatch Filter existing records Identify existing records that need to be updated using inner join Identify new records that need to be inserted using left_anti join Update existing records Insert new records
process using foreachBatch