How to reduce file size of PySpark output to that of Hive?

67 Views Asked by At

I am writing orc snappy files to a Google Cloud Storage Bucket using PySpark and Hive. Hive produces a single file output that is significantly smaller than the output produced by Hive. How can I make PySpark produce an output the size that Hive does?

Here is an example output of two identical queries I ran to produce the exact same file in GCS, one with Hive and the other with PySpark.

Hive output of 72.9 MB

PySpark output totaling 86.4 MB across 9 parts

Edit: Here are the codes I've been using:

To create the Hive table based on a csv file with 15 million rows and 6 columns of random integers.

    drop table if exists dummy_data;
    create external table if not exists dummy_data (
    column1 int,
    column2 int,
    column3 int,
    column4 int,
    column5 int,
    column6 int)
    row format delimited
    fields terminated by ','
    stored as textfile
    location 'gs://dummy_bucket_mlw/dummy_data/'
    tblproperties("skip.header.line.count"="1");

To create a GCS table that I will load data from the dummy_data table into using Hive.

    drop table if exists hive_test_table;
    create external table if not exists hive_test_table (
    column1 int,
    column2 int,
    column3 int,
    column4 int,
    column5 int,
    column6 int)
    stored as orc
    location 'gs://dummy_bucket_mlw/hive_test_table'
    tblproperties ('ORC.COMPRESS' = 'SNAPPY');

Load data into table from dummy_data table via Hive:

    INSERT INTO hive_test_table
    SELECT *
    FROM dummy_data;

Results in this output

Create a table in Hive to load data into it using PySpark:

    drop table if exists pyspark_test_table;
    create external table if not exists hive_test_table (
    column1 int,
    column2 int,
    column3 int,
    column4 int,
    column5 int,
    column6 int)
    stored as orc
    location 'gs://dummy_bucket_mlw/hive_test_table'
    tblproperties ('ORC.COMPRESS' = 'SNAPPY');

Updated PySpark code that combines all file parts into one, but does not compress it to that of the Hive method:

import findspark
findspark.init()
from pyspark.sql import SparkSession
from google.cloud import storage

client = storage.Client()
xr = client.get_bucket("dummy_bucket_mlw")

spark = SparkSession.builder \
    .appName("pyspark_test_load") \
    .enableHiveSupport() \
    .getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", 'nonstrict')
spark.conf.set("hive.server2.builtin.udf.blacklist", "emply_bl")

# Execute the query to retrieve the data
my_query = """
    SELECT * FROM dummy_data
"""
df = spark.sql(my_query)

# Compress the data and insert into the existing Hive table
output_table = "pyspark_test_table"

# Configure compression and insert the data into the Hive table
df.repartition(1).write.format("orc").mode("append").option("compression", "snappy").insertInto(output_table)

Results in this output

1

There are 1 best solutions below

3
notNull On

At the end when you are writing to table, use repartition(1).

Example:

dataframe.repartition(1)...