I am trying to do an upsert from a pyspark dataframe to a sql table.
sparkdf is my pyspark dataframe. Test is my sql table in an azure sql database.
I have the following so far:
def write_to_sqldatabase(final_table, target_table):
#Write table data into a spark dataframe
final_table.write.format("jdbc") \
.option("url", f"jdbc:sqlserver://{SERVER};databaseName={DATABASE}") \
.option("dbtable", f'....{target_table}') \
.option("user", USERNAME) \
.option("password", PASSWORD) \
.mode("append") \
.save()
and
spark.sql("""
merge target t
using source s
on s.Id = t.Id
when matched then
update set *
when not matched then insert *
""")
and
jdbc_url = f"jdbc:sqlserver://{SERVER};database={DATABASE};user={USERNAME};password={PASSWORD}"
sparkdf.createOrReplaceTempView('source')
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "(merge into target t using source s on s.Id = t.Id when matched then update set * when not matched then insert *) AS subquery") \
.load()
The latter is not working since the * seems to be not supported in azure sql server. Think you have to declare the columns and values without using the *. Want to do this dynamically however because I have a lot of columns and a lot of different tables for which I want to do the upsert.
I was trying different options, but nothing worked so far.
As per this https://issues.apache.org/jira/browse/SPARK-19335 Spark dataframe writer API does not have such a feature for jdbc. Have to roll your own solution.