Schema Mismatch while reading from Postgres and writing to Google BigQuery

60 Views Asked by At

I have created a pyspark script to migrate data from my PG DB to Google Bigquery via Dataproc, However I am facing an error while running the log on dataproc

Caused by: java.lang.NullPointerException: java.lang.IllegalStateException: Unexpected type: DecimalType(10,0)

Source table schema

                                             Table "public.ml_model_logs"

      Column       |            Type             | Collation | Nullable |                    Default                    
-------------------+-----------------------------+-----------+----------+-----------------------------------------------
 log_id            | numeric(12,0)               |           | not null | nextval('seq_ml_model_logs_log_id'::regclass)
 prediction_date   | timestamp without time zone |           |          | 
 fk_process_id     | numeric(10,0)               |           |          | 
 record_id         | numeric(20,0)               |           |          | 
 model_output_json | json                        |           |          | 
 predicted_ban     | numeric(1,0)                |           |          | 

Indexes:
    "ml_model_logs_pkey" PRIMARY KEY, btree (log_id)
Dataframe Schema


 |-- log_id: decimal(12,0) (nullable = true)
 |-- prediction_date: timestamp (nullable = true)
 |-- fk_process_id: decimal(10,0) (nullable = true)
 |-- record_id: decimal(20,0) (nullable = true)
 |-- model_output_json: string (nullable = true)
 |-- predicted_ban: decimal(1,0) (nullable = true)

and

Destination Table schema

log_id            NUMERIC   NULLABLE  
prediction_date   DATETIME  NULLABLE    
fk_process_id     NUMERIC   NULLABLE       
record_id         NUMERIC   NULLABLE    
model_output_json    JSON   NULLABLE      
predicted_ban     NUMERIC   NULLABLE

What I understood is that there is a datatype mismatch between long and NUMERIC datatypes, so I tries typecasting Attributes of Long to to int,Float,DecimalType, LongType to make it compatible with destination tables NUMERIC datatype, But still the error persists.

Can someone please help with this. Thanks in advance.

Edit : Added Source table schema and updated dataframe schema.

1

There are 1 best solutions below

2
TSCAmerica.com On

The columns in the DataFrame that correspond to BigQuery's NUMERIC type should be cast to PySpark's DecimalType with the appropriate precision and scale. I provided an example where columns like log_id, fk_process_id, record_id, and predicted_ban are cast to DecimalType(38, 0) to match BigQuery’s NUMERIC datatype. Try below

from pyspark.sql.types import DecimalType

# Assuming df is your DataFrame
df = df \
    .withColumn("log_id", df["log_id"].cast(DecimalType(38, 0))) \
    .withColumn("fk_process_id", df["fk_process_id"].cast(DecimalType(38, 0))) \
    .withColumn("record_id", df["record_id"].cast(DecimalType(38, 0))) \
    .withColumn("predicted_ban", df["predicted_ban"].cast(DecimalType(38, 0)))