Back to Tutorials
tutorialstutorialai

How to Monitor OpenAI API Performance with Production Tools

Practical tutorial: It indicates significant recognition and validation from a leading industry analyst firm for OpenAI's capabilities in en

BlogIA AcademyMay 23, 202616 min read3 106 words

How to Monitor OpenAI API Performance with Production Tools

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


OpenAI has established itself as a leading force in enterprise AI solutions, with its generative pre-trained transformer (GPT) family of large language models powering critical business applications worldwide. As of May 2026, organizations deploying OpenAI's API at scale face a fundamental challenge: maintaining reliability and performance visibility across distributed systems. This tutorial will guide you through building a production-grade monitoring solution for OpenAI API endpoints, leveraging real-time metrics, alerting, and historical analysis.

According to Wikipedia, OpenAI is an American artificial intelligence research organization headquartered in San Francisco, consisting of a for-profit public benefit corporation partially controlled by a nonprofit foundation. The company developed the GPT family of models, DALL-E [2] text-to-image models, and Sora text-to-video models. For enterprise teams relying on these capabilities, downtime and latency fluctuations can directly impact revenue and user experience.

We'll build a comprehensive monitoring stack that tracks API uptime, response latencies, error rates, and model-specific performance. This system will help you detect issues before they affect end users, optimize cost by identifying underperforming endpoints, and maintain service level agreements (SLAs) with confidence.

Understanding the Monitoring Architecture

Before writing code, let's establish the architectural foundation. Our monitoring system will consist of three core components:

  1. Probe Service: A distributed health checker that sends synthetic requests to OpenAI API endpoints at configurable intervals
  2. Metrics Aggregator: A time-series database that stores latency percentiles, error counts, and status codes
  3. Alerting Engine: A rule-based system that triggers notifications when metrics breach defined thresholds

The OpenAI API, categorized as a code-assistant tool according to DND:Tools, provides access to GPT-3 and GPT-4 models for natural language tasks, and Codex for translating natural language to code. Monitoring this API requires understanding its rate limits, authentication patterns, and response structures.

For production deployments, we'll use the OpenAI Downtime Monitor, a free tool that tracks API uptime and latencies for various OpenAI models and other LLM providers. This tool, available at https://status.portkey.ai/, provides baseline metrics we can compare against our internal measurements.

Key Metrics to Track

  • p50, p95, p99 Latency: Response time percentiles for different model endpoints
  • Error Rate: Percentage of failed requests (4xx and 5xx status codes)
  • Throughput: Requests per minute per API key
  • Token Usage: Prompt and completion token counts for cost tracking
  • Rate Limit Headers: Remaining requests and tokens before throttling

Prerequisites and Environment Setup

Let's set up our development environment with the necessary dependencies. We'll use Python 3.11+ and modern async libraries for maximum performance.

# Create a virtual environment
python -m venv openai-monitor
source openai-monitor/bin/activate

# Install core dependencies
pip install openai==1.30.0 httpx==0.27.0 prometheus-client==0.20.0
pip install python-dotenv==1.0.1 pydantic==2.7.0
pip install redis==5.0.0 celery==5.4.0
pip install fastapi==0.111.0 uvicorn==0.29.0

# For time-series storage
pip install influx [6]db-client==1.44.0

# For alerting
pip install slack-sdk==3.27.0 twilio==9.0.0

Create a .env file for configuration:

OPENAI_API_KEY=sk-your-key-here
OPENAI_ORG_ID=org-your-org-id
INFLUXDB_URL=http://localhost:8086
INFLUXDB_TOKEN=your-influxdb-token
INFLUXDB_ORG=openai-monitor
INFLUXDB_BUCKET=api_metrics
REDIS_URL=redis://localhost:6379/0
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/your/webhook

Building the Core Monitoring Probe

The probe service is the heart of our monitoring system. It sends periodic requests to OpenAI API endpoints and records response metrics. We'll implement this as an async service that can handle multiple concurrent probes.

# probes/openai_probe.py
import asyncio
import time
import logging
from datetime import datetime, timezone
from typing import Optional, Dict, Any
from dataclasses import dataclass, field

import httpx
from openai import AsyncOpenAI
from prometheus_client import Histogram, Counter, Gauge

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Prometheus metrics for real-time monitoring
LATENCY_HISTOGRAM = Histogram(
    'openai_api_latency_seconds',
    'API response latency in seconds',
    ['model', 'endpoint', 'status_code'],
    buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0)
)

ERROR_COUNTER = Counter(
    'openai_api_errors_total',
    'Total API errors by type',
    ['model', 'error_type', 'status_code']
)

RATE_LIMIT_GAUGE = Gauge(
    'openai_api_rate_limit_remaining',
    'Remaining requests before rate limit',
    ['model', 'limit_type']
)

@dataclass
class ProbeResult:
    """Structured result from a single API probe."""
    model: str
    endpoint: str
    latency_ms: float
    status_code: int
    error: Optional[str] = None
    tokens_used: int = 0
    rate_limit_remaining: int = 0
    rate_limit_reset: int = 0
    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

class OpenAIProbe:
    """Production-grade probe for OpenAI API endpoints."""

    def __init__(
        self,
        api_key: str,
        org_id: Optional[str] = None,
        base_url: str = "https://api.openai.com/v1",
        timeout: float = 30.0,
        max_retries: int = 3
    ):
        self.client = AsyncOpenAI(
            api_key=api_key,
            organization=org_id,
            base_url=base_url,
            timeout=timeout,
            max_retries=max_retries
        )
        self.http_client = httpx.AsyncClient(
            timeout=timeout,
            headers={
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json"
            }
        )
        self.base_url = base_url

    async def probe_chat_completion(
        self,
        model: str = "gpt-4",
        max_tokens: int = 50
    ) -> ProbeResult:
        """
        Probe a chat completion endpoint with a minimal request.

        This method sends a lightweight prompt to measure baseline latency
        without consuming significant token quota.
        """
        start_time = time.monotonic()
        error = None
        status_code = 200
        tokens_used = 0
        rate_limit_remaining = 0
        rate_limit_reset = 0

        try:
            response = await self.client.chat.completions.create(
                model=model,
                messages=[
                    {"role": "user", "content": "Respond with 'ok' only."}
                ],
                max_tokens=max_tokens,
                temperature=0.0  # Deterministic for consistent timing
            )

            # Extract token usage from response
            if response.usage:
                tokens_used = response.usage.total_tokens

            # Parse rate limit headers from underlying HTTP response
            if hasattr(response, '_response') and response._response:
                headers = response._response.headers
                rate_limit_remaining = int(
                    headers.get('x-ratelimit-remaining-requests', 0)
                )
                rate_limit_reset = int(
                    headers.get('x-ratelimit-reset-requests', 0)
                )

        except Exception as e:
            error = str(e)
            status_code = getattr(e, 'status_code', 500)
            ERROR_COUNTER.labels(
                model=model,
                error_type=type(e).__name__,
                status_code=status_code
            ).inc()
            logger.error(f"Probe failed for {model}: {error}")

        finally:
            latency_ms = (time.monotonic() - start_time) * 1000

            # Record latency histogram
            LATENCY_HISTOGRAM.labels(
                model=model,
                endpoint='chat/completions',
                status_code=status_code
            ).observe(latency_ms / 1000)

            # Update rate limit gauge
            RATE_LIMIT_GAUGE.labels(
                model=model,
                limit_type='requests'
            ).set(rate_limit_remaining)

        return ProbeResult(
            model=model,
            endpoint='chat/completions',
            latency_ms=latency_ms,
            status_code=status_code,
            error=error,
            tokens_used=tokens_used,
            rate_limit_remaining=rate_limit_remaining,
            rate_limit_reset=rate_limit_reset
        )

    async def probe_embedding(
        self,
        model: str = "text-embedding-3-small"
    ) -> ProbeResult:
        """
        Probe embedding endpoint for vector generation latency.

        Embeddings are critical for RAG applications and have different
        performance characteristics than chat completions.
        """
        start_time = time.monotonic()
        error = None
        status_code = 200

        try:
            response = await self.client.embeddings.create(
                model=model,
                input="Performance test vector.",
                dimensions=256  # Smaller dimension for faster response
            )

            # Extract token usage
            if response.usage:
                tokens_used = response.usage.total_tokens

        except Exception as e:
            error = str(e)
            status_code = getattr(e, 'status_code', 500)
            ERROR_COUNTER.labels(
                model=model,
                error_type=type(e).__name__,
                status_code=status_code
            ).inc()

        finally:
            latency_ms = (time.monotonic() - start_time) * 1000
            LATENCY_HISTOGRAM.labels(
                model=model,
                endpoint='embeddings',
                status_code=status_code
            ).observe(latency_ms / 1000)

        return ProbeResult(
            model=model,
            endpoint='embeddings',
            latency_ms=latency_ms,
            status_code=status_code,
            error=error
        )

    async def close(self):
        """Clean up HTTP connections."""
        await self.client.close()
        await self.http_client.aclose()

Handling Edge Cases in Production

The probe service must handle several edge cases that commonly occur in production environments:

  1. Rate Limiting: OpenAI enforces rate limits per API key and organization. Our probe tracks remaining requests via response headers and adjusts probe frequency accordingly. When x-ratelimit-remaining-requests approaches zero, we back off exponentially.

  2. Connection Timeouts: Network issues can cause hanging requests. We set a 30-second timeout and implement retry logic with exponential backoff (1s, 2s, 4s) to handle transient failures.

  3. Model Deprecation: OpenAI occasionally deprecates older model versions. Our probe logs model version from response headers and alerts when a deprecated model is detected.

  4. Token Quota Exhaustion: For paid accounts, running out of credits returns a 429 status. We monitor this separately and trigger billing alerts.

Implementing the Metrics Aggregator

Raw probe results need to be aggregated into meaningful metrics. We'll use InfluxDB for time-series storage and implement aggregation logic that computes percentiles over sliding windows.

# aggregator/metrics_aggregator.py
import asyncio
from collections import defaultdict
from typing import Dict, List, Optional
from datetime import datetime, timedelta, timezone
import statistics

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import numpy as np

class MetricsAggregator:
    """
    Aggregates probe results into time-series metrics with configurable windows.

    Uses InfluxDB for persistent storage and NumPy for efficient percentile
    calculations on large datasets.
    """

    def __init__(
        self,
        url: str,
        token: str,
        org: str,
        bucket: str,
        window_seconds: int = 300  # 5-minute aggregation window
    ):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()
        self.bucket = bucket
        self.org = org
        self.window = timedelta(seconds=window_seconds)

        # In-memory buffer for recent results
        self.buffer: Dict[str, List] = defaultdict(list)

    async def ingest_result(self, result):
        """Store a single probe result with full context."""
        point = Point("api_probe") \
            .tag("model", result.model) \
            .tag("endpoint", result.endpoint) \
            .tag("status_code", str(result.status_code)) \
            .field("latency_ms", result.latency_ms) \
            .field("tokens_used", result.tokens_used) \
            .field("rate_limit_remaining", result.rate_limit_remaining) \
            .time(result.timestamp)

        if result.error:
            point = point.field("error", result.error)

        self.write_api.write(bucket=self.bucket, record=point)

        # Update in-memory buffer for real-time aggregation
        key = f"{result.model}:{result.endpoint}"
        self.buffer[key].append(result)

        # Trim old entries from buffer
        cutoff = datetime.now(timezone.utc) - self.window
        self.buffer[key] = [
            r for r in self.buffer[key] 
            if r.timestamp > cutoff
        ]

    def compute_percentiles(self, model: str, endpoint: str) -> Dict:
        """
        Compute latency percentiles from the in-memory buffer.

        Returns p50, p95, p99, and p999 for the current window.
        """
        key = f"{model}:{endpoint}"
        results = self.buffer.get(key, [])

        if not results:
            return {
                'p50': 0, 'p95': 0, 'p99': 0, 'p999': 0,
                'count': 0, 'error_rate': 0.0
            }

        latencies = [r.latency_ms for r in results]
        errors = [r for r in results if r.error]

        # Use NumPy for efficient percentile calculation
        latencies_np = np.array(latencies)

        return {
            'p50': float(np.percentile(latencies_np, 50)),
            'p95': float(np.percentile(latencies_np, 95)),
            'p99': float(np.percentile(latencies_np, 99)),
            'p999': float(np.percentile(latencies_np, 99.9)),
            'count': len(results),
            'error_rate': len(errors) / len(results) if results else 0.0,
            'mean_latency': float(np.mean(latencies_np)),
            'std_latency': float(np.std(latencies_np))
        }

    async def get_historical_metrics(
        self,
        model: str,
        endpoint: str,
        duration_minutes: int = 60
    ) -> List[Dict]:
        """
        Query historical metrics from InfluxDB for trend analysis.

        This is useful for identifying performance regressions over time.
        """
        query = f'''
        from(bucket: "{self.bucket}")
            |> range(start: -{duration_minutes}m)
            |> filter(fn: (r) => r["model"] == "{model}")
            |> filter(fn: (r) => r["endpoint"] == "{endpoint}")
            |> aggregateWindow(every: 1m, fn: mean)
            |> yield(name: "mean")
        '''

        tables = self.query_api.query(query, org=self.org)
        results = []

        for table in tables:
            for record in table.records:
                results.append({
                    'time': record.get_time(),
                    'latency_ms': record.get_value(),
                    'model': record.values.get('model'),
                    'endpoint': record.values.get('endpoint')
                })

        return results

    async def close(self):
        """Clean up InfluxDB connection."""
        self.client.close()

