How to Build CI/CD for ML with GitHub Actions and DVC
Practical tutorial: CI/CD for ML: GitHub Actions + DVC + MLflow
How to Build CI/CD for ML with GitHub Actions and DVC
Table of Contents
- How to Build CI/CD for ML with GitHub Actions and DVC
- Create a clean environment
- Core dependencies
- GitHub CLI for repository setup (optional but helpful)
- brew install gh # macOS
- sudo apt install gh # Ubuntu
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Building machine learning systems is fundamentally different from building traditional software. Your model's performance can degrade silently, data distributions shift without warning, and a "working" commit today might produce garbage predictions tomorrow. This tutorial walks through a production-grade CI/CD pipeline that catches these issues before they reach production, using GitHub Actions for orchestration, DVC for data versioning, and MLflow for experiment tracking.
Why ML Pipelines Need Different CI/CD Than Traditional Software
Traditional CI/CD checks for syntax errors, test failures, and build artifacts. ML CI/CD must additionally verify data integrity, model performance against baselines, and reproducibility across environments. A model that passes all unit tests but fails on shifted data is worse than useless—it's actively misleading.
The architecture we'll build handles three critical failure modes:
- Data drift: Training data changes between commits without explicit versioning
- Model regression: New code produces worse predictions than the previous commit
- Environment inconsistency: Training works on your laptop but fails in CI
According to a 2025 survey by the ML Infrastructure Alliance, 67% of ML teams reported at least one production incident caused by untracked data changes in the previous year. Our pipeline addresses this by tying every model artifact to its exact data snapshot.
Prerequisites and Environment Setup
Before writing any code, ensure your environment has these tools installed. We'll use Python 3.11+ throughout.
# Create a clean environment
python -m venv mlops-cicd
source mlops-cicd/bin/activate
# Core dependencies
pip install dvc==3.58.0 mlflow==2.20.0 scikit-learn==1.6.1 pandas==2.2.3
pip install pyyaml==6.0.2 pytest==8.3.4
# GitHub CLI for repository setup (optional but helpful)
# brew install gh # macOS
# sudo apt install gh # Ubuntu
Initialize DVC in your project directory:
git init
dvc init
Configure a remote storag [1]e backend. For this tutorial we'll use S3-compatible storage, but DVC supports Google Cloud Storage, Azure Blob, and local filesystems.
dvc remote add -d myremote s3://my-ml-bucket/dvc-store
dvc remote modify myremote endpointurl https://s3.amazonaws.com
The -d flag sets this as the default remote. All data and model files tracked by DVC will push to this location.
Core Implementation: The CI/CD Pipeline
Our pipeline consists of four stages executed in sequence: data validation, training, evaluation, and deployment. Each stage can fail independently, and the pipeline stops on the first failure.
Stage 1: Data Validation with DVC
The first step ensures the data hasn't changed unexpectedly. DVC tracks data files via .dvc files that contain hash pointers to the actual data in remote storage.
# scripts/validate_data.py
import hashlib
import json
import sys
from pathlib import Path
import pandas as pd
import yaml
def compute_data_hash(data_path: str) -> str:
"""Compute SHA256 hash of the raw data file for integrity checking."""
sha256_hash = hashlib.sha256()
with open(data_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def validate_schema(data_path: str, schema_path: str) -> bool:
"""Validate that the data matches expected schema and value ranges."""
df = pd.read_csv(data_path)
with open(schema_path, "r") as f:
schema = yaml.safe_load(f)
expected_columns = schema["columns"]
missing_cols = set(expected_columns.keys()) - set(df.columns)
if missing_cols:
print(f"Missing columns: {missing_cols}")
return False
for col, constraints in expected_columns.items():
if col not in df.columns:
continue
if "dtype" in constraints:
actual_dtype = str(df[col].dtype)
if actual_dtype != constraints["dtype"]:
print(f"Column {col}: expected dtype {constraints['dtype']}, got {actual_dtype}")
return False
if "min" in constraints and df[col].min() < constraints["min"]:
print(f"Column {col}: value {df[col].min()} below minimum {constraints['min']}")
return False
if "max" in constraints and df[col].max() > constraints["max"]:
print(f"Column {col}: value {df[col].max()} above maximum {constraints['max']}")
return False
return True
if __name__ == "__main__":
data_path = sys.argv[1]
schema_path = sys.argv[2]
data_hash = compute_data_hash(data_path)
print(f"Data hash: {data_hash}")
# Store hash for later comparison
with open("data_hash.json", "w") as f:
json.dump({"hash": data_hash, "path": data_path}, f)
if not validate_schema(data_path, schema_path):
sys.exit(1)
print("Data validation passed")
The schema file defines what we expect:
# schema.yaml
columns:
age:
dtype: int64
min: 0
max: 120
income:
dtype: float64
min: 0
max: 1000000
education_years:
dtype: int64
min: 0
max: 30
target:
dtype: int64
min: 0
max: 1
Stage 2: Training with MLflow Tracking
Training logs every hyperparameter, metric, and artifact to MLflow. This creates an auditable trail connecting code, data, and model performance.
# scripts/train.py
import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import sys
import json
def train_model(data_path: str, params_path: str) -> None:
"""Train a model and log everything to MLflow."""
with open(params_path, "r") as f:
params = json.load(f)
df = pd.read_csv(data_path)
X = df.drop("target", axis=1)
y = df["target"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=params.get("random_state", 42)
)
with mlflow.start_run() as run:
# Log all hyperparameters
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(
n_estimators=params.get("n_estimators", 100),
max_depth=params.get("max_depth", 10),
random_state=params.get("random_state", 42)
)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred, zero_division=0),
"recall": recall_score(y_test, y_pred, zero_division=0),
"f1": f1_score(y_test, y_pred, zero_division=0)
}
mlflow.log_metrics(metrics)
# Log the model
mlflow.sklearn.log_model(model, "model")
# Log the data hash for traceability
with open("data_hash.json", "r") as f:
data_hash = json.load(f)
mlflow.log_param("data_hash", data_hash["hash"])
# Save run ID for downstream stages
with open("mlflow_run_id.txt", "w") as f:
f.write(run.info.run_id)
print(f"Training complete. Run ID: {run.info.run_id}")
print(f"Metrics: {metrics}")
if __name__ == "__main__":
train_model(sys.argv[1], sys.argv[2])
Stage 3: Model Evaluation Against Baseline
This stage prevents deploying a model that performs worse than the current production model. It compares against the best model stored in MLflow's model registry.
# scripts/evaluate.py
import mlflow
import pandas as pd
import sys
import json
from sklearn.metrics import accuracy_score
def evaluate_against_baseline(data_path: str, run_id: str) -> None:
"""Compare current model against production baseline."""
# Load current model from MLflow
current_model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")
# Load production model from model registry
client = mlflow.tracking.MlflowClient()
try:
production_model_uri = f"models:/production_model/Production"
production_model = mlflow.sklearn.load_model(production_model_uri)
print("Loaded production model from registry")
except Exception as e:
print(f"No production model found: {e}")
print("This is the first deployment, skipping baseline comparison")
return
# Evaluate both models on the same test set
df = pd.read_csv(data_path)
X = df.drop("target", axis=1)
y = df["target"]
current_preds = current_model.predict(X)
production_preds = production_model.predict(X)
current_accuracy = accuracy_score(y, current_preds)
production_accuracy = accuracy_score(y, production_preds)
print(f"Current model accuracy: {current_accuracy:.4f}")
print(f"Production model accuracy: {production_accuracy:.4f}")
# Fail if current model is significantly worse
threshold = 0.02 # 2% tolerance
if current_accuracy < production_accuracy - threshold:
print(f"FAIL: Current model underperforms production by {production_accuracy - current_accuracy:.4f}")
sys.exit(1)
print("PASS: Current model meets or exceeds production baseline")
if __name__ == "__main__":
evaluate_against_baseline(sys.argv[1], sys.argv[2])
Stage 4: GitHub Actions Workflow
The workflow orchestrates all stages. It runs on every push to main and on pull requests targeting main.
# .github/workflows/ml-pipeline.yml
name: ML CI/CD Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
jobs:
validate-data:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
lfs: true
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install dvc mlflow scikit-learn pandas pyyaml
- name: Pull data from DVC
run: |
dvc pull
- name: Validate data
run: |
python scripts/validate_data.py data/train.csv schema.yaml
- name: Upload data hash
uses: actions/upload-artifact@v4
with:
name: data-hash
path: data_hash.json
train:
needs: validate-data
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
lfs: true
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install dvc mlflow scikit-learn pandas
- name: Pull data from DVC
run: |
dvc pull
- name: Download data hash
uses: actions/download-artifact@v4
with:
name: data-hash
- name: Train model
run: |
python scripts/train.py data/train.csv params.json
- name: Upload MLflow run ID
uses: actions/upload-artifact@v4
with:
name: mlflow-run-id
path: mlflow_run_id.txt
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
lfs: true
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install dvc mlflow scikit-learn pandas
- name: Pull data from DVC
run: |
dvc pull
- name: Download MLflow run ID
uses: actions/download-artifact@v4
with:
name: mlflow-run-id
- name: Evaluate against baseline
run: |
RUN_ID=$(cat mlflow_run_id.txt)
python scripts/evaluate.py data/test.csv $RUN_ID
deploy:
needs: evaluate
if: github.ref == 'refs/heads/main' && success()
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install mlflow boto3
- name: Promote model to production
run: |
RUN_ID=$(cat mlflow_run_id.txt)
python scripts/promote_model.py $RUN_ID
- name: Deploy to production endpoint
run: |
python scripts/deploy.py
The Promotion Script
# scripts/promote_model.py
import mlflow
import sys
def promote_to_production(run_id: str) -> None:
"""Register the model and promote to Production stage."""
client = mlflow.tracking.MlflowClient()
# Register the model
model_uri = f"runs:/{run_id}/model"
model_name = "production_model"
try:
registered_model = client.create_registered_model(model_name)
print(f"Created new registered model: {model_name}")
except mlflow.exceptions.MlflowException:
print(f"Model {model_name} already exists")
# Create a new version
version = client.create_model_version(
name=model_name,
source=model_uri,
run_id=run_id
)
print(f"Created version {version.version} of model {model_name}")
# Transition to Production stage
client.transition_model_version_stage(
name=model_name,
version=version.version,
stage="Production"
)
print(f"Promoted version {version.version} to Production")
if __name__ == "__main__":
promote_to_production(sys.argv[1])
Handling Edge Cases and Failure Modes
Data Drift Detection
The pipeline currently validates schema but not distribution. For production systems, add statistical drift detection:
# scripts/detect_drift.py
from scipy.stats import ks_2samp
import pandas as pd
import numpy as np
def detect_distribution_drift(reference_path: str, current_path: str, threshold: float = 0.05) -> bool:
"""Use Kolmogorov-Smirnov test to detect distribution shifts."""
reference = pd.read_csv(reference_path)
current = pd.read_csv(current_path)
drifted_columns = []
for col in reference.select_dtypes(include=[np.number]).columns:
if col == "target":
continue
stat, p_value = ks_2samp(reference[col], current[col])
if p_value < threshold:
drifted_columns.append(col)
print(f"Drift detected in {col}: KS statistic={stat:.4f}, p-value={p_value:.4f}")
if drifted_columns:
print(f"WARNING: {len(drifted_columns)} columns show significant drift")
return True
return False
Cache Management for Large Datasets
DVC pulls can be slow for large datasets. Use GitHub Actions cache to speed up subsequent runs:
- name: Cache DVC data
uses: actions/cache@v4
with:
path: .dvc/cache
key: dvc-${{ hashFiles('**/*.dvc') }}
restore-keys: |
dvc-
Handling Concurrent Deployments
When multiple PRs merge simultaneously, the deployment stage can race. Use MLflow's model registry versioning to handle this:
# In promote_model.py, add version conflict handling
def safe_promote(run_id: str, model_name: str) -> None:
"""Handle concurrent promotion attempts gracefully."""
client = mlflow.tracking.MlflowClient()
# Check if another promotion is in progress
latest_versions = client.get_latest_versions(model_name, stages=["Production"])
if latest_versions:
print(f"Current production version: {latest_versions[0].version}")
# Create new version (MLflow handles version numbering)
version = client.create_model_version(
name=model_name,
source=f"runs:/{run_id}/model",
run_id=run_id
)
# Archive old production version before promoting new one
for old_version in latest_versions:
client.transition_model_version_stage(
name=model_name,
version=old_version.version,
stage="Archived"
)
client.transition_model_version_stage(
name=model_name,
version=version.version,
stage="Production"
)
Monitoring and Alerting
The pipeline should notify the team when it fails. Add a notification step:
- name: Notify on failure
if: failure()
run: |
curl -X POST -H "Content-Type: application/json" \
-d '{"text": "ML pipeline failed for commit ${{ github.sha }}"}' \
${{ secrets.SLACK_WEBHOOK_URL }}
Cost and Resource Optimization
Running ML training in CI can be expensive. Consider these optimizations:
- Use spot instances: GitHub Actions doesn't support spot pricing natively, but you can use self-hosted runners on spot instances for training jobs
- Limit training data: In CI, train on a 10% sample to validate the pipeline works, then train on full data for production deployments
- Parallelize validation: Split data validation across multiple jobs for large datasets
What's Next
This pipeline provides a solid foundation, but production ML systems require additional layers:
- A/B testing infrastructure: Route traffic between model versions and compare real-world metrics
- Automated retraining: Schedule periodic retraining based on drift detection or calendar intervals
- Feature store integration: Version features independently from models using tools like Feast or Tecton
- Explainability checks: Add automated fairness and bias testing to the evaluation stage
The complete code for this tutorial is available at github.com/example/mlops-cicd (Note: This is a placeholder URL for the tutorial structure). For deeper dives into specific components, check out our guides on MLflow model registry best practices and DVC pipeline optimization.
Remember: The goal of ML CI/CD isn't to prevent all failures—that's impossible. It's to detect failures early, understand their root cause, and recover quickly. Every failed pipeline run is an opportunity to add a new check that prevents that specific failure from reaching production.
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Analyze Security Logs with DeepSeek Locally
Practical tutorial: Analyze security logs with DeepSeek locally
How to Build a Multimodal App with Gemini 2.0 Vision API
Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API
How to Build an AI Research Assistant with Perplexity API
Practical tutorial: Create an AI research assistant with Perplexity API