This guide provides an in-depth exploration of MLflow, a platform for managing the complete machine learning lifecycle. We'll cover essential concepts, practical implementation examples, and best practices for data scientists and ML engineers.

Table of Contents


Introduction to MLflow

MLflow is an open-source platform designed to manage the machine learning lifecycle, including experimentation, reproducibility, deployment, and a central model registry. Created by Databricks in 2018, it addresses common challenges in machine learning workflows, such as tracking experiments, packaging code into reproducible runs, and sharing and deploying models.

MLflow works with any machine learning library and language (with primary support for Python), and can be used in any environment, from local machines to cloud platforms.

The key problems MLflow solves include:

  • Experiment tracking: Keeping track of parameters, metrics, and artifacts across multiple experiments
  • Reproducibility: Packaging ML code in a way that it can be reliably rerun
  • Model management: Versioning, staging, and deploying models
  • Collaborative workflows: Enabling teams to share experiments and models

Setting Up MLflow

Let’s start with installation and basic setup.

Installation

pip install mlflow

For additional features:

# With SQL storage support
pip install mlflow[sqlalchemy]
 
# With AWS S3 support
pip install mlflow[aws]
 
# With Azure support
pip install mlflow[azure]
 
# With Google Cloud support
pip install mlflow[gcp]

Basic Configuration

MLflow can be configured using environment variables or directly in your Python code:

import os
 
# Configure tracking URI using environment variables
os.environ["MLFLOW_TRACKING_URI"] = "http://localhost:5000"
 
# Or configure in code
import mlflow
mlflow.set_tracking_uri("http://localhost:5000")

Starting the Tracking Server

mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./artifacts --host 0.0.0.0 --port 5000

For production environments, consider using more robust database backends like PostgreSQL or MySQL, and cloud storage for artifacts.

MLflow Components

MLflow consists of four main components, each addressing a different aspect of the ML lifecycle.

MLflow Tracking

MLflow Tracking is a logging API that records parameters, code versions, metrics, and output files when running machine learning code. It works equally well with any ML library and can log results to local files or to a server, for sharing across a team.

Key Concepts

  • Runs: Individual executions of your code
  • Experiments: Groups of runs
  • Parameters: Key-value input parameters for your runs
  • Metrics: Numerical values to track (e.g., accuracy, loss)
  • Artifacts: Files associated with a run (e.g., models, plots)

Basic Usage

import mlflow
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
 
# Load data
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Set experiment name
mlflow.set_experiment("iris_classification")
 
# Define hyperparameters to try
regularization_params = [0.01, 0.1, 1.0]
 
for reg_param in regularization_params:
    # Start a new run
    with mlflow.start_run(run_name=f"logreg_C={reg_param}"):
        # Log the hyperparameter
        mlflow.log_param("C", reg_param)
        
        # Train model
        model = LogisticRegression(C=reg_param, solver="lbfgs", max_iter=1000, multi_class="auto")
        model.fit(X_train, y_train)
        
        # Evaluate model
        y_pred = model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        
        # Log metrics
        mlflow.log_metric("accuracy", accuracy)
        
        # Log model
        mlflow.sklearn.log_model(model, "model")
        
        # Log a simple plot as an artifact
        import matplotlib.pyplot as plt
        plt.figure()
        plt.scatter(X[:, 0], X[:, 1], c=y, cmap='viridis')
        plt.title(f'Iris Classification (C={reg_param})')
        plt.savefig("scatter_plot.png")
        mlflow.log_artifact("scatter_plot.png")
        
        print(f"Logged model with C={reg_param}, accuracy={accuracy}")
 

Viewing Results

After running experiments, you can view them using the MLflow UI:

mlflow ui

Or query programmatically:

from mlflow.tracking import MlflowClient
 
client = MlflowClient()
experiment = client.get_experiment_by_name("iris_classification")
runs = client.search_runs(experiment_ids=[experiment.experiment_id])
 
for run in runs:
    print(f"Run ID: {run.info.run_id}")
    print(f"C parameter: {run.data.params['C']}")
    print(f"Accuracy: {run.data.metrics['accuracy']}")
    print("-" * 40)

MLflow Projects

MLflow Projects provide a standard format for packaging ML code to make it reusable and reproducible. A project is defined by an MLproject file (in YAML format) that specifies dependencies, entry points, and parameters.

Project Structure

my_project/
β”œβ”€β”€ MLproject
β”œβ”€β”€ conda.yaml
└── train.py

MLproject File

name: iris_classification
 
conda_env: conda.yaml
 
entry_points:
  main:
    parameters:
      C: {type: float, default: 1.0}
      max_iter: {type: int, default: 1000}
      test_size: {type: float, default: 0.2}
    command: "python train.py --C {C} --max_iter {max_iter} --test_size {test_size}"

Conda Environment File (conda.yaml)

name: iris_proj
channels:
  - defaults
  - conda-forge
dependencies:
  - python=3.8
  - pip
  - scikit-learn=1.0
  - matplotlib
  - pip:
    - mlflow>=2.0.0

Running Projects

# Run locally
mlflow run . -P C=0.5
 
# Run from GitHub
mlflow run https://github.com/username/iris_project -P C=0.5

MLflow Models

MLflow Models provide a standard format for packaging machine learning models that can be used in a variety of downstream tools. MLflow includes built-in support for many model types and frameworks.

Saving Models

import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
 
# Load data
diabetes = load_diabetes()
X = pd.DataFrame(diabetes.data, columns=diabetes.feature_names)
y = diabetes.target
 
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Train model
model = RandomForestRegressor(n_estimators=100, max_depth=6, random_state=42)
model.fit(X_train, y_train)
 
# Log model with signature and input example
signature = mlflow.models.infer_signature(X_train, model.predict(X_train))
input_example = X_train.iloc[:5]
 
with mlflow.start_run() as run:
    mlflow.sklearn.log_model(
        model, 
        "random_forest_model",
        signature=signature,
        input_example=input_example
    )
    
    # Get the logged model URI
    model_uri = f"runs:/{run.info.run_id}/random_forest_model"
    print(f"Model saved at: {model_uri}")
    
# Load the model
loaded_model = mlflow.sklearn.load_model(model_uri)
 
# Make predictions
predictions = loaded_model.predict(X_test)
print(f"Predictions: {predictions[:5]}")
 

Model Flavors

MLflow supports multiple model β€œflavors,” enabling the same model to be used with different downstream tools:

  • Python function flavor (universal)
  • Framework-specific flavors (scikit-learn, TensorFlow, PyTorch, etc.)
  • R function flavors
  • ONNX flavor
  • Custom flavors

Serving Models

# Start a local REST server
mlflow models serve -m "runs:/your-run-id/model" -p 5001

Then make predictions:

import requests
import pandas as pd
import json
 
# Prepare data
data = X_test.iloc[:5].values.tolist()
payload = {"instances": data}
 
# Make prediction request
response = requests.post("http://localhost:5001/invocations", 
                        json=payload,
                        headers={"Content-Type": "application/json"})
 
predictions = json.loads(response.text)
print(f"Predictions: {predictions}")

MLflow Registry

The MLflow Model Registry is a centralized repository for managing the full lifecycle of MLflow Models. It provides model lineage, versioning, stage transitions, and annotations.

Key Concepts

  • Registered Model: A logical collection of models with the same name
  • Model Version: A specific instance of a registered model
  • Model Stage: The deployment stage of the model (e.g., Staging, Production)
  • Aliases: Named pointers to specific model versions
  • Annotations: Key-value pairs for adding metadata to models

Using the Model Registry

import mlflow
from mlflow.tracking import MlflowClient
 
# Initialize client
client = MlflowClient()
 
# Register the model
model_name = "diabetes_predictor"
run_id = "your-run-id"  # Replace with actual run ID
model_uri = f"runs:/{run_id}/random_forest_model"
 
# Register the model if it doesn't exist
try:
    registered_model = client.get_registered_model(model_name)
    print(f"Model {model_name} already exists")
except:
    registered_model = client.create_registered_model(model_name)
    print(f"Created new model: {model_name}")
 
# Add a new version to the model
model_version = client.create_model_version(
    name=model_name,
    source=model_uri,
    run_id=run_id
)
print(f"Created version {model_version.version} of model {model_name}")
 
# Transition the model to staging
client.transition_model_version_stage(
    name=model_name,
    version=model_version.version,
    stage="Staging"
)
print(f"Transitioned model {model_name} version {model_version.version} to Staging")
 
# Add metadata
client.update_model_version(
    name=model_name,
    version=model_version.version,
    description="Random Forest model trained on diabetes dataset"
)
 
# Set model alias
client.set_registered_model_alias(
    name=model_name,
    alias="latest",
    version=model_version.version
)
print(f"Set alias 'latest' to version {model_version.version}")
 
# Load the model by alias
model_uri = f"models:/{model_name}@latest"
model = mlflow.sklearn.load_model(model_uri)
print(f"Loaded model from {model_uri}")
 

Practical MLflow Python Programming

Let’s explore more practical examples of using MLflow in real-world scenarios.

End-to-End ML Pipeline with MLflow

import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import load_breast_cancer
 
# Set experiment
mlflow.set_experiment("breast_cancer_classification")
 
# Load data
data = load_breast_cancer()
X = pd.DataFrame(data.data, columns=data.feature_names)
y = pd.Series(data.target)
 
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Define preprocessing and model pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(random_state=42))
])
 
# Define parameter grid
param_grid = {
    'classifier__n_estimators': [50, 100],
    'classifier__max_depth': [5, 10, 15],
    'classifier__min_samples_split': [2, 5]
}
 
# Create grid search
grid_search = GridSearchCV(
    pipeline,
    param_grid=param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1
)
 
# Start MLflow run
with mlflow.start_run(run_name="rf_grid_search") as run:
    # Log dataset info
    mlflow.log_param("dataset_size", len(X))
    mlflow.log_param("train_size", len(X_train))
    mlflow.log_param("test_size", len(X_test))
    mlflow.log_param("feature_count", X.shape[1])
    
    # Log data summary statistics
    for col in X.columns:
        mlflow.log_param(f"feature_mean_{col}", X[col].mean())
        mlflow.log_param(f"feature_std_{col}", X[col].std())
    
    # Log parameter grid
    for param_name, param_values in param_grid.items():
        mlflow.log_param(param_name, str(param_values))
    
    # Fit grid search
    print("Training models with grid search...")
    grid_search.fit(X_train, y_train)
    
    # Log best parameters
    best_params = grid_search.best_params_
    for param, value in best_params.items():
        mlflow.log_param(f"best_{param}", value)
    
    # Get best model
    best_model = grid_search.best_estimator_
    
    # Make predictions
    y_pred = best_model.predict(X_test)
    y_prob = best_model.predict_proba(X_test)[:, 1]
    
    # Calculate metrics
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    roc_auc = roc_auc_score(y_test, y_prob)
    
    # Log metrics
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    mlflow.log_metric("f1_score", f1)
    mlflow.log_metric("roc_auc", roc_auc)
    
    # Log cross-validation results
    cv_results = grid_search.cv_results_
    for i, (params, mean_score, std_score) in enumerate(zip(
            cv_results['params'],
            cv_results['mean_test_score'],
            cv_results['std_test_score'])):
        run_name = '_'.join(f"{k.split('__')[-1]}_{v}" for k, v in params.items())
        mlflow.log_metric(f"cv_mean_{run_name}", mean_score)
        mlflow.log_metric(f"cv_std_{run_name}", std_score)
    
    # Create feature importance plot
    rf_model = best_model.named_steps['classifier']
    feature_names = X.columns
    importances = rf_model.feature_importances_
    indices = np.argsort(importances)[::-1]
    
    plt.figure(figsize=(12, 8))
    plt.title('Feature Importances')
    plt.bar(range(len(indices[:15])), importances[indices[:15]], align='center')
    plt.xticks(range(len(indices[:15])), [feature_names[i] for i in indices[:15]], rotation=90)
    plt.tight_layout()
    plt.savefig("feature_importances.png")
    mlflow.log_artifact("feature_importances.png")
    
    # Create confusion matrix
    from sklearn.metrics import confusion_matrix
    cm = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.savefig("confusion_matrix.png")
    mlflow.log_artifact("confusion_matrix.png")
    
    # Create ROC curve
    from sklearn.metrics import roc_curve
    fpr, tpr, _ = roc_curve(y_test, y_prob)
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, label=f'ROC (AUC = {roc_auc:.3f})')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend()
    plt.savefig("roc_curve.png")
    mlflow.log_artifact("roc_curve.png")
    
    # Log model with signature and input example
    signature = mlflow.models.infer_signature(X_train, best_model.predict(X_train), best_model.predict_proba(X_train))
    mlflow.sklearn.log_model(
        best_model, 
        "best_model",
        signature=signature,
        input_example=X_train.iloc[:5]
    )
    
    print(f"Best model: {best_params}")
    print(f"Test accuracy: {accuracy:.4f}")
    print(f"Run ID: {run.info.run_id}")
    
    # Register the model
    client = mlflow.tracking.MlflowClient()
    model_name = "breast_cancer_classifier"
    
    try:
        client.create_registered_model(model_name)
    except:
        pass
    
    model_details = mlflow.register_model(
        f"runs:/{run.info.run_id}/best_model",
        model_name
    )
    
    print(f"Registered model version: {model_details.version}")
 

Hyperparameter Tuning with MLflow

import mlflow
import numpy as np
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.datasets import load_boston
import optuna
import joblib
import matplotlib.pyplot as plt
 
# Load dataset
boston = load_boston()
X, y = boston.data, boston.target
 
# Set experiment
mlflow.set_experiment("boston_housing_optuna")
 
# Define objective function for Optuna
def objective(trial):
    # Define hyperparameters to optimize
    params = {
        'n_estimators': trial.suggest_int('n_estimators', 50, 500),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
        'max_depth': trial.suggest_int('max_depth', 3, 10),
        'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
        'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
        'subsample': trial.suggest_float('subsample', 0.5, 1.0)
    }
    
    # Use cross-validation
    kf = KFold(n_splits=5, shuffle=True, random_state=42)
    rmse_scores = []
    r2_scores = []
    
    # Start nested MLflow run
    with mlflow.start_run(nested=True) as child_run:
        # Log parameters
        for param_name, param_value in params.items():
            mlflow.log_param(param_name, param_value)
        
        # Cross-validation
        for train_idx, val_idx in kf.split(X):
            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]
            
            # Train model
            model = GradientBoostingRegressor(**params, random_state=42)
            model.fit(X_train, y_train)
            
            # Evaluate
            y_pred = model.predict(X_val)
            rmse = np.sqrt(mean_squared_error(y_val, y_pred))
            r2 = r2_score(y_val, y_pred)
            
            rmse_scores.append(rmse)
            r2_scores.append(r2)
        
        # Calculate average metrics
        mean_rmse = np.mean(rmse_scores)
        mean_r2 = np.mean(r2_scores)
        std_rmse = np.std(rmse_scores)
        
        # Log metrics
        mlflow.log_metric("mean_rmse", mean_rmse)
        mlflow.log_metric("mean_r2", mean_r2)
        mlflow.log_metric("std_rmse", std_rmse)
        
        # Log trial number
        mlflow.log_param("trial_number", trial.number)
        
        # Return objective metric for Optuna to minimize
        return mean_rmse
 
# Run optimization
with mlflow.start_run(run_name="optuna_optimization") as parent_run:
    mlflow.log_param("optimization_algorithm", "TPE")
    mlflow.log_param("n_trials", 20)
    mlflow.log_param("dataset", "boston_housing")
    
    # Create and run the study
    study = optuna.create_study(direction="minimize")
    study.optimize(objective, n_trials=20)
    
    # Get best parameters and metrics
    best_params = study.best_params
    best_value = study.best_value
    
    # Log best results
    mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
    mlflow.log_metric("best_rmse", best_value)
    
    # Train final model on full dataset
    final_model = GradientBoostingRegressor(**best_params, random_state=42)
    final_model.fit(X, y)
    
    # Save importance plot
    feature_importance = final_model.feature_importances_
    sorted_idx = np.argsort(feature_importance)
    pos = np.arange(sorted_idx.shape[0])
    
    plt.figure(figsize=(12, 6))
    plt.barh(pos, feature_importance[sorted_idx], align='center')
    plt.yticks(pos, np.array(boston.feature_names)[sorted_idx])
    plt.title('Feature Importance (MDI)')
    plt.tight_layout()
    plt.savefig("feature_importance.png")
    mlflow.log_artifact("feature_importance.png")
    
    # Save optimization history plot
    plt.figure(figsize=(10, 6))
    optuna.visualization.matplotlib.plot_optimization_history(study)
    plt.tight_layout()
    plt.savefig("optimization_history.png")
    mlflow.log_artifact("optimization_history.png")
    
    # Save parameter importance plot
    plt.figure(figsize=(10, 6))
    optuna.visualization.matplotlib.plot_param_importances(study)
    plt.tight_layout()
    plt.savefig("param_importances.png")
    mlflow.log_artifact("param_importances.png")
    
    # Log final model
    mlflow.sklearn.log_model(final_model, "final_model")
    
    # Save study for later analysis
    joblib.dump(study, "optuna_study.pkl")
    mlflow.log_artifact("optuna_study.pkl")
    
    print(f"Best parameters: {best_params}")
    print(f"Best RMSE: {best_value:.4f}")
    print(f"Parent run ID: {parent_run.info.run_id}")
 

Advanced MLflow Usage

Custom Flavors

MLflow allows you to create custom flavors for specialized models or workflows. This is useful for:

  • Supporting frameworks not directly supported by MLflow
  • Custom preprocessing logic
  • Complex model ensembles
  • Special deployment requirements
import mlflow
import mlflow.pyfunc
import pandas as pd
import numpy as np
import pickle
 
# Define a custom model class
class TextPreprocessorModel(mlflow.pyfunc.PythonModel):
    def __init__(self, steps=None):
        self.steps = steps or []
    
    def fit(self, texts):
        """Fit the preprocessing steps on the training data"""
        # Example: Build vocabulary, learn frequencies, etc.
        self.vocabulary = set()
        for text in texts:
            words = text.lower().split()
            self.vocabulary.update(words)
        return self
    
    def add_step(self, step_func):
        """Add a preprocessing step"""
        self.steps.append(step_func)
        return self
    
    def predict(self, context, model_input):
        """Apply all preprocessing steps to the input data"""
        if isinstance(model_input, pd.DataFrame):
            text_col = model_input.columns[0]  # Assume first column is text
            texts = model_input[text_col].tolist()
        else:
            texts = model_input
        
        processed_texts = []
        for text in texts:
            # Apply each preprocessing step sequentially
            processed_text = text
            for step in self.steps:
                processed_text = step(processed_text)
            processed_texts.append(processed_text)
        
        # Return as DataFrame
        return pd.DataFrame({"processed_text": processed_texts})
 
# Example preprocessing steps
def lowercase(text):
    return text.lower()
 
def remove_punctuation(text):
    import string
    return ''.join(c for c in text if c not in string.punctuation)
 
def tokenize(text):
    return ' '.join(text.split())
 
