How to perform an upsert (insert + update) from a pyspark dataframe to an azure sql database table?

661 Views Asked by At

I am trying to do an upsert from a pyspark dataframe to a sql table.

sparkdf is my pyspark dataframe. Test is my sql table in an azure sql database.

I have the following so far:

def write_to_sqldatabase(final_table, target_table):
    #Write table data into a spark dataframe
    final_table.write.format("jdbc") \
        .option("url", f"jdbc:sqlserver://{SERVER};databaseName={DATABASE}") \
        .option("dbtable", f'....{target_table}') \
        .option("user", USERNAME) \
        .option("password", PASSWORD) \
        .mode("append") \
        .save()

and

spark.sql("""
merge target t
using source s
on s.Id = t.Id
when matched then 
update set *
when not matched then insert *
""")

and

jdbc_url = f"jdbc:sqlserver://{SERVER};database={DATABASE};user={USERNAME};password={PASSWORD}"
sparkdf.createOrReplaceTempView('source')
df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "(merge into target t using source s on s.Id = t.Id when matched then  update set * when not matched then insert *) AS subquery") \
    .load()

The latter is not working since the * seems to be not supported in azure sql server. Think you have to declare the columns and values without using the *. Want to do this dynamically however because I have a lot of columns and a lot of different tables for which I want to do the upsert.

I was trying different options, but nothing worked so far.

1

There are 1 best solutions below

0
thebluephantom On

As per this https://issues.apache.org/jira/browse/SPARK-19335 Spark dataframe writer API does not have such a feature for jdbc. Have to roll your own solution.