Merge (insert /update)

52 Views Asked by At

I have the need to create logic on PySpark that updates tables that contain millions of records. The tables fetch data from BigQuery. When a new flow is inserted, it should compare the existing records with the new records: the comparison must be done via keys. If the keys match those of the new flow, update the entire record with all fields. Otherwise, it must insert new records based on the corresponding keys. All this to avoid duplicates. Do you have suggestions?

At the moment I thought about creating a temporary table and doing a merge. But I don't like this idea because it would create too much data.

1

There are 1 best solutions below

1
Ram Ghadiyaram On

I would prefer to use foreachBatch which will also works for static dataframes not only for sparkstreaming

giving sample way of doing this

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("read from bigquery and upsert") \
    .getOrCreate()
mydf = spark.read.format("bigquery") \
    .option("table", "yourtable") \
    .load()

then do a microbatch logic UPSERT logic using foreachBatch Filter existing records Identify existing records that need to be updated using inner join Identify new records that need to be inserted using left_anti join Update existing records Insert new records

 def do_micro_batch(df, batch_id):
    # keys for comparison
    keys = ['key1', 'key2']

     
    existing_records = bigquery_df.select(*keys)

   
    updates = df.join(existing_records, on=keys, how='inner')

    new_records = df.join(existing_records, on=keys, how='left_anti')
 
    updates.write.format("bigquery") \
        .option("table", "yourtable") \
        .option("writeDisposition", "WRITE_APPEND") \
        .save()

    new_records.write.format("bigquery") \
        .option("table", "yourtable") \
        .option("writeDisposition", "WRITE_APPEND") \
        .save()



process using foreachBatch

mydf.writeStream \
    .foreachBatch(do_micro_batch) \
    .start() \
    .awaitTermination()