Back to Tutorials
tutorialstutorialai

How to Build CI/CD for ML with GitHub Actions DVC MLflow

Practical tutorial: CI/CD for ML: GitHub Actions + DVC + MLflow

BlogIA AcademyJune 10, 202614 min read2 647 words

How to Build CI/CD for ML with GitHub Actions DVC MLflow

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Building production machine learning pipelines requires more than just training accurate models. You need reproducible experiments, automated testing, and reliable deployment. This tutorial walks through implementing a complete CI/CD pipeline for ML using GitHub Actions for automation, DVC for data versioning, and MLflow for experiment tracking and model registry.

Why CI/CD Matters for Machine Learning

Traditional software CI/CD pipelines handle code changes, but ML pipelines introduce additional complexity: data versioning, model artifacts, experiment tracking, and environment reproducibility. According to the paper "Intent-Aware Authorization for Zero Trust CI/CD" (Source: ArXiv), modern CI/CD systems must handle dynamic authorization contexts where pipeline components interact across trust boundaries. This is especially relevant for ML pipelines where data scientists, engineers, and production systems all need controlled access to models and data.

A production ML CI/CD pipeline must address:

  • Data versioning: Track which dataset version produced which model
  • Experiment reproducibility: Recreate any past experiment exactly
  • Model governance: Control which models move to production
  • Automated testing: Validate data quality, model performance, and deployment readiness

Real-World Architecture Overview

The architecture we'll build connects three core components:

  1. GitHub Actions orchestrates the pipeline, triggering on code pushes, data updates, or scheduled retraining
  2. DVC manages data and model versioning, storing artifacts in cloud storag [1]e (S3/GCS/Azure)
  3. MLflow tracks experiments, metrics, and manages the model registry

The pipeline flow:

Code Push → GitHub Actions → DVC Pull Data → Train Model → MLflow Log → DVC Push Model → Deploy

Prerequisites and Environment Setup

Before diving into implementation, ensure you have:

  • Python 3.9+ installed
  • A GitHub account with repository access
  • AWS S3 bucket (or equivalent cloud storage) for DVC remote storage
  • MLflow tracking server (can use local or Databricks Community Edition)

Install required packages:

pip install dvc dvc-s3 mlflow scikit-learn pandas numpy pytest

Configure DVC with your remote storage:

dvc init
dvc remote add -d myremote s3://your-bucket/dvc-store
dvc remote modify myremote region us-east-1

Setting Up the MLflow Tracking Server

MLflow provides experiment tracking, model registry, and deployment capabilities. For production, you'll want a persistent tracking server. Here's how to set one up:

# mlflow_server.py
import mlflow
from mlflow.tracking import MlflowClient

# Configure tracking URI - use a persistent database backend
tracking_uri = "postgresql://user:pass@localhost/mlflow"
mlflow.set_tracking_uri(tracking_uri)

# Create experiment if it doesn't exist
experiment_name = "ci-cd-ml-pipeline"
try:
    experiment_id = mlflow.create_experiment(
        name=experiment_name,
        artifact_location="s3://your-bucket/mlflow-artifacts"
    )
except mlflow.exceptions.MlflowException:
    experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id

print(f"MLflow experiment '{experiment_name}' ready with ID: {experiment_id}")

The paper "Establishing Workload Identity for Zero Trust CI/CD: From Secrets to SPIFFE-Based Authentication" (Source: ArXiv) highlights the importance of secure credential management in CI/CD pipelines. For MLflow, use environment variables or a secrets manager rather than hardcoding credentials:

export MLFLOW_TRACKING_URI="postgresql://mlflow_user:${MLFLOW_DB_PASSWORD}@mlflow-db.internal/mlflow"
export AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY}"
export AWS_SECRET_ACCESS_KEY="${AWS_SECRET_KEY}"

Core Implementation: The Training Pipeline

Now let's build the actual ML pipeline that our CI/CD system will execute. This example uses a classification model with scikit-learn, but the pattern applies to any ML framework.

# pipeline/train.py
import os
import sys
import json
import pickle
import pandas as pd
import numpy as np
from datetime import datetime
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import StandardScaler
import mlflow
import mlflow.sklearn
import dvc.api

def load_data_with_dvc():
    """
    Load data using DVC to ensure we're using the correct version.
    DVC handles pulling the right data version from remote storage.
    """
    # DVC tracks data files; this ensures we have the latest version
    os.system("dvc pull data/raw/dataset.csv.dvc")

    # Read the data file tracked by DVC
    data_path = "data/raw/dataset.csv"
    df = pd.read_csv(data_path)
    return df

def preprocess_data(df, target_column="target"):
    """
    Preprocess data with proper train/test split and scaling.
    Handles edge cases like missing values and data drift.
    """
    # Check for data quality issues
    if df.isnull().sum().sum() > 0:
        print(f"Warning: Found {df.isnull().sum().sum()} missing values")
        df = df.dropna()

    # Separate features and target
    X = df.drop(columns=[target_column])
    y = df[target_column]

    # Handle categorical features
    categorical_cols = X.select_dtypes(include=['object']).columns
    if len(categorical_cols) > 0:
        X = pd.get_dummies(X, columns=categorical_cols)

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    # Scale features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    return X_train_scaled, X_test_scaled, y_train, y_test, scaler

def train_model(X_train, y_train, params=None):
    """
    Train a RandomForest classifier with configurable hyperparameters.
    Uses cross-validation for robust performance estimation.
    """
    if params is None:
        params = {
            "n_estimators": 100,
            "max_depth": 10,
            "min_samples_split": 5,
            "min_samples_leaf": 2,
            "random_state": 42
        }

    model = RandomForestClassifier(**params)

    # Cross-validation score
    cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring='accuracy')
    print(f"Cross-validation accuracy: {cv_scores.mean():.4f} ± {cv_scores.std():.4f}")

    model.fit(X_train, y_train)
    return model, cv_scores

def evaluate_model(model, X_test, y_test):
    """
    Comprehensive model evaluation with multiple metrics.
    Returns metrics dictionary for MLflow logging.
    """
    y_pred = model.predict(X_test)

    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred, average='weighted'),
        "recall": recall_score(y_test, y_pred, average='weighted'),
        "f1_score": f1_score(y_test, y_pred, average='weighted')
    }

    return metrics

def main():
    """
    Main training pipeline orchestrated by MLflow.
    """
    # Start MLflow run
    mlflow.set_experiment("ci-cd-ml-pipeline")

    with mlflow.start_run() as run:
        run_id = run.info.run_id
        print(f"MLflow Run ID: {run_id}")

        # Log parameters
        params = {
            "n_estimators": 100,
            "max_depth": 10,
            "min_samples_split": 5,
            "min_samples_leaf": 2
        }
        mlflow.log_params(params)

        # Load and preprocess data
        print("Loading data with DVC..")
        df = load_data_with_dvc()
        mlflow.log_param("data_shape", df.shape)

        print("Preprocessing data..")
        X_train, X_test, y_train, y_test, scaler = preprocess_data(df)

        # Train model
        print("Training model..")
        model, cv_scores = train_model(X_train, y_train, params)
        mlflow.log_metric("cv_accuracy_mean", cv_scores.mean())
        mlflow.log_metric("cv_accuracy_std", cv_scores.std())

        # Evaluate model
        print("Evaluating model..")
        metrics = evaluate_model(model, X_test, y_test)
        mlflow.log_metrics(metrics)

        # Log artifacts
        mlflow.log_artifact("data/raw/dataset.csv", artifact_path="data")

        # Save and log the model
        mlflow.sklearn.log_model(
            sk_model=model,
            artifact_path="model",
            registered_model_name="random_forest_classifier"
        )

        # Save scaler for inference pipeline
        with open("scaler.pkl", "wb") as f:
            pickle.dump(scaler, f)
        mlflow.log_artifact("scaler.pkl")

        # Save metrics as JSON for downstream CI/CD steps
        with open("metrics.json", "w") as f:
            json.dump(metrics, f)

        print(f"Training complete. Metrics: {json.dumps(metrics, indent=2)}")

        return run_id

if __name__ == "__main__":
    main()

GitHub Actions Workflow Configuration

The GitHub Actions workflow orchestrates the entire CI/CD pipeline. It handles data versioning, model training, testing, and deployment decisions.

# .github/workflows/ml-pipeline.yml
name: ML CI/CD Pipeline

on:
  push:
    branches: [main, develop]
    paths:
      - 'pipeline/**'
      - 'data/**'
      - 'requirements.txt'
      - '.github/workflows/**'
  pull_request:
    branches: [main]
  schedule:
    # Weekly retraining on Sundays at midnight
    - cron: '0 0 * * 0'
  workflow_dispatch:  # Manual trigger

env:
  MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
  AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
  AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
  DVC_REMOTE: s3://your-bucket/dvc-store

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
        with:
          lfs: true

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
          cache: 'pip'

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
          pip install dvc dvc-s3

      - name: Configure DVC
        run: |
          dvc remote default myremote
          dvc remote modify myremote access_key_id $AWS_ACCESS_KEY_ID
          dvc remote modify myremote secret_access_key $AWS_SECRET_ACCESS_KEY

      - name: Pull data from DVC
        run: dvc pull

      - name: Run data validation
        run: |
          python -m pytest tests/test_data_quality.py -v

      - name: Run unit tests
        run: |
          python -m pytest tests/ -v --cov=pipeline --cov-report=xml

      - name: Upload test coverage
        uses: codecov/codecov-action@v3

  train:
    needs: validate
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
        with:
          lfs: true

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
          pip install dvc dvc-s3

      - name: Configure DVC and MLflow
        run: |
          dvc remote default myremote
          dvc remote modify myremote access_key_id $AWS_ACCESS_KEY_ID
          dvc remote modify myremote secret_access_key $AWS_SECRET_ACCESS_KEY
          echo "MLFLOW_TRACKING_URI=$MLFLOW_TRACKING_URI" >> $GITHUB_ENV

      - name: Pull latest data
        run: dvc pull

      - name: Train model
        run: python pipeline/train.py

      - name: Push model artifacts to DVC
        run: |
          dvc add models/
          dvc push

      - name: Upload MLflow run ID
        run: |
          echo "RUN_ID=$(mlflow runs list --experiment-name ci-cd-ml-pipeline --max-results 1 --format json | python -c 'import sys,json; print(json.load(sys.stdin)[0]["run_id"])')" >> $GITHUB_ENV

  evaluate:
    needs: train
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'

      - name: Install dependencies
        run: |
          pip install mlflow scikit-learn pandas numpy

      - name: Evaluate model against baseline
        run: |
          python pipeline/evaluate_vs_baseline.py

      - name: Check model performance thresholds
        run: |
          python pipeline/check_thresholds.py

  deploy:
    needs: evaluate
    if: github.ref == 'refs/heads/main' && success()
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'

      - name: Install dependencies
        run: |
          pip install mlflow boto3

      - name: Promote model to production
        run: |
          python pipeline/promote_to_production.py

      - name: Deploy model endpoint
        run: |
          python pipeline/deploy_model.py

Data Quality Testing

Data quality is critical for ML pipelines. Here's a comprehensive test suite:

# tests/test_data_quality.py
import pandas as pd
import numpy as np
import pytest
from scipy import stats

def test_data_shape():
    """Verify data has expected dimensions."""
    df = pd.read_csv("data/raw/dataset.csv")
    assert df.shape[0] >= 1000, f"Dataset too small: {df.shape[0]} rows"
    assert df.shape[1] >= 5, f"Too few features: {df.shape[1]} columns"

def test_missing_values():
    """Check for excessive missing values."""
    df = pd.read_csv("data/raw/dataset.csv")
    missing_pct = df.isnull().sum().max() / len(df) * 100
    assert missing_pct < 10, f"Too many missing values: {missing_pct:.2f}%"

def test_target_distribution():
    """Verify target variable has reasonable class balance."""
    df = pd.read_csv("data/raw/dataset.csv")
    target = df["target"]
    class_counts = target.value_counts(normalize=True)
    min_class_pct = class_counts.min()
    assert min_class_pct > 0.1, f"Class imbalance: smallest class is {min_class_pct:.2%}"

