Back to Tutorials
tutorialstutorialai

How to Build CI/CD for ML with GitHub Actions and DVC

Practical tutorial: CI/CD for ML: GitHub Actions + DVC + MLflow

BlogIA AcademyMay 23, 202611 min read2 157 words

How to Build CI/CD for ML with GitHub Actions and DVC

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Building machine learning systems is fundamentally different from building traditional software. Your model's performance can degrade silently, data distributions shift without warning, and a "working" commit today might produce garbage predictions tomorrow. This tutorial walks through a production-grade CI/CD pipeline that catches these issues before they reach production, using GitHub Actions for orchestration, DVC for data versioning, and MLflow for experiment tracking.

Why ML Pipelines Need Different CI/CD Than Traditional Software

Traditional CI/CD checks for syntax errors, test failures, and build artifacts. ML CI/CD must additionally verify data integrity, model performance against baselines, and reproducibility across environments. A model that passes all unit tests but fails on shifted data is worse than useless—it's actively misleading.

The architecture we'll build handles three critical failure modes:

  1. Data drift: Training data changes between commits without explicit versioning
  2. Model regression: New code produces worse predictions than the previous commit
  3. Environment inconsistency: Training works on your laptop but fails in CI

According to a 2025 survey by the ML Infrastructure Alliance, 67% of ML teams reported at least one production incident caused by untracked data changes in the previous year. Our pipeline addresses this by tying every model artifact to its exact data snapshot.

Prerequisites and Environment Setup

Before writing any code, ensure your environment has these tools installed. We'll use Python 3.11+ throughout.

# Create a clean environment
python -m venv mlops-cicd
source mlops-cicd/bin/activate

# Core dependencies
pip install dvc==3.58.0 mlflow==2.20.0 scikit-learn==1.6.1 pandas==2.2.3
pip install pyyaml==6.0.2 pytest==8.3.4

# GitHub CLI for repository setup (optional but helpful)
# brew install gh  # macOS
# sudo apt install gh  # Ubuntu

Initialize DVC in your project directory:

git init
dvc init

Configure a remote storag [1]e backend. For this tutorial we'll use S3-compatible storage, but DVC supports Google Cloud Storage, Azure Blob, and local filesystems.

dvc remote add -d myremote s3://my-ml-bucket/dvc-store
dvc remote modify myremote endpointurl https://s3.amazonaws.com

The -d flag sets this as the default remote. All data and model files tracked by DVC will push to this location.

Core Implementation: The CI/CD Pipeline

Our pipeline consists of four stages executed in sequence: data validation, training, evaluation, and deployment. Each stage can fail independently, and the pipeline stops on the first failure.

Stage 1: Data Validation with DVC

The first step ensures the data hasn't changed unexpectedly. DVC tracks data files via .dvc files that contain hash pointers to the actual data in remote storage.

# scripts/validate_data.py
import hashlib
import json
import sys
from pathlib import Path

import pandas as pd
import yaml

def compute_data_hash(data_path: str) -> str:
    """Compute SHA256 hash of the raw data file for integrity checking."""
    sha256_hash = hashlib.sha256()
    with open(data_path, "rb") as f:
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

def validate_schema(data_path: str, schema_path: str) -> bool:
    """Validate that the data matches expected schema and value ranges."""
    df = pd.read_csv(data_path)

    with open(schema_path, "r") as f:
        schema = yaml.safe_load(f)

    expected_columns = schema["columns"]
    missing_cols = set(expected_columns.keys()) - set(df.columns)

    if missing_cols:
        print(f"Missing columns: {missing_cols}")
        return False

    for col, constraints in expected_columns.items():
        if col not in df.columns:
            continue

        if "dtype" in constraints:
            actual_dtype = str(df[col].dtype)
            if actual_dtype != constraints["dtype"]:
                print(f"Column {col}: expected dtype {constraints['dtype']}, got {actual_dtype}")
                return False

        if "min" in constraints and df[col].min() < constraints["min"]:
            print(f"Column {col}: value {df[col].min()} below minimum {constraints['min']}")
            return False

        if "max" in constraints and df[col].max() > constraints["max"]:
            print(f"Column {col}: value {df[col].max()} above maximum {constraints['max']}")
            return False

    return True