Building the Alerting Engine

Alerting is critical for production monitoring. We'll implement a rule-based engine that evaluates metrics against thresholds and sends notifications through multiple channels.

# alerting/alert_engine.py
import asyncio
import json
import logging
from typing import Dict, List, Callable, Optional
from datetime import datetime, timezone
from dataclasses import dataclass, field

import httpx
from slack_sdk.web.async_client import AsyncWebClient

logger = logging.getLogger(__name__)

@dataclass
class AlertRule:
    """Defines a single alerting rule with threshold and conditions."""
    name: str
    model: str
    endpoint: str
    metric: str  # 'p95_latency', 'error_rate', 'throughput'
    operator: str  # 'gt', 'lt', 'gte', 'lte'
    threshold: float
    duration_seconds: int  # How long condition must persist
    severity: str  # 'critical', 'warning', 'info'
    enabled: bool = True

class AlertEngine:
    """
    Evaluates metrics against rules and triggers notifications.

    Supports multiple notification channels including Slack, email,
    and webhook-based integrations.
    """

    def __init__(
        self,
        slack_token: Optional[str] = None,
        slack_channel: str = "#alerts",
        webhook_url: Optional[str] = None
    ):
        self.rules: List[AlertRule] = []
        self.triggered_states: Dict[str, datetime] = {}

        # Initialize notification clients
        self.slack_client = (
            AsyncWebClient(token=slack_token) if slack_token else None
        )
        self.slack_channel = slack_channel
        self.webhook_url = webhook_url
        self.http_client = httpx.AsyncClient()

    def add_rule(self, rule: AlertRule):
        """Register a new alerting rule."""
        self.rules.append(rule)
        logger.info(f"Added alert rule: {rule.name}")

    async def evaluate_metrics(self, metrics: Dict) -> List[str]:
        """
        Evaluate current metrics against all active rules.

        Returns list of triggered alert messages.
        """
        triggered_alerts = []
        now = datetime.now(timezone.utc)

        for rule in self.rules:
            if not rule.enabled:
                continue

            # Extract the relevant metric value
            metric_value = self._extract_metric(metrics, rule)
            if metric_value is None:
                continue

            # Check if condition is met
            condition_met = self._evaluate_condition(
                metric_value, rule.operator, rule.threshold
            )

            rule_key = f"{rule.name}:{rule.model}:{rule.endpoint}"

            if condition_met:
                if rule_key not in self.triggered_states:
                    # First occurrence - start tracking duration
                    self.triggered_states[rule_key] = now

                # Check if condition has persisted long enough
                elapsed = (now - self.triggered_states[rule_key]).seconds
                if elapsed >= rule.duration_seconds:
                    alert_msg = (
                        f"[{rule.severity.upper()}] {rule.name}: "
                        f"{rule.metric} = {metric_value:.2f} "
                        f"({rule.operator} {rule.threshold}) "
                        f"for {rule.model}/{rule.endpoint}"
                    )
                    triggered_alerts.append(alert_msg)

                    # Send notifications
                    await self._send_notifications(alert_msg, rule.severity)

            else:
                # Condition no longer met - reset tracking
                self.triggered_states.pop(rule_key, None)

        return triggered_alerts

    def _extract_metric(self, metrics: Dict, rule: AlertRule) -> Optional[float]:
        """Extract specific metric value from aggregated data."""
        # Handle different metric types
        if rule.metric == 'p95_latency':
            return metrics.get('p95', None)
        elif rule.metric == 'error_rate':
            return metrics.get('error_rate', None)
        elif rule.metric == 'throughput':
            return metrics.get('count', None)
        elif rule.metric == 'mean_latency':
            return metrics.get('mean_latency', None)
        return None

    def _evaluate_condition(
        self, value: float, operator: str, threshold: float
    ) -> bool:
        """Evaluate a comparison condition."""
        operators = {
            'gt': lambda v, t: v > t,
            'lt': lambda v, t: v < t,
            'gte': lambda v, t: v >= t,
            'lte': lambda v, t: v <= t,
            'eq': lambda v, t: v == t
        }
        return operators.get(operator, lambda v, t: False)(value, threshold)

    async def _send_notifications(self, message: str, severity: str):
        """Send alert through configured channels."""
        tasks = []

        # Slack notification
        if self.slack_client:
            color = {
                'critical': '#FF0000',
                'warning': '#FFA500',
                'info': '#0000FF'
            }.get(severity, '#808080')

            tasks.append(
                self.slack_client.chat_postMessage(
                    channel=self.slack_channel,
                    attachments=[{
                        'color': color,
                        'text': message,
                        'ts': datetime.now().timestamp()
                    }]
                )
            )

        # Webhook notification
        if self.webhook_url:
            payload = {
                'text': message,
                'severity': severity,
                'timestamp': datetime.now().isoformat()
            }
            tasks.append(
                self.http_client.post(
                    self.webhook_url,
                    json=payload
                )
            )

        # Execute all notifications concurrently
        if tasks:
            results = await asyncio.gather(*tasks, return_exceptions=True)
            for result in results:
                if isinstance(result, Exception):
                    logger.error(f"Notification failed: {result}")

    async def close(self):
        """Clean up HTTP connections."""
        await self.http_client.aclose()