# Create and train the model
texts = [
    "Hello, world! This is a sample text.",
    "MLflow is a great tool for ML experiments.",
    "Custom flavors let you extend MLflow's capabilities."
]
 
# Create and configure the model
preproc_model = TextPreprocessorModel()
preproc_model.add_step(lowercase)
preproc_model.add_step(remove_punctuation)
preproc_model.add_step(tokenize)
preproc_model.fit(texts)
 
# Define artifact paths
artifacts = {
    "preprocessor": "preprocessor.pkl"
}
 
# Serialize the model
with open(artifacts["preprocessor"], "wb") as f:
    pickle.dump(preproc_model, f)
 
# Define a loader function for the model
class TextPreprocessorWrapper(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        with open(context.artifacts["preprocessor"], "rb") as f:
            self.model = pickle.load(f)
    
    def predict(self, context, model_input):
        return self.model.predict(context, model_input)
 
# Log the model
with mlflow.start_run() as run:
    mlflow.pyfunc.log_model(
        artifact_path="text_preprocessor",
        python_model=TextPreprocessorWrapper(),
        artifacts=artifacts,
        conda_env={
            'channels': ['defaults'],
            'dependencies': [
                'python=3.8.0',
                'pip',
                {'pip': ['mlflow', 'pandas', 'numpy']}
            ],
            'name': 'text_preprocessor_env'
        }
    )
    
    # Log example inputs
    example_input = pd.DataFrame({"text": ["This is a TEST example with punctuation!!!"]})
    mlflow.log_input(example_input, context="example_input")
    
    print(f"Model logged with run_id: {run.info.run_id}")
 
# Load and use the model
loaded_model = mlflow.pyfunc.load_model(f"runs:/{run.info.run_id}/text_preprocessor")
test_input = pd.DataFrame({"text": ["Hello, WORLD! Testing the custom model."]})
result = loaded_model.predict(test_input)
print(f"Original: {test_input['text'][0]}")
print(f"Processed: {result['processed_text'][0]}")
 

Autologging

MLflow provides autologging capabilities for popular ML frameworks, automatically capturing parameters, metrics, and models without requiring explicit logging calls:

import mlflow
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
 
# Enable autologging for scikit-learn
mlflow.sklearn.autolog(log_models=True, 
                       log_input_examples=True,
                       log_model_signatures=True,
                       log_post_training_metrics=True)
 
# Load California housing dataset
from sklearn.datasets import fetch_california_housing
housing = fetch_california_housing()
X = pd.DataFrame(housing.data, columns=housing.feature_names)
y = housing.target
 
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Create experiment
mlflow.set_experiment("california_housing_autolog")
 
# Start run
with mlflow.start_run(run_name="rf_pipeline_autolog"):
    # Create pipeline
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('regressor', RandomForestRegressor(n_estimators=100, random_state=42))
    ])
    
    # Train model (mlflow will automatically log parameters, metrics, and model)
    pipeline.fit(X_train, y_train)
    
    # Make predictions
    y_pred = pipeline.predict(X_test)
    
    # Calculate additional metrics (these will be automatically logged)
    mae = mean_absolute_error(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    r2 = r2_score(y_test, y_pred)
    
    # Log feature importances manually
    feature_importances = pipeline.named_steps['regressor'].feature_importances_
    for i, col in enumerate(X.columns):
        mlflow.log_metric(f"feature_importance_{col}", feature_importances[i])
    
    print(f"Training complete - metrics automatically logged")
    print(f"MAE: {mae:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"RΒ²: {r2:.4f}")
 

MLflow with PyTorch

import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
 
# Enable PyTorch autologging
mlflow.pytorch.autolog()
 
# Set experiment
mlflow.set_experiment("pytorch_digits_classification")
 
# Load digits dataset
digits = load_digits()
X, y = digits.data, digits.target
 
# Preprocess data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
 
# Split data
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)
 
# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.LongTensor(y_train)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.LongTensor(y_test)
 
# Create datasets and dataloaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
 
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
 
# Define neural network
class DigitsClassifier(nn.Module):
    def __init__(self, input_size=64, hidden_size=128, num_classes=10, dropout_rate=0.2):
        super(DigitsClassifier, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_size // 2, num_classes)
        )
        
    def forward(self, x):
        return self.model(x)
 
# Training function
def train_model(model, train_loader, test_loader, criterion, optimizer, epochs=10, device="cpu"):
    # Move model to device
    model.to(device)
    
    train_losses = []
    test_losses = []
    train_accuracies = []
    test_accuracies = []
    
    for epoch in range(epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            # Statistics
            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = correct / total
        train_losses.append(epoch_loss)
        train_accuracies.append(epoch_acc)
        
        # Evaluation phase
        model.eval()
        running_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                
                # Forward pass
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                # Statistics
                running_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        epoch_loss = running_loss / len(test_loader.dataset)
        epoch_acc = correct / total
        test_losses.append(epoch_loss)
        test_accuracies.append(epoch_acc)
        
        # Log metrics for each epoch
        mlflow.log_metric("train_loss", train_losses[-1], step=epoch)
        mlflow.log_metric("train_accuracy", train_accuracies[-1], step=epoch)
        mlflow.log_metric("test_loss", test_losses[-1], step=epoch)
        mlflow.log_metric("test_accuracy", test_accuracies[-1], step=epoch)
        
        print(f"Epoch {epoch+1}/{epochs} - "
              f"Train Loss: {train_losses[-1]:.4f}, Train Acc: {train_accuracies[-1]:.4f}, "
              f"Test Loss: {test_losses[-1]:.4f}, Test Acc: {test_accuracies[-1]:.4f}")
    
    # Create and log plots
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(1, epochs+1), train_losses, label='Train')
    plt.plot(range(1, epochs+1), test_losses, label='Test')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Loss Curves')
    
    plt.subplot(1, 2, 2)
    plt.plot(range(1, epochs+1), train_accuracies, label='Train')
    plt.plot(range(1, epochs+1), test_accuracies, label='Test')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Accuracy Curves')
    
    plt.tight_layout()
    plt.savefig("training_curves.png")
    mlflow.log_artifact("training_curves.png")
    
    return model
 
# Main training loop with MLflow
with mlflow.start_run(run_name="pytorch_digits_nn"):
    # Set hyperparameters
    input_size = 64  # Number of features in digits dataset
    hidden_size = 128
    num_classes = 10  # Digits 0-9
    dropout_rate = 0.2
    learning_rate = 0.001
    epochs = 15
    
    # Log hyperparameters
    mlflow.log_param("input_size", input_size)
    mlflow.log_param("hidden_size", hidden_size)
    mlflow.log_param("num_classes", num_classes)
    mlflow.log_param("dropout_rate", dropout_rate)
    mlflow.log_param("learning_rate", learning_rate)
    mlflow.log_param("batch_size", batch_size)
    mlflow.log_param("epochs", epochs)
    
    # Define model, loss function, and optimizer
    model = DigitsClassifier(input_size, hidden_size, num_classes, dropout_rate)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    # Train model
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    mlflow.log_param("device", device.type)
    
    model = train_model(model, train_loader, test_loader, criterion, optimizer, epochs, device)
    
    # Final evaluation
    model.eval()
    with torch.no_grad():
        all_preds = []
        all_labels = []
        
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # Log confusion matrix
    from sklearn.metrics import confusion_matrix, classification_report
    cm = confusion_matrix(all_labels, all_preds)
    
    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion Matrix')
    plt.colorbar()
    tick_marks = np.arange(num_classes)
    plt.xticks(tick_marks, range(num_classes))
    plt.yticks(tick_marks, range(num_classes))
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    
    # Add text annotations to confusion matrix
    thresh = cm.max() / 2
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], 'd'),
                    horizontalalignment="center",
                    color="white" if cm[i, j] > thresh else "black")
    
    plt.tight_layout()
    plt.savefig("confusion_matrix.png")
    mlflow.log_artifact("confusion_matrix.png")
    
    # Log classification report
    report = classification_report(all_labels, all_preds, output_dict=True)
    for digit in range(10):
        mlflow.log_metric(f"precision_digit_{digit}", report[str(digit)]['precision'])
        mlflow.log_metric(f"recall_digit_{digit}", report[str(digit)]['recall'])
        mlflow.log_metric(f"f1_score_digit_{digit}", report[str(digit)]['f1-score'])
    
    # Log example predictions
    fig, axes = plt.subplots(3, 5, figsize=(15, 9))
    axes = axes.flatten()
    
    for i, ax in enumerate(axes):
        if i < len(X_test):
            # Get a random test example
            idx = np.random.randint(0, len(X_test))
            image = X_test[idx].reshape(8, 8)
            
            # Get prediction
            with torch.no_grad():
                input_tensor = torch.FloatTensor(X_test[idx]).to(device)
                output = model(input_tensor)
                _, prediction = torch.max(output, 0)
            
            # Display image and prediction
            ax.imshow(image, cmap='gray')
            ax.set_title(f"True: {y_test[idx]}, Pred: {prediction.item()}")
            ax.axis('off')
    
    plt.tight_layout()
    plt.savefig("example_predictions.png")
    mlflow.log_artifact("example_predictions.png")
    
    # Log PyTorch model
    mlflow.pytorch.log_model(model, "pytorch_model")
    
    print(f"Training complete. Model saved in run {mlflow.active_run().info.run_id}")
 

MLflow Deployment Scenarios

MLflow supports various deployment scenarios, from simple local testing to production systems.

Local Deployment

# Start MLflow tracking server locally
mlflow server \
    --backend-store-uri sqlite:///mlflow.db \
    --default-artifact-root ./mlflow-artifacts \
    --host 0.0.0.0 \
    --port 5000
 
# Serve a model locally
mlflow models serve -m "models:/my_model/production" -p 5001
 
# Batch inference using MLflow CLI
mlflow models predict \
    -m "models:/my_model/production" \
    -i input_data.csv \
    -o predictions.csv
 

Docker-based Deployment

# Build a Docker image for the model
mlflow models build-docker \
    -m "models:/my_model/production" \
    -n "my-model-image" \
    --enable-mlserver
 
# Run the Docker image
docker run -p 5001:8080 my-model-image
 
# Use the model REST API
curl -X POST -H "Content-Type:application/json" \
    --data '{"dataframe_split": {"columns":["feature1", "feature2"], "data":[[1.0, 2.0]]}}' \
    http://localhost:5001/invocations
 

Kubernetes Deployment

# mlflow-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mlflow-server
  labels:
    app: mlflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mlflow
  template:
    metadata:
      labels:
        app: mlflow
    spec:
      containers:
      - name: mlflow
        image: my-model-image:latest
        ports:
        - containerPort: 8080
        env:
        - name: GUNICORN_CMD_ARGS
          value: "--timeout 60 --workers 2"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: mlflow-service
spec:
  selector:
    app: mlflow
  ports:
  - port: 80
    targetPort: 8080
  type: LoadBalancer
 

Best Practices

Organizing Experiments

# Hierarchical experiment organization
project_name = "fraud_detection"
dataset_version = "v2"
model_type = "gradient_boosting"
 
# Create a structured experiment name
experiment_name = f"{project_name}/{dataset_version}/{model_type}"
mlflow.set_experiment(experiment_name)
 
# Or use tags for better organization
with mlflow.start_run(run_name="hyperparameter_tuning_run"):
    mlflow.set_tags({
        "project": project_name,
        "dataset_version": dataset_version,
        "model_type": model_type,
        "stage": "development",
        "engineer": "jane_doe",
        "priority": "high"
    })
    
    # Rest of your experiment code...
 

Production-Ready MLflow Setup

# Database setup (PostgreSQL)
export MLFLOW_TRACKING_URI=postgresql://mlflow_user:mlflow_password@localhost/mlflow_db
 
# Object storage for artifacts (S3)
export MLFLOW_ARTIFACT_ROOT=s3://mlflow-artifacts/
 
# Set default experiment
export MLFLOW_EXPERIMENT_NAME=my_project/production
 
# Start tracking server with proper authentication
mlflow server \
    --backend-store-uri ${MLFLOW_TRACKING_URI} \
    --default-artifact-root ${MLFLOW_ARTIFACT_ROOT} \
    --host 0.0.0.0 \
    --port 5000 \
    --workers 4 \
    --gunicorn-opts "--timeout 120 --log-level info" \
    --expose-prometheus
 
# Example systemd service file (mlflow.service)
# [Unit]
# Description=MLflow Tracking Server
# After=network.target postgresql.service
#
# [Service]
# User=mlflow
# Environment="MLFLOW_TRACKING_URI=postgresql://mlflow_user:mlflow_password@localhost/mlflow_db"
# Environment="MLFLOW_ARTIFACT_ROOT=s3://mlflow-artifacts/"
# ExecStart=/usr/local/bin/mlflow server --backend-store-uri ${MLFLOW_TRACKING_URI} --default-artifact-root ${MLFLOW_ARTIFACT_ROOT} --host 0.0.0.0 --port 5000
# Restart=on-failure
#
# [Install]
# WantedBy=multi-user.target
 

