How do I grab all data from multiple tables found in a certain table in PySpark?

270 Views Asked by At

I am using pyspark/SQL. I have a table (MAIN_TABLE) that contains THREE columns:

DATABASE_NAME
TABLE_NAME
SOURCE_TYPE

I would like to get all the data from the actual database and table found under the main tables in the columns DATABASE_NAME and TABLE_NAME. However, I would only like to grab the data from the tables that have a SOURCE_TYPE = 'STANDARD' anything else, should not grab.

I need basically a union of the data of all the tables found under MAIN_TABLE where SOURCE_TYPE = 'STANDARD' and they meet certain conditions. I tried running, but its not grabbing the data found under all the tables found under MAIN_TABLE that have SOURCE_TYPE = 'STANDARD'. Does something look like I'm missing?

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Create a SparkSession
spark = SparkSession.builder.appName("InsertData").getOrCreate()

# Filter the tables where SOURCE_TYPE = 'STANDARD'
config_df = spark.table("MAIN_TABLE").filter("SOURCE_TYPE = 'STANDARD'")

# Initialize an empty DataFrame to store the result
result_df = None

# Loop through the filtered tables
for row in config_df.collect():
    database_name = row["database_name"]
    table_name = row["table_name"]

    # Generate a dynamic SQL query to select data from the source table
    sql_query = f"""
        SELECT
            header.profile,
            attributes.id,
            header.location,
            'SOURCE_TYPE' as source_type,
            header.actionname as actionname,
            transform.date,
            header.ip,
            header.country,
            '{database_name}' as source_database_name,
            '{table_name}' as source_event_name
        FROM {database_name}.{table_name}
        """

    # Execute the SQL query and create a DataFrame
    source_data_df = spark.sql(sql_query)

    # Union the source_data_df with the result_df
    if result_df is None:
        result_df = source_data_df
    else:
        result_df = result_df.unionAll(source_data_df)

# Insert the combined data into MAIN.NEW_RESULTED_TABLE
result_df.write.mode("append").saveAsTable("MAIN.NEW_RESULTED_TABLE")

# Stop the SparkSession
spark.stop()

Is there anything I'm not doing right in order to grab all the data from all the tables in which SOURCE_TYPE = 'ACTUAL'?

0

There are 0 best solutions below