How to Build CI/CD for ML with GitHub Actions and DVC
Practical tutorial: CI/CD for ML: GitHub Actions + DVC + MLflow
How to Build CI/CD for ML with GitHub Actions and DVC
Table of Contents
- How to Build CI/CD for ML with GitHub Actions and DVC
- Create a Python virtual environment
- Install core dependencies
- Initialize DVC in your project
- Configure S3 remote storage
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Building machine learning systems is hard enough without adding deployment complexity. When your model's performance degrades in production because of data drift, or when a teammate accidentally commits a 2GB CSV file to your repository, you need automated guardrails. This tutorial walks through a production-ready CI/CD pipeline for ML using GitHub Actions, DVC (Data Version Control), and MLflow.
Why ML Pipelines Need Different CI/CD
Traditional CI/CD pipelines test code changes against fixed datasets. In machine learning, your "code" includes data transformations, model architectures, and hyperparameters—all of which interact with datasets that change over time. A model that achieves 95% accuracy on last month's data might fail catastrophically on today's distribution.
According to a 2025 survey by the MLOps Community, 67% of ML teams reported that data versioning issues caused at least one production incident in the previous year. The core problem is that Git alone cannot handle large datasets efficiently, and standard CI/CD tools don't understand model metrics or data lineage.
This pipeline addresses three specific failure modes:
- Data inconsistency: Different team members training on different versions of the dataset
- Silent model degradation: Deploying models without comparing against current production performance
- Reproducibility gaps: Inability to trace a production model back to its exact training data and code
Architecture Overview
The pipeline connects four components:
- GitHub Actions orchestrates the workflow triggers and execution
- DVC manages data and model versioning outside Git
- MLflow tracks experiments, metrics, and model registry
- Your training code runs inside a containerized environment
When a developer pushes code or data changes, GitHub Actions triggers a workflow that:
- Pulls the latest data from DVC remote storage
- Runs the training pipeline
- Logs metrics and artifacts to MLflow
- Compares new model performance against the current production model
- Promotes the model to staging or production based on performance thresholds
Prerequisites and Environment Setup
Before implementing the pipeline, set up your local environment and cloud resources.
Local Development Setup
# Create a Python virtual environment
python -m venv mlops-env
source mlops-env/bin/activate
# Install core dependencies
pip install dvc dvc-s3 mlflow scikit-learn pandas numpy
# Initialize DVC in your project
git init
dvc init
Cloud Storage Configuration
DVC requires remote storage for datasets and models. This example uses AWS S3, but DVC supports GCS, Azure Blob, and local storage.
# Configure S3 remote storage
dvc remote add -d myremote s3://your-ml-bucket/dvc-store
# Set AWS credentials as environment variables
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
MLflow Tracking Server
Set up MLflow tracking for experiment logging. For production, use a hosted MLflow server or deploy one on a cloud VM.
# Install MLflow
pip install mlflow
# Start a local tracking server (for development)
mlflow server --host 0.0.0.0 --port 5000
# For production, set the tracking URI
export MLFLOW_TRACKING_URI="http://your-mlflow-server:5000"
Implementing the Training Pipeline
The training pipeline must be reproducible and parameterized. Create a train.py script that accepts configuration via command-line arguments or a config file.
# train.py
import argparse
import json
import os
import pickle
from pathlib import Path
import mlflow
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
def load_data(data_path: str) -> tuple[pd.DataFrame, pd.Series]:
"""Load and validate input data.
Args:
data_path: Path to the CSV file containing features and target
Returns:
Tuple of (features DataFrame, target Series)
"""
if not os.path.exists(data_path):
raise FileNotFoundError(f"Data file not found: {data_path}")
df = pd.read_csv(data_path)
# Validate required columns
required_cols = ["feature_1", "feature_2", "feature_3", "target"]
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
X = df[["feature_1", "feature_2", "feature_3"]]
y = df["target"]
# Check for data quality issues
if X.isnull().any().any():
raise ValueError("Input data contains null values")
return X, y
def train_model(
X_train: np.ndarray,
y_train: np.ndarray,
n_estimators: int = 100,
max_depth: int = 10,
random_state: int = 42,
) -> RandomForestClassifier:
"""Train a Random Forest classifier with specified hyperparameters.
Args:
X_train: Training features
y_train: Training labels
n_estimators: Number of trees in the forest
max_depth: Maximum depth of each tree
random_state: Random seed for reproducibility
Returns:
Trained RandomForestClassifier instance
"""
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=random_state,
n_jobs=-1, # Use all available CPU cores
)
model.fit(X_train, y_train)
return model
def evaluate_model(
model: RandomForestClassifier,
X_test: np.ndarray,
y_test: np.ndarray,
) -> dict[str, float]:
"""Calculate comprehensive evaluation metrics.
Args:
model: Trained classifier
X_test: Test features
y_test: Test labels
Returns:
Dictionary of metric names to values
"""
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred, average="weighted"),
"precision": precision_score(y_test, y_pred, average="weighted"),
"recall": recall_score(y_test, y_pred, average="weighted"),
}
return metrics
def main(config: dict):
"""Execute the full training pipeline.
Args:
config: Dictionary containing all configuration parameters
"""
# Set MLflow experiment
mlflow.set_experiment(config.get("experiment_name", "default"))
with mlflow.start_run() as run:
# Log all parameters
mlflow.log_params({
"n_estimators": config["n_estimators"],
"max_depth": config["max_depth"],
"test_size": config["test_size"],
"random_state": config["random_state"],
})
# Load and split data
X, y = load_data(config["data_path"])
X_train, X_test, y_train, y_test = train_test_split(
X, y,
test_size=config["test_size"],
random_state=config["random_state"],
stratify=y, # Maintain class distribution
)
# Train model
model = train_model(
X_train, y_train,
n_estimators=config["n_estimators"],
max_depth=config["max_depth"],
random_state=config["random_state"],
)
# Evaluate model
metrics = evaluate_model(model, X_test, y_test)
# Log metrics to MLflow
mlflow.log_metrics(metrics)
# Log the trained model
mlflow.sklearn.log_model(
model,
"model",
registered_model_name=config.get("model_name", "classifier"),
)
# Save model locally for DVC tracking
model_path = Path("models") / f"model_{run.info.run_id}.pkl"
model_path.parent.mkdir(exist_ok=True)
with open(model_path, "wb") as f:
pickle.dump(model, f)
# Log feature importance
feature_importance = pd.DataFrame(
{"feature": X.columns, "importance": model.feature_importances_}
)
feature_importance.to_csv("models/feature_importance.csv", index=False)
mlflow.log_artifact("models/feature_importance.csv")
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {json.dumps(metrics, indent=2)}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Train ML model with MLflow tracking")
parser.add_argument("--data-path", required=True, help="Path to training data CSV")
parser.add_argument("--n-estimators", type=int, default=100, help="Number of trees")
parser.add_argument("--max-depth", type=int, default=10, help="Maximum tree depth")
parser.add_argument("--test-size", type=float, default=0.2, help="Test split ratio")
parser.add_argument("--random-state", type=int, default=42, help="Random seed")
parser.add_argument("--experiment-name", default="mlops-pipeline", help="MLflow experiment")
parser.add_argument("--model-name", default="classifier", help="Registered model name")
args = parser.parse_args()
config = vars(args)
main(config)
DVC Configuration
Track your data and models with DVC to enable versioning and reproducibility.
# Track the training data
dvc add data/training_data.csv
# Track the models directory
dvc add models/
# Commit the DVC files to Git
git add data/training_data.csv.dvc models/.gitignore
git commit -m "Add data and model tracking with DVC"
# Push data to remote storage
dvc push
Create a dvc.yaml file to define the pipeline stages:
# dvc.yaml
stages:
train:
cmd: python train.py --data-path data/training_data.csv --n-estimators 100 --max-depth 10
deps:
- data/training_data.csv
- train.py
outs:
- models/
metrics:
- metrics.json:
cache: false
GitHub Actions Workflow Configuration
The CI/CD pipeline runs on every push to the main branch and on pull requests. Create .github/workflows/ml-pipeline.yml:
# .github/workflows/ml-pipeline.yml
name: ML Pipeline CI/CD
on:
push:
branches: [main]
pull_request:
branches: [main]
workflow_dispatch: # Allow manual triggering
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
lfs: true # Handle large files
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: "pip"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install dvc dvc-s3 mlflow scikit-learn pandas numpy pytest
- name: Validate code quality
run: |
pip install flake8 black
flake8 train.py --max-line-length=100
black --check train.py
- name: Run unit tests
run: |
pytest tests/ -v --cov=.
train:
needs: validate
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install dvc dvc-s3 mlflow scikit-learn pandas numpy
- name: Pull data from DVC remote
run: |
dvc pull data/training_data.csv.dvc
- name: Run training pipeline
run: |
python train.py \
--data-path data/training_data.csv \
--n-estimators 100 \
--max-depth 10 \
--experiment-name "ci-pipeline" \
--model-name "classifier"
- name: Push model artifacts to DVC
run: |
dvc add models/
dvc push
- name: Upload model artifacts
uses: actions/upload-artifact@v4
with:
name: trained-model
path: models/
retention-days: 7
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install mlflow scikit-learn pandas numpy
- name: Compare with production model
run: |
python scripts/compare_models.py \
--candidate-run-id $(mlflow runs list --experiment-name ci-pipeline --limit 1 --output-format json | jq -r '.[0].run_id') \
--production-model-name "classifier" \
--production-stage "Production"
- name: Promote model if better
if: success()
run: |
python scripts/promote_model.py \
--run-id $(mlflow runs list --experiment-name ci-pipeline --limit 1 --output-format json | jq -r '.[0].run_id') \
--metric-threshold 0.85
deploy:
needs: evaluate
if: github.ref == 'refs/heads/main' && success()
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
echo "Deploying model to staging environment"
# Add your deployment logic here
# For example: kubectl apply -f deployment.yaml
Model Comparison Script
Create scripts/compare_models.py to automate model evaluation against production:
# scripts/compare_models.py
import argparse
import json
import mlflow
from mlflow.tracking import MlflowClient
def get_production_model_metrics(client: MlflowClient, model_name: str) -> dict:
"""Retrieve metrics from the current production model.
Args:
client: MLflow client instance
model_name: Name of the registered model
Returns:
Dictionary of metric names to values
"""
try:
# Get latest production version
latest_version = client.get_latest_versions(model_name, stages=["Production"])
if not latest_version:
print(f"No production version found for model '{model_name}'")
return {}
run_id = latest_version[0].run_id
run = client.get_run(run_id)
return run.data.metrics
except Exception as e:
print(f"Error fetching production model metrics: {e}")
return {}
def compare_models(
candidate_metrics: dict,
production_metrics: dict,
threshold: float = 0.05,
) -> bool:
"""Compare candidate model against production.
Args:
candidate_metrics: Metrics from the newly trained model
production_metrics: Metrics from the current production model
threshold: Minimum improvement required to promote
Returns:
True if candidate outperforms production
"""
if not production_metrics:
print("No production model to compare against. Promoting candidate.")
return True
candidate_accuracy = candidate_metrics.get("accuracy", 0)
production_accuracy = production_metrics.get("accuracy", 0)
improvement = candidate_accuracy - production_accuracy
print(f"Candidate accuracy: {candidate_accuracy:.4f}")
print(f"Production accuracy: {production_accuracy:.4f}")
print(f"Improvement: {improvement:.4f}")
return improvement >= threshold
def main():
parser = argparse.ArgumentParser(description="Compare ML models")
parser.add_argument("--candidate-run-id", required=True)
parser.add_argument("--production-model-name", required=True)
parser.add_argument("--production-stage", default="Production")
parser.add_argument("--threshold", type=float, default=0.05)
args = parser.parse_args()
client = MlflowClient()
# Get candidate metrics
candidate_run = client.get_run(args.candidate_run_id)
candidate_metrics = candidate_run.data.metrics
# Get production metrics
production_metrics = get_production_model_metrics(
client, args.production_model_name
)
# Compare
should_promote = compare_models(
candidate_metrics, production_metrics, args.threshold
)
# Output result for GitHub Actions
result = {"should_promote": should_promote}
print(json.dumps(result))
if not should_promote:
exit(1)
if __name__ == "__main__":
main()
Handling Edge Cases and Production Considerations
Data Drift Detection
Add a drift detection step before training to catch distribution shifts:
# scripts/detect_drift.py
import numpy as np
import pandas as pd
from scipy.stats import ks_2samp
def detect_feature_drift(
reference_data: pd.DataFrame,
current_data: pd.DataFrame,
threshold: float = 0.05,
) -> dict:
"""Detect distribution drift using Kolmogorov-Smirnov test.
Args:
reference_data: Baseline dataset
current_data: New dataset to check
threshold: P-value threshold for drift detection
Returns:
Dictionary mapping feature names to drift status
"""
drift_results = {}
for column in reference_data.columns:
if column == "target":
continue
stat, p_value = ks_2samp(
reference_data[column].values,
current_data[column].values,
)
drift_results[column] = {
"drift_detected": p_value < threshold,
"ks_statistic": float(stat),
"p_value": float(p_value),
}
return drift_results
Resource Management
Large models and datasets can exhaust CI runner resources. Implement these safeguards:
# In GitHub Actions workflow
- name: Check available disk space
run: |
df -h
if [ $(df --output=pcent / | tail -1 | tr -d '%') -gt 85 ]; then
echo "Disk space critically low"
exit 1
fi
- name: Limit model training resources
run: |
# Set memory limit for Python process
export PYTHON_MEMORY_LIMIT="4G"
python train.py --data-path data/training_data.csv --n-estimators 50
Model Registry Governance
Implement approval gates for production deployments:
# scripts/request_approval.py
import mlflow
from mlflow.tracking import MlflowClient
def request_production_approval(
run_id: str,
model_name: str,
metrics: dict,
requester: str,
) -> str:
"""Create an approval request for model promotion.
Args:
run_id: MLflow run ID
model_name: Registered model name
metrics: Model performance metrics
requester: Email or username of requester
Returns:
Approval request ID
"""
client = MlflowClient()
# Create model version if not exists
try:
version = client.create_model_version(
name=model_name,
source=f"runs:/{run_id}/model",
run_id=run_id,
)
except Exception as e:
print(f"Model version already exists: {e}")
version = client.get_latest_versions(model_name, stages=["None"])[0]
# Transition to staging for review
client.transition_model_version_stage(
name=model_name,
version=version.version,
stage="Staging",
)
# Log approval metadata
client.set_model_version_tag(
name=model_name,
version=version.version,
key="approval_requester",
value=requester,
)
return f"{model_name}:v{version.version}"
if __name__ == "__main__":
# Example usage
request_id = request_production_approval(
run_id="abc123",
model_name="classifier",
metrics={"accuracy": 0.92},
requester="ml-engineer@company.com",
)
print(f"Approval requested for {request_id}")
Monitoring and Alerting
Add post-deployment monitoring to catch performance degradation:
# scripts/monitor_model.py
import mlflow
import numpy as np
from datetime import datetime, timedelta
def check_model_health(
model_name: str,
metric_name: str = "accuracy",
degradation_threshold: float = 0.1,
lookback_days: int = 30,
) -> dict:
"""Monitor model performance over time.
Args:
model_name: Registered model name
metric_name: Metric to monitor
degradation_threshold: Maximum acceptable degradation
lookback_days: How far back to check
Returns:
Health status dictionary
"""
client = mlflow.tracking.MlflowClient()
# Get production model versions from last N days
cutoff_date = datetime.now() - timedelta(days=lookback_days)
versions = client.search_model_versions(
f"name='{model_name}' and stage='Production'"
)
metrics_over_time = []
for version in versions:
run = client.get_run(version.run_id)
if run.info.end_time and datetime.fromtimestamp(run.info.end_time / 1000) > cutoff_date:
metrics_over_time.append({
"version": version.version,
"timestamp": run.info.end_time,
metric_name: run.data.metrics.get(metric_name, 0),
})
if len(metrics_over_time) < 2:
return {"status": "insufficient_data", "message": "Need at least 2 versions to compare"}
# Check for degradation
latest_metric = metrics_over_time[-1][metric_name]
previous_metric = metrics_over_time[-2][metric_name]
degradation = previous_metric - latest_metric
if degradation > degradation_threshold:
return {
"status": "degraded",
"current_metric": latest_metric,
"previous_metric": previous_metric,
"degradation": degradation,
"alert": True,
}
return {
"status": "healthy",
"current_metric": latest_metric,
"previous_metric": previous_metric,
"degradation": degradation,
"alert": False,
}
What's Next
This pipeline provides a foundation for production ML CI/CD, but several enhancements can improve reliability:
- Add canary deployments: Route a percentage of traffic to new models before full rollout
- Implement A/B testing infrastructure: Compare model versions on live traffic
- Add automated rollback: Trigger rollback when monitoring detects degradation
- Integrate with feature stores: Use tools like Feast for consistent feature engineering
- Add cost tracking: Monitor compute costs per training run and model version
The key insight is that ML CI/CD is not just about automating deployment—it's about maintaining trust in your models as data and requirements evolve. By combining DVC for data versioning, MLflow for experiment tracking, and GitHub Actions for orchestration, you create a system where every model can be traced back to its exact training data, code, and configuration. This traceability is what separates experimental notebooks from production ML systems.
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Build a Multimodal App with Gemini 2.0 Vision API
Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API
How to Build an AI Pentesting Assistant with LangChain
Practical tutorial: Build an AI-powered pentesting assistant
How to Build Autonomous Scientific Discovery Agents with EurekAgent
Practical tutorial: The story discusses a significant advancement in AI research that could impact autonomous scientific discovery.