I am writing orc snappy files to a Google Cloud Storage Bucket using PySpark and Hive. Hive produces a single file output that is significantly smaller than the output produced by Hive. How can I make PySpark produce an output the size that Hive does?
Here is an example output of two identical queries I ran to produce the exact same file in GCS, one with Hive and the other with PySpark.
PySpark output totaling 86.4 MB across 9 parts
Edit: Here are the codes I've been using:
To create the Hive table based on a csv file with 15 million rows and 6 columns of random integers.
drop table if exists dummy_data;
create external table if not exists dummy_data (
column1 int,
column2 int,
column3 int,
column4 int,
column5 int,
column6 int)
row format delimited
fields terminated by ','
stored as textfile
location 'gs://dummy_bucket_mlw/dummy_data/'
tblproperties("skip.header.line.count"="1");
To create a GCS table that I will load data from the dummy_data table into using Hive.
drop table if exists hive_test_table;
create external table if not exists hive_test_table (
column1 int,
column2 int,
column3 int,
column4 int,
column5 int,
column6 int)
stored as orc
location 'gs://dummy_bucket_mlw/hive_test_table'
tblproperties ('ORC.COMPRESS' = 'SNAPPY');
Load data into table from dummy_data table via Hive:
INSERT INTO hive_test_table
SELECT *
FROM dummy_data;
Create a table in Hive to load data into it using PySpark:
drop table if exists pyspark_test_table;
create external table if not exists hive_test_table (
column1 int,
column2 int,
column3 int,
column4 int,
column5 int,
column6 int)
stored as orc
location 'gs://dummy_bucket_mlw/hive_test_table'
tblproperties ('ORC.COMPRESS' = 'SNAPPY');
Updated PySpark code that combines all file parts into one, but does not compress it to that of the Hive method:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from google.cloud import storage
client = storage.Client()
xr = client.get_bucket("dummy_bucket_mlw")
spark = SparkSession.builder \
.appName("pyspark_test_load") \
.enableHiveSupport() \
.getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", 'nonstrict')
spark.conf.set("hive.server2.builtin.udf.blacklist", "emply_bl")
# Execute the query to retrieve the data
my_query = """
SELECT * FROM dummy_data
"""
df = spark.sql(my_query)
# Compress the data and insert into the existing Hive table
output_table = "pyspark_test_table"
# Configure compression and insert the data into the Hive table
df.repartition(1).write.format("orc").mode("append").option("compression", "snappy").insertInto(output_table)
At the end when you are writing to table, use
repartition(1).Example: