This guide provides an in-depth exploration of MLflow, a platform for managing the complete machine learning lifecycle. We'll cover essential concepts, practical implementation examples, and best practices for data scientists and ML engineers.
Table of Contents
- Introduction to MLflow
- Setting Up MLflow
- MLflow Components
- Practical MLflow Python Programming
- Advanced MLflow Usage
- MLflow Deployment Scenarios
- Best Practices
- Integration with Other Tools
- Conclusion
Introduction to MLflow
MLflow is an open-source platform designed to manage the machine learning lifecycle, including experimentation, reproducibility, deployment, and a central model registry. Created by Databricks in 2018, it addresses common challenges in machine learning workflows, such as tracking experiments, packaging code into reproducible runs, and sharing and deploying models.
MLflow works with any machine learning library and language (with primary support for Python), and can be used in any environment, from local machines to cloud platforms.
The key problems MLflow solves include:
- Experiment tracking: Keeping track of parameters, metrics, and artifacts across multiple experiments
- Reproducibility: Packaging ML code in a way that it can be reliably rerun
- Model management: Versioning, staging, and deploying models
- Collaborative workflows: Enabling teams to share experiments and models
Setting Up MLflow
Letβs start with installation and basic setup.
Installation
pip install mlflow
For additional features:
# With SQL storage support
pip install mlflow[sqlalchemy]
# With AWS S3 support
pip install mlflow[aws]
# With Azure support
pip install mlflow[azure]
# With Google Cloud support
pip install mlflow[gcp]
Basic Configuration
MLflow can be configured using environment variables or directly in your Python code:
import os
# Configure tracking URI using environment variables
os.environ["MLFLOW_TRACKING_URI"] = "http://localhost:5000"
# Or configure in code
import mlflow
mlflow.set_tracking_uri("http://localhost:5000")
Starting the Tracking Server
mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./artifacts --host 0.0.0.0 --port 5000
For production environments, consider using more robust database backends like PostgreSQL or MySQL, and cloud storage for artifacts.
MLflow Components
MLflow consists of four main components, each addressing a different aspect of the ML lifecycle.
MLflow Tracking
MLflow Tracking is a logging API that records parameters, code versions, metrics, and output files when running machine learning code. It works equally well with any ML library and can log results to local files or to a server, for sharing across a team.
Key Concepts
- Runs: Individual executions of your code
- Experiments: Groups of runs
- Parameters: Key-value input parameters for your runs
- Metrics: Numerical values to track (e.g., accuracy, loss)
- Artifacts: Files associated with a run (e.g., models, plots)
Basic Usage
import mlflow
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
# Load data
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Set experiment name
mlflow.set_experiment("iris_classification")
# Define hyperparameters to try
regularization_params = [0.01, 0.1, 1.0]
for reg_param in regularization_params:
# Start a new run
with mlflow.start_run(run_name=f"logreg_C={reg_param}"):
# Log the hyperparameter
mlflow.log_param("C", reg_param)
# Train model
model = LogisticRegression(C=reg_param, solver="lbfgs", max_iter=1000, multi_class="auto")
model.fit(X_train, y_train)
# Evaluate model
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
# Log metrics
mlflow.log_metric("accuracy", accuracy)
# Log model
mlflow.sklearn.log_model(model, "model")
# Log a simple plot as an artifact
import matplotlib.pyplot as plt
plt.figure()
plt.scatter(X[:, 0], X[:, 1], c=y, cmap='viridis')
plt.title(f'Iris Classification (C={reg_param})')
plt.savefig("scatter_plot.png")
mlflow.log_artifact("scatter_plot.png")
print(f"Logged model with C={reg_param}, accuracy={accuracy}")
Viewing Results
After running experiments, you can view them using the MLflow UI:
mlflow ui
Or query programmatically:
from mlflow.tracking import MlflowClient
client = MlflowClient()
experiment = client.get_experiment_by_name("iris_classification")
runs = client.search_runs(experiment_ids=[experiment.experiment_id])
for run in runs:
print(f"Run ID: {run.info.run_id}")
print(f"C parameter: {run.data.params['C']}")
print(f"Accuracy: {run.data.metrics['accuracy']}")
print("-" * 40)
MLflow Projects
MLflow Projects provide a standard format for packaging ML code to make it reusable and reproducible. A project is defined by an MLproject file (in YAML format) that specifies dependencies, entry points, and parameters.
Project Structure
my_project/
βββ MLproject
βββ conda.yaml
βββ train.py
MLproject File
name: iris_classification
conda_env: conda.yaml
entry_points:
main:
parameters:
C: {type: float, default: 1.0}
max_iter: {type: int, default: 1000}
test_size: {type: float, default: 0.2}
command: "python train.py --C {C} --max_iter {max_iter} --test_size {test_size}"
Conda Environment File (conda.yaml)
name: iris_proj
channels:
- defaults
- conda-forge
dependencies:
- python=3.8
- pip
- scikit-learn=1.0
- matplotlib
- pip:
- mlflow>=2.0.0
Running Projects
# Run locally
mlflow run . -P C=0.5
# Run from GitHub
mlflow run https://github.com/username/iris_project -P C=0.5
MLflow Models
MLflow Models provide a standard format for packaging machine learning models that can be used in a variety of downstream tools. MLflow includes built-in support for many model types and frameworks.
Saving Models
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
# Load data
diabetes = load_diabetes()
X = pd.DataFrame(diabetes.data, columns=diabetes.feature_names)
y = diabetes.target
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train model
model = RandomForestRegressor(n_estimators=100, max_depth=6, random_state=42)
model.fit(X_train, y_train)
# Log model with signature and input example
signature = mlflow.models.infer_signature(X_train, model.predict(X_train))
input_example = X_train.iloc[:5]
with mlflow.start_run() as run:
mlflow.sklearn.log_model(
model,
"random_forest_model",
signature=signature,
input_example=input_example
)
# Get the logged model URI
model_uri = f"runs:/{run.info.run_id}/random_forest_model"
print(f"Model saved at: {model_uri}")
# Load the model
loaded_model = mlflow.sklearn.load_model(model_uri)
# Make predictions
predictions = loaded_model.predict(X_test)
print(f"Predictions: {predictions[:5]}")
Model Flavors
MLflow supports multiple model βflavors,β enabling the same model to be used with different downstream tools:
- Python function flavor (universal)
- Framework-specific flavors (scikit-learn, TensorFlow, PyTorch, etc.)
- R function flavors
- ONNX flavor
- Custom flavors
Serving Models
# Start a local REST server
mlflow models serve -m "runs:/your-run-id/model" -p 5001
Then make predictions:
import requests
import pandas as pd
import json
# Prepare data
data = X_test.iloc[:5].values.tolist()
payload = {"instances": data}
# Make prediction request
response = requests.post("http://localhost:5001/invocations",
json=payload,
headers={"Content-Type": "application/json"})
predictions = json.loads(response.text)
print(f"Predictions: {predictions}")
MLflow Registry
The MLflow Model Registry is a centralized repository for managing the full lifecycle of MLflow Models. It provides model lineage, versioning, stage transitions, and annotations.
Key Concepts
- Registered Model: A logical collection of models with the same name
- Model Version: A specific instance of a registered model
- Model Stage: The deployment stage of the model (e.g., Staging, Production)
- Aliases: Named pointers to specific model versions
- Annotations: Key-value pairs for adding metadata to models
Using the Model Registry
import mlflow
from mlflow.tracking import MlflowClient
# Initialize client
client = MlflowClient()
# Register the model
model_name = "diabetes_predictor"
run_id = "your-run-id" # Replace with actual run ID
model_uri = f"runs:/{run_id}/random_forest_model"
# Register the model if it doesn't exist
try:
registered_model = client.get_registered_model(model_name)
print(f"Model {model_name} already exists")
except:
registered_model = client.create_registered_model(model_name)
print(f"Created new model: {model_name}")
# Add a new version to the model
model_version = client.create_model_version(
name=model_name,
source=model_uri,
run_id=run_id
)
print(f"Created version {model_version.version} of model {model_name}")
# Transition the model to staging
client.transition_model_version_stage(
name=model_name,
version=model_version.version,
stage="Staging"
)
print(f"Transitioned model {model_name} version {model_version.version} to Staging")
# Add metadata
client.update_model_version(
name=model_name,
version=model_version.version,
description="Random Forest model trained on diabetes dataset"
)
# Set model alias
client.set_registered_model_alias(
name=model_name,
alias="latest",
version=model_version.version
)
print(f"Set alias 'latest' to version {model_version.version}")
# Load the model by alias
model_uri = f"models:/{model_name}@latest"
model = mlflow.sklearn.load_model(model_uri)
print(f"Loaded model from {model_uri}")
Practical MLflow Python Programming
Letβs explore more practical examples of using MLflow in real-world scenarios.
End-to-End ML Pipeline with MLflow
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import load_breast_cancer
# Set experiment
mlflow.set_experiment("breast_cancer_classification")
# Load data
data = load_breast_cancer()
X = pd.DataFrame(data.data, columns=data.feature_names)
y = pd.Series(data.target)
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Define preprocessing and model pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(random_state=42))
])
# Define parameter grid
param_grid = {
'classifier__n_estimators': [50, 100],
'classifier__max_depth': [5, 10, 15],
'classifier__min_samples_split': [2, 5]
}
# Create grid search
grid_search = GridSearchCV(
pipeline,
param_grid=param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1
)
# Start MLflow run
with mlflow.start_run(run_name="rf_grid_search") as run:
# Log dataset info
mlflow.log_param("dataset_size", len(X))
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("test_size", len(X_test))
mlflow.log_param("feature_count", X.shape[1])
# Log data summary statistics
for col in X.columns:
mlflow.log_param(f"feature_mean_{col}", X[col].mean())
mlflow.log_param(f"feature_std_{col}", X[col].std())
# Log parameter grid
for param_name, param_values in param_grid.items():
mlflow.log_param(param_name, str(param_values))
# Fit grid search
print("Training models with grid search...")
grid_search.fit(X_train, y_train)
# Log best parameters
best_params = grid_search.best_params_
for param, value in best_params.items():
mlflow.log_param(f"best_{param}", value)
# Get best model
best_model = grid_search.best_estimator_
# Make predictions
y_pred = best_model.predict(X_test)
y_prob = best_model.predict_proba(X_test)[:, 1]
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
roc_auc = roc_auc_score(y_test, y_prob)
# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("precision", precision)
mlflow.log_metric("recall", recall)
mlflow.log_metric("f1_score", f1)
mlflow.log_metric("roc_auc", roc_auc)
# Log cross-validation results
cv_results = grid_search.cv_results_
for i, (params, mean_score, std_score) in enumerate(zip(
cv_results['params'],
cv_results['mean_test_score'],
cv_results['std_test_score'])):
run_name = '_'.join(f"{k.split('__')[-1]}_{v}" for k, v in params.items())
mlflow.log_metric(f"cv_mean_{run_name}", mean_score)
mlflow.log_metric(f"cv_std_{run_name}", std_score)
# Create feature importance plot
rf_model = best_model.named_steps['classifier']
feature_names = X.columns
importances = rf_model.feature_importances_
indices = np.argsort(importances)[::-1]
plt.figure(figsize=(12, 8))
plt.title('Feature Importances')
plt.bar(range(len(indices[:15])), importances[indices[:15]], align='center')
plt.xticks(range(len(indices[:15])), [feature_names[i] for i in indices[:15]], rotation=90)
plt.tight_layout()
plt.savefig("feature_importances.png")
mlflow.log_artifact("feature_importances.png")
# Create confusion matrix
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.savefig("confusion_matrix.png")
mlflow.log_artifact("confusion_matrix.png")
# Create ROC curve
from sklearn.metrics import roc_curve
fpr, tpr, _ = roc_curve(y_test, y_prob)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'ROC (AUC = {roc_auc:.3f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.savefig("roc_curve.png")
mlflow.log_artifact("roc_curve.png")
# Log model with signature and input example
signature = mlflow.models.infer_signature(X_train, best_model.predict(X_train), best_model.predict_proba(X_train))
mlflow.sklearn.log_model(
best_model,
"best_model",
signature=signature,
input_example=X_train.iloc[:5]
)
print(f"Best model: {best_params}")
print(f"Test accuracy: {accuracy:.4f}")
print(f"Run ID: {run.info.run_id}")
# Register the model
client = mlflow.tracking.MlflowClient()
model_name = "breast_cancer_classifier"
try:
client.create_registered_model(model_name)
except:
pass
model_details = mlflow.register_model(
f"runs:/{run.info.run_id}/best_model",
model_name
)
print(f"Registered model version: {model_details.version}")
Hyperparameter Tuning with MLflow
import mlflow
import numpy as np
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.datasets import load_boston
import optuna
import joblib
import matplotlib.pyplot as plt
# Load dataset
boston = load_boston()
X, y = boston.data, boston.target
# Set experiment
mlflow.set_experiment("boston_housing_optuna")
# Define objective function for Optuna
def objective(trial):
# Define hyperparameters to optimize
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 500),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
'max_depth': trial.suggest_int('max_depth', 3, 10),
'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
'subsample': trial.suggest_float('subsample', 0.5, 1.0)
}
# Use cross-validation
kf = KFold(n_splits=5, shuffle=True, random_state=42)
rmse_scores = []
r2_scores = []
# Start nested MLflow run
with mlflow.start_run(nested=True) as child_run:
# Log parameters
for param_name, param_value in params.items():
mlflow.log_param(param_name, param_value)
# Cross-validation
for train_idx, val_idx in kf.split(X):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
# Train model
model = GradientBoostingRegressor(**params, random_state=42)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_val)
rmse = np.sqrt(mean_squared_error(y_val, y_pred))
r2 = r2_score(y_val, y_pred)
rmse_scores.append(rmse)
r2_scores.append(r2)
# Calculate average metrics
mean_rmse = np.mean(rmse_scores)
mean_r2 = np.mean(r2_scores)
std_rmse = np.std(rmse_scores)
# Log metrics
mlflow.log_metric("mean_rmse", mean_rmse)
mlflow.log_metric("mean_r2", mean_r2)
mlflow.log_metric("std_rmse", std_rmse)
# Log trial number
mlflow.log_param("trial_number", trial.number)
# Return objective metric for Optuna to minimize
return mean_rmse
# Run optimization
with mlflow.start_run(run_name="optuna_optimization") as parent_run:
mlflow.log_param("optimization_algorithm", "TPE")
mlflow.log_param("n_trials", 20)
mlflow.log_param("dataset", "boston_housing")
# Create and run the study
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=20)
# Get best parameters and metrics
best_params = study.best_params
best_value = study.best_value
# Log best results
mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
mlflow.log_metric("best_rmse", best_value)
# Train final model on full dataset
final_model = GradientBoostingRegressor(**best_params, random_state=42)
final_model.fit(X, y)
# Save importance plot
feature_importance = final_model.feature_importances_
sorted_idx = np.argsort(feature_importance)
pos = np.arange(sorted_idx.shape[0])
plt.figure(figsize=(12, 6))
plt.barh(pos, feature_importance[sorted_idx], align='center')
plt.yticks(pos, np.array(boston.feature_names)[sorted_idx])
plt.title('Feature Importance (MDI)')
plt.tight_layout()
plt.savefig("feature_importance.png")
mlflow.log_artifact("feature_importance.png")
# Save optimization history plot
plt.figure(figsize=(10, 6))
optuna.visualization.matplotlib.plot_optimization_history(study)
plt.tight_layout()
plt.savefig("optimization_history.png")
mlflow.log_artifact("optimization_history.png")
# Save parameter importance plot
plt.figure(figsize=(10, 6))
optuna.visualization.matplotlib.plot_param_importances(study)
plt.tight_layout()
plt.savefig("param_importances.png")
mlflow.log_artifact("param_importances.png")
# Log final model
mlflow.sklearn.log_model(final_model, "final_model")
# Save study for later analysis
joblib.dump(study, "optuna_study.pkl")
mlflow.log_artifact("optuna_study.pkl")
print(f"Best parameters: {best_params}")
print(f"Best RMSE: {best_value:.4f}")
print(f"Parent run ID: {parent_run.info.run_id}")
Advanced MLflow Usage
Custom Flavors
MLflow allows you to create custom flavors for specialized models or workflows. This is useful for:
- Supporting frameworks not directly supported by MLflow
- Custom preprocessing logic
- Complex model ensembles
- Special deployment requirements
import mlflow
import mlflow.pyfunc
import pandas as pd
import numpy as np
import pickle
# Define a custom model class
class TextPreprocessorModel(mlflow.pyfunc.PythonModel):
def __init__(self, steps=None):
self.steps = steps or []
def fit(self, texts):
"""Fit the preprocessing steps on the training data"""
# Example: Build vocabulary, learn frequencies, etc.
self.vocabulary = set()
for text in texts:
words = text.lower().split()
self.vocabulary.update(words)
return self
def add_step(self, step_func):
"""Add a preprocessing step"""
self.steps.append(step_func)
return self
def predict(self, context, model_input):
"""Apply all preprocessing steps to the input data"""
if isinstance(model_input, pd.DataFrame):
text_col = model_input.columns[0] # Assume first column is text
texts = model_input[text_col].tolist()
else:
texts = model_input
processed_texts = []
for text in texts:
# Apply each preprocessing step sequentially
processed_text = text
for step in self.steps:
processed_text = step(processed_text)
processed_texts.append(processed_text)
# Return as DataFrame
return pd.DataFrame({"processed_text": processed_texts})
# Example preprocessing steps
def lowercase(text):
return text.lower()
def remove_punctuation(text):
import string
return ''.join(c for c in text if c not in string.punctuation)
def tokenize(text):
return ' '.join(text.split())
# Create and train the model
texts = [
"Hello, world! This is a sample text.",
"MLflow is a great tool for ML experiments.",
"Custom flavors let you extend MLflow's capabilities."
]
# Create and configure the model
preproc_model = TextPreprocessorModel()
preproc_model.add_step(lowercase)
preproc_model.add_step(remove_punctuation)
preproc_model.add_step(tokenize)
preproc_model.fit(texts)
# Define artifact paths
artifacts = {
"preprocessor": "preprocessor.pkl"
}
# Serialize the model
with open(artifacts["preprocessor"], "wb") as f:
pickle.dump(preproc_model, f)
# Define a loader function for the model
class TextPreprocessorWrapper(mlflow.pyfunc.PythonModel):
def load_context(self, context):
with open(context.artifacts["preprocessor"], "rb") as f:
self.model = pickle.load(f)
def predict(self, context, model_input):
return self.model.predict(context, model_input)
# Log the model
with mlflow.start_run() as run:
mlflow.pyfunc.log_model(
artifact_path="text_preprocessor",
python_model=TextPreprocessorWrapper(),
artifacts=artifacts,
conda_env={
'channels': ['defaults'],
'dependencies': [
'python=3.8.0',
'pip',
{'pip': ['mlflow', 'pandas', 'numpy']}
],
'name': 'text_preprocessor_env'
}
)
# Log example inputs
example_input = pd.DataFrame({"text": ["This is a TEST example with punctuation!!!"]})
mlflow.log_input(example_input, context="example_input")
print(f"Model logged with run_id: {run.info.run_id}")
# Load and use the model
loaded_model = mlflow.pyfunc.load_model(f"runs:/{run.info.run_id}/text_preprocessor")
test_input = pd.DataFrame({"text": ["Hello, WORLD! Testing the custom model."]})
result = loaded_model.predict(test_input)
print(f"Original: {test_input['text'][0]}")
print(f"Processed: {result['processed_text'][0]}")
Autologging
MLflow provides autologging capabilities for popular ML frameworks, automatically capturing parameters, metrics, and models without requiring explicit logging calls:
import mlflow
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
# Enable autologging for scikit-learn
mlflow.sklearn.autolog(log_models=True,
log_input_examples=True,
log_model_signatures=True,
log_post_training_metrics=True)
# Load California housing dataset
from sklearn.datasets import fetch_california_housing
housing = fetch_california_housing()
X = pd.DataFrame(housing.data, columns=housing.feature_names)
y = housing.target
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Create experiment
mlflow.set_experiment("california_housing_autolog")
# Start run
with mlflow.start_run(run_name="rf_pipeline_autolog"):
# Create pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('regressor', RandomForestRegressor(n_estimators=100, random_state=42))
])
# Train model (mlflow will automatically log parameters, metrics, and model)
pipeline.fit(X_train, y_train)
# Make predictions
y_pred = pipeline.predict(X_test)
# Calculate additional metrics (these will be automatically logged)
mae = mean_absolute_error(y_test, y_pred)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
r2 = r2_score(y_test, y_pred)
# Log feature importances manually
feature_importances = pipeline.named_steps['regressor'].feature_importances_
for i, col in enumerate(X.columns):
mlflow.log_metric(f"feature_importance_{col}", feature_importances[i])
print(f"Training complete - metrics automatically logged")
print(f"MAE: {mae:.4f}")
print(f"RMSE: {rmse:.4f}")
print(f"RΒ²: {r2:.4f}")
MLflow with PyTorch
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
# Enable PyTorch autologging
mlflow.pytorch.autolog()
# Set experiment
mlflow.set_experiment("pytorch_digits_classification")
# Load digits dataset
digits = load_digits()
X, y = digits.data, digits.target
# Preprocess data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Split data
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)
# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.LongTensor(y_train)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.LongTensor(y_test)
# Create datasets and dataloaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
# Define neural network
class DigitsClassifier(nn.Module):
def __init__(self, input_size=64, hidden_size=128, num_classes=10, dropout_rate=0.2):
super(DigitsClassifier, self).__init__()
self.model = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.Linear(hidden_size // 2, num_classes)
)
def forward(self, x):
return self.model(x)
# Training function
def train_model(model, train_loader, test_loader, criterion, optimizer, epochs=10, device="cpu"):
# Move model to device
model.to(device)
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []
for epoch in range(epochs):
# Training phase
model.train()
running_loss = 0.0
correct = 0
total = 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
# Zero the parameter gradients
optimizer.zero_grad()
# Forward pass
outputs = model(inputs)
loss = criterion(outputs, labels)
# Backward pass and optimize
loss.backward()
optimizer.step()
# Statistics
running_loss += loss.item() * inputs.size(0)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
epoch_loss = running_loss / len(train_loader.dataset)
epoch_acc = correct / total
train_losses.append(epoch_loss)
train_accuracies.append(epoch_acc)
# Evaluation phase
model.eval()
running_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
# Forward pass
outputs = model(inputs)
loss = criterion(outputs, labels)
# Statistics
running_loss += loss.item() * inputs.size(0)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
epoch_loss = running_loss / len(test_loader.dataset)
epoch_acc = correct / total
test_losses.append(epoch_loss)
test_accuracies.append(epoch_acc)
# Log metrics for each epoch
mlflow.log_metric("train_loss", train_losses[-1], step=epoch)
mlflow.log_metric("train_accuracy", train_accuracies[-1], step=epoch)
mlflow.log_metric("test_loss", test_losses[-1], step=epoch)
mlflow.log_metric("test_accuracy", test_accuracies[-1], step=epoch)
print(f"Epoch {epoch+1}/{epochs} - "
f"Train Loss: {train_losses[-1]:.4f}, Train Acc: {train_accuracies[-1]:.4f}, "
f"Test Loss: {test_losses[-1]:.4f}, Test Acc: {test_accuracies[-1]:.4f}")
# Create and log plots
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(range(1, epochs+1), train_losses, label='Train')
plt.plot(range(1, epochs+1), test_losses, label='Test')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Loss Curves')
plt.subplot(1, 2, 2)
plt.plot(range(1, epochs+1), train_accuracies, label='Train')
plt.plot(range(1, epochs+1), test_accuracies, label='Test')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Accuracy Curves')
plt.tight_layout()
plt.savefig("training_curves.png")
mlflow.log_artifact("training_curves.png")
return model
# Main training loop with MLflow
with mlflow.start_run(run_name="pytorch_digits_nn"):
# Set hyperparameters
input_size = 64 # Number of features in digits dataset
hidden_size = 128
num_classes = 10 # Digits 0-9
dropout_rate = 0.2
learning_rate = 0.001
epochs = 15
# Log hyperparameters
mlflow.log_param("input_size", input_size)
mlflow.log_param("hidden_size", hidden_size)
mlflow.log_param("num_classes", num_classes)
mlflow.log_param("dropout_rate", dropout_rate)
mlflow.log_param("learning_rate", learning_rate)
mlflow.log_param("batch_size", batch_size)
mlflow.log_param("epochs", epochs)
# Define model, loss function, and optimizer
model = DigitsClassifier(input_size, hidden_size, num_classes, dropout_rate)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# Train model
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
mlflow.log_param("device", device.type)
model = train_model(model, train_loader, test_loader, criterion, optimizer, epochs, device)
# Final evaluation
model.eval()
with torch.no_grad():
all_preds = []
all_labels = []
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
# Log confusion matrix
from sklearn.metrics import confusion_matrix, classification_report
cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(8, 6))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(num_classes)
plt.xticks(tick_marks, range(num_classes))
plt.yticks(tick_marks, range(num_classes))
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
# Add text annotations to confusion matrix
thresh = cm.max() / 2
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
plt.text(j, i, format(cm[i, j], 'd'),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.savefig("confusion_matrix.png")
mlflow.log_artifact("confusion_matrix.png")
# Log classification report
report = classification_report(all_labels, all_preds, output_dict=True)
for digit in range(10):
mlflow.log_metric(f"precision_digit_{digit}", report[str(digit)]['precision'])
mlflow.log_metric(f"recall_digit_{digit}", report[str(digit)]['recall'])
mlflow.log_metric(f"f1_score_digit_{digit}", report[str(digit)]['f1-score'])
# Log example predictions
fig, axes = plt.subplots(3, 5, figsize=(15, 9))
axes = axes.flatten()
for i, ax in enumerate(axes):
if i < len(X_test):
# Get a random test example
idx = np.random.randint(0, len(X_test))
image = X_test[idx].reshape(8, 8)
# Get prediction
with torch.no_grad():
input_tensor = torch.FloatTensor(X_test[idx]).to(device)
output = model(input_tensor)
_, prediction = torch.max(output, 0)
# Display image and prediction
ax.imshow(image, cmap='gray')
ax.set_title(f"True: {y_test[idx]}, Pred: {prediction.item()}")
ax.axis('off')
plt.tight_layout()
plt.savefig("example_predictions.png")
mlflow.log_artifact("example_predictions.png")
# Log PyTorch model
mlflow.pytorch.log_model(model, "pytorch_model")
print(f"Training complete. Model saved in run {mlflow.active_run().info.run_id}")
MLflow Deployment Scenarios
MLflow supports various deployment scenarios, from simple local testing to production systems.
Local Deployment
# Start MLflow tracking server locally
mlflow server \
--backend-store-uri sqlite:///mlflow.db \
--default-artifact-root ./mlflow-artifacts \
--host 0.0.0.0 \
--port 5000
# Serve a model locally
mlflow models serve -m "models:/my_model/production" -p 5001
# Batch inference using MLflow CLI
mlflow models predict \
-m "models:/my_model/production" \
-i input_data.csv \
-o predictions.csv
Docker-based Deployment
# Build a Docker image for the model
mlflow models build-docker \
-m "models:/my_model/production" \
-n "my-model-image" \
--enable-mlserver
# Run the Docker image
docker run -p 5001:8080 my-model-image
# Use the model REST API
curl -X POST -H "Content-Type:application/json" \
--data '{"dataframe_split": {"columns":["feature1", "feature2"], "data":[[1.0, 2.0]]}}' \
http://localhost:5001/invocations
Kubernetes Deployment
# mlflow-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mlflow-server
labels:
app: mlflow
spec:
replicas: 1
selector:
matchLabels:
app: mlflow
template:
metadata:
labels:
app: mlflow
spec:
containers:
- name: mlflow
image: my-model-image:latest
ports:
- containerPort: 8080
env:
- name: GUNICORN_CMD_ARGS
value: "--timeout 60 --workers 2"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: mlflow-service
spec:
selector:
app: mlflow
ports:
- port: 80
targetPort: 8080
type: LoadBalancer
Best Practices
Organizing Experiments
# Hierarchical experiment organization
project_name = "fraud_detection"
dataset_version = "v2"
model_type = "gradient_boosting"
# Create a structured experiment name
experiment_name = f"{project_name}/{dataset_version}/{model_type}"
mlflow.set_experiment(experiment_name)
# Or use tags for better organization
with mlflow.start_run(run_name="hyperparameter_tuning_run"):
mlflow.set_tags({
"project": project_name,
"dataset_version": dataset_version,
"model_type": model_type,
"stage": "development",
"engineer": "jane_doe",
"priority": "high"
})
# Rest of your experiment code...
Production-Ready MLflow Setup
# Database setup (PostgreSQL)
export MLFLOW_TRACKING_URI=postgresql://mlflow_user:mlflow_password@localhost/mlflow_db
# Object storage for artifacts (S3)
export MLFLOW_ARTIFACT_ROOT=s3://mlflow-artifacts/
# Set default experiment
export MLFLOW_EXPERIMENT_NAME=my_project/production
# Start tracking server with proper authentication
mlflow server \
--backend-store-uri ${MLFLOW_TRACKING_URI} \
--default-artifact-root ${MLFLOW_ARTIFACT_ROOT} \
--host 0.0.0.0 \
--port 5000 \
--workers 4 \
--gunicorn-opts "--timeout 120 --log-level info" \
--expose-prometheus
# Example systemd service file (mlflow.service)
# [Unit]
# Description=MLflow Tracking Server
# After=network.target postgresql.service
#
# [Service]
# User=mlflow
# Environment="MLFLOW_TRACKING_URI=postgresql://mlflow_user:mlflow_password@localhost/mlflow_db"
# Environment="MLFLOW_ARTIFACT_ROOT=s3://mlflow-artifacts/"
# ExecStart=/usr/local/bin/mlflow server --backend-store-uri ${MLFLOW_TRACKING_URI} --default-artifact-root ${MLFLOW_ARTIFACT_ROOT} --host 0.0.0.0 --port 5000
# Restart=on-failure
#
# [Install]
# WantedBy=multi-user.target
CI/CD for MLflow Models
# .github/workflows/mlflow-pipeline.yml
name: MLflow Model Pipeline
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
schedule:
- cron: '0 0 * * 0' # Weekly retraining
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
MODEL_NAME: fraud_detection_model
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
run: |
pytest tests/ --cov=src/
train:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Train model
run: |
python src/train.py --experiment_name "fraud_detection/prod" --run_name "github_action_${GITHUB_SHA}"
- name: Save run ID
id: save_run_id
run: |
RUN_ID=$(cat run_id.txt)
echo "::set-output name=run_id::$RUN_ID"
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Evaluate model
run: |
python src/evaluate.py --run_id ${{ needs.train.outputs.run_id }} --test_data data/test.csv
- name: Check performance
id: check_performance
run: |
PASSED=$(cat eval_results.json | jq -r '.passed')
echo "::set-output name=passed::$PASSED"
register:
needs: [train, evaluate]
if: ${{ needs.evaluate.outputs.passed == 'true' }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Register model
run: |
python src/register_model.py --run_id ${{ needs.train.outputs.run_id }} --model_name $MODEL_NAME
- name: Promote to staging
run: |
python src/promote_model.py --model_name $MODEL_NAME --stage "Staging"
deploy:
needs: register
runs-on: ubuntu-latest
environment: staging
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt awscli
- name: Create SageMaker endpoint
run: |
python src/deploy_sagemaker.py --model_name $MODEL_NAME --stage "Staging" --endpoint_name "fraud-detection-staging"
- name: Run integration tests
run: |
python tests/integration/test_endpoint.py --endpoint_name "fraud-detection-staging"
promote_to_production:
needs: deploy
runs-on: ubuntu-latest
environment: production
# Manual approval step through GitHub environments
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Promote to production
run: |
python src/promote_model.py --model_name $MODEL_NAME --stage "Production"
- name: Update production endpoint
run: |
python src/deploy_sagemaker.py --model_name $MODEL_NAME --stage "Production" --endpoint_name "fraud-detection-prod"
Integration with Other Tools
MLflow integrates well with many other tools in the ML ecosystem:
MLflow with Databricks
# In a Databricks notebook, MLflow is already configured
# Example using Databricks managed MLflow
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
# MLflow experiment is automatically set to the notebook's path
# You can optionally set it manually
mlflow.set_experiment("/Users/username@company.com/mlflow-examples/sklearn-iris")
# Load data
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Define parameters
params = {
"n_estimators": 100,
"max_depth": 6,
"min_samples_split": 2
}
# Train and log with MLflow
with mlflow.start_run() as run:
# Log parameters
mlflow.log_params(params)
# Train model
rf = RandomForestClassifier(**params)
rf.fit(X_train, y_train)
# Evaluate
y_pred = rf.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
# Log metrics
mlflow.log_metric("accuracy", accuracy)
# Log model
mlflow.sklearn.log_model(rf, "model")
# Register model in Databricks Model Registry
model_uri = f"runs:/{run.info.run_id}/model"
model_details = mlflow.register_model(model_uri, "iris_classifier")
# The model is now in the Databricks Model Registry and can be moved
# through stages (Staging, Production) via the Databricks UI or APIs
MLflow with Kubernetes and Kubeflow
# Example Kubeflow Pipeline using MLflow
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: mlflow-pipeline-
spec:
entrypoint: mlflow-pipeline
templates:
- name: mlflow-pipeline
dag:
tasks:
- name: prepare-data
template: prepare-data
- name: train-model
dependencies: [prepare-data]
template: train-model
- name: evaluate-model
dependencies: [train-model]
template: evaluate-model
- name: register-model
dependencies: [evaluate-model]
template: register-model
- name: deploy-model
dependencies: [register-model]
template: deploy-model
- name: prepare-data
container:
image: mlflow-pipeline:latest
command: ["python", "/app/prepare_data.py"]
env:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow-service.kubeflow:5000"
- name: MLFLOW_EXPERIMENT_NAME
value: "kubeflow-pipeline-demo"
volumeMounts:
- name: data-volume
mountPath: /data
- name: train-model
container:
image: mlflow-pipeline:latest
command: ["python", "/app/train_model.py"]
env:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow-service.kubeflow:5000"
- name: MLFLOW_EXPERIMENT_NAME
value: "kubeflow-pipeline-demo"
volumeMounts:
- name: data-volume
mountPath: /data
- name: evaluate-model
container:
image: mlflow-pipeline:latest
command: ["python", "/app/evaluate_model.py"]
env:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow-service.kubeflow:5000"
volumeMounts:
- name: data-volume
mountPath: /data
- name: register-model
container:
image: mlflow-pipeline:latest
command: ["python", "/app/register_model.py"]
env:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow-service.kubeflow:5000"
- name: MODEL_NAME
value: "kubeflow-demo-model"
- name: deploy-model
container:
image: mlflow-pipeline:latest
command: ["python", "/app/deploy_model.py"]
env:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow-service.kubeflow:5000"
- name: MODEL_NAME
value: "kubeflow-demo-model"
- name: DEPLOYMENT_NAMESPACE
value: "serving"
volumes:
- name: data-volume
persistentVolumeClaim:
claimName: pipeline-data-pvc
MLflow with Apache Airflow
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import mlflow
import json
import requests
# Default arguments
default_args = {
'owner': 'data_scientist',
'depends_on_past': False,
'start_date': datetime(2025, 4, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Define MLflow tracking URI
MLFLOW_TRACKING_URI = "http://mlflow-server:5000"
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
# Define functions
def prepare_data(**context):
# Set experiment
mlflow.set_experiment("airflow_pipeline")
# Start run and get run ID
with mlflow.start_run() as run:
run_id = run.info.run_id
# Log dataset info
mlflow.log_param("data_source", "s3://data/customer_churn.csv")
mlflow.log_param("data_version", "1.2.0")
# Push run_id to XCom for downstream tasks
context['ti'].xcom_push(key='run_id', value=run_id)
return run_id
def train_model(**context):
# Get run_id from upstream task
run_id = context['ti'].xcom_pull(task_ids='prepare_data', key='run_id')
# Resume the run
with mlflow.start_run(run_id=run_id) as run:
# Log training parameters
mlflow.log_params({
"algorithm": "RandomForest",
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5
})
# Simulate training process
import time
import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# Load data
data = load_breast_cancer()
X, y = data.data, data.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10, min_samples_split=5, random_state=42)
model.fit(X_train, y_train)
# Log metrics
for i in range(10):
mlflow.log_metric("training_loss", 0.9 - i * 0.05, step=i)
mlflow.log_metric("training_accuracy", 0.7 + i * 0.02, step=i)
time.sleep(1)
# Log model
mlflow.sklearn.log_model(model, "model")
# Log feature importances
feature_importance = model.feature_importances_
for i, importance in enumerate(feature_importance):
mlflow.log_metric(f"feature_importance_{i}", importance)
return run_id
def evaluate_model(**context):
# Get run_id from upstream task
run_id = context['ti'].xcom_pull(task_ids='train_model', key='run_id')
# Resume the run
with mlflow.start_run(run_id=run_id) as run:
# Load model
model_uri = f"runs:/{run_id}/model"
model = mlflow.sklearn.load_model(model_uri)
# Evaluate on test data
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
# Load data
data = load_breast_cancer()
X, y = data.data, data.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Make predictions
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
# Calculate metrics
metrics = {
"test_accuracy": accuracy_score(y_test, y_pred),
"test_precision": precision_score(y_test, y_pred),
"test_recall": recall_score(y_test, y_pred),
"test_f1": f1_score(y_test, y_pred),
"test_roc_auc": roc_auc_score(y_test, y_prob)
}
# Log metrics
mlflow.log_metrics(metrics)
# Determine if model passes quality threshold
passed_threshold = metrics["test_accuracy"] > 0.9
mlflow.log_param("passed_evaluation", passed_threshold)
# Push evaluation result to XCom
context['ti'].xcom_push(key='evaluation_passed', value=passed_threshold)
context['ti'].xcom_push(key='model_metrics', value=metrics)
return run_id, passed_threshold
def register_model(**context):
# Get run_id and evaluation result from upstream tasks
run_id = context['ti'].xcom_pull(task_ids='evaluate_model', key='run_id')
passed = context['ti'].xcom_pull(task_ids='evaluate_model', key='evaluation_passed')
if not passed:
print("Model did not pass evaluation. Skipping registration.")
return None
# Register model
model_uri = f"runs:/{run_id}/model"
model_name = "breast_cancer_classifier"
# Register the model
result = mlflow.register_model(model_uri, model_name)
# Log registration info
with mlflow.start_run(run_id=run_id) as run:
mlflow.log_param("registered_model_name", model_name)
mlflow.log_param("registered_model_version", result.version)
# Push model details to XCom
context['ti'].xcom_push(key='model_name', value=model_name)
context['ti'].xcom_push(key='model_version', value=result.version)
return result.version
def deploy_model(**context):
# Get model details from upstream task
model_name = context['ti'].xcom_pull(task_ids='register_model', key='model_name')
model_version = context['ti'].xcom_pull(task_ids='register_model', key='model_version')
if not model_name or not model_version:
print("No model to deploy. Skipping deployment.")
return None
# Transition model to staging
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name=model_name,
version=model_version,
stage="Staging"
)
# Log deployment details
run_id = context['ti'].xcom_pull(task_ids='evaluate_model', key='run_id')
with mlflow.start_run(run_id=run_id) as run:
mlflow.log_param("deployment_stage", "Staging")
mlflow.log_param("deployment_timestamp", datetime.now().isoformat())
# For this example, we'll just log that deployment was successful
print(f"Model {model_name} version {model_version} deployed to Staging")
return f"{model_name}/{model_version}"
# Create DAG
dag = DAG(
'mlflow_end_to_end_pipeline',
default_args=default_args,
description='End-to-end ML pipeline with MLflow integration',
schedule_interval=timedelta(days=1),
catchup=False
)
# Define tasks
prepare_task = PythonOperator(
task_id='prepare_data',
python_callable=prepare_data,
provide_context=True,
dag=dag
)
train_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
provide_context=True,
dag=dag
)
evaluate_task = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model,
provide_context=True,
dag=dag
)
register_task = PythonOperator(
task_id='register_model',
python_callable=register_model,
provide_context=True,
dag=dag
)
deploy_task = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
provide_context=True,
dag=dag
)
# Define task dependencies
prepare_task >> train_task >> evaluate_task >> register_task >> deploy_task
MLflow with AWS SageMaker
import mlflow.sagemaker as mfs
import boto3
import mlflow
from mlflow.tracking import MlflowClient
# Set MLflow tracking URI
mlflow.set_tracking_uri("http://mlflow-server:5000")
# Define model and deployment parameters
model_name = "fraud_detection_model"
model_uri = f"models:/{model_name}/Production"
region = "us-west-2"
app_name = "fraud-detection-prod"
image_url = "123456789012.dkr.ecr.us-west-2.amazonaws.com/mlflow-sagemaker:latest"
model_s3_path = f"s3://mlflow-models/{model_name}"
execution_role_arn = "arn:aws:iam::123456789012:role/SageMakerExecutionRole"
bucket = "mlflow-models"
instance_type = "ml.m5.xlarge"
instance_count = 1
# AWS client setup
sagemaker_client = boto3.client('sagemaker', region_name=region)
s3_client = boto3.client('s3', region_name=region)
# Build the SageMaker image if it doesn't exist
try:
image_exists = False
for image in sagemaker_client.list_images()['Images']:
if image['ImageName'] == 'mlflow-sagemaker':
image_exists = True
break
if not image_exists:
print("Building SageMaker container image...")
# This step typically happens in a CI/CD pipeline
# mfs.build_and_push_container(
# image_name="mlflow-sagemaker",
# region=region
# )
print("Using existing image:", image_url)
except Exception as e:
print(f"Error checking/building image: {e}")
# Deploy the model to SageMaker
try:
# Load the model from MLflow
print(f"Loading model from {model_uri}")
client = MlflowClient()
model_version = client.get_latest_versions(model_name, stages=["Production"])[0]
run_id = model_version.run_id
# Deploy to SageMaker
print(f"Deploying model {model_name} (run_id={run_id}) to SageMaker endpoint {app_name}...")
# Option 1: Using MLflow's built-in SageMaker deployment
mfs.deploy(
mode="create",
app_name=app_name,
model_uri=model_uri,
image_url=image_url,
execution_role_arn=execution_role_arn,
instance_type=instance_type,
instance_count=instance_count,
region_name=region,
vpc_config=None, # Add if needed
synchronous=True # Wait for deployment to complete
)
# Option 2: Alternative deployment using AWS SDK
"""
# First export model to S3
mlflow.sagemaker.push_model_to_s3(
model_uri=model_uri,
bucket=bucket,
prefix=f"{model_name}/{run_id}"
)
# Create model in SageMaker
create_model_response = sagemaker_client.create_model(
ModelName=f"{app_name}-model",
PrimaryContainer={
'Image': image_url,
'ModelDataUrl': f"s3://{bucket}/{model_name}/{run_id}/model.tar.gz",
'Environment': {
'MLFLOW_MODEL_NAME': model_name
}
},
ExecutionRoleArn=execution_role_arn
)
# Create endpoint configuration
endpoint_config_response = sagemaker_client.create_endpoint_config(
EndpointConfigName=f"{app_name}-config",
ProductionVariants=[
{
'VariantName': 'production',
'ModelName': f"{app_name}-model",
'InitialInstanceCount': instance_count,
'InstanceType': instance_type
}
]
)
# Create or update endpoint
try:
# Try to update existing endpoint
sagemaker_client.update_endpoint(
EndpointName=app_name,
EndpointConfigName=f"{app_name}-config"
)
print(f"Updating existing endpoint {app_name}")
except sagemaker_client.exceptions.ClientError:
# Create new endpoint if it doesn't exist
sagemaker_client.create_endpoint(
EndpointName=app_name,
EndpointConfigName=f"{app_name}-config"
)
print(f"Creating new endpoint {app_name}")
"""
print(f"Model successfully deployed to endpoint {app_name}")
# Log deployment information in MLflow
with mlflow.start_run(run_id=run_id):
mlflow.log_param("sagemaker_endpoint", app_name)
mlflow.log_param("deployment_timestamp", datetime.now().isoformat())
mlflow.log_param("instance_type", instance_type)
mlflow.log_param("instance_count", instance_count)
except Exception as e:
print(f"Error deploying model: {e}")
raise
# Example prediction using deployed endpoint
def predict(data):
"""Make a prediction using deployed SageMaker endpoint"""
import boto3
import json
import numpy as np
# Initialize SageMaker runtime client
sagemaker_runtime = boto3.client('sagemaker-runtime', region_name=region)
# Prepare data for prediction
payload = json.dumps({
"instances": data if isinstance(data, list) else [data]
})
# Invoke endpoint
response = sagemaker_runtime.invoke_endpoint(
EndpointName=app_name,
ContentType='application/json',
Body=payload
)
# Parse response
result = json.loads(response['Body'].read().decode())
return result
# Example usage
example_data = [
[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
]
print(f"Example prediction: {predict(example_data)}")
Conclusion
MLflow provides a comprehensive platform for managing the entire machine learning lifecycle, from experimentation to production deployment. Its modular design makes it adaptable to various workflows and environments, while its framework-agnostic approach allows it to integrate with a wide range of ML libraries and tools.
Key takeaways from this guide:
-
Experiment Tracking: MLflow provides a centralized way to record parameters, metrics, and artifacts, making it easier to compare experiments and reproduce results.
-
Model Packaging: MLflow Projects and Models provide a standardized way to package ML code and models, ensuring reproducibility and portability.
-
Model Registry: The Model Registry enables versioning, staging, and lifecycle management of models, facilitating collaboration and controlled deployment.
-
Production Deployment: MLflow offers various deployment options, from simple local serving to integration with production platforms like Kubernetes and SageMaker.
-
Integration: MLflow integrates with popular ML frameworks, orchestration tools, and cloud platforms, making it a versatile tool for any ML workflow.
As you implement MLflow in your organization, start with experiment tracking and gradually adopt more components as your ML workflow matures. This incremental approach ensures a smoother transition and allows your team to realize value at each step.