How spark get the size of a dataframe for broadcast?

680 Views Asked by At

I set this setting : --conf spark.sql.autoBroadcastJoinThreshold=209715200 //200mb

And i want to decrease this amount to be just a little higher than a specific dataFrame (Let s call it bdrDf)

I tried to esmimate the bdrDf :

import org.apache.commons.io.FileUtils

val bytes = sparkSession.sessionState.executePlan(bdrDf.queryExecution.logical)
.optimizedPlan.stats(sparkSession.sessionState.conf).sizeInBytes

println("bdrDfsize mb:" + FileUtils.byteCountToDisplaySize(bytes.toLong))

i got : 58 MB

Is this the size that Spark will get when it will check if the dataframe is below or not of spark.sql.autoBroadcastJoinThreshold ?

I also saw this metric of the sparkUI :

enter image description here

It corresponds to 492 MB

Is one of my values correct ? if no, how to estimate the size of my dataframe ?

code:

val Df= readFromHive()      
import org.apache.commons.io.FileUtils     

def checkSize(df: DataFrame)(implicit spark: SparkSession) = {       
  df.cache.foreach(el => el)       
  val catalyst_plan = df.queryExecution.logical

 val df_size_in_bytes = spark.sessionState.executePlan(catalyst_plan).optimizedPlan.stats(sparkSession.sessionState.conf).sizeInBytes

 logger.info("size in mO:" + 
   FileUtils.byteCountToDisplaySize(df_size_in_bytes.toLong))       
 logger.info("size bytes:" + df_size_in_bytes)     

}     

checkSize(Df)
1

There are 1 best solutions below

7
Emiliano Martinez On

I used this function:

  def checkSize(df: DataFrame)(implicit spark: SparkSession) = {
    df.cache.foreach(el => el)
    val catalyst_plan = df.queryExecution.logical
    val df_size_in_bytes = spark.sessionState.executePlan(
      catalyst_plan).optimizedPlan.statistics.sizeInBytes
    df_size_in_bytes
  }

With this method is mandatory to cache the df, and because it is a lazy operation you need to perform the foreach action, a little weird ..., check if that works for you