How can I process this Dataproc job faster?

90 Views Asked by At

The code reads a CSV of 628360 rows from GCS, applies a transformation to the created Dataframe with the method withColumn and writes to a partitioned Bigquery table.

Despite this simple workflow the job took 19h 42min hours to be processed. What can I do to process this faster?

I am using an Autoscaling Policy and I know it is not scaling up because there is no Yarn Memory Pending as you can see in the following screenshot.

The configuration of the cluster is the following:

gcloud dataproc clusters create $CLUSTER_NAME \
    --project $PROJECT_ID_PROCESSING \
    --region $REGION \
    --image-version 2.0-ubuntu18 \
    --num-masters 1 \
    --master-machine-type n2d-standard-2 \
    --master-boot-disk-size 100GB \
    --confidential-compute \
    --num-workers 4 \
    --worker-machine-type n2d-standard-2 \
    --worker-boot-disk-size 100GB \
    --secondary-worker-boot-disk-size 100GB \
    --autoscaling-policy $AUTOSCALING_POLICY \
    --secondary-worker-type=non-preemptible \
    --subnet $SUBNET \
    --no-address \
    --shielded-integrity-monitoring \
    --shielded-secure-boot \
    --shielded-vtpm \
    --labels label\
    --gce-pd-kms-key $KMS_KEY \
    --service-account $SERVICE_ACCOUNT \
    --scopes 'https://www.googleapis.com/auth/cloud-platform' \
    --zone "" \
    --max-idle 3600s
2

There are 2 best solutions below

0
Redtwin On BEST ANSWER

As it was discussed in Google Dataproc Pyspark - BigQuery connector is super slow I processed the same job without the deidentifier transformation

udf_deidentifier = udf(
    lambda x: deidentify(
        content=x,
        project_id_processing=args.project_id_processing,
    )
    if x is not None and x != ""
    else None,
    StringType(),
)
deidentified_df = transformed_df.withColumn(
    colName="col1", col=udf_deidentifier("col1")
).withColumn(colName="col2", col=udf_deidentifier("col2"))

it took 23 seconds to process a file with approximately 20.000 rows. I conclude this was the transformation that was delaying the job but I still don't know if I should use withColumn method.

3
Dagang Wei On

I see 2 major issues for performance:

  1. Disks are way too small. Disk IO throughput and IOPS are proportional to size 1. Small disks won't save you money, instead they significantly slow down the job, so you end up paying more because of more usage for expensive CPUs. The recommended minimum size is 1 TB for each PD, or use local SSDs, see 2 for more details.

  2. Many security features are enabled such as confidential compute and encrypted PD, etc. There is a tradeoff between security and performance, so enable them only when necessary. If the security features are required for your use case, you have to either accept the performance downgrade, or use faster CPUs and disks.