Aws Glue Batch processing using spark engine

111 Views Asked by At

This is my scenario.

I have a source database which is an aurora database with Postgresql engine. I have a table named payments. This table consists of millions of records, so at the end of every day I need to read the data and check for any payments that are past due date, if any payments are past due date, I need to mark the payment as "Overdue".

How can I achieve this in an optimized way using the AWS glue spark job? What are the AWS components that can be leveraged to achieve this requirement?

Thanks in advance!

Code:

from pyspark.sql import SparkSession


# SparkSession
spark = SparkSession.builder.config("spark.jars", "absolutepath/postgresql-42.7.3.jar") \
    .appName("PostgresApp").getOrCreate()



# PostgreSQL connection details
pg_host = "localhost"
pg_port = 5432
pg_database = "test"
pg_user = "test"
pg_password = "test"

# JDBC connection string
jdbc_url = f"jdbc:postgresql://{pg_host}:{pg_port}/{pg_database}?currentSchema=product"



df.show()  # Display the loaded DataFrame


update_query = """
    UPDATE product
    SET product = 'product2'
    WHERE productid= '1'
"""

print("SQL Query:", update_query)
spark.sql(update_query)

# Close connections to avoid resource leaks
spark.stop()
3

There are 3 best solutions below

2
VonC On BEST ANSWER

You would need to use AWS Glue to extract the data from your Aurora PostgreSQL payments table.
The awsglue python package should help: you have many examples in aws-samples/aws-glue-samples, and you can also include a DataDirect JDBC driver into DataFrame.

Aurora DB (PostgreSQL) ──┐
                         │
                         ├─> AWS Glue (ETL) ──> Process ──> S3 (Temporary Storage)
                         │
                         └─<──────────────────<──────────<─ AWS Lambda & RDS Data API

From there, filter the records with Spark to identify overdue payments and mark them accordingly. Save the processed data in an Amazon S3 bucket temporarily, and use AWS Lambda along with the RDS Data API (web-services interface to your Aurora DB cluster) to update the original payments table based on the data in S3.

import sys
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import current_date, col, lit
from pyspark.sql import SparkSession

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Reading data from Aurora
payments_df = spark.read.format("jdbc", 
                                url="jdbc:postgresql://<your-aurora-endpoint>:5432/yourdatabase", 
                                dbtable="payments",
                                properties={"user": "yourusername", "password": "yourpassword"})

# Identify overdue payments
overdue_payments_df = payments_df.filter(col("due_date") < current_date())

# Mark payments as Overdue
overdue_payments_marked_df = overdue_payments_df.withColumn("status", lit("Overdue"))

# Save processed data to S3
overdue_payments_marked_df.write.mode("overwrite").parquet("s3://your-bucket-name/overdue_payments/")

Use AWS Lambda, triggered either manually or on a schedule (via Amazon EventBridge, formerly CloudWatch Events), to read the processed data from S3 and update the Aurora database using the RDS Data API:

import boto3
import pandas as pd
import s3fs

def lambda_handler(event, context):
    # Access the processed data in S3
    fs = s3fs.S3FileSystem(anon=False)
    with fs.open('s3://your-bucket-name/overdue_payments/', mode='rb') as f:
        overdue_payments_df = pd.read_parquet(f)

    # Convert your DataFrame to SQL update statements

    # Execute SQL updates via RDS Data API
    rds_client = boto3.client('rds-data')
    # Example: update a single record
    rds_client.execute_statement(
        resourceArn='arn:aws:rds:region:account-id:cluster:your-cluster-id',
        secretArn='arn:aws:secretsmanager:region:account-id:secret:your-secret-name',
        database='yourdatabase',
        sql='UPDATE payments SET status = "Overdue" WHERE id = ?;',
        parameters=[{'name':'id', 'value':{'longValue': 123}}]
    )

Ananth's suggestion to directly update records in the PostgreSQL payments table using an SQL UPDATE statement is a good alternative, especially if you want transactional integrity and to minimize data movement. That would simplify the process by eliminating the need to temporarily store processed data in S3 and then using AWS Lambda for updates.

The AWS Glue Spark job would execute the SQL update statement directly against the Aurora PostgreSQL database from within the Spark script. Spark itself does not natively execute SQL UPDATE statements directly on a JDBC data source.
Instead, you can use the JDBC connection to execute raw SQL commands, but this requires managing the connection yourself within the Spark job, which is not typical Spark usage and may require additional error handling and performance considerations.

An alternative and more Spark-native approach would be to leverage AWS Glue's capability to run a job that triggers an AWS Lambda function, which in turn performs the SQL update operation.
That keeps the heavy lifting within AWS services optimized for these tasks. It is fairly similar to the first part of this answer.

For the direct approach, make sure your AWS Glue job has access to a PostgreSQL JDBC driver. That might involve uploading the driver to S3 and referencing it in your job's dependencies.
Then use, for instance, PySpark's JDBC capabilities to establish a connection to the database and execute an SQL update statement, as in this example.