Orchestrating the Complete Monitoring System

Now let's wire everything together into a production-ready monitoring service that runs on a schedule.

# main.py
import asyncio
import signal
import sys
from datetime import datetime, timezone
from typing import List

from probes.openai_probe import OpenAIProbe, ProbeResult
from aggregator.metrics_aggregator import MetricsAggregator
from alerting.alert_engine import AlertEngine, AlertRule

class OpenAIMonitor:
    """
    Orchestrates the complete monitoring pipeline.

    Runs periodic probes, aggregates metrics, and evaluates alert rules.
    Designed for production deployment with graceful shutdown.
    """

    def __init__(
        self,
        api_key: str,
        org_id: str,
        influx_config: dict,
        slack_token: str,
        probe_interval: int = 60  # seconds between probe cycles
    ):
        self.probe = OpenAIProbe(api_key=api_key, org_id=org_id)
        self.aggregator = MetricsAggregator(**influx_config)
        self.alert_engine = AlertEngine(slack_token=slack_token)
        self.interval = probe_interval
        self.running = False

        # Configure default alert rules
        self._setup_alert_rules()

    def _setup_alert_rules(self):
        """Configure default alerting rules for common scenarios."""
        rules = [
            AlertRule(
                name="High Latency - GPT-4",
                model="gpt-4",
                endpoint="chat/completions",
                metric="p95_latency",
                operator="gt",
                threshold=5000.0,  # 5 seconds
                duration_seconds=120,  # 2 minutes
                severity="warning"
            ),
            AlertRule(
                name="Critical Latency - GPT-4",
                model="gpt-4",
                endpoint="chat/completions",
                metric="p95_latency",
                operator="gt",
                threshold=10000.0,  # 10 seconds
                duration_seconds=60,  # 1 minute
                severity="critical"
            ),
            AlertRule(
                name="Elevated Error Rate",
                model="gpt-4",
                endpoint="chat/completions",
                metric="error_rate",
                operator="gt",
                threshold=0.05,  # 5% error rate
                duration_seconds=300,  # 5 minutes
                severity="critical"
            ),
            AlertRule(
                name="Low Throughput",
                model="gpt-4",
                endpoint="chat/completions",
                metric="throughput",
                operator="lt",
                threshold=10,  # Less than 10 requests in window
                duration_seconds=600,  # 10 minutes
                severity="warning"
            )
        ]

        for rule in rules:
            self.alert_engine.add_rule(rule)

    async def probe_cycle(self):
        """
        Execute one complete probe cycle across all endpoints.

        Probes multiple models concurrently for efficiency.
        """
        models_to_probe = [
            ("gpt-4", "chat/completions"),
            ("gpt-3.5-turbo", "chat/completions"),
            ("text-embedding-3-small", "embeddings"),
            ("text-embedding-3-large", "embeddings")
        ]

        tasks = []
        for model, endpoint in models_to_probe:
            if endpoint == "chat/completions":
                tasks.append(self.probe.probe_chat_completion(model=model))
            elif endpoint == "embeddings":
                tasks.append(self.probe.probe_embedding(model=model))

        # Execute all probes concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Process results
        for result in results:
            if isinstance(result, ProbeResult):
                await self.aggregator.ingest_result(result)
            elif isinstance(result, Exception):
                logger.error(f"Probe cycle error: {result}")

        # Compute aggregated metrics and evaluate alerts
        for model, endpoint in models_to_probe:
            metrics = self.aggregator.compute_percentiles(model, endpoint)
            triggered = await self.alert_engine.evaluate_metrics(metrics)

            if triggered:
                logger.warning(f"Alerts triggered: {triggered}")

    async def run(self):
        """Main monitoring loop with graceful shutdown support."""
        self.running = True

        # Setup signal handlers for graceful shutdown
        loop = asyncio.get_event_loop()
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(
                sig,
                lambda: asyncio.create_task(self.shutdown())
            )

        logger.info("OpenAI Monitor started")

        while self.running:
            cycle_start = datetime.now(timezone.utc)

            try:
                await self.probe_cycle()
            except Exception as e:
                logger.error(f"Probe cycle failed: {e}", exc_info=True)

            # Calculate sleep time to maintain consistent interval
            elapsed = (datetime.now(timezone.utc) - cycle_start).seconds
            sleep_time = max(0, self.interval - elapsed)

            if sleep_time > 0:
                await asyncio.sleep(sleep_time)

    async def shutdown(self):
        """Gracefully shut down the monitor."""
        logger.info("Shutting down OpenAI Monitor..")
        self.running = False
        await self.probe.close()
        await self.aggregator.close()
        await self.alert_engine.close()
        logger.info("Shutdown complete")

