PySpark performance chained transformations vs successive reassignment

65 Views Asked by At

I'm somewhat new to PySpark. I understand that it uses lazy evaluation, meaning that execution of a series of transformations will be deferred until some action is requested, at which point the Spark engine optimizes the entire set of transformations and then executes them.

Therefore, from both functional and performance standpoint I would expect this:

Approach A

df = spark.read.parquet(path) 
df = df.filter(F.col('state') == 'CA')
df = df.select('id', 'name', 'subject')
df = df.groupBy('subject').count()

to be the same as this:

Approach B

df = spark.read.parquet(path)\
    .filter(F.col('state') == 'CA')\
    .select('id', 'name', 'subject')\
    .groupBy('subject').count()

I like the style of Approach A because it would allow me to break up my logic into smaller (and re-usable) functions.

However, I have come across a few blog posts (e.g. here, here and here) that are confusing to me. These posts deal specifically with the use of successive withColumn statements and suggest using a single select instead. The underlying rationale seems to be that since dataframes are immutable, successive withColumn usage is detrimental to performance.

So my question is... would I have that same performance issue when using Approach A? Or is it just an issue specific to withColumn?

3

There are 3 best solutions below

3
thebluephantom On BEST ANSWER

There are 2 questions in fact.

First question with codes samples. Unlike the 1st answer I disagree.

Try both code segments with an .explain() and you will see the generated Physical Plan for Execution is exactly the same.

Spark is based on lazy evaluation. That is to say:

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

The upshot of all this is, that I ran similar code to yours with 2 filters applied, and note that as the Action .count causes just-in-time evaluation, Catalyst filtered out based on both the first and the 2nd filter. This is known as "Code Fusing" - which can be done to late execution aka Lazy Evaluation.

Snippet 1 and Physical Plan

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import col

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)

df = df.filter(col('lastname') == 'Jones')
df = df.select('firstname', 'lastname', 'salary')
df = df.filter(col('lastname') == 'Jones2')
df = df.groupBy('lastname').count().explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lastname#212], functions=[finalmerge_count(merge count#233L) AS count(1)#228L])
   +- Exchange hashpartitioning(lastname#212, 200), ENSURE_REQUIREMENTS, [plan_id=391]
      +- HashAggregate(keys=[lastname#212], functions=[partial_count(1) AS count#233L])
         +- Project [lastname#212]
            +- Filter (isnotnull(lastname#212) AND ((lastname#212 = Jones) AND (lastname#212 = Jones2)))
               +- Scan ExistingRDD[firstname#210,middlename#211,lastname#212,id#213,gender#214,salary#215]

Snippet 2 and Same Physical Plan

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import col
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema2 = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df2 = spark.createDataFrame(data=data2,schema=schema2)

df2 = df2.filter(col('lastname') == 'Jones')\
       .select('firstname', 'lastname', 'salary')\
       .filter(col('lastname') == 'Jones2')\
       .groupBy('lastname').count().explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lastname#299], functions=[finalmerge_count(merge count#320L) AS count(1)#315L])
   +- Exchange hashpartitioning(lastname#299, 200), ENSURE_REQUIREMENTS, [plan_id=577]
      +- HashAggregate(keys=[lastname#299], functions=[partial_count(1) AS count#320L])
         +- Project [lastname#299]
            +- Filter (isnotnull(lastname#299) AND ((lastname#299 = Jones) AND (lastname#299 = Jones2)))
               +- Scan ExistingRDD[firstname#297,middlename#298,lastname#299,id#300,gender#301,salary#302]

Second Question - withColumn

Doing this:

df = df.filter(col('lastname') == 'Jones')
df = df.select('firstname', 'lastname', 'salary')
df = df.withColumn("salary100",col("salary")*100) 
df = df.withColumn("salary200",col("salary")*200).explain()

or via chaining gives the same result as well. I.e. it does not matter how you write the transformations. The final Physical Plan is what counts, but that optimization has overhead though. Depends how you think on this, select is the alternative.

== Physical Plan ==
*(1) Project [firstname#399, lastname#401, salary#404, (salary#404 * 100) AS salary100#414, (salary#404 * 200) AS salary200#419]
+- *(1) Filter (isnotnull(lastname#401) AND (lastname#401 = Jones))
   +- *(1) Scan ExistingRDD[firstname#399,middlename#400,lastname#401,id#402,gender#403,salary#404]

This article https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015#:~:text=Summary,number%20of%20transforms%20on%20DataFrame%20 takes a different perspective on the cost of optimization as opposed to 'projection' issue. I am, was surprized that 3 withColumns would cause any issue. If it is an issue then it points to crap optimization. And I suspect that looping of many features is the real issue, not a small number of withColumns.

1
Prathik Kini On

Approach A:

  • In this approach, each line creates a new DataFrame, and the previous DataFrame is replaced with the new one.
  • This can lead to performance issues, especially with large datasets, due to the overhead of DataFrame creation and management
  • Successive withColumn statements can lead to performance issues in PySpark due to the immutability of DataFrames.

Approach B:

  • Instead of using successive withColumn statements, you can use a single select statement to achieve the same result.

  • It performs the transformations in a single pass through the data.

  • Each operation in the chain is applied directly to the DataFrame
    returned by the preceding operation without creating intermediate
    DataFrames.

Approach B is typically preferred for its better performance and cleaner, more concise code.

0
Vikas Sharma On

I am assuming you have the following questions:

  1. Performance difference between not chaining the operations (Approach A in your question) vs chaining the operations (Approach B in your question)?
  2. Performance difference between multiple withColumn() statements vs a single select() statement?

Now answering the 2nd question is quite straightforward. As noted in the spark API reference for withColumn():

This method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select() with multiple columns at once.

I hope this makes it clear that in certain situations (for eg. loops) there can be a performance difference between withColumn() and select() and it's better to use the latter in general. You can read more on the hidden cost of withColumn() here.

Finally, coming to the 1st question: Short answer - Nope, there is no (negligible if any) performance difference between the two approaches.

Maybe (correct me if I am wrong) your doubt is arising from the fact that not chaining the dataframe operations and using them separately is creating multiple intermediary dataframes which might be causing an overhead but the exact thing is happening while chaining as well. Therefore, they have the same physical plans and performance with negligible differences if any.

So, the only difference that remains is readability.