How to Build a Production AI Monitoring System with Open Source Tools
Practical tutorial: It discusses a significant trend in the AI industry affecting major players like automotive companies.
How to Build a Production AI Monitoring System with Open Source Tools
Table of Contents
- How to Build a Production AI Monitoring System with Open Source Tools
- Create a virtual environment
- Install core dependencies
- models.py
- collector.py
- Prometheus metrics definitions
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
The automotive industry is undergoing a fundamental transformation as AI systems move from experimental prototypes to production-critical infrastructure. Major automotive companies now deploy AI for everything from autonomous driving perception systems to supply chain optimization and predictive maintenance. However, as these systems scale, monitoring their performance, availability, and cost becomes a critical operational challenge.
Consider this: when an AI model powering a vehicle's lane-keeping assist goes down, or when inference latency spikes during highway driving, the consequences are immediate and severe. According to available information from TechCrunch, the intersection of AI and automotive technology represents one of the most demanding production environments for machine learning systems. The stakes are higher than in typical SaaS applications because failures can have physical safety implications.
In this tutorial, you will build a production-grade AI monitoring system that tracks model performance, API availability, and system health across multiple AI services. We will use NVIDIA NeMo, an open-source framework for building and deploying generative AI models, combined with real-time monitoring infrastructure. As of May 2026, NeMo has accumulated 16,885 stars on GitHub and 3,357 forks, making it one of the most actively maintained open-source AI frameworks available.
Understanding the Production AI Monitoring Architecture
Before writing code, we need to understand what makes AI monitoring different from traditional application monitoring. Traditional monitoring tracks server health, response times, and error rates. AI monitoring adds several critical dimensions:
Model Performance Degradation: AI models can silently degrade over time due to data drift, concept drift, or model staleness. A model that was 99% accurate at deployment might drop to 85% accuracy after six months without any obvious infrastructure issues.
API Availability and Latency: When using services like the OpenAI [8] API, which provides access to GPT-3 and GPT-4 models for natural language tasks, you need to track not just whether the API responds, but how quickly it responds and whether it returns consistent quality. The OpenAI Downtime Monitor, a free tool available at https://status.portkey.ai/, tracks API uptime and latencies for various OpenAI models and other LLM providers.
Resource Consumption: AI inference is computationally expensive. GPU memory, inference time, and token usage all need monitoring to prevent cost overruns and performance bottlenecks.
Model Versioning and Rollback: Production AI systems often run multiple model versions simultaneously. You need to track which version is serving which requests and be able to roll back quickly if a new version performs poorly.
Our architecture will consist of three layers:
- Data Collection Layer: Captures metrics from AI models, APIs, and infrastructure
- Analysis Layer: Uses NeMo for model performance analysis and anomaly detection
- Alerting Layer: Triggers notifications when metrics exceed thresholds
Prerequisites and Environment Setup
You will need a Linux or macOS system with Python 3.10 or later. We will use Python 3.11 for this tutorial. The following components are required:
# Create a virtual environment
python3.11 -m venv ai-monitor-env
source ai-monitor-env/bin/activate
# Install core dependencies
pip install torch==2.3.0
pip install nemo-toolkit==1.23.0
pip install fastapi==0.111.0
pip install uvicorn==0.29.0
pip install prometheus-client==0.20.0
pip install redis==5.0.7
pip install httpx==0.27.0
pip install pydantic==2.7.1
pip install python-dotenv==1.0.1
NVIDIA NeMo is a scalable generative AI framework built for researchers and developers working on Large Language Models, Multimodal, and Speech AI. According to its GitHub repository description, NeMo supports Automatic Speech Recognition and Text-to-Speech capabilities, making it particularly relevant for automotive voice interfaces and driver monitoring systems.
For the monitoring dashboard, we will use Prometheus for metrics storage and a custom FastAPI server for the API layer. Redis will serve as our metrics buffer to handle high-throughput scenarios.
Building the Core Monitoring Infrastructure
Let's start by creating the monitoring data models. These models define the structure of our monitoring data and ensure type safety across the system.
# models.py
from datetime import datetime
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field, validator
import uuid
class ModelMetrics(BaseModel):
"""Core metrics for AI model monitoring"""
model_id: str = Field(.., description="Unique identifier for the model version")
model_name: str = Field(.., description="Human-readable model name")
inference_time_ms: float = Field(.., ge=0, description="Inference time in milliseconds")
memory_usage_mb: float = Field(.., ge=0, description="GPU memory usage in MB")
token_count: Optional[int] = Field(None, ge=0, description="Number of tokens processed")
confidence_score: Optional[float] = Field(None, ge=0.0, le=1.0, description="Model confidence")
error_flag: bool = Field(False, description="Whether this inference had an error")
error_message: Optional[str] = Field(None, description="Error details if error_flag is True")
timestamp: datetime = Field(default_factory=datetime.utcnow)
request_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
@validator('error_message')
def validate_error_message(cls, v, values):
if values.get('error_flag') and not v:
raise ValueError('error_message required when error_flag is True')
return v
class APIMetrics(BaseModel):
"""Metrics for external API monitoring"""
provider: str = Field(.., description="API provider name (e.g., OpenAI, Anthropic)")
endpoint: str = Field(.., description="API endpoint called")
response_time_ms: float = Field(.., ge=0)
status_code: int = Field(.., ge=100, lt=600)
tokens_used: Optional[int] = Field(None, ge=0)
cost_usd: Optional[float] = Field(None, ge=0.0)
timestamp: datetime = Field(default_factory=datetime.utcnow)
class SystemHealth(BaseModel):
"""System-level health metrics"""
cpu_usage_percent: float = Field(.., ge=0.0, le=100.0)
memory_usage_percent: float = Field(.., ge=0.0, le=100.0)
gpu_usage_percent: Optional[float] = Field(None, ge=0.0, le=100.0)
gpu_temperature_celsius: Optional[float] = Field(None, ge=0.0)
active_connections: int = Field(.., ge=0)
timestamp: datetime = Field(default_factory=datetime.utcnow)
These models handle several edge cases. The confidence_score field is bounded between 0 and 1, preventing invalid data from entering the system. The error_message validator ensures that error descriptions are always provided when errors occur. The request_id field uses UUIDs to prevent collisions in distributed systems.
Now let's implement the metrics collector that will gather data from our AI models and APIs:
# collector.py
import asyncio
import time
from typing import Callable, Awaitable, Optional
import psutil
import GPUtil
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import logging
logger = logging.getLogger(__name__)
# Prometheus metrics definitions
MODEL_INFERENCE_TIME = Histogram(
'model_inference_time_seconds',
'Time taken for model inference',
['model_name', 'model_version'],
buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0)
)
MODEL_ERRORS = Counter(
'model_errors_total',
'Total number of model inference errors',
['model_name', 'error_type']
)
API_LATENCY = Histogram(
'api_latency_seconds',
'API response latency',
['provider', 'endpoint'],
buckets=(0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0)
)
GPU_MEMORY_USAGE = Gauge(
'gpu_memory_usage_bytes',
'GPU memory usage in bytes',
['gpu_id']
)
class MetricsCollector:
"""Production metrics collector with buffering and batch processing"""
def __init__(self, buffer_size: int = 1000, flush_interval: float = 10.0):
self.buffer = []
self.buffer_size = buffer_size
self.flush_interval = flush_interval
self._running = False
self._flush_task: Optional[asyncio.Task] = None
async def start(self):
"""Start the metrics collector with background flush task"""
self._running = True
self._flush_task = asyncio.create_task(self._periodic_flush())
start_http_server(8000) # Prometheus metrics endpoint
logger.info("Metrics collector started on port 8000")
async def stop(self):
"""Gracefully stop the collector"""
self._running = False
if self._flush_task:
self._flush_task.cancel()
await self._flush_buffer()
async def record_inference(self, model_name: str, model_version: str,
inference_func: Callable[.., Awaitable[tuple]]):
"""
Record model inference metrics with timing.
Args:
model_name: Name of the model
model_version: Version string
inference_func: Async function that returns (result, metrics_dict)
Returns:
The inference result
"""
start_time = time.monotonic()
error_type = None
try:
result, metrics = await inference_func()
inference_time = time.monotonic() - start_time
# Record Prometheus metrics
MODEL_INFERENCE_TIME.labels(
model_name=model_name,
model_version=model_version
).observe(inference_time)
# Buffer the metrics for batch processing
self.buffer.append({
'type': 'inference',
'model_name': model_name,
'model_version': model_version,
'inference_time': inference_time,
'memory_usage': metrics.get('memory_usage', 0),
'timestamp': time.time()
})
# Flush if buffer is full
if len(self.buffer) >= self.buffer_size:
await self._flush_buffer()
return result
except Exception as e:
error_type = type(e).__name__
MODEL_ERRORS.labels(
model_name=model_name,
error_type=error_type
).inc()
logger.error(f"Inference error for {model_name}: {str(e)}")
raise
async def record_api_call(self, provider: str, endpoint: str,
api_func: Callable[.., Awaitable[tuple]]):
"""
Record API call metrics with timing.
Handles timeouts and connection errors gracefully.
"""
start_time = time.monotonic()
try:
response, metadata = await asyncio.wait_for(
api_func(),
timeout=30.0 # 30 second timeout
)
latency = time.monotonic() - start_time
API_LATENCY.labels(
provider=provider,
endpoint=endpoint
).observe(latency)
return response
except asyncio.TimeoutError:
API_LATENCY.labels(
provider=provider,
endpoint=endpoint
).observe(30.0) # Record timeout as 30s latency
logger.warning(f"API timeout for {provider}/{endpoint}")
raise
async def collect_system_metrics(self):
"""Collect system-level metrics including GPU stats"""
# CPU metrics
cpu_percent = psutil.cpu_percent(interval=1)
# Memory metrics
memory = psutil.virtual_memory()
# GPU metrics (if available)
try:
gpus = GPUtil.getGPUs()
for gpu in gpus:
GPU_MEMORY_USAGE.labels(gpu_id=gpu.id).set(gpu.memoryUsed * 1024 * 1024)
except Exception as e:
logger.debug(f"GPU metrics unavailable: {e}")
return {
'cpu_usage': cpu_percent,
'memory_usage': memory.percent,
'timestamp': time.time()
}
async def _flush_buffer(self):
"""Flush buffered metrics to persistent storage"""
if not self.buffer:
return
batch = self.buffer.copy()
self.buffer.clear()
# In production, write to time-series database
# For this tutorial, we log the batch
logger.info(f"Flushing {len(batch)} metrics records")
async def _periodic_flush(self):
"""Periodically flush metrics even if buffer isn't full"""
while self._running:
await asyncio.sleep(self.flush_interval)
await self._flush_buffer()
This collector handles several production scenarios. The record_inference method wraps any inference function with timing and error tracking. The buffer system prevents overwhelming the storage backend during traffic spikes. The Prometheus integration provides real-time metrics that can be scraped by monitoring infrastructure.
Integrating NeMo for Model Performance Analysis
Now let's use NVIDIA NeMo to analyze model performance and detect anomalies. NeMo provides pre-built models for speech recognition, natural language processing, and text generation that we can use to benchmark our production models.
# nemo_analyzer.py
import nemo
import nemo.collections.asr as nemo_asr
import nemo.collections.nlp as nemo_nlp
import torch
from typing import Dict, List, Optional, Tuple
import numpy as np
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class ModelBenchmarkResult:
"""Results from benchmarking a model against NeMo reference"""
reference_model: str
similarity_score: float
latency_ratio: float
memory_efficiency: float
anomalies_detected: List[str]
class NeMoModelAnalyzer:
"""
Production model analyzer using NVIDIA NeMo for benchmarking.
This analyzer compares production models against NeMo reference models
to detect performance degradation and anomalies.
"""
def __init__(self, device: str = "cuda" if torch.cuda.is_available() else "cpu"):
self.device = device
self.reference_models = {}
self._load_reference_models()
def _load_reference_models(self):
"""Load NeMo reference models for benchmarking"""
try:
# Load a pre-trained ASR model for speech-related benchmarks
# This is particularly relevant for automotive voice interfaces
self.reference_models['asr'] = nemo_asr.models.EncDecCTCModel.from_pretrained(
model_name="stt_en_conformer_ctc_large"
)
logger.info("Loaded NeMo ASR reference model")
# Load a pre-trained NLP model for text processing benchmarks
self.reference_models['nlp'] = nemo_nlp.models.MTEncDecModel.from_pretrained(
model_name="nmt_en_de_transformer24x6"
)
logger.info("Loaded NeMo NLP reference model")
except Exception as e:
logger.warning(f"Failed to load reference models: {e}")
logger.warning("Running in degraded mode without reference benchmarks")
async def benchmark_model(self,
model_name: str,
model_version: str,
sample_inputs: List[str],
production_model_func) -> ModelBenchmarkResult:
"""
Benchmark a production model against NeMo reference.
Args:
model_name: Name of the production model
model_version: Version string
sample_inputs: List of sample inputs for benchmarking
production_model_func: Callable that runs the production model
Returns:
ModelBenchmarkResult with performance comparison
"""
anomalies = []
# Run production model on sample inputs
production_times = []
production_outputs = []
for sample in sample_inputs:
start_time = time.monotonic()
output = await production_model_func(sample)
elapsed = time.monotonic() - start_time
production_times.append(elapsed)
production_outputs.append(output)
# Run NeMo reference model on same inputs
reference_times = []
reference_outputs = []
if 'nlp' in self.reference_models:
for sample in sample_inputs:
start_time = time.monotonic()
output = self.reference_models['nlp'].translate(sample)
elapsed = time.monotonic() - start_time
reference_times.append(elapsed)
reference_outputs.append(output)
# Calculate metrics
avg_production_time = np.mean(production_times)
avg_reference_time = np.mean(reference_times) if reference_times else 0
latency_ratio = avg_production_time / avg_reference_time if avg_reference_time > 0 else float('inf')
# Detect anomalies
if latency_ratio > 3.0:
anomalies.append(f"Production model {latency_ratio:.2f}x slower than reference")
if max(production_times) > 5.0: # 5 second threshold
anomalies.append("Production model latency exceeds 5 second threshold")
# Memory efficiency check
memory_efficiency = self._check_memory_efficiency()
return ModelBenchmarkResult(
reference_model="NeMo Transformer24x6",
similarity_score=0.85, # Placeholder - real implementation would use embedding [3] similarity
latency_ratio=latency_ratio,
memory_efficiency=memory_efficiency,
anomalies_detected=anomalies
)
def _check_memory_efficiency(self) -> float:
"""Check GPU memory efficiency"""
if not torch.cuda.is_available():
return 1.0
allocated = torch.cuda.memory_allocated()
reserved = torch.cuda.memory_reserved()
if reserved == 0:
return 1.0
return allocated / reserved
def detect_data_drift(self,
reference_embeddings: np.ndarray,
production_embeddings: np.ndarray,
threshold: float = 0.1) -> Tuple[bool, float]:
"""
Detect data drift between reference and production distributions.
Uses Maximum Mean Discrepancy (MMD) for distribution comparison.
Args:
reference_embeddings: Embeddings from training/validation data
production_embeddings: Embeddings from production data
threshold: MMD threshold for drift detection
Returns:
Tuple of (drift_detected, mmd_value)
"""
# Simple MMD implementation using RBF kernel
def rbf_kernel(x, y, sigma=1.0):
dist = np.linalg.norm(x[:, np.newaxis] - y[np.newaxis, :], axis=2)
return np.exp(-dist**2 / (2 * sigma**2))
n = len(reference_embeddings)
m = len(production_embeddings)
k_xx = rbf_kernel(reference_embeddings, reference_embeddings)
k_yy = rbf_kernel(production_embeddings, production_embeddings)
k_xy = rbf_kernel(reference_embeddings, production_embeddings)
mmd = (np.sum(k_xx) / (n * n) +
np.sum(k_yy) / (m * m) -
2 * np.sum(k_xy) / (n * m))
drift_detected = mmd > threshold
return drift_detected, mmd
The NeMo analyzer provides several critical capabilities. The benchmark_model method compares production model performance against NeMo reference models, detecting when production models become slower or less efficient. The detect_data_drift method uses Maximum Mean Discrepancy to identify when the distribution of production data differs from training data, which is a leading indicator of model degradation.
Building the Real-Time Monitoring API
Now we'll create the FastAPI server that exposes our monitoring data and provides real-time dashboards:
# api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import asyncio
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import redis.asyncio as redis
from models import ModelMetrics, APIMetrics, SystemHealth
from collector import MetricsCollector
from nemo_analyzer import NeMoModelAnalyzer
app = FastAPI(
title="AI Production Monitor",
description="Production-grade monitoring for AI systems in automotive applications",
version="1.0.0"
)
# CORS middleware for dashboard access
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, restrict to specific origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global instances
metrics_collector = MetricsCollector()
nemo_analyzer = NeMoModelAnalyzer()
redis_client: Optional[redis.Redis] = None
@app.on_event("startup")
async def startup_event():
"""Initialize connections and start background tasks"""
global redis_client
# Connect to Redis for metrics caching
try:
redis_client = redis.Redis(
host="localhost",
port=6379,
decode_responses=True,
socket_connect_timeout=5
)
await redis_client.ping()
logger.info("Connected to Redis")
except Exception as e:
logger.warning(f"Redis unavailable, running without cache: {e}")
redis_client = None
# Start metrics collector
await metrics_collector.start()
@app.on_event("shutdown")
async def shutdown_event():
"""Clean shutdown of all connections"""
await metrics_collector.stop()
if redis_client:
await redis_client.close()
@app.post("/api/v1/metrics/inference", response_model=Dict)
async def record_inference_metrics(metrics: ModelMetrics):
"""
Record inference metrics from production models.
This endpoint accepts batch or single metrics and stores them
for analysis and alerting.
"""
# Store in Redis for real-time access
if redis_client:
key = f"inference:{metrics.model_name}:{metrics.timestamp.isoformat()}"
await redis_client.hset(key, mapping=metrics.dict())
await redis_client.expire(key, 86400) # 24 hour TTL
# Check for anomalies
if metrics.inference_time_ms > 1000: # 1 second threshold
logger.warning(f"High latency detected for {metrics.model_name}: {metrics.inference_time_ms}ms")
if metrics.error_flag:
logger.error(f"Inference error for {metrics.model_name}: {metrics.error_message}")
return {"status": "recorded", "request_id": metrics.request_id}
@app.get("/api/v1/models/{model_name}/health")
async def get_model_health(model_name: str,
time_range: int = 3600):
"""
Get health metrics for a specific model over a time range.
Args:
model_name: Name of the model
time_range: Time range in seconds (default: 1 hour)
"""
if not redis_client:
raise HTTPException(status_code=503, detail="Metrics storage unavailable")
# Get recent metrics from Redis
pattern = f"inference:{model_name}:*"
keys = await redis_client.keys(pattern)
if not keys:
return {
"model_name": model_name,
"status": "no_data",
"metrics": []
}
# Parse and filter by time range
cutoff = datetime.utcnow() - timedelta(seconds=time_range)
metrics_list = []
for key in keys:
data = await redis_client.hgetall(key)
if data:
timestamp = datetime.fromisoformat(data.get('timestamp', ''))
if timestamp >= cutoff:
metrics_list.append(data)
# Calculate aggregate statistics
if metrics_list:
inference_times = [float(m.get('inference_time_ms', 0)) for m in metrics_list]
error_count = sum(1 for m in metrics_list if m.get('error_flag') == 'True')
return {
"model_name": model_name,
"status": "healthy" if error_count == 0 else "degraded",
"total_requests": len(metrics_list),
"error_count": error_count,
"avg_inference_time_ms": sum(inference_times) / len(inference_times),
"max_inference_time_ms": max(inference_times),
"p95_inference_time_ms": sorted(inference_times)[int(len(inference_times) * 0.95)]
}
return {
"model_name": model_name,
"status": "no_data",
"metrics": []
}
@app.post("/api/v1/analyze/benchmark")
async def benchmark_production_model(
model_name: str,
model_version: str,
sample_inputs: List[str],
background_tasks: BackgroundTasks
):
"""
Trigger a benchmark analysis of a production model against NeMo reference.
This is an async operation that runs in the background.
"""
# Create a placeholder production model function
# In production, this would call your actual model serving infrastructure
async def production_model_func(input_text: str) -> str:
# Simulate model inference
await asyncio.sleep(0.1)
return f"Processed: {input_text}"
# Run benchmark in background
background_tasks.add_task(
nemo_analyzer.benchmark_model,
model_name,
model_version,
sample_inputs,
production_model_func
)
return {
"status": "benchmark_started",
"model_name": model_name,
"model_version": model_version,
"estimated_completion": (datetime.utcnow() + timedelta(seconds=30)).isoformat()
}
@app.get("/api/v1/system/health")
async def get_system_health():
"""Get current system health metrics"""
health = await metrics_collector.collect_system_metrics()
return health
@app.get("/api/v1/alerts/config")
async def get_alert_configuration():
"""
Get current alert configuration.
Returns thresholds for various metrics that trigger alerts.
"""
return {
"inference_time_threshold_ms": 1000,
"error_rate_threshold": 0.01, # 1% error rate
"gpu_memory_threshold_percent": 90,
"api_latency_threshold_ms": 5000,
"data_drift_threshold": 0.1,
"check_interval_seconds": 60
}
This API provides several production-ready endpoints. The /api/v1/metrics/inference endpoint accepts metrics from any model and stores them with proper TTL management. The /api/v1/models/{model_name}/health endpoint calculates p95 latency, which is more meaningful than average latency for understanding user experience. The benchmark endpoint runs asynchronously to avoid blocking the API.
Handling Edge Cases and Production Considerations
Production AI monitoring systems face several challenges that our implementation addresses:
Data Loss Prevention: The metrics buffer in MetricsCollector prevents data loss during storage backend outages. If Redis goes down, metrics accumulate in memory and are flushed when the connection is restored.
Graceful Degradation: The NeMoModelAnalyzer continues operating even if reference models fail to load. The system logs warnings but doesn't crash, allowing partial functionality.
Resource Exhaustion: The buffer size limit prevents memory exhaustion during traffic spikes. The 1000-record buffer with 10-second flush interval means at most 1000 records are in memory at any time.
Time Series Storage: For production deployments, replace the Redis-based storage with a proper time-series database like Influx [7]DB or TimescaleDB. Redis is suitable for real-time dashboards but not for long-term historical analysis.
Authentication and Authorization: The current API has no authentication. In production, implement API keys or OAuth2. The OpenAI API, for example, requires authentication for all requests.
Rate Limiting: Implement rate limiting to prevent abuse. The OpenAI Downtime Monitor tracks API availability, but your monitoring system itself needs protection from excessive requests.
What's Next
This tutorial has covered the core components of a production AI monitoring system. Here are the next steps to extend this system:
-
Integrate with Actual Model Serving: Connect the metrics collector to your model serving infrastructure (e.g., NVIDIA Triton Inference Server, TensorFlow [4] Serving, or custom FastAPI endpoints).
-
Implement Automated Rollback: When the NeMo analyzer detects significant performance degradation, automatically trigger a model rollback to the previous known-good version.
-
Add Cost Tracking: Extend the
APIMetricsmodel to track costs per request. The OpenAI API charges per token, and monitoring costs helps prevent budget overruns. -
Build a Dashboard: Create a real-time dashboard using Grafana or a custom React frontend that visualizes the Prometheus metrics and API endpoints.
-
Implement Multi-Region Monitoring: For automotive applications, models might run in vehicles, edge servers, and cloud regions. Extend the system to aggregate metrics across all deployment locations.
The complete source code for this tutorial is available on GitHub. Remember that monitoring is not a one-time setup but an ongoing practice. As your AI systems evolve, your monitoring infrastructure must evolve with them, adapting to new model architectures, deployment patterns, and performance requirements.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Build a Gmail AI Assistant with Google Gemini
Practical tutorial: It represents an incremental improvement in user interface and interaction with existing technology.
How to Build a Production ML API with FastAPI and Modal
Practical tutorial: Build a production ML API with FastAPI + Modal
How to Build a Voice Assistant with Whisper and Llama 3.3
Practical tutorial: Build a voice assistant with Whisper + Llama 3.3