I'm trying to implement Storage Partition Join (SPJ) while doing the Merge in Apache Spark for Iceberg Tables.
Apache Spark Version: 3.5.1
Apache Iceberg Version: 1.4.3
Both Source & Target are Iceberg Tables, same partition strategy.
After reading this https://github.com/apache/iceberg/issues/7832. I'm setting all spark config required for SPJ.
Iceberg Tables:
CREATE OR REPLACE TABLE default.emp_table (id BIGINT, dep STRING, cmt String)
USING iceberg
PARTITIONED BY (bucket(2, id))
INSERT INTO default.emp_table VALUES (1, 'a',null), (2, 'b',null), (3, 'c',null), (4, 'd',null), (5, 'e',null), (6, 'f',null);
CREATE OR REPLACE TABLE default.emp_table_stage (id BIGINT, dep STRING, cmt String)
USING iceberg
PARTITIONED BY (bucket(2, id))
INSERT INTO default.emp_table_stage VALUES (1, 'a','test'), (2, 'b','test'), (3, 'c','test'), (4, 'd','test'), (5, 'e','test'), (6, 'f','test');
spark code
spark.conf.set("spark.sql.sources.bucketing.enabled",True)
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true")
df= spark.sql(
f" Explain MERGE INTO default.emp_table target "
f"USING default.emp_table_stage source "
f" ON target.id = source.id "
f"WHEN MATCHED THEN UPDATE SET * "
f"WHEN NOT MATCHED THEN INSERT *",
)
df.explain(extended=True)
== Optimized Logical Plan ==
CommandResult [plan#1547], Execute ExplainCommand, [[== Physical Plan ==
ReplaceData IcebergWrite(table=demo.default.emp_table, format=PARQUET)
+- Exchange hashpartitioning(staticinvoke(class org.apache.iceberg.spark.functions.BucketFunction$BucketLong, IntegerType, invoke, 2, id#1566L, IntegerType, LongType, false, true, true), 5), REBALANCE_PARTITIONS_BY_COL, 402653184, [plan_id=4530]
+- *(4) Project [id#1566L, dep#1567, cmt#1568]
+- MergeRowsExec[id#1566L, dep#1567, cmt#1568, _file#1569]
+- *(3) SortMergeJoin [id#1552L], [id#1555L], FullOuter
:- *(1) Sort [id#1552L ASC NULLS FIRST], false, 0
: +- *(1) Project [id#1552L, dep#1553, cmt#1554, _file#1560, true AS __row_from_target#1563, monotonically_increasing_id() AS __row_id#1564L]
: +- *(1) ColumnarToRow
: +- BatchScan demo.default.emp_table[id#1552L, dep#1553, cmt#1554, _file#1560] demo.default.emp_table (branch=null) [filters=, groupedBy=id_bucket] RuntimeFilters: [dynamicpruningexpression(_file#1560 IN subquery#1601)]
: +- Subquery subquery#1601, [id=#4481]
: +- *(4) HashAggregate(keys=[_file#1600], functions=[])
: +- Exchange hashpartitioning(_file#1600, 5), ENSURE_REQUIREMENTS, [plan_id=4477]
: +- *(3) HashAggregate(keys=[_file#1600], functions=[])
: +- *(3) Project [_file#1600]
: +- *(3) SortMergeJoin [id#1597L], [id#1576L], LeftSemi
: :- *(1) Sort [id#1597L ASC NULLS FIRST], false, 0
: : +- *(1) ColumnarToRow
: : +- BatchScan demo.default.emp_table[id#1597L, _file#1600] demo.default.emp_table (branch=null) [filters=id IS NOT NULL, groupedBy=id_bucket] RuntimeFilters: []
: +- *(2) Sort [id#1576L ASC NULLS FIRST], false, 0
: +- *(2) Project [id#1576L]
: +- *(2) Filter isnotnull(id#1576L)
: +- *(2) ColumnarToRow
: +- BatchScan demo.default.emp_table_stage[id#1576L, dep#1577, cmt#1578] demo.default.emp_table_stage (branch=null) [filters=, groupedBy=id_bucket] RuntimeFilters: []
+- *(2) Sort [id#1555L ASC NULLS FIRST], false, 0
+- *(2) Project [id#1555L, dep#1556, cmt#1557, true AS __row_from_source#1565]
+- *(2) ColumnarToRow
+- BatchScan demo.default.emp_table_stage[id#1555L, dep#1556, cmt#1557] demo.default.emp_table_stage (branch=null) [filters=, groupedBy=id_bucket] RuntimeFilters: []
I have tried almost all combination of config, still going for SMJ. Any help how we can trigger a SPJ.
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.sources.bucketing.enabled",True)
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")
spark.conf.set("spark.sql.shuffle.partitions",5)
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true")
In iceberg, the partition scheme is completely managed by iceberg's metadata management, so partition pushdown and pruning is just an issue of "iceberg fileformat" (like any other storage engine like an RDBMS). So, you cannot see the partition pushdown in the spark sql plan.