Transfering the latest data from Redshift to dynamoDB by AWS Glue

400 Views Asked by At

I'm new to dynamoDB and AWS Glue and I'm trying to transfer data from Redshift Cluster to DynamoDB tables by using AWS Glue, but I want to keep only the most recent data from Cluster table.

As I understand, dropping the entire dynamoDB table and re-creating it is the logical idea. But if I do this, does that mean I need to create the dynamoDB tables by hand each time (?) The Glue jobs works weekly.

I tried Job Bookmarks, but the previous data was on the dynamoDB table:

enter image description here

An example of the Glue job, you can see in the code I'm trying to obtain the last snapshot_day:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame


def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(query)
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)


args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node Redshift Cluster
RedshiftCluster_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="dbname",
    redshift_tmp_dir=args["TempDir"],
    table_name="tablename",
    transformation_ctx="RedshiftCluster_node1",
)

# Script generated for node SQL
SqlQuery0 = """
select * from vendor_info
WHERE snapshot_day = (SELECT MAX(snapshot_day) FROM vendor_info)

"""
SQL_node1660806425146 = sparkSqlQuery(
    glueContext,
    query=SqlQuery0,
    mapping={"vendor_info": RedshiftCluster_node1},
    transformation_ctx="SQL_node1660806425146",
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=SQL_node1660806425146,
    mappings=[
        ("owning_buyer", "string", "owning_buyer", "string"),
        ("business_name", "string", "business_name", "string"),
        ("vendor_code", "string", "vendor_code", "string"),
        ("vendor_group_id", "int", "vendor_group_id", "int"),
        ("vendor_group_status_name", "string", "vendor_group_status_name", "string"),
        ("rand_id", "string", "rand_id", "string"),
        ("snapshot_day", "timestamp", "snapshot_day", "timestamp"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

# Script generated for node DynamoDB bucket
Datasink1 = glueContext.write_dynamic_frame_from_options(
    frame=ApplyMapping_node2,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.output.tableName": "VENDOR_INFO",
        "dynamodb.throughput.write.percent": "1.0"
    }
)
job.commit()

This is the dynamoDB table structure, snapshot_day is the global secondary index. Should I try to change the partition key, sort key, secondy index? Or is there way to drop/truncate dynamoDB tables in AWS Glue jobs dynamically before transfering the data?

enter image description here

Thank you

0

There are 0 best solutions below