Convert MLtable object into a pyspark dataframe

63 Views Asked by At

I have a data asset in Azure Machine Learning. I want to convert it into a Pyspark dataframe. In the consume tab of the data asset, I get the code to convert it into a Pandas dataframe. However this data is huge (1 Tb+) so it will not fit into a Pandas dataframe.

This is the code I am using:

import mltable
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

ml_client = MLClient.from_config(credential=DefaultAzureCredential())
data_asset = ml_client.data.get("data_asset_name", version="1")

tbl = mltable.load(data_asset.path)

df = tbl.to_pandas_dataframe()
df

The function

to_pandas_dataframe()

Converts the MLTable into a pandas dataframe.

Is there any function/way that I would be able to convert it into a pyspark dataframe?

1

There are 1 best solutions below

1
JayashankarGS On

You can pass the mltable data asset directly to Spark jobs; there is no need to create a Spark DataFrame. Here is the sample code:

from azure.ai.ml import MLClient, spark, Input, Output
from azure.identity import DefaultAzureCredential

ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

spark_job = spark(
    display_name="Titanic-Spark-Job-SDK-6",
    code="./src",
    entry={"file": "titanic.py"},
    driver_cores=1,
    driver_memory="2g",
    executor_cores=2,
    executor_memory="2g",
    executor_instances=2,
    resources={
        "instance_type": "Standard_E4s_v3",
        "runtime_version": "3.2.0",
    },
    inputs={
        "titanic_data": Input(
            type="mltable",
            path=data_asset.id,
            mode="direct"
        ),
    },
    outputs={
        "wrangled_data": Output(
            type="uri_folder",
            path="azureml://datastores/workspaceblobstore/paths/data/wrangled/",
            mode="direct",
        ),
    },
    args="--titanic_data ${{inputs.titanic_data}} --wrangled_data ${{outputs.wrangled_data}}",
)

returned_spark_job = ml_client.jobs.create_or_update(spark_job)

# Wait until the job completes
ml_client.jobs.stream(returned_spark_job.name)

Here, I have given the input type as mltable and path data asset id.

The output is written to the wrangled folder.

enter image description here

If you still want to create pandas dataframe only way is using pandas dataframe in serverless spark compute.

spark.createDataFrame(pandas_dataframe)

Refer this github for more information.