I am using pyspark/SQL. I have a table (MAIN_TABLE) that contains THREE columns:
DATABASE_NAME
TABLE_NAME
SOURCE_TYPE
I would like to get all the data from the actual database and table found under the main tables in the columns DATABASE_NAME and TABLE_NAME. However, I would only like to grab the data from the tables that have a SOURCE_TYPE = 'STANDARD' anything else, should not grab.
I need basically a union of the data of all the tables found under MAIN_TABLE where SOURCE_TYPE = 'STANDARD' and they meet certain conditions. I tried running, but its not grabbing the data found under all the tables found under MAIN_TABLE that have SOURCE_TYPE = 'STANDARD'. Does something look like I'm missing?
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
# Create a SparkSession
spark = SparkSession.builder.appName("InsertData").getOrCreate()
# Filter the tables where SOURCE_TYPE = 'STANDARD'
config_df = spark.table("MAIN_TABLE").filter("SOURCE_TYPE = 'STANDARD'")
# Initialize an empty DataFrame to store the result
result_df = None
# Loop through the filtered tables
for row in config_df.collect():
database_name = row["database_name"]
table_name = row["table_name"]
# Generate a dynamic SQL query to select data from the source table
sql_query = f"""
SELECT
header.profile,
attributes.id,
header.location,
'SOURCE_TYPE' as source_type,
header.actionname as actionname,
transform.date,
header.ip,
header.country,
'{database_name}' as source_database_name,
'{table_name}' as source_event_name
FROM {database_name}.{table_name}
"""
# Execute the SQL query and create a DataFrame
source_data_df = spark.sql(sql_query)
# Union the source_data_df with the result_df
if result_df is None:
result_df = source_data_df
else:
result_df = result_df.unionAll(source_data_df)
# Insert the combined data into MAIN.NEW_RESULTED_TABLE
result_df.write.mode("append").saveAsTable("MAIN.NEW_RESULTED_TABLE")
# Stop the SparkSession
spark.stop()
Is there anything I'm not doing right in order to grab all the data from all the tables in which SOURCE_TYPE = 'ACTUAL'?