Join two tables by columnname when columnames for joining stored in a table

69 Views Asked by At

i try to join two tables by a list which was created from an other table. But i didn't find a way how it works.

Base would be:

data = [('B_YEAR, B_ORG', 'YEAR, ORG', 1), ('B_YEAR', 'YEAR', 2) ]
test = spark.createDataFrame(data, ['key_orig', 'key_map', 'Fil_Number']) 
fc_year == 2024

Now i try to join:

for i in range (1,x):
test_1= test.filter(col("Fil_Number")==1)
list_key_orig = test_1.select("key_orig").collect()
list_key_map = test_1.select("key_map").collect()

df_calc_values =spark.read.table("hive_metastore.reporting_datalake.df_calc_fc_values")

df_calc_values = df_calc_values.filter((col("GJ")==fc_year))
display(df_calc_values)

df_new= df_orig.join(df_calc_values, on = (key_orig1.key_orig == key_map1.key_map) ,how = "left")

I already searched in google but no result.

Tables:

Table - df_calc_values enter image description here

Table - df_orig enter image description here

1

There are 1 best solutions below

5
Samuel Demir On BEST ANSWER

First of all your question is a disaster! There are no insights where the original DataFrame came from. Also your for loop is not correct indented. So it is not feasible to recognize what is part of your loop and what not. your fc_year variable is not declared. You are importing all namespacing from the functions module of pyspark by from pyspark.sql.functions import * that you should absolutely not do to avoid naming conflicts and stuff like that in your code. Also you cannot expect an answer if a question plucked out of air and without any relations to each other.

Anyway I tried to provide you an answer on how you can build a joining string out of your given information and how to join two DataFrames by this string.

from pyspark.sql import functions as f

df_calc_values =spark.read.table("hive_metastore.reporting_datalake.df_calc_fc_values")
df_orig = spark.read.table("original_table from wherever the table came from")
max_fil_no = test.agg(f.max("Fil_Number").alias("max")).first().max
fc_year = 2024
alias_original = "original"
alias_calc_val = "calc_vals"

dfs = {}

for i in range(1, max_fil_no+1):
    test_1= test.filter(f.col("Fil_Number")==i)
    list_key_orig = test_1.select("key_orig").collect()
    list_key_map = test_1.select("key_map").collect()

    join_cols_original = [x.replace(" ", "") for x in list_key_orig[0].key_orig.split(",")]
    join_cols_calc_vals = [x.replace(" ", "") for x in list_key_map[0].key_map.split(",")]
    
    join_str_combos = []
    for key_orig, key_cal_vals in zip(join_cols_original, join_cols_calc_vals):
        join_str_combos.append(f"{alias_original}.{key_orig} == {alias_calc_val}.{key_cal_vals}")

    join_str = " AND ".join(join_str_combos)

    df_calc_values = df_calc_values.filter((f.col("GJ")==fc_year))

    df_new = (
        df_orig.alias(alias_original)
        .join(
            df_calc_values.alias(alias_calc_val),
            on=f.expr(join_str),
            how="left"
        )
    )
    dfs[i] = df_new