Dynamic tables via DMS -> Kinesis -> Iceberg (Transactional data lake)

42 Views Asked by At

Using a Glue Streaming job, receiving events from DMS via Kinesis how do I take those events and push them into a Iceberg table in S3?

The problems I'm running into is when I get the event from the Kinesis stream, it doesn't line up with whats in the Glue Catalog, some columns are missing, some columns are all in lowercase vs camel casing etc.

I believe this is because I read the event from Kinesis into a DynamicFrame first, then attempt to manipulate it into the shape I want, this feels very manual and not like the correct approach.

This is how I get the frame:

event_df = glueContext.create_data_frame.from_options(
    connection_type="kinesis",
    connection_options={
        "typeOfData": "kinesis",
        "streamARN": "arn:aws:kinesis:<region>:<account_id>:stream/my-stream",
        "classification": "json",
        "startingPosition": "earliest",
        "inferSchema": "true",
    },
    transformation_ctx="transform_ctx_kinesis_stream",
)

I'm then using a forEachBatch to process the event_df

glueContext.forEachBatch(
    frame=event_df,
    batch_function=process,
    options={
        "windowSize": "1 seconds",
        "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
    },
)

I then have a lot of work in the process function where I have to group the records inside the frame based on the table they belong to, as a Kinesis event comes in with many table changes

def process(data_frame: DataFrame, batchId):
    additional_options = {"write.parquet.compression-codec": "snappy", "enableUpdateCatalog": True}

    tables_collection = spark.catalog.listTables(glue_database)
    table_names_in_db = [table.name for table in tables_collection]

    if data_frame.count() > 0:
        dyf = DynamicFrame.fromDF(
            glueContext.add_ingestion_time_columns(data_frame, "hour"),
            glueContext,
            "from_data_frame",
        )

        # Create two new columns in the dataframe so we can build up a valid table name
        mapped_df_with_data = dyf.toDF().select('metadata.schema-name', 'metadata.table-name', 'data.*')

        # Create a list of all tables in this event
        database_tables = {}
        for row in mapped_df_with_data.collect():
            schema = row['schema-name'].replace("-", "_")
            table = row['table-name'].replace("-", "_")

            if schema in database_tables:
                database_tables[schema].add(table)
            else:
                database_tables[schema] = {table}

        # For each table we found, look up if it exists or not and decide what to do with it
        for database, tables in database_tables.items():
            for table in tables:
                print(f"Finding all tables where table-name = {table}")
                df = mapped_df_with_data.where(f.col('table-name') == table)
                df.show(n=1, vertical=True, truncate=False)

                # Drop columns we no longer need
                df = df.drop("schema-name")
                df = df.drop("table-name")

                complete_table_name = f"{database}_{table}"
                if complete_table_name in table_names_in_db:
                    append(additional_options, df, complete_table_name)
                else:
                    create(additional_options, df, complete_table_name)

This mostly works, except it will fail if an event comes in with extra columns that the created frame doesn't know about after it's created. Should I be manually updating the catalog? I did go down that route but ended up in a mess of schema conflicts and that doesn't seem like the right approach.

Ultimately what I'm trying to achieve is the lowest latency in events from DMS ending up in Iceberg - Is there a better tool/approach or some more code which can help me out?

All examples/blogs I've seen focus on writing to a single known table which is easy, but what happens when you have many dynamic tables?

0

There are 0 best solutions below