CI/CD for MLflow Models

# .github/workflows/mlflow-pipeline.yml
name: MLflow Model Pipeline
 
on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]
  schedule:
    - cron: '0 0 * * 0'  # Weekly retraining
 
env:
  MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
  AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
  AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
  MODEL_NAME: fraud_detection_model
 
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install pytest pytest-cov
    - name: Run tests
      run: |
        pytest tests/ --cov=src/
 
  train:
    needs: test
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    - name: Train model
      run: |
        python src/train.py --experiment_name "fraud_detection/prod" --run_name "github_action_${GITHUB_SHA}"
    - name: Save run ID
      id: save_run_id
      run: |
        RUN_ID=$(cat run_id.txt)
        echo "::set-output name=run_id::$RUN_ID"
 
  evaluate:
    needs: train
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    - name: Evaluate model
      run: |
        python src/evaluate.py --run_id ${{ needs.train.outputs.run_id }} --test_data data/test.csv
    - name: Check performance
      id: check_performance
      run: |
        PASSED=$(cat eval_results.json | jq -r '.passed')
        echo "::set-output name=passed::$PASSED"
 
  register:
    needs: [train, evaluate]
    if: ${{ needs.evaluate.outputs.passed == 'true' }}
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    - name: Register model
      run: |
        python src/register_model.py --run_id ${{ needs.train.outputs.run_id }} --model_name $MODEL_NAME
    - name: Promote to staging
      run: |
        python src/promote_model.py --model_name $MODEL_NAME --stage "Staging"
 
  deploy:
    needs: register
    runs-on: ubuntu-latest
    environment: staging
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt awscli
    - name: Create SageMaker endpoint
      run: |
        python src/deploy_sagemaker.py --model_name $MODEL_NAME --stage "Staging" --endpoint_name "fraud-detection-staging"
    - name: Run integration tests
      run: |
        python tests/integration/test_endpoint.py --endpoint_name "fraud-detection-staging"
 
  promote_to_production:
    needs: deploy
    runs-on: ubuntu-latest
    environment: production
    # Manual approval step through GitHub environments
    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    - name: Promote to production
      run: |
        python src/promote_model.py --model_name $MODEL_NAME --stage "Production"
    - name: Update production endpoint
      run: |
        python src/deploy_sagemaker.py --model_name $MODEL_NAME --stage "Production" --endpoint_name "fraud-detection-prod"
 

Integration with Other Tools

MLflow integrates well with many other tools in the ML ecosystem:

MLflow with Databricks

# In a Databricks notebook, MLflow is already configured
 
# Example using Databricks managed MLflow
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
 
# MLflow experiment is automatically set to the notebook's path
# You can optionally set it manually
mlflow.set_experiment("/Users/username@company.com/mlflow-examples/sklearn-iris")
 
# Load data
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Define parameters
params = {
    "n_estimators": 100,
    "max_depth": 6,
    "min_samples_split": 2
}
 
# Train and log with MLflow
with mlflow.start_run() as run:
    # Log parameters
    mlflow.log_params(params)
    
    # Train model
    rf = RandomForestClassifier(**params)
    rf.fit(X_train, y_train)
    
    # Evaluate
    y_pred = rf.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    
    # Log metrics
    mlflow.log_metric("accuracy", accuracy)
    
    # Log model
    mlflow.sklearn.log_model(rf, "model")
    
    # Register model in Databricks Model Registry
    model_uri = f"runs:/{run.info.run_id}/model"
    model_details = mlflow.register_model(model_uri, "iris_classifier")
    
    # The model is now in the Databricks Model Registry and can be moved 
    # through stages (Staging, Production) via the Databricks UI or APIs
 

MLflow with Kubernetes and Kubeflow

# Example Kubeflow Pipeline using MLflow
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: mlflow-pipeline-
spec:
  entrypoint: mlflow-pipeline
  templates:
  - name: mlflow-pipeline
    dag:
      tasks:
      - name: prepare-data
        template: prepare-data
      - name: train-model
        dependencies: [prepare-data]
        template: train-model
      - name: evaluate-model
        dependencies: [train-model]
        template: evaluate-model
      - name: register-model
        dependencies: [evaluate-model]
        template: register-model
      - name: deploy-model
        dependencies: [register-model]
        template: deploy-model
        
  - name: prepare-data
    container:
      image: mlflow-pipeline:latest
      command: ["python", "/app/prepare_data.py"]
      env:
      - name: MLFLOW_TRACKING_URI
        value: "http://mlflow-service.kubeflow:5000"
      - name: MLFLOW_EXPERIMENT_NAME
        value: "kubeflow-pipeline-demo"
      volumeMounts:
      - name: data-volume
        mountPath: /data
        
  - name: train-model
    container:
      image: mlflow-pipeline:latest
      command: ["python", "/app/train_model.py"]
      env:
      - name: MLFLOW_TRACKING_URI
        value: "http://mlflow-service.kubeflow:5000"
      - name: MLFLOW_EXPERIMENT_NAME
        value: "kubeflow-pipeline-demo"
      volumeMounts:
      - name: data-volume
        mountPath: /data
        
  - name: evaluate-model
    container:
      image: mlflow-pipeline:latest
      command: ["python", "/app/evaluate_model.py"]
      env:
      - name: MLFLOW_TRACKING_URI
        value: "http://mlflow-service.kubeflow:5000"
      volumeMounts:
      - name: data-volume
        mountPath: /data
        
  - name: register-model
    container:
      image: mlflow-pipeline:latest
      command: ["python", "/app/register_model.py"]
      env:
      - name: MLFLOW_TRACKING_URI
        value: "http://mlflow-service.kubeflow:5000"
      - name: MODEL_NAME
        value: "kubeflow-demo-model"
        
  - name: deploy-model
    container:
      image: mlflow-pipeline:latest
      command: ["python", "/app/deploy_model.py"]
      env:
      - name: MLFLOW_TRACKING_URI
        value: "http://mlflow-service.kubeflow:5000"
      - name: MODEL_NAME
        value: "kubeflow-demo-model"
      - name: DEPLOYMENT_NAMESPACE
        value: "serving"
        
  volumes:
  - name: data-volume
    persistentVolumeClaim:
      claimName: pipeline-data-pvc
 

