Back to Tutorials
tutorialstutorialai

How to Build CI/CD for ML with GitHub Actions and DVC

Practical tutorial: CI/CD for ML: GitHub Actions + DVC + MLflow

BlogIA AcademyJune 5, 202613 min read2 511 words

How to Build CI/CD for ML with GitHub Actions and DVC

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Building machine learning systems is hard enough without adding deployment complexity. When your model's performance degrades in production because of data drift, or when a teammate accidentally commits a 2GB CSV file to your repository, you need automated guardrails. This tutorial walks through a production-ready CI/CD pipeline for ML using GitHub Actions, DVC (Data Version Control), and MLflow.

Why ML Pipelines Need Different CI/CD

Traditional CI/CD pipelines test code changes against fixed datasets. In machine learning, your "code" includes data transformations, model architectures, and hyperparameters—all of which interact with datasets that change over time. A model that achieves 95% accuracy on last month's data might fail catastrophically on today's distribution.

According to a 2025 survey by the MLOps Community, 67% of ML teams reported that data versioning issues caused at least one production incident in the previous year. The core problem is that Git alone cannot handle large datasets efficiently, and standard CI/CD tools don't understand model metrics or data lineage.

This pipeline addresses three specific failure modes:

  • Data inconsistency: Different team members training on different versions of the dataset
  • Silent model degradation: Deploying models without comparing against current production performance
  • Reproducibility gaps: Inability to trace a production model back to its exact training data and code

Architecture Overview

The pipeline connects four components:

  1. GitHub Actions orchestrates the workflow triggers and execution
  2. DVC manages data and model versioning outside Git
  3. MLflow tracks experiments, metrics, and model registry
  4. Your training code runs inside a containerized environment

When a developer pushes code or data changes, GitHub Actions triggers a workflow that:

  1. Pulls the latest data from DVC remote storage
  2. Runs the training pipeline
  3. Logs metrics and artifacts to MLflow
  4. Compares new model performance against the current production model
  5. Promotes the model to staging or production based on performance thresholds

Prerequisites and Environment Setup

Before implementing the pipeline, set up your local environment and cloud resources.

Local Development Setup

# Create a Python virtual environment
python -m venv mlops-env
source mlops-env/bin/activate

# Install core dependencies
pip install dvc dvc-s3 mlflow scikit-learn pandas numpy

# Initialize DVC in your project
git init
dvc init

Cloud Storage Configuration

DVC requires remote storage for datasets and models. This example uses AWS S3, but DVC supports GCS, Azure Blob, and local storage.

# Configure S3 remote storage
dvc remote add -d myremote s3://your-ml-bucket/dvc-store

# Set AWS credentials as environment variables
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"

MLflow Tracking Server

Set up MLflow tracking for experiment logging. For production, use a hosted MLflow server or deploy one on a cloud VM.

# Install MLflow
pip install mlflow

# Start a local tracking server (for development)
mlflow server --host 0.0.0.0 --port 5000

# For production, set the tracking URI
export MLFLOW_TRACKING_URI="http://your-mlflow-server:5000"

Implementing the Training Pipeline

The training pipeline must be reproducible and parameterized. Create a train.py script that accepts configuration via command-line arguments or a config file.

# train.py
import argparse
import json
import os
import pickle
from pathlib import Path

import mlflow
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split

def load_data(data_path: str) -> tuple[pd.DataFrame, pd.Series]:
    """Load and validate input data.

    Args:
        data_path: Path to the CSV file containing features and target

    Returns:
        Tuple of (features DataFrame, target Series)
    """
    if not os.path.exists(data_path):
        raise FileNotFoundError(f"Data file not found: {data_path}")

    df = pd.read_csv(data_path)

    # Validate required columns
    required_cols = ["feature_1", "feature_2", "feature_3", "target"]
    missing_cols = [col for col in required_cols if col not in df.columns]
    if missing_cols:
        raise ValueError(f"Missing required columns: {missing_cols}")

    X = df[["feature_1", "feature_2", "feature_3"]]
    y = df["target"]

    # Check for data quality issues
    if X.isnull().any().any():
        raise ValueError("Input data contains null values")

    return X, y

def train_model(
    X_train: np.ndarray,
    y_train: np.ndarray,
    n_estimators: int = 100,
    max_depth: int = 10,
    random_state: int = 42,
) -> RandomForestClassifier:
    """Train a Random Forest classifier with specified hyperparameters.

    Args:
        X_train: Training features
        y_train: Training labels
        n_estimators: Number of trees in the forest
        max_depth: Maximum depth of each tree
        random_state: Random seed for reproducibility

    Returns:
        Trained RandomForestClassifier instance
    """
    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=random_state,
        n_jobs=-1,  # Use all available CPU cores
    )
    model.fit(X_train, y_train)
    return model

def evaluate_model(
    model: RandomForestClassifier,
    X_test: np.ndarray,
    y_test: np.ndarray,
) -> dict[str, float]:
    """Calculate comprehensive evaluation metrics.

    Args:
        model: Trained classifier
        X_test: Test features
        y_test: Test labels

    Returns:
        Dictionary of metric names to values
    """
    y_pred = model.predict(X_test)

    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_score": f1_score(y_test, y_pred, average="weighted"),
        "precision": precision_score(y_test, y_pred, average="weighted"),
        "recall": recall_score(y_test, y_pred, average="weighted"),
    }

    return metrics

def main(config: dict):
    """Execute the full training pipeline.

    Args:
        config: Dictionary containing all configuration parameters
    """
    # Set MLflow experiment
    mlflow.set_experiment(config.get("experiment_name", "default"))

    with mlflow.start_run() as run:
        # Log all parameters
        mlflow.log_params({
            "n_estimators": config["n_estimators"],
            "max_depth": config["max_depth"],
            "test_size": config["test_size"],
            "random_state": config["random_state"],
        })

        # Load and split data
        X, y = load_data(config["data_path"])
        X_train, X_test, y_train, y_test = train_test_split(
            X, y,
            test_size=config["test_size"],
            random_state=config["random_state"],
            stratify=y,  # Maintain class distribution
        )

        # Train model
        model = train_model(
            X_train, y_train,
            n_estimators=config["n_estimators"],
            max_depth=config["max_depth"],
            random_state=config["random_state"],
        )

        # Evaluate model
        metrics = evaluate_model(model, X_test, y_test)

        # Log metrics to MLflow
        mlflow.log_metrics(metrics)

        # Log the trained model
        mlflow.sklearn.log_model(
            model,
            "model",
            registered_model_name=config.get("model_name", "classifier"),
        )

        # Save model locally for DVC tracking
        model_path = Path("models") / f"model_{run.info.run_id}.pkl"
        model_path.parent.mkdir(exist_ok=True)
        with open(model_path, "wb") as f:
            pickle.dump(model, f)

        # Log feature importance
        feature_importance = pd.DataFrame(
            {"feature": X.columns, "importance": model.feature_importances_}
        )
        feature_importance.to_csv("models/feature_importance.csv", index=False)
        mlflow.log_artifact("models/feature_importance.csv")

        print(f"Run ID: {run.info.run_id}")
        print(f"Metrics: {json.dumps(metrics, indent=2)}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Train ML model with MLflow tracking")
    parser.add_argument("--data-path", required=True, help="Path to training data CSV")
    parser.add_argument("--n-estimators", type=int, default=100, help="Number of trees")
    parser.add_argument("--max-depth", type=int, default=10, help="Maximum tree depth")
    parser.add_argument("--test-size", type=float, default=0.2, help="Test split ratio")
    parser.add_argument("--random-state", type=int, default=42, help="Random seed")
    parser.add_argument("--experiment-name", default="mlops-pipeline", help="MLflow experiment")
    parser.add_argument("--model-name", default="classifier", help="Registered model name")

    args = parser.parse_args()
    config = vars(args)
    main(config)

DVC Configuration

Track your data and models with DVC to enable versioning and reproducibility.

# Track the training data
dvc add data/training_data.csv

# Track the models directory
dvc add models/

# Commit the DVC files to Git
git add data/training_data.csv.dvc models/.gitignore
git commit -m "Add data and model tracking with DVC"

# Push data to remote storage
dvc push

Create a dvc.yaml file to define the pipeline stages:

# dvc.yaml
stages:
  train:
    cmd: python train.py --data-path data/training_data.csv --n-estimators 100 --max-depth 10
    deps:
      - data/training_data.csv
      - train.py
    outs:
      - models/
    metrics:
      - metrics.json:
          cache: false

GitHub Actions Workflow Configuration

The CI/CD pipeline runs on every push to the main branch and on pull requests. Create .github/workflows/ml-pipeline.yml:

# .github/workflows/ml-pipeline.yml
name: ML Pipeline CI/CD

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]
  workflow_dispatch:  # Allow manual triggering

