How is Spark's "exploding" of array/map fields a SELECT operation?

349 Views Asked by At

I am new to Python a Spark, currently working through this tutorial on Spark's explode operation for array/map fields of a DataFrame.

Based on the very first section 1 (PySpark explode array or map column to rows), it's very intuitive. The minimum working example DataFrame is created the Annex below. The schema and DataFrame table are:

>>> df.printSchema()
root
 |-- name: string (nullable = true)
 |-- knownLanguages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

>>> df.show(truncate=False)
+----------+-------------------+-----------------------------+
|name      |knownLanguages     |properties                   |
+----------+-------------------+-----------------------------+
|James     |[Java, Scala]      |{eye -> brown, hair -> black}|
|Michael   |[Spark, Java, null]|{eye -> null, hair -> brown} |
|Robert    |[CSharp, ]         |{eye -> , hair -> red}       |
|Washington|null               |null                         |
|Jefferson |[1, 2]             |{}                           |
+----------+-------------------+-----------------------------+

The explode function is illustrated as follows:

>>> df \
... .select(df.name,explode(df.knownLanguages)) \
... .show()
+---------+------+
|name     |col   |
+---------+------+
|James    |Java  |
|James    |Scala |
|Michael  |Spark |
|Michael  |Java  |
|Michael  |null  |
|Robert   |CSharp|
|Robert   |      |
|Jefferson|1     |
|Jefferson|2     |
+---------+------+

The explode function is shown in the context of a SELECT query, however, which I find to be very unintuitive. SELECT prunes away rows and never increases the height of a data frame. Only joins potentially increase height, but even there, the filtering of rows is applied to a Cartesian join [1], so is still a potential reduction in height rather than an increase. Correct me if I'm wrong, but the above SELECT is not being applied to a join, since it is invoked as a method of DataFrame df.

I tried to better see how explode fits SELECT through the latter's doc string: "Projects a set of expressions and returns a new :class:DataFrame". Projection refers to choosing column expressions. I unsuccessfully tried to get insight into how the above explode code fits SELECTion by examining its content:

explode(df.knownLanguages) # Shows no columnar data
Out[114]: Column<'explode(knownLanguages)'>

Later, I found that it is not possible to examine the columnar data content of Column object, as described here.

The prototype for explode returns a Column object while the doc string says that it "Returns a new row for each element in the given array or map". It's difficult to picture such column, as there is no "given array or map" -- there are as many heterogenous arrays/maps as there are records in DataFrame df.

Even if we accept that the Column object doesn't contain such columnar data, it's necessary to picture how such a column would be conceptually constructed in order to see how the SELECT makes sense. I can't come up with such a column of data that would make sense in the SELECT query because no matter how the explode column is constructed, it will be of a different height than DataFrame df.

Would it be correct to conclude that explode() can yield no column expression that would fit SELECT's projection/selection operation as applied DataFrame df, and that it is simply a signal to the select() method to create a new DataFrame by replicating each record $i$ by $n_i$ times, where $n_i$ is the number of items in the record's array/map?

I'm just starting to find may way around Spark. I anticipate, however, that if explode() breaks the projection/selection model of SELECT, it may be difficult to craft more complex queries than in the tutorial based on knowledge of designed-for behaviour.

Notes

[1] SELECT filters a Cartesian join in concept, though of course, not in execution. This is reflected by the fact that early SQL used WHERE in place of ON. All the WHERE clauses are (conceptually) applied to a Cartesian join.

Annex: Create minimum working example DataFrame table

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark-by-examples').getOrCreate()

arrayData = [
        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})
]

df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])
0

There are 0 best solutions below