async def main():
    """Entry point for the monitoring service."""
    import os
    from dotenv import load_dotenv

    load_dotenv()

    monitor = OpenAIMonitor(
        api_key=os.getenv("OPENAI_API_KEY"),
        org_id=os.getenv("OPENAI_ORG_ID"),
        influx_config={
            "url": os.getenv("INFLUXDB_URL"),
            "token": os.getenv("INFLUXDB_TOKEN"),
            "org": os.getenv("INFLUXDB_ORG"),
            "bucket": os.getenv("INFLUXDB_BUCKET")
        },
        slack_token=os.getenv("SLACK_WEBHOOK_URL"),
        probe_interval=60
    )

    await monitor.run()

if __name__ == "__main__":
    asyncio.run(main())

Deploying to Production

For production deployment, consider these additional considerations:

Containerization with Docker

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY .

# Run as non-root user
RUN useradd -m -u 1000 monitor
USER monitor

CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: openai-monitor
spec:
  replicas: 2
  selector:
    matchLabels:
      app: openai-monitor
  template:
    metadata:
      labels:
        app: openai-monitor
    spec:
      containers:
      - name: monitor
        image: openai-monitor:latest
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: openai-credentials
              key: api-key
        resources:
          requests:
            memory: "256Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "200m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10

What's Next

You now have a production-grade monitoring system for OpenAI API endpoints. This system provides real-time visibility into API performance, automated alerting for anomalies, and historical data for trend analysis.

To extend this system further:

  1. Add anomaly detection: Implement statistical models that automatically detect unusual patterns without manual threshold configuration
  2. Integrate with APM tools: Export metrics to Datadog, New Relic, or Grafana for unified dashboards
  3. Implement cost tracking: Monitor token usage across different models and API keys to optimize spending
  4. Build a web dashboard: Create a real-time UI using FastAPI and Chart.js for visual monitoring

The OpenAI Downtime Monitor at https://status.portkey.ai/ provides a complementary external view of API status. Cross-reference your internal metrics with this external source to distinguish between provider-wide issues and local configuration problems.

Remember that monitoring is an iterative process. Start with the default alert rules provided here, then tune thresholds based on your specific workload patterns and SLA requirements. The key to effective monitoring is not just detecting failures, but understanding normal behavior so you can identify anomalies before they become incidents.


References

1. Wikipedia - Rag. Wikipedia. [Source]
2. Wikipedia - DALL-E. Wikipedia. [Source]
3. Wikipedia - Flux. Wikipedia. [Source]
4. GitHub - Shubhamsaboo/awesome-llm-apps. Github. [Source]
5. GitHub - danny-avila/LibreChat. Github. [Source]
6. GitHub - black-forest-labs/flux. Github. [Source]
7. GitHub - openai/openai-python. Github. [Source]
tutorialai
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles