pyspark parsing complex json

63 Views Asked by At

I'm new to pyspark and I'm having issues converting a JSON string that is returned from an API call to a dataframe. I've tried a variety of solutions found online, with no success so far.

With just this command I get errors about not being able to infer the schema:

df = spark.read.json(retjson)

I tried generating a schema for the json format using this site: https://preetranjan.github.io/pyspark-schema-generator/

I pass the schema in and I get TypeErrors.

df = spark.createDataFrame(data=retjson,schema=s)

Here's a partial sample of the json format and the schema in the following two code blocks.

I'm really only interested the "Records" in the QueryResults node of the json. Also notice that when there is data in a field element it is ina "Value:" sub node.

Any suggestions on how to get the QueryResults section only into a DataFrame so it can be persisted to a table in the data lake

{
  "id": "384d5bca-7082-462b-8708-xxxxxxx",
  "rowNumber": 1,
  "note": null,
  "QueryResults": [
    {
      "id": "febfdf20-b7bf-4667-b1ac-668f17d023b0",
      "rowNumber": 1,
      "note": null,
      "ClientTime": {},
      "CommandName": {},
      "CommandTarget": {},
      "EventsCount": {
        "value": 1
      },
      "ExceptionsCount": {
        "value": 0
      },
      "Name": {},
      "Headers": {
        "value": "7364c0daae3c455ebd6adb49f09937a1"
      },
      "InstallationID": {
        "value": "D96034A1-28B95B7F-B440D442-BB08F62B-E3F8EC8B"
      },
      "ItemstoProcess": {},
      "LoggedEventsCount": {
        "value": 0
      },
      "LoggedExceptionsCount": {
        "value": 0
      },
      "LoggedSQLCount": {
        "value": 2
      },
      "ManagedMemory": {
        "value": 919.33244
      },
    schema = StructType([StructField('id',StringType(),True),  
    StructField('rowNumber',IntegerType(),True),  
    StructField('note',StringType(),True),  
    StructField('QueryDetails',ArrayType(StructType([StructField('id',StringType(),True),  
    StructField('rowNumber',IntegerType(),True),  
    StructField('note',StringType(),True),  
    StructField('ClientTime',StructType([]),True),  
    StructField('CommandName',StructType([]),True),  
    StructField('CommandTarget',StructType([]),True),  
    StructField('EventsCount',StructType([StructField('value',IntegerType(),True)]),True),  
    StructField('ExceptionsCount',StructType([StructField('value',IntegerType(),True)]),True),  
    StructField('GIName',StructType([]),True),  
    StructField('Headers',StructType([StructField('value',StringType(),True)]),True),  
    StructField('InstallationID',StructType([StructField('value',StringType(),True)]),True),  
    StructField('ItemstoProcess',StructType([]),True),  

1

There are 1 best solutions below

1
DileeprajnarayanThumula On

You can use Python API for Apache Spark

I have tried the below approach:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
retjson = '{"id": "384d5bca-7082-462b-8708-xxxxxxx", "rowNumber": 1, "note": null, "QueryResults": [{"id": "febfdf20-b7bf-4667-b1ac-668f17d023b0", "rowNumber": "2", "note": null, "ClientTime": {}, "CommandName": {}, "CommandTarget": {}, "EventsCount": {"value": 1}, "ExceptionsCount": {"value": 0}, "Name": {}, "Headers": {"value": "7364c0daae3c455ebd6adb49f09937a1"}, "InstallationID": {"value": "D96034A1-28B95B7F-B440D442-BB08F62B-E3F8EC8B"}, "ItemstoProcess": {}, "LoggedEventsCount": {"value": 0}, "LoggedExceptionsCount": {"value": 0}, "LoggedSQLCount": {"value": 2}, "ManagedMemory": {"value": 919.33244}}]}'
retjson_dict = json.loads(retjson.replace('%20', ' ').replace('%22', '"'))
schema = StructType([
    StructField("id", StringType(), True),
    StructField("rowNumber", IntegerType(), True),
    StructField("note", StringType(), True),
    StructField("QueryResults", StringType(), True)
])
data = [(retjson_dict.get("id"), int(retjson_dict.get("rowNumber")), retjson_dict.get("note"), json.dumps(retjson_dict.get("QueryResults")))]
df = spark.createDataFrame(data, schema=schema)
table_name = "new_json_table"
df.write.format("delta").mode("overwrite").saveAsTable(f"{table_name}")

Results:

select * from new_json_table
id  rowNumber   note    QueryResults
384d5bca-7082-462b-8708-xxxxxxx 1   null    [{"id": "febfdf20-b7bf-4667-b1ac-668f17d023b0", "rowNumber": "2", "note": null, "ClientTime": {}, "CommandName": {}, "CommandTarget": {}, "EventsCount": {"value": 1}, "ExceptionsCount": {"value": 0}, "Name": {}, "Headers": {"value": "7364c0daae3c455ebd6adb49f09937a1"}, "InstallationID": {"value": "D96034A1-28B95B7F-B440D442-BB08F62B-E3F8EC8B"}, "ItemstoProcess": {}, "LoggedEventsCount": {"value": 0}, "LoggedExceptionsCount": {"value": 0}, "LoggedSQLCount": {"value": 2}, "ManagedMemory": {"value": 919.33244}}]
  • In the above code json library is used for handling JSON data.

  • parsing the JSON string using json.loads and replaces %20 with space and %22 with double quotes in the string. In the schema for the Spark DataFrame.

  • The schema contains four fields: id (String), rowNumber (Integer), note (String), and QueryResults (String).

  • Next a single row of data is created as a list of tuples (data). Using the createDataFrame method to create a Spark DataFrame (dilip_df) with schema.

  • And the DataFrame (dilip_df) is written to a Delta table. and while write df to table.

  • I have used the The mode("overwrite") to check that if the table already exists or else it will be overwritten with the new data.