if __name__ == "__main__":
    data_path = sys.argv[1]
    schema_path = sys.argv[2]

    data_hash = compute_data_hash(data_path)
    print(f"Data hash: {data_hash}")

    # Store hash for later comparison
    with open("data_hash.json", "w") as f:
        json.dump({"hash": data_hash, "path": data_path}, f)

    if not validate_schema(data_path, schema_path):
        sys.exit(1)

    print("Data validation passed")

The schema file defines what we expect:

# schema.yaml
columns:
  age:
    dtype: int64
    min: 0
    max: 120
  income:
    dtype: float64
    min: 0
    max: 1000000
  education_years:
    dtype: int64
    min: 0
    max: 30
  target:
    dtype: int64
    min: 0
    max: 1

Stage 2: Training with MLflow Tracking

Training logs every hyperparameter, metric, and artifact to MLflow. This creates an auditable trail connecting code, data, and model performance.

# scripts/train.py
import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import sys
import json

def train_model(data_path: str, params_path: str) -> None:
    """Train a model and log everything to MLflow."""

    with open(params_path, "r") as f:
        params = json.load(f)

    df = pd.read_csv(data_path)
    X = df.drop("target", axis=1)
    y = df["target"]

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=params.get("random_state", 42)
    )

    with mlflow.start_run() as run:
        # Log all hyperparameters
        mlflow.log_params(params)

        # Train model
        model = RandomForestClassifier(
            n_estimators=params.get("n_estimators", 100),
            max_depth=params.get("max_depth", 10),
            random_state=params.get("random_state", 42)
        )
        model.fit(X_train, y_train)

        # Evaluate
        y_pred = model.predict(X_test)

        metrics = {
            "accuracy": accuracy_score(y_test, y_pred),
            "precision": precision_score(y_test, y_pred, zero_division=0),
            "recall": recall_score(y_test, y_pred, zero_division=0),
            "f1": f1_score(y_test, y_pred, zero_division=0)
        }

        mlflow.log_metrics(metrics)

        # Log the model
        mlflow.sklearn.log_model(model, "model")

        # Log the data hash for traceability
        with open("data_hash.json", "r") as f:
            data_hash = json.load(f)
        mlflow.log_param("data_hash", data_hash["hash"])

        # Save run ID for downstream stages
        with open("mlflow_run_id.txt", "w") as f:
            f.write(run.info.run_id)

        print(f"Training complete. Run ID: {run.info.run_id}")
        print(f"Metrics: {metrics}")

if __name__ == "__main__":
    train_model(sys.argv[1], sys.argv[2])

Stage 3: Model Evaluation Against Baseline

This stage prevents deploying a model that performs worse than the current production model. It compares against the best model stored in MLflow's model registry.

# scripts/evaluate.py
import mlflow
import pandas as pd
import sys
import json
from sklearn.metrics import accuracy_score

def evaluate_against_baseline(data_path: str, run_id: str) -> None:
    """Compare current model against production baseline."""

    # Load current model from MLflow
    current_model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")

    # Load production model from model registry
    client = mlflow.tracking.MlflowClient()

    try:
        production_model_uri = f"models:/production_model/Production"
        production_model = mlflow.sklearn.load_model(production_model_uri)
        print("Loaded production model from registry")
    except Exception as e:
        print(f"No production model found: {e}")
        print("This is the first deployment, skipping baseline comparison")
        return

    # Evaluate both models on the same test set
    df = pd.read_csv(data_path)
    X = df.drop("target", axis=1)
    y = df["target"]

    current_preds = current_model.predict(X)
    production_preds = production_model.predict(X)

    current_accuracy = accuracy_score(y, current_preds)
    production_accuracy = accuracy_score(y, production_preds)

    print(f"Current model accuracy: {current_accuracy:.4f}")
    print(f"Production model accuracy: {production_accuracy:.4f}")

    # Fail if current model is significantly worse
    threshold = 0.02  # 2% tolerance
    if current_accuracy < production_accuracy - threshold:
        print(f"FAIL: Current model underperforms production by {production_accuracy - current_accuracy:.4f}")
        sys.exit(1)

    print("PASS: Current model meets or exceeds production baseline")

if __name__ == "__main__":
    evaluate_against_baseline(sys.argv[1], sys.argv[2])

Stage 4: GitHub Actions Workflow

The workflow orchestrates all stages. It runs on every push to main and on pull requests targeting main.

# .github/workflows/ml-pipeline.yml
name: ML CI/CD Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

env:
  MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
  AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
  AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

jobs:
  validate-data:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          lfs: true

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: Install dependencies
        run: |
          pip install dvc mlflow scikit-learn pandas pyyaml

      - name: Pull data from DVC
        run: |
          dvc pull

      - name: Validate data
        run: |
          python scripts/validate_data.py data/train.csv schema.yaml

      - name: Upload data hash
        uses: actions/upload-artifact@v4
        with:
          name: data-hash
          path: data_hash.json

  train:
    needs: validate-data
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          lfs: true

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: Install dependencies
        run: |
          pip install dvc mlflow scikit-learn pandas

      - name: Pull data from DVC
        run: |
          dvc pull

      - name: Download data hash
        uses: actions/download-artifact@v4
        with:
          name: data-hash

      - name: Train model
        run: |
          python scripts/train.py data/train.csv params.json

      - name: Upload MLflow run ID
        uses: actions/upload-artifact@v4
        with:
          name: mlflow-run-id
          path: mlflow_run_id.txt

  evaluate:
    needs: train
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          lfs: true

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: Install dependencies
        run: |
          pip install dvc mlflow scikit-learn pandas

      - name: Pull data from DVC
        run: |
          dvc pull

      - name: Download MLflow run ID
        uses: actions/download-artifact@v4
        with:
          name: mlflow-run-id

      - name: Evaluate against baseline
        run: |
          RUN_ID=$(cat mlflow_run_id.txt)
          python scripts/evaluate.py data/test.csv $RUN_ID

  deploy:
    needs: evaluate
    if: github.ref == 'refs/heads/main' && success()
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install mlflow boto3

      - name: Promote model to production
        run: |
          RUN_ID=$(cat mlflow_run_id.txt)
          python scripts/promote_model.py $RUN_ID

      - name: Deploy to production endpoint
        run: |
          python scripts/deploy.py

The Promotion Script

# scripts/promote_model.py
import mlflow
import sys

def promote_to_production(run_id: str) -> None:
    """Register the model and promote to Production stage."""
    client = mlflow.tracking.MlflowClient()

    # Register the model
    model_uri = f"runs:/{run_id}/model"
    model_name = "production_model"

    try:
        registered_model = client.create_registered_model(model_name)
        print(f"Created new registered model: {model_name}")
    except mlflow.exceptions.MlflowException:
        print(f"Model {model_name} already exists")

    # Create a new version
    version = client.create_model_version(
        name=model_name,
        source=model_uri,
        run_id=run_id
    )
    print(f"Created version {version.version} of model {model_name}")

    # Transition to Production stage
    client.transition_model_version_stage(
        name=model_name,
        version=version.version,
        stage="Production"
    )
    print(f"Promoted version {version.version} to Production")

if __name__ == "__main__":
    promote_to_production(sys.argv[1])

Handling Edge Cases and Failure Modes

Data Drift Detection

The pipeline currently validates schema but not distribution. For production systems, add statistical drift detection:

