Zenml+Mlflow : Model not deploying on localhost

193 Views Asked by At
Step deployment_trigger has started.
Step deployment_trigger has finished in 0.051s.
Caching disabled explicitly for mlflow_model_deployer_step.
Step mlflow_model_deployer_step has started.
Updating an existing MLflow deployment service: MLFlowDeploymentService[680269df-ff43-44fd-b6d2-9643e4691755] (type: model-serving, flavor: mlflow)
MLflow deployment service started and reachable at:
    http://127.0.0.1:8000/invocations

Stopping existing services...
Step mlflow_model_deployer_step has finished in 13.595s.
Run continous_deployment_pipeline-2023_12_25-17_37_02_483102 has finished in 44.326s.
You can visualize your pipeline runs in the ZenML Dashboard. In order to try it locally, please run zenml up.
You can run:
     mlflow ui --backend-store-uri 
'https://dagshub.com/Adiii1436/CNN_MLFLOW.
mlflow
 ...to inspect your experiment runs within
the MLflow UI.
You can find your runs tracked within the 
`mlflow_example_pipeline` experiment. 
There you'll also be able to compare two 
or more runs.


No MLflow prediction server is currently 
running. The deployment pipeline must run 
first to train a model and deploy it. 
Execute the same command with the 
`--deploy` argument to deploy a model.

I dont know why it is "Stopping existing services" and preventing model to deploy. If it is stopping the previous services(deployed model) then it should deploy the current one. Previously I was doing it in WSL but later I installed Ubuntu on my system but got the same issue.

run_deployment.py

from pipelines.deployment_pipeline import continous_deployment_pipeline, inference_pipeline
import click 
from rich import print 
from zenml.integrations.mlflow.mlflow_utils import get_tracking_uri
from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
    MLFlowModelDeployer,
)
from zenml.integrations.mlflow.services import MLFlowDeploymentService
from typing import cast

DEPLOY = "deploy"
PREDICT = "predict"
DEPLOY_AND_PREDICT = "deploy_and_predict"

@click.command()
@click.option(
    "--config",
    "-c",
    type=click.Choice([DEPLOY, PREDICT, DEPLOY_AND_PREDICT]),
    default=DEPLOY_AND_PREDICT,
    help="Optionally you can choose to only run the deployment "
    "pipeline to train and deploy a model (`deploy`), or to "
    "only run a prediction against the deployed model "
    "('predict'). By default both will be run "
    "('deploy_and_predict').",
)

@click.option(
    "--min-accuracy",
    default=70,
    help="Minimum accuracy for the model to be deployed.",
)

def run_deployment(config:str, min_accuracy:float):
    mlflow_model_deployer_component = MLFlowModelDeployer.get_active_model_deployer()

    deploy = config == DEPLOY or config == DEPLOY_AND_PREDICT
    predict = config == PREDICT or config == DEPLOY_AND_PREDICT

    if deploy:
        continous_deployment_pipeline(
            min_accuracy=min_accuracy,
            workers=1,
            timeout=6000,
        )

    if predict:
        inference_pipeline(
            pipeline_name="continuous_deployment_pipeline",
            pipeline_step_name="mlflow_model_deployer_step",
        )   

    print(
        "You can run:\n "
        f"[italic green]    mlflow ui --backend-store-uri '{get_tracking_uri()}"
        "[/italic green]\n ...to inspect your experiment runs within the MLflow"
        " UI.\nYou can find your runs tracked within the "
        "`mlflow_example_pipeline` experiment. There you'll also be able to "
        "compare two or more runs.\n\n"
    )

    # fetch existing services with same pipeline name, step name and model name
    existing_services = mlflow_model_deployer_component.find_model_server(
        pipeline_name="continuous_deployment_pipeline",
        pipeline_step_name="mlflow_model_deployer_step",
        model_name="CNN_MODEL",
    )

    if existing_services:
        service = cast(MLFlowDeploymentService, existing_services[0])
        if service.is_running:
            print(
                f"The MLflow prediction server is running locally as a daemon "
                f"process service and accepts inference requests at:\n"
                f"    {service.prediction_url}\n"
                f"To stop the service, run "
                f"[italic green]`zenml model-deployer models delete "
                f"{str(service.uuid)}`[/italic green]."
            )
        elif service.is_failed:
            print(
                f"The MLflow prediction server is in a failed state:\n"
                f" Last state: '{service.status.state.value}'\n"
                f" Last error: '{service.status.last_error}'"
            )
    else:
        print(
            "No MLflow prediction server is currently running. The deployment "
            "pipeline must run first to train a model and deploy it. Execute "
            "the same command with the `--deploy` argument to deploy a model."
        ) 

if __name__ == "__main__":
    run_deployment()

deployment_pipeline.py

import torch
from steps.batch_data import batch_df
from steps.helper_functions import accuracy_fn
from steps.ingest_data import ingest_df
from zenml import pipeline, step
from zenml.config import DockerSettings
from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
    MLFlowModelDeployer,
)
from torch.utils.data import DataLoader
from zenml.constants import DEFAULT_SERVICE_START_STOP_TIMEOUT
from zenml.integrations.constants import MLFLOW
from zenml.integrations.mlflow.services import MLFlowDeploymentService
from zenml.integrations.mlflow.steps import mlflow_model_deployer_step
from zenml.steps import BaseParameters

from steps.initialize_model import initialize_model
from steps.train_test_model import train_test_model
from .utils import get_data_for_test

docker_settings = DockerSettings(required_integrations={MLFLOW})

@step(enable_cache=False)
def dynamic_importer() -> DataLoader:
    data = get_data_for_test()
    return data

class DeploymentTriggerConfig(BaseParameters):
    min_accuracy: float = 70.0

@step
def deployment_trigger(accuracy:float,config:DeploymentTriggerConfig)->bool:
    return accuracy>=config.min_accuracy    

class MLFlowDeploymentLoaderStepParameters(BaseParameters):
    pipeline_name: str
    step_name: str
    running: bool = True

@step(enable_cache=False)
def prediction_service_loader(
    pipeline_name: str,
    pipeline_step_name: str,
    running: bool = True,
    model_name: str = "CNN_MODEL",
) -> MLFlowDeploymentService:
    model_deployer = MLFlowModelDeployer.get_active_model_deployer()

    existing_services = model_deployer.find_model_server(
        pipeline_name=pipeline_name,
        pipeline_step_name=pipeline_step_name,
        model_name=model_name,
        running=running,
    )

    if not existing_services:
        raise RuntimeError(
            f"No MLflow prediction service deployed by the "
            f"{pipeline_step_name} step in the {pipeline_name} "
            f"pipeline for the '{model_name}' model is currently "
            f"running."
        )
    print(existing_services)
    print(type(existing_services))
    return existing_services[0]


@step
def predictor(
    service: MLFlowDeploymentService,
    data: DataLoader,
) -> float:
    
    test_acc = 0
    
    with torch.inference_mode():
        for X, y in data:
            test_pred = service(X)
            test_acc +=  accuracy_fn(y, test_pred.argmax(dim=1))
        
    test_acc /= len(data)
    return test_acc
    
    
@pipeline(enable_cache=False, settings={"docker":docker_settings})
def continous_deployment_pipeline(
    min_accuracy: float = 0,
    workers: int = 3,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
):
    train_data, test_data, classes = ingest_df()
    train_dataloader, test_dataloader = batch_df(train_data,test_data)
    model,model_path = initialize_model(class_names=classes, hidden_units=10)

    _, train_acc, _, _ = train_test_model(
        model_path=model_path,
        model=model, 
        train_dataloader=train_dataloader, 
        test_dataloader=test_dataloader,
        hidden_units=10,
        classes=classes
    )

    deployment_decision = deployment_trigger(train_acc)

    mlflow_model_deployer_step(
        model=model_path,
        deploy_decision=deployment_decision,
        workers=workers,
        timeout=timeout
    )


@pipeline(enable_cache=False, settings={"docker": docker_settings})
def inference_pipeline(pipeline_name: str, pipeline_step_name: str):
    batch_data = dynamic_importer()
    model_deployment_service = prediction_service_loader(
        pipeline_name=pipeline_name,
        pipeline_step_name=pipeline_step_name,
        running=False,
    )
    predictor(service=model_deployment_service, data=batch_data)
1

There are 1 best solutions below

0
Arslan Mehmood On

hy, I faced the same error today. There's a simple solution just after the service is stopped automatically, you have to explicitly start the service by

service.start(timeout=60)

write above code after the service = cast....