How to Build CI/CD for ML with GitHub Actions DVC MLflow
Practical tutorial: CI/CD for ML: GitHub Actions + DVC + MLflow
How to Build CI/CD for ML with GitHub Actions DVC MLflow
Table of Contents
- How to Build CI/CD for ML with GitHub Actions DVC MLflow
- mlflow_server.py
- Configure tracking URI - use a persistent database backend
- Create experiment if it doesn't exist
- pipeline/train.py
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Building production machine learning pipelines requires more than just training accurate models. You need reproducible experiments, automated testing, and reliable deployment. This tutorial walks through implementing a complete CI/CD pipeline for ML using GitHub Actions for automation, DVC for data versioning, and MLflow for experiment tracking and model registry.
Why CI/CD Matters for Machine Learning
Traditional software CI/CD pipelines handle code changes, but ML pipelines introduce additional complexity: data versioning, model artifacts, experiment tracking, and environment reproducibility. According to the paper "Intent-Aware Authorization for Zero Trust CI/CD" (Source: ArXiv), modern CI/CD systems must handle dynamic authorization contexts where pipeline components interact across trust boundaries. This is especially relevant for ML pipelines where data scientists, engineers, and production systems all need controlled access to models and data.
A production ML CI/CD pipeline must address:
- Data versioning: Track which dataset version produced which model
- Experiment reproducibility: Recreate any past experiment exactly
- Model governance: Control which models move to production
- Automated testing: Validate data quality, model performance, and deployment readiness
Real-World Architecture Overview
The architecture we'll build connects three core components:
- GitHub Actions orchestrates the pipeline, triggering on code pushes, data updates, or scheduled retraining
- DVC manages data and model versioning, storing artifacts in cloud storag [1]e (S3/GCS/Azure)
- MLflow tracks experiments, metrics, and manages the model registry
The pipeline flow:
Code Push → GitHub Actions → DVC Pull Data → Train Model → MLflow Log → DVC Push Model → Deploy
Prerequisites and Environment Setup
Before diving into implementation, ensure you have:
- Python 3.9+ installed
- A GitHub account with repository access
- AWS S3 bucket (or equivalent cloud storage) for DVC remote storage
- MLflow tracking server (can use local or Databricks Community Edition)
Install required packages:
pip install dvc dvc-s3 mlflow scikit-learn pandas numpy pytest
Configure DVC with your remote storage:
dvc init
dvc remote add -d myremote s3://your-bucket/dvc-store
dvc remote modify myremote region us-east-1
Setting Up the MLflow Tracking Server
MLflow provides experiment tracking, model registry, and deployment capabilities. For production, you'll want a persistent tracking server. Here's how to set one up:
# mlflow_server.py
import mlflow
from mlflow.tracking import MlflowClient
# Configure tracking URI - use a persistent database backend
tracking_uri = "postgresql://user:pass@localhost/mlflow"
mlflow.set_tracking_uri(tracking_uri)
# Create experiment if it doesn't exist
experiment_name = "ci-cd-ml-pipeline"
try:
experiment_id = mlflow.create_experiment(
name=experiment_name,
artifact_location="s3://your-bucket/mlflow-artifacts"
)
except mlflow.exceptions.MlflowException:
experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
print(f"MLflow experiment '{experiment_name}' ready with ID: {experiment_id}")
The paper "Establishing Workload Identity for Zero Trust CI/CD: From Secrets to SPIFFE-Based Authentication" (Source: ArXiv) highlights the importance of secure credential management in CI/CD pipelines. For MLflow, use environment variables or a secrets manager rather than hardcoding credentials:
export MLFLOW_TRACKING_URI="postgresql://mlflow_user:${MLFLOW_DB_PASSWORD}@mlflow-db.internal/mlflow"
export AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY}"
export AWS_SECRET_ACCESS_KEY="${AWS_SECRET_KEY}"
Core Implementation: The Training Pipeline
Now let's build the actual ML pipeline that our CI/CD system will execute. This example uses a classification model with scikit-learn, but the pattern applies to any ML framework.
# pipeline/train.py
import os
import sys
import json
import pickle
import pandas as pd
import numpy as np
from datetime import datetime
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import StandardScaler
import mlflow
import mlflow.sklearn
import dvc.api
def load_data_with_dvc():
"""
Load data using DVC to ensure we're using the correct version.
DVC handles pulling the right data version from remote storage.
"""
# DVC tracks data files; this ensures we have the latest version
os.system("dvc pull data/raw/dataset.csv.dvc")
# Read the data file tracked by DVC
data_path = "data/raw/dataset.csv"
df = pd.read_csv(data_path)
return df
def preprocess_data(df, target_column="target"):
"""
Preprocess data with proper train/test split and scaling.
Handles edge cases like missing values and data drift.
"""
# Check for data quality issues
if df.isnull().sum().sum() > 0:
print(f"Warning: Found {df.isnull().sum().sum()} missing values")
df = df.dropna()
# Separate features and target
X = df.drop(columns=[target_column])
y = df[target_column]
# Handle categorical features
categorical_cols = X.select_dtypes(include=['object']).columns
if len(categorical_cols) > 0:
X = pd.get_dummies(X, columns=categorical_cols)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
return X_train_scaled, X_test_scaled, y_train, y_test, scaler
def train_model(X_train, y_train, params=None):
"""
Train a RandomForest classifier with configurable hyperparameters.
Uses cross-validation for robust performance estimation.
"""
if params is None:
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5,
"min_samples_leaf": 2,
"random_state": 42
}
model = RandomForestClassifier(**params)
# Cross-validation score
cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring='accuracy')
print(f"Cross-validation accuracy: {cv_scores.mean():.4f} ± {cv_scores.std():.4f}")
model.fit(X_train, y_train)
return model, cv_scores
def evaluate_model(model, X_test, y_test):
"""
Comprehensive model evaluation with multiple metrics.
Returns metrics dictionary for MLflow logging.
"""
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred, average='weighted'),
"recall": recall_score(y_test, y_pred, average='weighted'),
"f1_score": f1_score(y_test, y_pred, average='weighted')
}
return metrics
def main():
"""
Main training pipeline orchestrated by MLflow.
"""
# Start MLflow run
mlflow.set_experiment("ci-cd-ml-pipeline")
with mlflow.start_run() as run:
run_id = run.info.run_id
print(f"MLflow Run ID: {run_id}")
# Log parameters
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5,
"min_samples_leaf": 2
}
mlflow.log_params(params)
# Load and preprocess data
print("Loading data with DVC..")
df = load_data_with_dvc()
mlflow.log_param("data_shape", df.shape)
print("Preprocessing data..")
X_train, X_test, y_train, y_test, scaler = preprocess_data(df)
# Train model
print("Training model..")
model, cv_scores = train_model(X_train, y_train, params)
mlflow.log_metric("cv_accuracy_mean", cv_scores.mean())
mlflow.log_metric("cv_accuracy_std", cv_scores.std())
# Evaluate model
print("Evaluating model..")
metrics = evaluate_model(model, X_test, y_test)
mlflow.log_metrics(metrics)
# Log artifacts
mlflow.log_artifact("data/raw/dataset.csv", artifact_path="data")
# Save and log the model
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name="random_forest_classifier"
)
# Save scaler for inference pipeline
with open("scaler.pkl", "wb") as f:
pickle.dump(scaler, f)
mlflow.log_artifact("scaler.pkl")
# Save metrics as JSON for downstream CI/CD steps
with open("metrics.json", "w") as f:
json.dump(metrics, f)
print(f"Training complete. Metrics: {json.dumps(metrics, indent=2)}")
return run_id
if __name__ == "__main__":
main()
GitHub Actions Workflow Configuration
The GitHub Actions workflow orchestrates the entire CI/CD pipeline. It handles data versioning, model training, testing, and deployment decisions.
# .github/workflows/ml-pipeline.yml
name: ML CI/CD Pipeline
on:
push:
branches: [main, develop]
paths:
- 'pipeline/**'
- 'data/**'
- 'requirements.txt'
- '.github/workflows/**'
pull_request:
branches: [main]
schedule:
# Weekly retraining on Sundays at midnight
- cron: '0 0 * * 0'
workflow_dispatch: # Manual trigger
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
DVC_REMOTE: s3://your-bucket/dvc-store
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
lfs: true
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
cache: 'pip'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install dvc dvc-s3
- name: Configure DVC
run: |
dvc remote default myremote
dvc remote modify myremote access_key_id $AWS_ACCESS_KEY_ID
dvc remote modify myremote secret_access_key $AWS_SECRET_ACCESS_KEY
- name: Pull data from DVC
run: dvc pull
- name: Run data validation
run: |
python -m pytest tests/test_data_quality.py -v
- name: Run unit tests
run: |
python -m pytest tests/ -v --cov=pipeline --cov-report=xml
- name: Upload test coverage
uses: codecov/codecov-action@v3
train:
needs: validate
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
lfs: true
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install dvc dvc-s3
- name: Configure DVC and MLflow
run: |
dvc remote default myremote
dvc remote modify myremote access_key_id $AWS_ACCESS_KEY_ID
dvc remote modify myremote secret_access_key $AWS_SECRET_ACCESS_KEY
echo "MLFLOW_TRACKING_URI=$MLFLOW_TRACKING_URI" >> $GITHUB_ENV
- name: Pull latest data
run: dvc pull
- name: Train model
run: python pipeline/train.py
- name: Push model artifacts to DVC
run: |
dvc add models/
dvc push
- name: Upload MLflow run ID
run: |
echo "RUN_ID=$(mlflow runs list --experiment-name ci-cd-ml-pipeline --max-results 1 --format json | python -c 'import sys,json; print(json.load(sys.stdin)[0]["run_id"])')" >> $GITHUB_ENV
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install mlflow scikit-learn pandas numpy
- name: Evaluate model against baseline
run: |
python pipeline/evaluate_vs_baseline.py
- name: Check model performance thresholds
run: |
python pipeline/check_thresholds.py
deploy:
needs: evaluate
if: github.ref == 'refs/heads/main' && success()
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install mlflow boto3
- name: Promote model to production
run: |
python pipeline/promote_to_production.py
- name: Deploy model endpoint
run: |
python pipeline/deploy_model.py
Data Quality Testing
Data quality is critical for ML pipelines. Here's a comprehensive test suite:
# tests/test_data_quality.py
import pandas as pd
import numpy as np
import pytest
from scipy import stats
def test_data_shape():
"""Verify data has expected dimensions."""
df = pd.read_csv("data/raw/dataset.csv")
assert df.shape[0] >= 1000, f"Dataset too small: {df.shape[0]} rows"
assert df.shape[1] >= 5, f"Too few features: {df.shape[1]} columns"
def test_missing_values():
"""Check for excessive missing values."""
df = pd.read_csv("data/raw/dataset.csv")
missing_pct = df.isnull().sum().max() / len(df) * 100
assert missing_pct < 10, f"Too many missing values: {missing_pct:.2f}%"
def test_target_distribution():
"""Verify target variable has reasonable class balance."""
df = pd.read_csv("data/raw/dataset.csv")
target = df["target"]
class_counts = target.value_counts(normalize=True)
min_class_pct = class_counts.min()
assert min_class_pct > 0.1, f"Class imbalance: smallest class is {min_class_pct:.2%}"
def test_data_drift():
"""Statistical test for data drift compared to reference."""
df_current = pd.read_csv("data/raw/dataset.csv")
try:
df_reference = pd.read_csv("data/reference/dataset.csv")
except FileNotFoundError:
pytest.skip("Reference dataset not available")
# Kolmogorov-Smirnov test for each numeric column
numeric_cols = df_current.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if col in df_reference.columns:
statistic, p_value = stats.ks_2samp(
df_current[col].dropna(),
df_reference[col].dropna()
)
assert p_value > 0.05, f"Data drift detected in column {col} (p={p_value:.4f})"
def test_feature_correlations():
"""Check for unexpected feature correlations."""
df = pd.read_csv("data/raw/dataset.csv")
numeric_df = df.select_dtypes(include=[np.number])
corr_matrix = numeric_df.corr().abs()
upper_tri = corr_matrix.where(
np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
)
high_corr = upper_tri[upper_tri > 0.95].stack()
assert len(high_corr) == 0, f"Highly correlated features found: {high_corr.index.tolist()}"
Model Evaluation and Threshold Checking
Before deploying, we need to ensure the model meets performance thresholds:
# pipeline/check_thresholds.py
import json
import mlflow
from mlflow.tracking import MlflowClient
def load_latest_metrics():
"""Fetch metrics from the latest MLflow run."""
client = MlflowClient()
# Get the latest run from our experiment
experiment = client.get_experiment_by_name("ci-cd-ml-pipeline")
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
order_by=["start_time DESC"],
max_results=1
)
if not runs:
raise ValueError("No runs found in experiment")
latest_run = runs[0]
metrics = {
"accuracy": latest_run.data.metrics.get("accuracy", 0),
"precision": latest_run.data.metrics.get("precision", 0),
"recall": latest_run.data.metrics.get("recall", 0),
"f1_score": latest_run.data.metrics.get("f1_score", 0)
}
return metrics
def check_thresholds(metrics):
"""
Verify model meets minimum performance thresholds.
These thresholds should be defined based on business requirements.
"""
thresholds = {
"accuracy": 0.80,
"precision": 0.75,
"recall": 0.75,
"f1_score": 0.75
}
all_passed = True
for metric_name, value in metrics.items():
threshold = thresholds.get(metric_name)
if threshold and value < threshold:
print(f"FAIL: {metric_name} = {value:.4f} < {threshold}")
all_passed = False
else:
print(f"PASS: {metric_name} = {value:.4f} >= {threshold}")
if not all_passed:
raise ValueError("Model failed performance thresholds")
print("All thresholds passed!")
return True
if __name__ == "__main__":
metrics = load_latest_metrics()
check_thresholds(metrics)
Model Promotion and Deployment
The final step promotes the model to production in MLflow's model registry:
# pipeline/promote_to_production.py
import mlflow
from mlflow.tracking import MlflowClient
def promote_model_to_production():
"""
Promote the latest model version to production stage.
Uses MLflow's model registry for version management.
"""
client = MlflowClient()
model_name = "random_forest_classifier"
# Get all versions of the model
versions = client.search_model_versions(f"name='{model_name}'")
if not versions:
raise ValueError(f"No versions found for model '{model_name}'")
# Find the latest version
latest_version = max(versions, key=lambda v: int(v.version))
# Check if there's already a production version
production_versions = [
v for v in versions
if v.current_stage == "Production"
]
if production_versions:
# Archive the current production version
for prod_version in production_versions:
client.transition_model_version_stage(
name=model_name,
version=prod_version.version,
stage="Archived"
)
print(f"Archived version {prod_version.version}")
# Promote new version to production
client.transition_model_version_stage(
name=model_name,
version=latest_version.version,
stage="Production"
)
print(f"Promoted version {latest_version.version} to Production")
# Add description with run information
client.update_model_version(
name=model_name,
version=latest_version.version,
description=f"Promoted to production on {datetime.now().isoformat()}"
)
if __name__ == "__main__":
promote_model_to_production()
Handling Edge Cases and Production Considerations
1. Data Version Conflicts
When multiple team members push data changes simultaneously, DVC can encounter merge conflicts. Handle this by:
# In your CI/CD pipeline, add conflict resolution
def resolve_dvc_conflicts():
"""Automatically resolve DVC merge conflicts by accepting latest."""
import subprocess
# Check for DVC conflicts
result = subprocess.run(
["dvc", "status"],
capture_output=True,
text=True
)
if "conflict" in result.stdout.lower():
# Accept the version from the current branch
subprocess.run(["dvc", "checkout", "--force"])
print("Resolved DVC conflicts by accepting current branch version")
2. MLflow Tracking Server Reliability
The paper "Decoupling Identity from Access: Credential Broker Patterns for Secure CI/CD" (Source: ArXiv) discusses patterns for managing credentials across pipeline components. For MLflow, implement retry logic:
import time
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def log_to_mlflow_with_retry(metrics):
"""Log metrics to MLflow with retry logic for transient failures."""
with mlflow.start_run() as run:
mlflow.log_metrics(metrics)
return run.info.run_id
3. Model Registry Governance
Implement approval gates for production deployments:
def check_deployment_approval(model_name, version):
"""
Check if model version has required approvals before deployment.
In production, this would integrate with your approval system.
"""
client = MlflowClient()
model_version = client.get_model_version(model_name, version)
# Check for required tags indicating approval
required_tags = ["qa_approved", "security_reviewed"]
for tag in required_tags:
if tag not in model_version.tags:
raise PermissionError(
f"Model version {version} missing required tag: {tag}"
)
print(f"All approvals present for version {version}")
return True
Monitoring and Alerting
Add monitoring to detect issues in production:
# pipeline/monitor.py
import mlflow
import numpy as np
from datetime import datetime, timedelta
def check_model_drift():
"""
Monitor for prediction drift in production.
Compares recent predictions to training distribution.
"""
client = MlflowClient()
# Get production model
model_version = client.get_model_version(
"random_forest_classifier",
"Production"
)
# Load recent predictions (in production, from your serving logs)
recent_predictions = load_recent_predictions() # Implement based on your serving infrastructure
# Statistical test for drift
from scipy import stats
# Compare to training distribution
training_predictions = load_training_predictions()
ks_statistic, p_value = stats.ks_2samp(
recent_predictions,
training_predictions
)
if p_value < 0.05:
alert_message = f"Prediction drift detected: KS={ks_statistic:.4f}, p={p_value:.4f}"
print(f"ALERT: {alert_message}")
# Send alert (email, Slack, PagerDuty)
send_alert(alert_message)
return p_value > 0.05
What's Next
This CI/CD pipeline provides a solid foundation for production ML workflows. To extend it:
- Add A/B testing: Deploy multiple model versions and compare performance
- Implement feature store integration: Use tools like Feast for feature management
- Add automated rollback: If performance degrades, automatically revert to previous model
- Integrate with monitoring tools: Connect to Prometheus/Grafana for real-time metrics
- Expand testing: Add integration tests, load tests, and security scanning
The combination of GitHub Actions, DVC, and MLflow creates a robust, reproducible ML pipeline that handles the unique challenges of machine learning in production. By implementing proper data versioning, experiment tracking, and automated testing, you ensure that your models are reliable, auditable, and ready for production deployment.
Remember that CI/CD for ML is an evolving practice. The patterns described here follow the principles of zero-trust CI/CD as discussed in recent research (Source: ArXiv), emphasizing secure credential management, workload identity, and decoupled access control. As your ML infrastructure grows, these security considerations become increasingly important for maintaining a production-grade pipeline.
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Build a Multi-Modal Search System with Vector Databases
Practical tutorial: It appears to be a general informational piece rather than a deep analysis or major announcement.
How to Build a Multimodal RAG System with Hugging Face
Practical tutorial: Demonstrates an innovative use of existing AI technologies to create a unique application.
How to Build a Privacy-Preserving AI Assistant with Apple's OpenELM
Practical tutorial: The story likely provides user perspectives and expectations for AI assistants like Siri, which is interesting but not g