import sys
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, col
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Database connection properties
database_url = "jdbc:postgresql://<your-aurora-endpoint>:5432/yourdatabase"
connection_properties = {
    "user": "yourusername",
    "password": "yourpassword",
    "driver": "org.postgresql.Driver"
}

# SQL to update the payments table
update_query = """
    UPDATE payments
    SET status = 'Overdue'
    WHERE due_date < CURRENT_DATE
"""

# Execute the update query using a JDBC connection
def execute_update_query():
    try:
        # Use Spark's JDBC to execute the update command
        df = spark.read.jdbc(url=database_url, table=f"({update_query}) as payments", properties=connection_properties)
        print("Update query executed successfully.")
    except Exception as e:
        print(f"Error executing update query: {e}")

# Execute the update function
execute_update_query()

Make sure your AWS Glue job has the necessary permissions to access the Aurora PostgreSQL database and execute updates. That typically involves configuring the database's security group to allow connections from AWS Glue and providing the appropriate IAM roles. That would execute the update without explicit transaction management. Depending on your use case, you may need to handle transactions more granularly, especially if executing multiple updates or if rollback functionality is required.


pyspark.errors.exceptions.captured.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view payments cannot be found.

You are trying to use Spark SQL's spark.sql() method for executing an SQL UPDATE statement, which is not supported directly by Spark SQL. Spark SQL is designed primarily for executing SQL queries that return a DataFrame representing the result set of a SELECT statement or operations like CREATE TABLE, DROP TABLE, etc., within the Spark SQL context. It does not support direct DML operations (INSERT, UPDATE, DELETE) on external databases like PostgreSQL through the spark.sql() method.

To execute an UPDATE statement on a PostgreSQL database from a Spark job, you would need to use a JDBC connection directly, bypassing Spark SQL for the UPDATE operation. That would involve using the JDBC API to execute your SQL commands, not through Spark SQL but by establishing a direct connection to your PostgreSQL database (using, for instance, psycopg2).

from pyspark.sql import SparkSession
import psycopg2  # Make sure psycopg2 is installed

# SparkSession
spark = SparkSession.builder.config("spark.jars", "absolutepath/postgresql-42.7.3.jar") \
    .appName("PostgresApp").getOrCreate()

# PostgreSQL connection details
pg_host = "localhost"
pg_port = 5432
pg_database = "test"
pg_user = "test"
pg_password = "test"

# Establish a direct JDBC connection to PostgreSQL
conn = psycopg2.connect(
    host=pg_host,
    port=pg_port,
    dbname=pg_database,
    user=pg_user,
    password=pg_password
)
conn.autocommit = True
cursor = conn.cursor()

# Execute the UPDATE command directly via psycopg2, not through Spark SQL
update_query = """
    UPDATE product
    SET product = 'product2'
    WHERE productid = '1'
"""
cursor.execute(update_query)
print(f"{cursor.rowcount} rows updated.")

# Cleanup: close the cursor and connection
cursor.close()
conn.close()

# Do not forget to stop SparkSession at the end of your application
spark.stop()

While the UPDATE operation is executed outside the Spark SQL context, you can still use Spark for distributed data processing tasks, including reading from and writing to PostgreSQL with .read.format("jdbc") and .write.format("jdbc").

0
Bogdan On

Do you really need to import all the data within Glue in order to update the status? If the logic is simple, I would only schedule a job to run a script like this on my database :

UPDATE payments SET status = "Overdue" where due_date < current_timestamp

You can do this within a glue job or lambda function, I would personally use Glue as I can add new features to the script if needed.

Otherwise, I would use PySpark JDBC to import data more efficient and would skip the s3 step as data can be exported directly back to Aurora.

0
sridhar5999 On

You are using AWS GLUE, I would suggest you to use Python Shell instead of SPARK, because python shell is much cheaper compared to SPARK. Your requirement is to update a certain field based on some condition. Spark doesn't support upsert. It can overwrite or append data. To perform upsert you need to find the change records and update using maybe psycopg2 execute_values which is the fastest method as far as I know.

UPDATE payments SET status = "Overdue" where due_date < now()

Here is the code of the python shell job. Here I have passed all the DB details as job parameters.

import sys
from awsglue.utils import getResolvedOptions # used to get the job parameters
import psycopg2

conn = psycopg2.connect(database=args['db_name'],
                      user=args['db_user'],
                      password=args['db_password'],
                      host=args['db_host'],
                      port=args['db_port'])
cur = conn.cursor()

update_query = ("""UPDATE payments SET status = "Overdue" where due_date < now()""")
cur.execute(update_query)
conn.commit()

Provide your DB details and then you can schedule it or call using a lambda function or from another job. If the update statement is slower try increasing the shared_memory of your DB.