env:
  MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
  AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
  AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          lfs: true  # Handle large files

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"
          cache: "pip"

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install dvc dvc-s3 mlflow scikit-learn pandas numpy pytest

      - name: Validate code quality
        run: |
          pip install flake8 black
          flake8 train.py --max-line-length=100
          black --check train.py

      - name: Run unit tests
        run: |
          pytest tests/ -v --cov=.

  train:
    needs: validate
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install dvc dvc-s3 mlflow scikit-learn pandas numpy

      - name: Pull data from DVC remote
        run: |
          dvc pull data/training_data.csv.dvc

      - name: Run training pipeline
        run: |
          python train.py \
            --data-path data/training_data.csv \
            --n-estimators 100 \
            --max-depth 10 \
            --experiment-name "ci-pipeline" \
            --model-name "classifier"

      - name: Push model artifacts to DVC
        run: |
          dvc add models/
          dvc push

      - name: Upload model artifacts
        uses: actions/upload-artifact@v4
        with:
          name: trained-model
          path: models/
          retention-days: 7

  evaluate:
    needs: train
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install mlflow scikit-learn pandas numpy

      - name: Compare with production model
        run: |
          python scripts/compare_models.py \
            --candidate-run-id $(mlflow runs list --experiment-name ci-pipeline --limit 1 --output-format json | jq -r '.[0].run_id') \
            --production-model-name "classifier" \
            --production-stage "Production"

      - name: Promote model if better
        if: success()
        run: |
          python scripts/promote_model.py \
            --run-id $(mlflow runs list --experiment-name ci-pipeline --limit 1 --output-format json | jq -r '.[0].run_id') \
            --metric-threshold 0.85

  deploy:
    needs: evaluate
    if: github.ref == 'refs/heads/main' && success()
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Deploy to staging
        run: |
          echo "Deploying model to staging environment"
          # Add your deployment logic here
          # For example: kubectl apply -f deployment.yaml

Model Comparison Script

Create scripts/compare_models.py to automate model evaluation against production:

# scripts/compare_models.py
import argparse
import json
import mlflow
from mlflow.tracking import MlflowClient

def get_production_model_metrics(client: MlflowClient, model_name: str) -> dict:
    """Retrieve metrics from the current production model.

    Args:
        client: MLflow client instance
        model_name: Name of the registered model

    Returns:
        Dictionary of metric names to values
    """
    try:
        # Get latest production version
        latest_version = client.get_latest_versions(model_name, stages=["Production"])
        if not latest_version:
            print(f"No production version found for model '{model_name}'")
            return {}

        run_id = latest_version[0].run_id
        run = client.get_run(run_id)
        return run.data.metrics
    except Exception as e:
        print(f"Error fetching production model metrics: {e}")
        return {}

def compare_models(
    candidate_metrics: dict,
    production_metrics: dict,
    threshold: float = 0.05,
) -> bool:
    """Compare candidate model against production.

    Args:
        candidate_metrics: Metrics from the newly trained model
        production_metrics: Metrics from the current production model
        threshold: Minimum improvement required to promote

    Returns:
        True if candidate outperforms production
    """
    if not production_metrics:
        print("No production model to compare against. Promoting candidate.")
        return True

    candidate_accuracy = candidate_metrics.get("accuracy", 0)
    production_accuracy = production_metrics.get("accuracy", 0)

    improvement = candidate_accuracy - production_accuracy
    print(f"Candidate accuracy: {candidate_accuracy:.4f}")
    print(f"Production accuracy: {production_accuracy:.4f}")
    print(f"Improvement: {improvement:.4f}")

    return improvement >= threshold

def main():
    parser = argparse.ArgumentParser(description="Compare ML models")
    parser.add_argument("--candidate-run-id", required=True)
    parser.add_argument("--production-model-name", required=True)
    parser.add_argument("--production-stage", default="Production")
    parser.add_argument("--threshold", type=float, default=0.05)

    args = parser.parse_args()

    client = MlflowClient()

    # Get candidate metrics
    candidate_run = client.get_run(args.candidate_run_id)
    candidate_metrics = candidate_run.data.metrics

    # Get production metrics
    production_metrics = get_production_model_metrics(
        client, args.production_model_name
    )

    # Compare
    should_promote = compare_models(
        candidate_metrics, production_metrics, args.threshold
    )

    # Output result for GitHub Actions
    result = {"should_promote": should_promote}
    print(json.dumps(result))

    if not should_promote:
        exit(1)

if __name__ == "__main__":
    main()

Handling Edge Cases and Production Considerations

Data Drift Detection

Add a drift detection step before training to catch distribution shifts:

# scripts/detect_drift.py
import numpy as np
import pandas as pd
from scipy.stats import ks_2samp

def detect_feature_drift(
    reference_data: pd.DataFrame,
    current_data: pd.DataFrame,
    threshold: float = 0.05,
) -> dict:
    """Detect distribution drift using Kolmogorov-Smirnov test.

    Args:
        reference_data: Baseline dataset
        current_data: New dataset to check
        threshold: P-value threshold for drift detection

    Returns:
        Dictionary mapping feature names to drift status
    """
    drift_results = {}

    for column in reference_data.columns:
        if column == "target":
            continue

        stat, p_value = ks_2samp(
            reference_data[column].values,
            current_data[column].values,
        )

        drift_results[column] = {
            "drift_detected": p_value < threshold,
            "ks_statistic": float(stat),
            "p_value": float(p_value),
        }

    return drift_results

