Dictionary comprehension while creating column in spark dataframe

83 Views Asked by At

I have to create a column in a dataframe which tracks old values vs new values

I have two types of columns in the dataframe, one is sot (source of truth) another are normal columns (metrics).

Example value of the resultant column which is a comparison of both types of columns would look like

"{'template_name': {'old_value': '1-en_US-Travel-Guide-Hotels-citymc-Blossom-Desktop-Like-HSR', 'new_value': '1-en_US-HTG_CMC_BEXUS_SecondaryKW_Test_Variant'}, 'template_id': {'old_value': '14949', 'new_value': '37807'}, 'num_questions': {'old_value': 29.0, 'new_value': 28}, 'duplicate_questions': {'old_value': '[]', 'new_value': []}}"

If we want to do something similar with normal dictionary comprehension in python it looks like this

>>> metrics = [1,2,3,4,5,6]
>>> sot = [3,1,6,2,5,1]
>>> str({i: {"old_value":sot[i], "new_value": metrics[i]} for i in range(6) if metrics[i] != sot[i]})
"{0: {'old_value': 3, 'new_value': 1}, 1: {'old_value': 1, 'new_value': 2}, 2: {'old_value': 6, 'new_value': 3}, 3: {'old_value': 2, 'new_value': 4}, 5: {'old_value': 1, 'new_value': 6}}"

But I can't do something similar with spark dataframe

metrics_cols = extract_metrics_spark_df.columns
temp.withColumn("flagged", str({ i : {"old_value" : f.col("sot_"+i) , "new_value": f.col(i)} for i in metrics_cols if f.col(i) != f.col("sot_"+i) }))

I couldn't figure how I could also use a udf in this case

Any help trying to create the column is appreciated.

2

There are 2 best solutions below

0
linpingta On

I think you could use a UDF for it, here is a scala example (pyspark should have similar functionality, I don't test the function)

import org.apache.spark.sql.functions._
import scala.collection.mutable.Map

// define udf
val transferFunc = udf((col1: Integer, col_slot1: Integer) => {
  val resultMap = Map[String, Integer]()
  resultMap["old_value"] = col1
  resultMap["new_value"] = col_slot1

  resultMap
})

// call udf
df.withColumn("flagged", transferFunc(col("col1"), col("col_slot1")))
0
Akshay Hazari On

I did something like this taking all columns zipping it and then comparing them to create the output. It may not be the best way.

from pyspark.sql.functions import udf

metric_cols = temp.columns
def check_differences(*cols):
    n = len(cols)
    columns = list(zip(cols[:n//2], cols[n//2:]))
    return str({metric_cols[i] : {"new_value": columns[i][0], "old_value": columns[i][1]} for i in range(n//2) if columns[i][0] != columns[i][1]})

check_differences_udf = udf(check_differences, StringType())


temp.withColumn("flagged", check_differences_udf('url', 'status_code', 'wizard_found', 'faq_module_found', 'faq_module_count', 'product_schema_found', 'template_name', 'template_id', 'meta_robots', 'x_robots_tag', 'page_type', 'canonical_url', 'canonical_url_status_code', 'canonical_url_x_robots_tag', 'canonical_url_redirects', 'canonical_url_hops', 'canonical_url_infinite_redirect', 'canonical_url_redirect_urls', 'canonical_url_final_url', 'canonical_url_final_status_code', 'num_questions', 'duplicate_questions_bool', 'num_duplicate_questions', 'duplicate_questions', 'gaia_id', 'redirects', 'hops', 'infinite_redirect', 'redirect_urls', 'final_url', 'final_status_code', 'googlebot_access', 'baiduspider_access', 'bingbot_access', 'sot_url', 'sot_status_code', 'sot_wizard_found', 'sot_faq_module_found', 'sot_faq_module_count', 'sot_product_schema_found', 'sot_template_name', 'sot_template_id', 'sot_meta_robots', 'sot_x_robots_tag', 'sot_page_type', 'sot_canonical_url', 'sot_canonical_url_status_code', 'sot_canonical_url_x_robots_tag', 'sot_canonical_url_redirects', 'sot_canonical_url_hops', 'sot_canonical_url_infinite_redirect', 'sot_canonical_url_redirect_urls', 'sot_canonical_url_final_url', 'sot_canonical_url_final_status_code', 'sot_num_questions', 'sot_duplicate_questions_bool', 'sot_num_duplicate_questions', 'sot_duplicate_questions', 'sot_gaia_id', 'sot_redirects', 'sot_hops', 'sot_infinite_redirect', 'sot_redirect_urls', 'sot_final_url', 'sot_final_status_code', 'sot_googlebot_access', 'sot_baiduspider_access', 'sot_bingbot_access'))

It gives the required output

"{'template_name': {'old_value': '1-en_US-Travel-Guide-Hotels-citymc-Blossom-Desktop-Like-HSR', 'new_value': '1-en_US-HTG_CMC_BEXUS_SecondaryKW_Test_Variant'}, 'template_id': {'old_value': '14949', 'new_value': '37807'}, 'num_questions': {'old_value': 29.0, 'new_value': 28}}"