Read the latest S3 parquet files partitioned by date key using Polars

63 Views Asked by At

I have parquet files stored in s3 location which are partitioned by date key. Using Polars, i need to read the parquet file(s) from the latest date key folder. Here is an example of my s3 structure:

Amazon S3>Buckets>Bucket name>dev/>target/>refined/>STUDENTS.parquet/

Under STUDENTS.parquet, there are several folders partitioned by DATE_KEY i.e.,

enter image description here

Each of these folders contain parquet file(s).

Using Polars, i need to read the parquet file from the latest date key folder (DATE_KEY=2024-03-06/ in this example) into a Polars dataframe.

Do you think doing a descending sort on Name folder would be a way to achieve this?

Can someone please help me on his as i'm after Polars dataframe and not Pandas.

1

There are 1 best solutions below

2
Hericks On BEST ANSWER

I see two ways to achieve this.

  1. Scan the entire dataset into a LazyFrame and filter on the fly.
  2. Read the names of the folders in S3 and scan only the parquet files within the folder with latest date.

Option 1. Scan entire dataset into a pl.LazyFrame and filter on the fly.

import boto3
import polars as pl

# 
profile = "your-profile"
s3_path = "s3://path/to/your/dataset/*/*.parquet"

# create session and obtain credentials
session = boto3.session.Session(profile_name=profile)
credentials = session.get_credentials().get_frozen_credentials()

df = (
    # scan entire dataset
    pl.scan_parquet(
        s3_path,
        storage_options={  
            "aws_access_key_id": credentials.access_key,
            "aws_secret_access_key": credentials.secret_key,
            "aws_session_token": credentials.token,
            "aws_region": session.region_name,
        },
    )
    # filter for latest partition
    .filter(
        pl.col("DATE_KEY") == pl.col("DATE_KEY").max()
    )
    .collect()
)

Option 2. Obtain name of latest partition folder and read only corresponding data.

import polars as pl
import boto3
import os

# 
profile = "your-profile"
s3_bucket = 'bucket-name'
s3_prefix = "path/to/dataset"

# create session and obtain credentials
session = boto3.session.Session(profile_name=profile)
credentials = session.get_credentials().get_frozen_credentials()

# get path of latest partition
response = session.client("s3").list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix, Delimiter='/')
s3_prefix_latest = max(prefix["Prefix"] for prefix in response["CommonPrefixes"])
s3_path_latest = os.path.join("s3://", s3_bucket, s3_prefix_latest, "*.parquet")

# read data only from latest partition
df = pl.read_parquet(
    s3_path_latest,
    storage_options={  
        "aws_access_key_id": credentials.access_key,
        "aws_secret_access_key": credentials.secret_key,
        "aws_session_token": credentials.token,
        "aws_region": session.region_name,
    },
)