MLflow with Apache Airflow

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import mlflow
import json
import requests
 
# Default arguments
default_args = {
    'owner': 'data_scientist',
    'depends_on_past': False,
    'start_date': datetime(2025, 4, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
 
# Define MLflow tracking URI
MLFLOW_TRACKING_URI = "http://mlflow-server:5000"
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
 
# Define functions
def prepare_data(**context):
    # Set experiment
    mlflow.set_experiment("airflow_pipeline")
    
    # Start run and get run ID
    with mlflow.start_run() as run:
        run_id = run.info.run_id
        # Log dataset info
        mlflow.log_param("data_source", "s3://data/customer_churn.csv")
        mlflow.log_param("data_version", "1.2.0")
        
        # Push run_id to XCom for downstream tasks
        context['ti'].xcom_push(key='run_id', value=run_id)
        
        return run_id
 
def train_model(**context):
    # Get run_id from upstream task
    run_id = context['ti'].xcom_pull(task_ids='prepare_data', key='run_id')
    
    # Resume the run
    with mlflow.start_run(run_id=run_id) as run:
        # Log training parameters
        mlflow.log_params({
            "algorithm": "RandomForest",
            "n_estimators": 100,
            "max_depth": 10,
            "min_samples_split": 5
        })
        
        # Simulate training process
        import time
        import numpy as np
        from sklearn.datasets import load_breast_cancer
        from sklearn.ensemble import RandomForestClassifier
        from sklearn.model_selection import train_test_split
        
        # Load data
        data = load_breast_cancer()
        X, y = data.data, data.target
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        # Train model
        model = RandomForestClassifier(n_estimators=100, max_depth=10, min_samples_split=5, random_state=42)
        model.fit(X_train, y_train)
        
        # Log metrics
        for i in range(10):
            mlflow.log_metric("training_loss", 0.9 - i * 0.05, step=i)
            mlflow.log_metric("training_accuracy", 0.7 + i * 0.02, step=i)
            time.sleep(1)
        
        # Log model
        mlflow.sklearn.log_model(model, "model")
        
        # Log feature importances
        feature_importance = model.feature_importances_
        for i, importance in enumerate(feature_importance):
            mlflow.log_metric(f"feature_importance_{i}", importance)
        
        return run_id
 
def evaluate_model(**context):
    # Get run_id from upstream task
    run_id = context['ti'].xcom_pull(task_ids='train_model', key='run_id')
    
    # Resume the run
    with mlflow.start_run(run_id=run_id) as run:
        # Load model
        model_uri = f"runs:/{run_id}/model"
        model = mlflow.sklearn.load_model(model_uri)
        
        # Evaluate on test data
        from sklearn.datasets import load_breast_cancer
        from sklearn.model_selection import train_test_split
        from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
        
        # Load data
        data = load_breast_cancer()
        X, y = data.data, data.target
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        # Make predictions
        y_pred = model.predict(X_test)
        y_prob = model.predict_proba(X_test)[:, 1]
        
        # Calculate metrics
        metrics = {
            "test_accuracy": accuracy_score(y_test, y_pred),
            "test_precision": precision_score(y_test, y_pred),
            "test_recall": recall_score(y_test, y_pred),
            "test_f1": f1_score(y_test, y_pred),
            "test_roc_auc": roc_auc_score(y_test, y_prob)
        }
        
        # Log metrics
        mlflow.log_metrics(metrics)
        
        # Determine if model passes quality threshold
        passed_threshold = metrics["test_accuracy"] > 0.9
        mlflow.log_param("passed_evaluation", passed_threshold)
        
        # Push evaluation result to XCom
        context['ti'].xcom_push(key='evaluation_passed', value=passed_threshold)
        context['ti'].xcom_push(key='model_metrics', value=metrics)
        
        return run_id, passed_threshold
 
def register_model(**context):
    # Get run_id and evaluation result from upstream tasks
    run_id = context['ti'].xcom_pull(task_ids='evaluate_model', key='run_id')
    passed = context['ti'].xcom_pull(task_ids='evaluate_model', key='evaluation_passed')
    
    if not passed:
        print("Model did not pass evaluation. Skipping registration.")
        return None
    
    # Register model
    model_uri = f"runs:/{run_id}/model"
    model_name = "breast_cancer_classifier"
    
    # Register the model
    result = mlflow.register_model(model_uri, model_name)
    
    # Log registration info
    with mlflow.start_run(run_id=run_id) as run:
        mlflow.log_param("registered_model_name", model_name)
        mlflow.log_param("registered_model_version", result.version)
    
    # Push model details to XCom
    context['ti'].xcom_push(key='model_name', value=model_name)
    context['ti'].xcom_push(key='model_version', value=result.version)
    
    return result.version
 
def deploy_model(**context):
    # Get model details from upstream task
    model_name = context['ti'].xcom_pull(task_ids='register_model', key='model_name')
    model_version = context['ti'].xcom_pull(task_ids='register_model', key='model_version')
    
    if not model_name or not model_version:
        print("No model to deploy. Skipping deployment.")
        return None
    
    # Transition model to staging
    client = mlflow.tracking.MlflowClient()
    client.transition_model_version_stage(
        name=model_name,
        version=model_version,
        stage="Staging"
    )
    
    # Log deployment details
    run_id = context['ti'].xcom_pull(task_ids='evaluate_model', key='run_id')
    with mlflow.start_run(run_id=run_id) as run:
        mlflow.log_param("deployment_stage", "Staging")
        mlflow.log_param("deployment_timestamp", datetime.now().isoformat())
    
    # For this example, we'll just log that deployment was successful
    print(f"Model {model_name} version {model_version} deployed to Staging")
    
    return f"{model_name}/{model_version}"
 
# Create DAG
dag = DAG(
    'mlflow_end_to_end_pipeline',
    default_args=default_args,
    description='End-to-end ML pipeline with MLflow integration',
    schedule_interval=timedelta(days=1),
    catchup=False
)
 
# Define tasks
prepare_task = PythonOperator(
    task_id='prepare_data',
    python_callable=prepare_data,
    provide_context=True,
    dag=dag
)
 
train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    provide_context=True,
    dag=dag
)
 
evaluate_task = PythonOperator(
    task_id='evaluate_model',
    python_callable=evaluate_model,
    provide_context=True,
    dag=dag
)
 
register_task = PythonOperator(
    task_id='register_model',
    python_callable=register_model,
    provide_context=True,
    dag=dag
)
 
deploy_task = PythonOperator(
    task_id='deploy_model',
    python_callable=deploy_model,
    provide_context=True,
    dag=dag
)
 
# Define task dependencies
prepare_task >> train_task >> evaluate_task >> register_task >> deploy_task
 

MLflow with AWS SageMaker

import mlflow.sagemaker as mfs
import boto3
import mlflow
from mlflow.tracking import MlflowClient
 
# Set MLflow tracking URI
mlflow.set_tracking_uri("http://mlflow-server:5000")
 
# Define model and deployment parameters
model_name = "fraud_detection_model"
model_uri = f"models:/{model_name}/Production"
region = "us-west-2"
app_name = "fraud-detection-prod"
image_url = "123456789012.dkr.ecr.us-west-2.amazonaws.com/mlflow-sagemaker:latest"
model_s3_path = f"s3://mlflow-models/{model_name}"
execution_role_arn = "arn:aws:iam::123456789012:role/SageMakerExecutionRole"
bucket = "mlflow-models"
instance_type = "ml.m5.xlarge"
instance_count = 1
 
# AWS client setup
sagemaker_client = boto3.client('sagemaker', region_name=region)
s3_client = boto3.client('s3', region_name=region)
 
# Build the SageMaker image if it doesn't exist
try:
    image_exists = False
    for image in sagemaker_client.list_images()['Images']:
        if image['ImageName'] == 'mlflow-sagemaker':
            image_exists = True
            break
    
    if not image_exists:
        print("Building SageMaker container image...")
        # This step typically happens in a CI/CD pipeline
        # mfs.build_and_push_container(
        #     image_name="mlflow-sagemaker",
        #     region=region
        # )
        print("Using existing image:", image_url)
except Exception as e:
    print(f"Error checking/building image: {e}")
 
# Deploy the model to SageMaker
try:
    # Load the model from MLflow
    print(f"Loading model from {model_uri}")
    client = MlflowClient()
    model_version = client.get_latest_versions(model_name, stages=["Production"])[0]
    run_id = model_version.run_id
    
    # Deploy to SageMaker
    print(f"Deploying model {model_name} (run_id={run_id}) to SageMaker endpoint {app_name}...")
    
    # Option 1: Using MLflow's built-in SageMaker deployment
    mfs.deploy(
        mode="create",
        app_name=app_name,
        model_uri=model_uri,
        image_url=image_url,
        execution_role_arn=execution_role_arn,
        instance_type=instance_type,
        instance_count=instance_count,
        region_name=region,
        vpc_config=None,  # Add if needed
        synchronous=True  # Wait for deployment to complete
    )
    
    # Option 2: Alternative deployment using AWS SDK
    """
    # First export model to S3
    mlflow.sagemaker.push_model_to_s3(
        model_uri=model_uri,
        bucket=bucket,
        prefix=f"{model_name}/{run_id}"
    )
    
    # Create model in SageMaker
    create_model_response = sagemaker_client.create_model(
        ModelName=f"{app_name}-model",
        PrimaryContainer={
            'Image': image_url,
            'ModelDataUrl': f"s3://{bucket}/{model_name}/{run_id}/model.tar.gz",
            'Environment': {
                'MLFLOW_MODEL_NAME': model_name
            }
        },
        ExecutionRoleArn=execution_role_arn
    )
    
    # Create endpoint configuration
    endpoint_config_response = sagemaker_client.create_endpoint_config(
        EndpointConfigName=f"{app_name}-config",
        ProductionVariants=[
            {
                'VariantName': 'production',
                'ModelName': f"{app_name}-model",
                'InitialInstanceCount': instance_count,
                'InstanceType': instance_type
            }
        ]
    )
    
    # Create or update endpoint
    try:
        # Try to update existing endpoint
        sagemaker_client.update_endpoint(
            EndpointName=app_name,
            EndpointConfigName=f"{app_name}-config"
        )
        print(f"Updating existing endpoint {app_name}")
    except sagemaker_client.exceptions.ClientError:
        # Create new endpoint if it doesn't exist
        sagemaker_client.create_endpoint(
            EndpointName=app_name,
            EndpointConfigName=f"{app_name}-config"
        )
        print(f"Creating new endpoint {app_name}")
    """
    
    print(f"Model successfully deployed to endpoint {app_name}")
    
    # Log deployment information in MLflow
    with mlflow.start_run(run_id=run_id):
        mlflow.log_param("sagemaker_endpoint", app_name)
        mlflow.log_param("deployment_timestamp", datetime.now().isoformat())
        mlflow.log_param("instance_type", instance_type)
        mlflow.log_param("instance_count", instance_count)
    
except Exception as e:
    print(f"Error deploying model: {e}")
    raise
 
# Example prediction using deployed endpoint
def predict(data):
    """Make a prediction using deployed SageMaker endpoint"""
    import boto3
    import json
    import numpy as np
    
    # Initialize SageMaker runtime client
    sagemaker_runtime = boto3.client('sagemaker-runtime', region_name=region)
    
    # Prepare data for prediction
    payload = json.dumps({
        "instances": data if isinstance(data, list) else [data]
    })
    
    # Invoke endpoint
    response = sagemaker_runtime.invoke_endpoint(
        EndpointName=app_name,
        ContentType='application/json',
        Body=payload
    )
    
    # Parse response
    result = json.loads(response['Body'].read().decode())
    return result
 
# Example usage
example_data = [
    [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
]
print(f"Example prediction: {predict(example_data)}")
 

Conclusion

MLflow provides a comprehensive platform for managing the entire machine learning lifecycle, from experimentation to production deployment. Its modular design makes it adaptable to various workflows and environments, while its framework-agnostic approach allows it to integrate with a wide range of ML libraries and tools.

Key takeaways from this guide:

  1. Experiment Tracking: MLflow provides a centralized way to record parameters, metrics, and artifacts, making it easier to compare experiments and reproduce results.

  2. Model Packaging: MLflow Projects and Models provide a standardized way to package ML code and models, ensuring reproducibility and portability.

  3. Model Registry: The Model Registry enables versioning, staging, and lifecycle management of models, facilitating collaboration and controlled deployment.

  4. Production Deployment: MLflow offers various deployment options, from simple local serving to integration with production platforms like Kubernetes and SageMaker.

  5. Integration: MLflow integrates with popular ML frameworks, orchestration tools, and cloud platforms, making it a versatile tool for any ML workflow.

As you implement MLflow in your organization, start with experiment tracking and gradually adopt more components as your ML workflow matures. This incremental approach ensures a smoother transition and allows your team to realize value at each step.