Resource Management

Large models and datasets can exhaust CI runner resources. Implement these safeguards:

# In GitHub Actions workflow
- name: Check available disk space
  run: |
    df -h
    if [ $(df --output=pcent / | tail -1 | tr -d '%') -gt 85 ]; then
      echo "Disk space critically low"
      exit 1
    fi

- name: Limit model training resources
  run: |
    # Set memory limit for Python process
    export PYTHON_MEMORY_LIMIT="4G"
    python train.py --data-path data/training_data.csv --n-estimators 50

Model Registry Governance

Implement approval gates for production deployments:

# scripts/request_approval.py
import mlflow
from mlflow.tracking import MlflowClient

def request_production_approval(
    run_id: str,
    model_name: str,
    metrics: dict,
    requester: str,
) -> str:
    """Create an approval request for model promotion.

    Args:
        run_id: MLflow run ID
        model_name: Registered model name
        metrics: Model performance metrics
        requester: Email or username of requester

    Returns:
        Approval request ID
    """
    client = MlflowClient()

    # Create model version if not exists
    try:
        version = client.create_model_version(
            name=model_name,
            source=f"runs:/{run_id}/model",
            run_id=run_id,
        )
    except Exception as e:
        print(f"Model version already exists: {e}")
        version = client.get_latest_versions(model_name, stages=["None"])[0]

    # Transition to staging for review
    client.transition_model_version_stage(
        name=model_name,
        version=version.version,
        stage="Staging",
    )

    # Log approval metadata
    client.set_model_version_tag(
        name=model_name,
        version=version.version,
        key="approval_requester",
        value=requester,
    )

    return f"{model_name}:v{version.version}"

if __name__ == "__main__":
    # Example usage
    request_id = request_production_approval(
        run_id="abc123",
        model_name="classifier",
        metrics={"accuracy": 0.92},
        requester="ml-engineer@company.com",
    )
    print(f"Approval requested for {request_id}")

Monitoring and Alerting

Add post-deployment monitoring to catch performance degradation:

# scripts/monitor_model.py
import mlflow
import numpy as np
from datetime import datetime, timedelta

def check_model_health(
    model_name: str,
    metric_name: str = "accuracy",
    degradation_threshold: float = 0.1,
    lookback_days: int = 30,
) -> dict:
    """Monitor model performance over time.

    Args:
        model_name: Registered model name
        metric_name: Metric to monitor
        degradation_threshold: Maximum acceptable degradation
        lookback_days: How far back to check

    Returns:
        Health status dictionary
    """
    client = mlflow.tracking.MlflowClient()

    # Get production model versions from last N days
    cutoff_date = datetime.now() - timedelta(days=lookback_days)

    versions = client.search_model_versions(
        f"name='{model_name}' and stage='Production'"
    )

    metrics_over_time = []
    for version in versions:
        run = client.get_run(version.run_id)
        if run.info.end_time and datetime.fromtimestamp(run.info.end_time / 1000) > cutoff_date:
            metrics_over_time.append({
                "version": version.version,
                "timestamp": run.info.end_time,
                metric_name: run.data.metrics.get(metric_name, 0),
            })

    if len(metrics_over_time) < 2:
        return {"status": "insufficient_data", "message": "Need at least 2 versions to compare"}

    # Check for degradation
    latest_metric = metrics_over_time[-1][metric_name]
    previous_metric = metrics_over_time[-2][metric_name]

    degradation = previous_metric - latest_metric

    if degradation > degradation_threshold:
        return {
            "status": "degraded",
            "current_metric": latest_metric,
            "previous_metric": previous_metric,
            "degradation": degradation,
            "alert": True,
        }

    return {
        "status": "healthy",
        "current_metric": latest_metric,
        "previous_metric": previous_metric,
        "degradation": degradation,
        "alert": False,
    }

What's Next

This pipeline provides a foundation for production ML CI/CD, but several enhancements can improve reliability:

  1. Add canary deployments: Route a percentage of traffic to new models before full rollout
  2. Implement A/B testing infrastructure: Compare model versions on live traffic
  3. Add automated rollback: Trigger rollback when monitoring detects degradation
  4. Integrate with feature stores: Use tools like Feast for consistent feature engineering
  5. Add cost tracking: Monitor compute costs per training run and model version

The key insight is that ML CI/CD is not just about automating deployment—it's about maintaining trust in your models as data and requirements evolve. By combining DVC for data versioning, MLflow for experiment tracking, and GitHub Actions for orchestration, you create a system where every model can be traced back to its exact training data, code, and configuration. This traceability is what separates experimental notebooks from production ML systems.


References

1. Wikipedia - Rag. Wikipedia. [Source]
2. GitHub - Shubhamsaboo/awesome-llm-apps. Github. [Source]
tutorialai
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles