Skip to content

warlock601/MLFlow

Repository files navigation

MLFlow

Repo for end-to-end MLOps workflow using MLFlow

Prerequisites:

  • Upgrade pip
pip install --upgrade pip
  • Create & Activate a Virtual env.
python3 -m venv mlflow-env
source mlflow-env/bin/activate
  • Install ipykernel and other libraries such as pandas, scikit-learn etc
pip install ipykernel
  • Install MLflow
pip install mlflow

This installs: MLflow Tracking, MLflow Projects, MLflow Models, MLflow CLI / UI Can start MLflow UI on your local machine:

mlflow ui
  • There's another way using Conda. First install Conda. Then crete a virtual env using conda and then create a requirements.txt file and add "mlflow" to it.
conda create -p venv python==3.10
conda activate venv/                                // to activate the virtual environment
pip install -r requirements.txt                     // mention mlflow in requirements.txt to install it
  • Then in the console, type this command to activate the MLFlow tracking server or the MLFlow UI
mlflow ui
image

MLflow UI will be accessible at: http://127.0.0.1:5000

MLFlow Tracking server: Whenever we create any project, we can track that project and all the other projects, for that we need a server that has all those MLflow capabilities. We can see all the Runs, Evaluation, Traces etc.

Steps to create Experiments, Plots in MLFlow:

  • First create a notebook using .ipynb extension to run the commands & Check whether everything is running fine or not.
import mlflow                                                           ##do this only after "mlflow ui"     
mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment("Check localhost connection")                     ##just to see whether everything is working fine

After we run this set_experiment, we can see in the MLFlow UI that a new experiment is added.
image

  • Now check whether we're abe to store any parameters in my server or not or whether we're able to track any parameters or not.
with mlflow.start_run():
  mlflow.log_metric("test",1)
  mlflow.log_metric("vivek",2)

We can put ML experiment's loss value, accuracy value, training accuracy, test accuracy etc. over here.
After running these, we can click on the experiment name "Check localhost connection" > Runs. We can see these in the metrics. image

We can also compare multiple experiments as MLFlow also provides visualizations such as Scatter plot, box plot etc... image


Since we are developing an end-to-end ML project so we will need other libraries as well such as scikit-learn, pandas, numpy...etc so add these all in requirements.txt

Training the model:

We use MLFlow here to track various parameters and metrics. We'll train a ML model and while we're training with various parameters, each & every parameter of that specific experiment will be logged.

  • In the requirements.txt add these:
mlflow
scikit-learn
pandas
numpy

Then run the command:

pip install -r requirements.txt
  • Import pandas and datasets from Scikit-learn beacause we're going to consider some datasets that are available in the scikit-learn library. We're going to use Logistic Regression ML algo so that will also be imported from sklearn.
from sklearn import datasets
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
import mlflow
from mlflow.models import infer_signature
import pandas as pd
  • Set the tracking uri
mlflow.set_tracking_uri(uri="http://127.0.0.1:5000")
  • Load the dataset. We're going to use "Iris" dataset which is already available insid ethe dataset library that we have imported. In Iris dataset we have 4 features: Petal length, petal width, sepal length, sepal width and based on that our output category will be like a flower. Three output categories: 012 and so these three categories will try to predict based on the input feature.
X,y=datasets.load_iris(return_X_y=True)
  • Split the data into training and test sets. Test size is set to 0.20 which means 20% of data will be our test data.
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.20)  
params = {"penalty":"12", "solver":"lbfgs", "max_iter":1000, "multi_class": "auto", "random_state":8888}

Here we are considering that these parameters are best for our model.

  • Trin the model.
lr=LogisticRegression(**params)
lr.fit(X_train,y_train)

we'll get something like this: image

  • Prediction on the test set. The values that we get from y_pred are based on the inputs that we provided using X_test
y_pred=lr.predict(X_test)
y_pred                              ## to print y_pred
  • Calculate Accuracy.
accuracy=accuracy_score(y_test,y_pred)
print(accuracy)
image
  • MLFlow Tracking. Start the MLFlow UI and then run this block of code.
mlflow.set_tracking_uri(uri="http://127.0.0.1:5000")

mlflow.set_experiment("MLFlow Quickstart")                             # create a new experiment

with mlflow.start_run():                                               # start the MLFlow run
  mlflow.log_params(params)                                            # log the hyperparameters
  mlflow.log_metric("acccuracy",accuracy)                              # log the accuracy metrics
  mlflow.set_tag("Training Info","basic LR model for iris data")       # set a tag that we can use to remind ourselves what this run was for
  signature=infer_signature(X_train,lr.predict(X_train))

  # log the model
  model_info=mlflow.sklearn.log_model(
      sk_model=lr,                                                     # lr is the model name
      artifact_path="iris_model",
      signature=signature,
      input_example=X_train,
      registered_model_name="tracking-quickstart"
  )                                
  

infer_signature() is used to infer model signature form the training data(input), model predictions(output) and parameters(for inference). The signature represents model input and output as data frames with named columns. This method will raise an exception if the user data contains incomptible types. In "mlruns" folder we can see all the artifacts, metrics like accuracy, parameters like max_iter...

Difference between Inference & Training?
Inference is the process of feeding new, unseen data into a trained ML model to get an output. It is what happens after training.
Example: You train a model on thousands of cat/dog images → that’s training.
Later, you give it a new image and ask “Is this a cat or a dog?” → that’s inference.

Inferencing & Validating the model:

  • We're gonna need the Model uri so first we'll fetch that. Inside this artifact only I'll be able to find my pickle file and all which is being referenced by the MLFlow UI.
    The model is logged with an input example. MLFlow converts it into the serving payload format for the deployed model endpoint, and saves it to "serving_input_payload.json"
model_info.model_uri                  # it can be model_uri or uri depending on whatever we specified.
  • Import he model and provide the input. Input will be given in the form of input key which will be a list of list values.
## Inferencing & Validation

from mlflow.models import validate_serving_input

model_uri = 'models:/m-5d38b7f1e310456396c370e662a99571'

serving_payload = """{
    "inputs": [
        [
            5.7,
            3.8,
            1.7,
            0.3
        ],
        [
            4.8,
            3.4,
            1.6,
            0.2
        ],
        [
            5.6,
            2.9,
            3.6,
            1.3
        ],
        [
            5.4,
            3.7,
            1.5,
            0.2
        ],
        [
            6.7,
            3.3,
            5.7,
            2.5
        ],
        [
            6.7,
            3.3,
            5.7,
            2.5 
        ],
        [
            6.7,
            3.3,
            5.7,
            2.5 
        ],
        [
            6.7,
            3.3,
            5.7,
            2.5 
        ],
        [
            5.4,
            3.7,
            1.5,
            0.2
        ],
        [
            5.4,
            3.7,
            1.5,
            0.2
        ],
        [
            5.7,
            3.8,
            1.7,
            0.3
        ],
        [
            5.7,
            3.8,
            1.7,
            0.3
        ],
        [
            5.7,
            3.8,
            1.7,
            0.3
        ]
    ]
}"""

# Validate the serving payload works on the model
validate_serving_input(model_uri, serving_payload)

Once we execute this, we will get an output like this:
image
This is how we validate ansd see whther for a new test data everything if working fine or not.

  • There's another way of doing it in the form of a generic Python function. Best thing about this is that these are generic things so you don't have to set the input and output each & every time. Uese a generic format to load a specific model so that it is generic to all the libraries we will be using.
## Load the model back for prediction as a generic Python function model

loaded_model=mlflow.pyfunc.load_model(model_info.model_uri)
predictions=loaded_model.predict(X_test)

iris_features_name=datasets.load_iris().feature_names

result=pd.DataFrame(X_test,columns=iris_features_name)
result["actual_class"]=y_test
result["predicted_class"]=predictions

## show the result
result

## Show the top 5 rows
result[:5]

Result will be something like this: image

Model Registry

The MLFlow Registry component is a centralized model store, set of APIs, and UI to collaboratively manage the full lifecycle of an MLFlow model. It provides model lineage, model versioning, model aliasing, model tagging and annotations.
The best use case of model registry is that it acts like a store.

How to register a model after we validate things.

Whenever we are solving a ML problem statement, we should not directly register the model name. Because we need to validte whether this is the best model or not. If it is the best model, then only we should validate and register that particular model name.
So how to register a model in the later stages after we've validated things?

House Price Prediction with MLFlow

We'll develop an end-to-end MLOps workflow for a House Price Prediction Model.
we will:

  • Run a hyperparameter tuning while training a model
  • Log every Hyperparameter and metrics in the MLFlow UI
  • Compare the results of the various runs in the MLFlow UI
  • Choose the best run and register it as a model

Steps:

  • Import Libraries. With GridSearchCV we will poerform hyperparameter tuning. And here we are taking "california_housing_dataset"
import pandas as pd
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split,GridSearchCV
  • Prepare this data in a proper way so that we can refer it.
data=pd.DataFrame(housing.data, columns=housing.feature_names)
data.head()

using data.head() we can see what are all the input features and values. image

  • We will create a new Output feature where we specify where the value for this feature is available. For eg: "target" variable here.
    Value is the Output feature and rest all are input features.
data['Price']=housing.target

data.head(10)                     # displaying top 10 values
  • Now we divide the data into Independent & Dependent features.
## Independent & Dependent features
## X denotes Independent
## y denotes Dependent

X=data.drop(columns=["Price"])
y=data["Price"]

Took the parameters from the link above and modified the values in the "param_grid".

Before we start the MLFlow experiments, we need to perform the hyperparameter tuning and why we did not performed it before is because we need the "param_grid". So in the notebook, a code block was created for hyperparameter tuning before the split data code block.

In Hyperparameter tuning, we'll create a variable called grid_search in which we will use estimator which is nothing but RandomForest, param_grid, Crossvalidation "cv" like how many different types of crossvalidations we really need to do, n_jobs to specify cores of CPU and if we want to use all the cores of our CPU then n_jobs=-1, verbose, scoring parameter (we can get it from here: https://scikit-learn.org/stable/modules/model_evaluation.html

Evaluation is like it will try to predict the test data and try to find out how much mean squared error we're gonna get.

Best estimator means what all best parameters are used from the used hyperparameters

# this will run before the code-block below
# hyperparametertuning using grid searchcv
def hyperparamater_tuning(X_train,y_train,param_grid):
    rf=RandomForestRegressor()
    grid_search=GridSearchCV(estimator=rf,param_grid=param_grid,cv=3,n_jobs=-1,verbose=2,scoring="neg_mean_squared_error")
    grid_search.fit(X_train,y_train)
    return grid_search
# Split the data into training and test data
# X_train will have training input features
# X_test will have test input features
# y_train is output
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.20)

# infer_signature is used so that we set our schema with respect to our input and output
from mlflow.models import infer_signature
signature=infer_signature(X_train,y_train)

# Define the hyperparameter grid
param_grid= {
    'n_estimators':[100,200],
    'max_depth':[5,10,None], 
    'min_samples_split':[2,5],
    'min_samples_leaf':[1,2]
}
# start with MLFlow experiments

with mlflow.start_run():

    # perform hyperparameter tuning
    grid_search=hyperparamater_tuning(X_train,y_train,param_grid)

    # get the best model
    best_model=grid_search.best_estimator_

    # Evaluate the best model
    y_pred=best_model.predict(X_test)
    mse=mean_squared_error(y_test,y_pred)

    # log best parameters and metrics
    mlflow.log_param("best_n_estimators",grid_search.best_params_['n_estimators'])
    mlflow.log_param("best_max_depth",grid_search.best_params_['max_depth'])
    mlflow.log_param("best_min_samples_split",grid_search.best_params_['min_samples_split'])
    mlflow.log_param("best_min_samples_leaf",grid_search.best_params_['min_samples_leaf'])
    mlflow.log_metric("mse",mse)

    # Tracking url
    mlflow.set_tracking_uri(uri="http://127.0.0.1:5000")
    tracking_url_type_store=urlparse(mlflow.get_tracking_uri()).scheme

    # If it is a file, we'll just go ahead and setup a signature but if it is a remote server tracking uri, 
    # then we can directly register that particular model based on the best model that we've got.
    if tracking_url_type_store !='file':
        mlflow.sklearn.log_model(best_model,"model",registered_model_name="Best RandomForest model")
    else: 
        mlflow.sklearn.log_model(best_model,"model",signature=signature)
    
    print(f"Best Hyperparameters: {grid_search.best_params_}")
    print(f"Mean Squared Error: {mse}")

This training will take time based on the hyperparameter tuning that we're going to do. image image image

About

Repo for end-to-end MLOps workflow using MLFlow

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors