I'm new to pyspark and I'm having issues converting a JSON string that is returned from an API call to a dataframe. I've tried a variety of solutions found online, with no success so far.
With just this command I get errors about not being able to infer the schema:
df = spark.read.json(retjson)
I tried generating a schema for the json format using this site: https://preetranjan.github.io/pyspark-schema-generator/
I pass the schema in and I get TypeErrors.
df = spark.createDataFrame(data=retjson,schema=s)
Here's a partial sample of the json format and the schema in the following two code blocks.
I'm really only interested the "Records" in the QueryResults node of the json. Also notice that when there is data in a field element it is ina "Value:" sub node.
Any suggestions on how to get the QueryResults section only into a DataFrame so it can be persisted to a table in the data lake
{
"id": "384d5bca-7082-462b-8708-xxxxxxx",
"rowNumber": 1,
"note": null,
"QueryResults": [
{
"id": "febfdf20-b7bf-4667-b1ac-668f17d023b0",
"rowNumber": 1,
"note": null,
"ClientTime": {},
"CommandName": {},
"CommandTarget": {},
"EventsCount": {
"value": 1
},
"ExceptionsCount": {
"value": 0
},
"Name": {},
"Headers": {
"value": "7364c0daae3c455ebd6adb49f09937a1"
},
"InstallationID": {
"value": "D96034A1-28B95B7F-B440D442-BB08F62B-E3F8EC8B"
},
"ItemstoProcess": {},
"LoggedEventsCount": {
"value": 0
},
"LoggedExceptionsCount": {
"value": 0
},
"LoggedSQLCount": {
"value": 2
},
"ManagedMemory": {
"value": 919.33244
},
schema = StructType([StructField('id',StringType(),True),
StructField('rowNumber',IntegerType(),True),
StructField('note',StringType(),True),
StructField('QueryDetails',ArrayType(StructType([StructField('id',StringType(),True),
StructField('rowNumber',IntegerType(),True),
StructField('note',StringType(),True),
StructField('ClientTime',StructType([]),True),
StructField('CommandName',StructType([]),True),
StructField('CommandTarget',StructType([]),True),
StructField('EventsCount',StructType([StructField('value',IntegerType(),True)]),True),
StructField('ExceptionsCount',StructType([StructField('value',IntegerType(),True)]),True),
StructField('GIName',StructType([]),True),
StructField('Headers',StructType([StructField('value',StringType(),True)]),True),
StructField('InstallationID',StructType([StructField('value',StringType(),True)]),True),
StructField('ItemstoProcess',StructType([]),True),
You can use Python API for Apache Spark
I have tried the below approach:
Results:
In the above code
jsonlibrary is used for handling JSON data.parsing the JSON string using
json.loadsand replaces%20with space and%22with double quotes in the string. In the schema for the Spark DataFrame.The schema contains four fields: id (String), rowNumber (Integer), note (String), and QueryResults (String).
Next a single row of data is created as a list of tuples (data). Using the createDataFrame method to create a Spark DataFrame (dilip_df) with schema.
And the DataFrame (dilip_df) is written to a Delta table. and while write df to table.
I have used the The
mode("overwrite")to check that if the table already exists or else it will be overwritten with the new data.