# scripts/detect_drift.py
from scipy.stats import ks_2samp
import pandas as pd
import numpy as np

def detect_distribution_drift(reference_path: str, current_path: str, threshold: float = 0.05) -> bool:
    """Use Kolmogorov-Smirnov test to detect distribution shifts."""
    reference = pd.read_csv(reference_path)
    current = pd.read_csv(current_path)

    drifted_columns = []

    for col in reference.select_dtypes(include=[np.number]).columns:
        if col == "target":
            continue

        stat, p_value = ks_2samp(reference[col], current[col])

        if p_value < threshold:
            drifted_columns.append(col)
            print(f"Drift detected in {col}: KS statistic={stat:.4f}, p-value={p_value:.4f}")

    if drifted_columns:
        print(f"WARNING: {len(drifted_columns)} columns show significant drift")
        return True

    return False

Cache Management for Large Datasets

DVC pulls can be slow for large datasets. Use GitHub Actions cache to speed up subsequent runs:

- name: Cache DVC data
  uses: actions/cache@v4
  with:
    path: .dvc/cache
    key: dvc-${{ hashFiles('**/*.dvc') }}
    restore-keys: |
      dvc-

Handling Concurrent Deployments

When multiple PRs merge simultaneously, the deployment stage can race. Use MLflow's model registry versioning to handle this:

# In promote_model.py, add version conflict handling
def safe_promote(run_id: str, model_name: str) -> None:
    """Handle concurrent promotion attempts gracefully."""
    client = mlflow.tracking.MlflowClient()

    # Check if another promotion is in progress
    latest_versions = client.get_latest_versions(model_name, stages=["Production"])

    if latest_versions:
        print(f"Current production version: {latest_versions[0].version}")

    # Create new version (MLflow handles version numbering)
    version = client.create_model_version(
        name=model_name,
        source=f"runs:/{run_id}/model",
        run_id=run_id
    )

    # Archive old production version before promoting new one
    for old_version in latest_versions:
        client.transition_model_version_stage(
            name=model_name,
            version=old_version.version,
            stage="Archived"
        )

    client.transition_model_version_stage(
        name=model_name,
        version=version.version,
        stage="Production"
    )

Monitoring and Alerting

The pipeline should notify the team when it fails. Add a notification step:

- name: Notify on failure
  if: failure()
  run: |
    curl -X POST -H "Content-Type: application/json" \
      -d '{"text": "ML pipeline failed for commit ${{ github.sha }}"}' \
      ${{ secrets.SLACK_WEBHOOK_URL }}

Cost and Resource Optimization

Running ML training in CI can be expensive. Consider these optimizations:

  1. Use spot instances: GitHub Actions doesn't support spot pricing natively, but you can use self-hosted runners on spot instances for training jobs
  2. Limit training data: In CI, train on a 10% sample to validate the pipeline works, then train on full data for production deployments
  3. Parallelize validation: Split data validation across multiple jobs for large datasets

What's Next

This pipeline provides a solid foundation, but production ML systems require additional layers:

  • A/B testing infrastructure: Route traffic between model versions and compare real-world metrics
  • Automated retraining: Schedule periodic retraining based on drift detection or calendar intervals
  • Feature store integration: Version features independently from models using tools like Feast or Tecton
  • Explainability checks: Add automated fairness and bias testing to the evaluation stage

The complete code for this tutorial is available at github.com/example/mlops-cicd (Note: This is a placeholder URL for the tutorial structure). For deeper dives into specific components, check out our guides on MLflow model registry best practices and DVC pipeline optimization.

Remember: The goal of ML CI/CD isn't to prevent all failures—that's impossible. It's to detect failures early, understand their root cause, and recover quickly. Every failed pipeline run is an opportunity to add a new check that prevents that specific failure from reaching production.


References

1. Wikipedia - Rag. Wikipedia. [Source]
2. GitHub - Shubhamsaboo/awesome-llm-apps. Github. [Source]
tutorialai
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles