I have a scenario where there will be multiple json files in S3 bucket, which my Pyspark script will load into dataframe.
Each json will have following structure -
[
{
"RECORDS_HEADER": {
"RECORD_TYPE": "HEADER",
"DATA": {
"COUNT_OF_OBJECTS_RECORD_TYPE_001": 1,
"COUNT_OF_OBJECTS_RECORD_TYPE_002": 3,
"COUNT_OF_OBJECTS_RECORD_TYPE_003": 6,
"COUNT_OF_OBJECTS_RECORD_TYPE_004": 3,
"COUNT_OF_OBJECTS_RECORD_TYPE_005": 1,
"COUNT_OF_OBJECTS_RECORD_TYPE_006": 1,
"COUNT_OF_OBJECTS_RECORD_TYPE_007": 0
}
},
"RECORDS_001": {
"RECORD_TYPE": "001",
"DATA": {
"MEMBER_ID": "U3652928373",
"INDIV_ID": "3003790",
"MBR_PREF_LANG": "English",
"MBR_PREF_LARGE_PRINT": "N",
"BATCH_BEGIN_DATE": "01/01/2023",
"BATCH_END_DATE": "11/30/2023",
"BATCH_RUN_DATE": "12/18/2023",
"BATCH_ID": 1000000078,
"BATCH_STATUS": "COMPLETE",
"COLATERAL_TYPE": "EOB"
}
},
"RECORDS_002": {
"RECORD_TYPE": "002",
"DATA": [
{
"CLAIM_NUMBER": "232423113200",
"RECEIVED_DATE": "08/30/2023",
"PROCESSED_DATE": "09/16/2023",
"PAID_DATE": "09/15/2023",
"CLAIM_DOS_BEGIN_DT": "08/29/2023",
"CLAIM_DOS_END_DT": "08/29/2023",
"CONTRACT_ID": "H0354",
"PBP": "027",
"SEGMENT": "000",
"GROUP_ID": "",
"CLASS_ID": "",
"CLASS_PLAN_ID": "",
"RENDERING_PROVIDER_NAME": "ESCHLER, DAVID J",
"CLAIM_TYP": "H",
"PAR_IND": "Y",
"CLAIM_QMB_ELIG": "N"
},
{
"CLAIM_NUMBER": "23P001125500",
"RECEIVED_DATE": "05/30/2023",
"PROCESSED_DATE": "06/16/2023",
"PAID_DATE": "06/15/2023",
"CLAIM_DOS_BEGIN_DT": "05/25/2023",
"CLAIM_DOS_END_DT": "05/25/2023",
"CONTRACT_ID": "H0354",
"PBP": "001",
"SEGMENT": "000",
"GROUP_ID": "",
"CLASS_ID": "",
"CLASS_PLAN_ID": "",
"RENDERING_PROVIDER_NAME": "PRAC HH, MHK HOME G.",
"CLAIM_TYP": "H",
"PAR_IND": "Y",
"CLAIM_QMB_ELIG": "N"
},
{
"CLAIM_NUMBER": "23E002114300",
"RECEIVED_DATE": "01/30/2024",
"PROCESSED_DATE": "01/16/2024",
"PAID_DATE": "01/15/2024",
"CLAIM_DOS_BEGIN_DT": "01/12/2024",
"CLAIM_DOS_END_DT": "01/12/2024",
"CONTRACT_ID": "H0354",
"PBP": "028",
"SEGMENT": "000",
"GROUP_ID": "",
"CLASS_ID": "",
"CLASS_PLAN_ID": "",
"RENDERING_PROVIDER_NAME": "ZEN, MAX",
"CLAIM_TYP": "H",
"PAR_IND": "Y",
"CLAIM_QMB_ELIG": "N"
}
]
},
"RECORDS_003": {
"RECORD_TYPE": "003",
"DATA": [
{
"CLAIM_NUMBER": "23E002113200",
"CLAIM_LINE_NUMBER": "2",
"CLAIMLN_DOS_BEGIN_DT": "08/29/2023",
"CLAIMLN_DOS_END_DT": "08/29/2023",
"BILLING_CODE": "96401",
"BILLING_CODE_DESC": "Complex medication injected",
"LINE_NOTES_CODE": "B",
"TOTAL_CHARGE": "475.00",
"ALLOWED": "0.00",
"PAID_AMT": "0.00",
"PATIENT_RESPONSIBILITY": "0.00",
"DEDUCTIBLE": "0.00",
"COINSURANCE": "0.00",
"COINSURANCE_PCT": "0.00",
"COPAY": "0.00",
"CLAIM_STATUS_DENIED": "Y",
"OVERAGE_AMOUNT": "0.00"
},
{
"CLAIM_NUMBER": "23E002113200",
"CLAIM_LINE_NUMBER": "4",
"CLAIMLN_DOS_BEGIN_DT": "08/29/2023",
"CLAIMLN_DOS_END_DT": "08/29/2023",
"BILLING_CODE": "96401",
"BILLING_CODE_DESC": "Complex medication injected",
"LINE_NOTES_CODE": "B",
"TOTAL_CHARGE": "475.00",
"ALLOWED": "0.00",
"PAID_AMT": "0.00",
"PATIENT_RESPONSIBILITY": "0.00",
"DEDUCTIBLE": "0.00",
"COINSURANCE": "0.00",
"COINSURANCE_PCT": "0.00",
"COPAY": "0.00",
"CLAIM_STATUS_DENIED": "Y",
"OVERAGE_AMOUNT": "0.00"
}
]
},
"RECORDS_004": {
"RECORD_TYPE": "004",
"DATA": [
{
"CLAIM_NUMBER": "123443536",
"CLAIM_TOTAL_CHARGE": "1800.00",
"CLAIM_ALLOWED": "900.00",
"CLAIM_PAID_AMT": "700.00",
"CLAIM_PATIENT_RESPONSIBILITY": "200.00",
"CLAIM_COPAY": "200.00",
"CLAIM_COINSURANCE": "0.00",
"CLAIM_DEDUCTIBLE": "0.00",
"CLAIM_OVERAGE_AMOUNT": "0.00"
},
{
"CLAIM_NUMBER": "123443536",
"CLAIM_TOTAL_CHARGE": "1000.00",
"CLAIM_ALLOWED": "335.77",
"CLAIM_PAID_AMT": "268.62",
"CLAIM_PATIENT_RESPONSIBILITY": "67.15",
"CLAIM_COPAY": "0.00",
"CLAIM_COINSURANCE": "67.15",
"CLAIM_DEDUCTIBLE": "0.00",
"CLAIM_OVERAGE_AMOUNT": "0.00"
},
{
"CLAIM_NUMBER": "123443536",
"CLAIM_TOTAL_CHARGE": "1900.00",
"CLAIM_ALLOWED": "0.00",
"CLAIM_PAID_AMT": "0.00",
"CLAIM_PATIENT_RESPONSIBILITY": "0.00",
"CLAIM_COPAY": "0.00",
"CLAIM_COINSURANCE": "0.00",
"CLAIM_DEDUCTIBLE": "0.00",
"CLAIM_OVERAGE_AMOUNT": "0.00"
}
]
},
"RECORDS_005": {
"RECORD_TYPE": "005",
"DATA": {
"PROCESSED_PERIOD_AMOUNT_CHARGED": "4700.00",
"PROCESSED_PERIOD_ALLOWED_AMT": "1235.77",
"PROCESSED_PERIOD_PAID_AMT": "968.62",
"PROCESSED_PERIOD_PATIENT_RESPONSIBILITY": "267.15"
}
},
"RECORDS_006": {
"RECORD_TYPE": "006",
"DATA": {
"YEAR_BEGIN_DT": "01/01/2024",
"YEAR_END_DT": "12/31/2024",
"START_DATE": "01/01/2024",
"END_DATE": "11/30/2024"
}
}
}
]
When all json files are loaded into single dataframe, then the dataframe looks like below format -
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| RECORDS_001| RECORDS_002| RECORDS_003| RECORDS_004| RECORDS_005| RECORDS_006| RECORDS_HEADER|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|{{01/01/2023, 11/...|{[{08/29/2023, 08...|{[{0.00, 96401, C...|{[{900.00, 0.00, ...|{{1235.77, 4700.0...|{{B, 11/30/2024, ...|{{1, 3, 6, 3, 1, ...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
Now I have transformation to do in the nested json like for one of the example in RECORDS_002 I need to change the date format for field PROCESSED_DATE for all records in the same row and for all the rows as well. Similarly there are other transformation required on different nested json fields.
Please let me know how I can perform transformation on nested json fields in PySpark dataframe without making the process too slower.
I am open for any further questions if needed.
Thanks !!
Check this out:
And the output is: