Step deployment_trigger has started.
Step deployment_trigger has finished in 0.051s.
Caching disabled explicitly for mlflow_model_deployer_step.
Step mlflow_model_deployer_step has started.
Updating an existing MLflow deployment service: MLFlowDeploymentService[680269df-ff43-44fd-b6d2-9643e4691755] (type: model-serving, flavor: mlflow)
MLflow deployment service started and reachable at:
http://127.0.0.1:8000/invocations
Stopping existing services...
Step mlflow_model_deployer_step has finished in 13.595s.
Run continous_deployment_pipeline-2023_12_25-17_37_02_483102 has finished in 44.326s.
You can visualize your pipeline runs in the ZenML Dashboard. In order to try it locally, please run zenml up.
You can run:
mlflow ui --backend-store-uri
'https://dagshub.com/Adiii1436/CNN_MLFLOW.
mlflow
...to inspect your experiment runs within
the MLflow UI.
You can find your runs tracked within the
`mlflow_example_pipeline` experiment.
There you'll also be able to compare two
or more runs.
No MLflow prediction server is currently
running. The deployment pipeline must run
first to train a model and deploy it.
Execute the same command with the
`--deploy` argument to deploy a model.
I dont know why it is "Stopping existing services" and preventing model to deploy. If it is stopping the previous services(deployed model) then it should deploy the current one. Previously I was doing it in WSL but later I installed Ubuntu on my system but got the same issue.
run_deployment.py
from pipelines.deployment_pipeline import continous_deployment_pipeline, inference_pipeline
import click
from rich import print
from zenml.integrations.mlflow.mlflow_utils import get_tracking_uri
from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
MLFlowModelDeployer,
)
from zenml.integrations.mlflow.services import MLFlowDeploymentService
from typing import cast
DEPLOY = "deploy"
PREDICT = "predict"
DEPLOY_AND_PREDICT = "deploy_and_predict"
@click.command()
@click.option(
"--config",
"-c",
type=click.Choice([DEPLOY, PREDICT, DEPLOY_AND_PREDICT]),
default=DEPLOY_AND_PREDICT,
help="Optionally you can choose to only run the deployment "
"pipeline to train and deploy a model (`deploy`), or to "
"only run a prediction against the deployed model "
"('predict'). By default both will be run "
"('deploy_and_predict').",
)
@click.option(
"--min-accuracy",
default=70,
help="Minimum accuracy for the model to be deployed.",
)
def run_deployment(config:str, min_accuracy:float):
mlflow_model_deployer_component = MLFlowModelDeployer.get_active_model_deployer()
deploy = config == DEPLOY or config == DEPLOY_AND_PREDICT
predict = config == PREDICT or config == DEPLOY_AND_PREDICT
if deploy:
continous_deployment_pipeline(
min_accuracy=min_accuracy,
workers=1,
timeout=6000,
)
if predict:
inference_pipeline(
pipeline_name="continuous_deployment_pipeline",
pipeline_step_name="mlflow_model_deployer_step",
)
print(
"You can run:\n "
f"[italic green] mlflow ui --backend-store-uri '{get_tracking_uri()}"
"[/italic green]\n ...to inspect your experiment runs within the MLflow"
" UI.\nYou can find your runs tracked within the "
"`mlflow_example_pipeline` experiment. There you'll also be able to "
"compare two or more runs.\n\n"
)
# fetch existing services with same pipeline name, step name and model name
existing_services = mlflow_model_deployer_component.find_model_server(
pipeline_name="continuous_deployment_pipeline",
pipeline_step_name="mlflow_model_deployer_step",
model_name="CNN_MODEL",
)
if existing_services:
service = cast(MLFlowDeploymentService, existing_services[0])
if service.is_running:
print(
f"The MLflow prediction server is running locally as a daemon "
f"process service and accepts inference requests at:\n"
f" {service.prediction_url}\n"
f"To stop the service, run "
f"[italic green]`zenml model-deployer models delete "
f"{str(service.uuid)}`[/italic green]."
)
elif service.is_failed:
print(
f"The MLflow prediction server is in a failed state:\n"
f" Last state: '{service.status.state.value}'\n"
f" Last error: '{service.status.last_error}'"
)
else:
print(
"No MLflow prediction server is currently running. The deployment "
"pipeline must run first to train a model and deploy it. Execute "
"the same command with the `--deploy` argument to deploy a model."
)
if __name__ == "__main__":
run_deployment()
deployment_pipeline.py
import torch
from steps.batch_data import batch_df
from steps.helper_functions import accuracy_fn
from steps.ingest_data import ingest_df
from zenml import pipeline, step
from zenml.config import DockerSettings
from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (
MLFlowModelDeployer,
)
from torch.utils.data import DataLoader
from zenml.constants import DEFAULT_SERVICE_START_STOP_TIMEOUT
from zenml.integrations.constants import MLFLOW
from zenml.integrations.mlflow.services import MLFlowDeploymentService
from zenml.integrations.mlflow.steps import mlflow_model_deployer_step
from zenml.steps import BaseParameters
from steps.initialize_model import initialize_model
from steps.train_test_model import train_test_model
from .utils import get_data_for_test
docker_settings = DockerSettings(required_integrations={MLFLOW})
@step(enable_cache=False)
def dynamic_importer() -> DataLoader:
data = get_data_for_test()
return data
class DeploymentTriggerConfig(BaseParameters):
min_accuracy: float = 70.0
@step
def deployment_trigger(accuracy:float,config:DeploymentTriggerConfig)->bool:
return accuracy>=config.min_accuracy
class MLFlowDeploymentLoaderStepParameters(BaseParameters):
pipeline_name: str
step_name: str
running: bool = True
@step(enable_cache=False)
def prediction_service_loader(
pipeline_name: str,
pipeline_step_name: str,
running: bool = True,
model_name: str = "CNN_MODEL",
) -> MLFlowDeploymentService:
model_deployer = MLFlowModelDeployer.get_active_model_deployer()
existing_services = model_deployer.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=pipeline_step_name,
model_name=model_name,
running=running,
)
if not existing_services:
raise RuntimeError(
f"No MLflow prediction service deployed by the "
f"{pipeline_step_name} step in the {pipeline_name} "
f"pipeline for the '{model_name}' model is currently "
f"running."
)
print(existing_services)
print(type(existing_services))
return existing_services[0]
@step
def predictor(
service: MLFlowDeploymentService,
data: DataLoader,
) -> float:
test_acc = 0
with torch.inference_mode():
for X, y in data:
test_pred = service(X)
test_acc += accuracy_fn(y, test_pred.argmax(dim=1))
test_acc /= len(data)
return test_acc
@pipeline(enable_cache=False, settings={"docker":docker_settings})
def continous_deployment_pipeline(
min_accuracy: float = 0,
workers: int = 3,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
):
train_data, test_data, classes = ingest_df()
train_dataloader, test_dataloader = batch_df(train_data,test_data)
model,model_path = initialize_model(class_names=classes, hidden_units=10)
_, train_acc, _, _ = train_test_model(
model_path=model_path,
model=model,
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
hidden_units=10,
classes=classes
)
deployment_decision = deployment_trigger(train_acc)
mlflow_model_deployer_step(
model=model_path,
deploy_decision=deployment_decision,
workers=workers,
timeout=timeout
)
@pipeline(enable_cache=False, settings={"docker": docker_settings})
def inference_pipeline(pipeline_name: str, pipeline_step_name: str):
batch_data = dynamic_importer()
model_deployment_service = prediction_service_loader(
pipeline_name=pipeline_name,
pipeline_step_name=pipeline_step_name,
running=False,
)
predictor(service=model_deployment_service, data=batch_data)
hy, I faced the same error today. There's a simple solution just after the service is stopped automatically, you have to explicitly start the service by
write above code after the service = cast....