def test_data_drift():
    """Statistical test for data drift compared to reference."""
    df_current = pd.read_csv("data/raw/dataset.csv")

    try:
        df_reference = pd.read_csv("data/reference/dataset.csv")
    except FileNotFoundError:
        pytest.skip("Reference dataset not available")

    # Kolmogorov-Smirnov test for each numeric column
    numeric_cols = df_current.select_dtypes(include=[np.number]).columns
    for col in numeric_cols:
        if col in df_reference.columns:
            statistic, p_value = stats.ks_2samp(
                df_current[col].dropna(),
                df_reference[col].dropna()
            )
            assert p_value > 0.05, f"Data drift detected in column {col} (p={p_value:.4f})"

def test_feature_correlations():
    """Check for unexpected feature correlations."""
    df = pd.read_csv("data/raw/dataset.csv")
    numeric_df = df.select_dtypes(include=[np.number])
    corr_matrix = numeric_df.corr().abs()
    upper_tri = corr_matrix.where(
        np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
    )
    high_corr = upper_tri[upper_tri > 0.95].stack()
    assert len(high_corr) == 0, f"Highly correlated features found: {high_corr.index.tolist()}"

Model Evaluation and Threshold Checking

Before deploying, we need to ensure the model meets performance thresholds:

# pipeline/check_thresholds.py
import json
import mlflow
from mlflow.tracking import MlflowClient

def load_latest_metrics():
    """Fetch metrics from the latest MLflow run."""
    client = MlflowClient()

    # Get the latest run from our experiment
    experiment = client.get_experiment_by_name("ci-cd-ml-pipeline")
    runs = client.search_runs(
        experiment_ids=[experiment.experiment_id],
        order_by=["start_time DESC"],
        max_results=1
    )

    if not runs:
        raise ValueError("No runs found in experiment")

    latest_run = runs[0]
    metrics = {
        "accuracy": latest_run.data.metrics.get("accuracy", 0),
        "precision": latest_run.data.metrics.get("precision", 0),
        "recall": latest_run.data.metrics.get("recall", 0),
        "f1_score": latest_run.data.metrics.get("f1_score", 0)
    }

    return metrics

def check_thresholds(metrics):
    """
    Verify model meets minimum performance thresholds.
    These thresholds should be defined based on business requirements.
    """
    thresholds = {
        "accuracy": 0.80,
        "precision": 0.75,
        "recall": 0.75,
        "f1_score": 0.75
    }

    all_passed = True
    for metric_name, value in metrics.items():
        threshold = thresholds.get(metric_name)
        if threshold and value < threshold:
            print(f"FAIL: {metric_name} = {value:.4f} < {threshold}")
            all_passed = False
        else:
            print(f"PASS: {metric_name} = {value:.4f} >= {threshold}")

    if not all_passed:
        raise ValueError("Model failed performance thresholds")

    print("All thresholds passed!")
    return True

if __name__ == "__main__":
    metrics = load_latest_metrics()
    check_thresholds(metrics)

Model Promotion and Deployment

The final step promotes the model to production in MLflow's model registry:

# pipeline/promote_to_production.py
import mlflow
from mlflow.tracking import MlflowClient

def promote_model_to_production():
    """
    Promote the latest model version to production stage.
    Uses MLflow's model registry for version management.
    """
    client = MlflowClient()
    model_name = "random_forest_classifier"

    # Get all versions of the model
    versions = client.search_model_versions(f"name='{model_name}'")

    if not versions:
        raise ValueError(f"No versions found for model '{model_name}'")

    # Find the latest version
    latest_version = max(versions, key=lambda v: int(v.version))

    # Check if there's already a production version
    production_versions = [
        v for v in versions 
        if v.current_stage == "Production"
    ]

    if production_versions:
        # Archive the current production version
        for prod_version in production_versions:
            client.transition_model_version_stage(
                name=model_name,
                version=prod_version.version,
                stage="Archived"
            )
            print(f"Archived version {prod_version.version}")

    # Promote new version to production
    client.transition_model_version_stage(
        name=model_name,
        version=latest_version.version,
        stage="Production"
    )

    print(f"Promoted version {latest_version.version} to Production")

    # Add description with run information
    client.update_model_version(
        name=model_name,
        version=latest_version.version,
        description=f"Promoted to production on {datetime.now().isoformat()}"
    )

if __name__ == "__main__":
    promote_model_to_production()

Handling Edge Cases and Production Considerations

1. Data Version Conflicts

When multiple team members push data changes simultaneously, DVC can encounter merge conflicts. Handle this by:

# In your CI/CD pipeline, add conflict resolution
def resolve_dvc_conflicts():
    """Automatically resolve DVC merge conflicts by accepting latest."""
    import subprocess

    # Check for DVC conflicts
    result = subprocess.run(
        ["dvc", "status"],
        capture_output=True,
        text=True
    )

    if "conflict" in result.stdout.lower():
        # Accept the version from the current branch
        subprocess.run(["dvc", "checkout", "--force"])
        print("Resolved DVC conflicts by accepting current branch version")

2. MLflow Tracking Server Reliability

The paper "Decoupling Identity from Access: Credential Broker Patterns for Secure CI/CD" (Source: ArXiv) discusses patterns for managing credentials across pipeline components. For MLflow, implement retry logic:

import time
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def log_to_mlflow_with_retry(metrics):
    """Log metrics to MLflow with retry logic for transient failures."""
    with mlflow.start_run() as run:
        mlflow.log_metrics(metrics)
        return run.info.run_id

3. Model Registry Governance

Implement approval gates for production deployments:

def check_deployment_approval(model_name, version):
    """
    Check if model version has required approvals before deployment.
    In production, this would integrate with your approval system.
    """
    client = MlflowClient()
    model_version = client.get_model_version(model_name, version)

    # Check for required tags indicating approval
    required_tags = ["qa_approved", "security_reviewed"]

    for tag in required_tags:
        if tag not in model_version.tags:
            raise PermissionError(
                f"Model version {version} missing required tag: {tag}"
            )

    print(f"All approvals present for version {version}")
    return True

Monitoring and Alerting

Add monitoring to detect issues in production:

# pipeline/monitor.py
import mlflow
import numpy as np
from datetime import datetime, timedelta

def check_model_drift():
    """
    Monitor for prediction drift in production.
    Compares recent predictions to training distribution.
    """
    client = MlflowClient()

    # Get production model
    model_version = client.get_model_version(
        "random_forest_classifier", 
        "Production"
    )

    # Load recent predictions (in production, from your serving logs)
    recent_predictions = load_recent_predictions()  # Implement based on your serving infrastructure

    # Statistical test for drift
    from scipy import stats

    # Compare to training distribution
    training_predictions = load_training_predictions()

    ks_statistic, p_value = stats.ks_2samp(
        recent_predictions,
        training_predictions
    )

    if p_value < 0.05:
        alert_message = f"Prediction drift detected: KS={ks_statistic:.4f}, p={p_value:.4f}"
        print(f"ALERT: {alert_message}")
        # Send alert (email, Slack, PagerDuty)
        send_alert(alert_message)

    return p_value > 0.05

What's Next

This CI/CD pipeline provides a solid foundation for production ML workflows. To extend it:

  1. Add A/B testing: Deploy multiple model versions and compare performance
  2. Implement feature store integration: Use tools like Feast for feature management
  3. Add automated rollback: If performance degrades, automatically revert to previous model
  4. Integrate with monitoring tools: Connect to Prometheus/Grafana for real-time metrics
  5. Expand testing: Add integration tests, load tests, and security scanning

The combination of GitHub Actions, DVC, and MLflow creates a robust, reproducible ML pipeline that handles the unique challenges of machine learning in production. By implementing proper data versioning, experiment tracking, and automated testing, you ensure that your models are reliable, auditable, and ready for production deployment.

Remember that CI/CD for ML is an evolving practice. The patterns described here follow the principles of zero-trust CI/CD as discussed in recent research (Source: ArXiv), emphasizing secure credential management, workload identity, and decoupled access control. As your ML infrastructure grows, these security considerations become increasingly important for maintaining a production-grade pipeline.


References

1. Wikipedia - Rag. Wikipedia. [Source]
2. arXiv - How Developers Adopt, Use, and Evolve CI/CD Caching: An Empi. Arxiv. [Source]
3. arXiv - Intent-Aware Authorization for Zero Trust CI/CD. Arxiv. [Source]
4. GitHub - Shubhamsaboo/awesome-llm-apps. Github. [Source]
tutorialai
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles