Back to Tutorials
tutorialstutorialaillm

How to Monitor LLM Apps with LangSmith and Weights & Biases

Practical tutorial: Monitor LLM apps with LangSmith and Weights & Biases

BlogIA AcademyMay 29, 202613 min read2 429 words

How to Monitor LLM Apps with LangSmith and Weights & Biases

Table of Contents

📺 Watch: Intro to Large Language Models

Video by Andrej Karpathy


Building production LLM applications is deceptively simple in development but notoriously difficult to maintain at scale. You've likely experienced the frustration: a chain that works perfectly in your notebook starts hallucinating, returning empty responses, or silently degrading in production. Without proper observability, debugging these failures is like finding a needle in a haystack while blindfolded.

This tutorial will walk you through implementing a comprehensive monitoring stack using LangSmith for LLM-specific tracing and evaluation, combined with Weights & Biases (W&B) for experiment tracking and performance visualization. By the end, you'll have a production-ready monitoring system that catches regressions before they impact users.

Why LangSmith and W&B Together?

LangSmith and W&B serve complementary roles in the LLM observability landscape. LangSmith provides granular, per-request tracing of LLM calls, including token usage, latency, and exact prompts/responses. W&B excels at aggregating these traces into dashboards, comparing experiments across model versions, and alerting on performance degradation.

According to LangChain [7]'s documentation, LangSmith captures "every step of your LLM application, from the initial user input to the final response, including all intermediate calls to LLMs, tools, and retrievers." W&B extends this by providing "experiment tracking, dataset versioning, and model evaluation" capabilities that LangSmith alone doesn't offer.

Prerequisites and Environment Setup

Before diving into implementation, ensure you have:

  • Python 3.10+
  • A LangSmith account (free tier available at langsmith.com)
  • A Weights & Biases account (free tier at wandb.ai)
  • An OpenAI [8] API key (or another LLM provider)

Install the required packages:

pip install langchain langsmith wandb openai python-dotenv

Create a .env file with your credentials:

LANGSMITH_API_KEY=your_langsmith_api_key
LANGSMITH_PROJECT=llm-monitoring-tutorial
WANDB_API_KEY=your_wandb_api_key
OPENAI_API_KEY=your_openai_api_key

Set the LangSmith environment variables to enable tracing:

export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_ENDPOINT="https://api.smith.langchain.com"

Architecture: The Monitoring Pipeline

Our monitoring system follows a three-tier architecture:

  1. Instrumentation Layer: LangSmith callbacks automatically capture every LLM call, chain execution, and tool invocation
  2. Evaluation Layer: Custom evaluators run on each trace to check for hallucinations, response quality, and latency
  3. Visualization Layer: W&B aggregates traces into dashboards with real-time metrics and historical comparisons

Here's the data flow:

User Input → LangChain Chain → LLM Call
                                  ↓
                         LangSmith Trace (captured)
                                  ↓
                         Evaluation Callbacks (quality checks)
                                  ↓
                         W&B Logging (aggregated metrics)
                                  ↓
                         Dashboard Visualization

Implementing the Monitoring Stack

Step 1: Configure LangSmith Tracing

First, set up LangSmith to capture all LLM interactions. We'll create a custom callback handler that enriches traces with metadata:

import os
from dotenv import load_dotenv
from langchain.callbacks.tracers import LangChainTracer
from langchain.callbacks.manager import CallbackManager
from langchain.schema import BaseMessage
from typing import Any, Dict, List, Optional

load_dotenv()

class ProductionTracer(LangChainTracer):
    """Custom tracer that adds production metadata to every trace."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.environment = os.getenv("ENVIRONMENT", "production")
        self.version = os.getenv("APP_VERSION", "1.0.0")

    def on_llm_start(
        self,
        serialized: Dict[str, Any],
        prompts: List[str],
        **kwargs: Any
    ) -> Any:
        # Add custom metadata to every LLM call
        kwargs["metadata"] = {
            "environment": self.environment,
            "version": self.version,
            "trace_type": "llm_call"
        }
        return super().on_llm_start(serialized, prompts, **kwargs)

    def on_chain_start(
        self,
        serialized: Dict[str, Any],
        inputs: Dict[str, Any],
        **kwargs: Any
    ) -> Any:
        # Track chain execution with custom tags
        kwargs["tags"] = kwargs.get("tags", []) + [self.environment]
        return super().on_chain_start(serialized, inputs, **kwargs)

# Initialize the tracer
tracer = ProductionTracer(
    project_name=os.getenv("LANGSMITH_PROJECT", "llm-monitoring"),
    api_key=os.getenv("LANGSMITH_API_KEY")
)

callback_manager = CallbackManager([tracer])

Step 2: Build the LLM Application with Monitoring

Now create a production-grade question-answering chain with built-in monitoring:

from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.embedding [3]s import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.prompts import PromptTemplate
from langchain.schema import Document
import time

# Initialize the LLM with monitoring
llm = ChatOpenAI(
    model_name="gpt [5]-4",
    temperature=0.1,
    callbacks=callback_manager,
    metadata={
        "model_version": "gpt-4-0613",
        "deployment": "production"
    }
)

# Create a sample vector store (in production, use a persistent store)
documents = [
    Document(
        page_content="LangSmith is a platform for LLM application development, monitoring, and testing.",
        metadata={"source": "docs", "topic": "observability"}
    ),
    Document(
        page_content="Weights & Biases provides experiment tracking and model visualization tools.",
        metadata={"source": "docs", "topic": "mlops"}
    )
]

embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_documents(documents, embeddings)

# Custom prompt template with monitoring hooks
QA_PROMPT = PromptTemplate(
    template="""You are a helpful assistant. Use the following context to answer the question.

Context: {context}

Question: {question}

Answer the question concisely and accurately. If you don't know the answer, say so.
""",
    input_variables=["context", "question"]
)

# Create the QA chain with monitoring
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 2}),
    chain_type_kwargs={"prompt": QA_PROMPT},
    callbacks=callback_manager,
    metadata={"chain_type": "retrieval_qa", "version": "1.0"}
)

Step 3: Implement Custom Evaluators

LangSmith allows you to run custom evaluators on each trace. Here's a comprehensive evaluation suite:

from langsmith.evaluation import EvaluationResult
from langsmith.schemas import Run, Example
import re

class ResponseQualityEvaluator:
    """Evaluates response quality metrics."""

    @staticmethod
    def check_hallucination(run: Run, example: Example) -> EvaluationResult:
        """Check if the response contains information not in the context."""
        # Extract context from the run inputs
        context = run.inputs.get("context", "")
        response = run.outputs.get("result", "")

        # Simple heuristic: check for unsupported claims
        # In production, use a dedicated hallucination detection model
        context_lower = context.lower()
        response_lower = response.lower()

        # Check if key terms in response appear in context
        key_terms = set(re.findall(r'\b\w+\b', response_lower)) - \
                    set(re.findall(r'\b\w+\b', context_lower))

        hallucination_score = len(key_terms) / max(len(response.split()), 1)

        return EvaluationResult(
            key="hallucination_score",
            score=1.0 - min(hallucination_score, 1.0),
            comment=f"Found {len(key_terms)} terms not in context"
        )

    @staticmethod
    def check_response_length(run: Run, example: Example) -> EvaluationResult:
        """Ensure responses are within acceptable length bounds."""
        response = run.outputs.get("result", "")
        word_count = len(response.split())

        # Flag responses that are too short or too long
        if word_count < 5:
            return EvaluationResult(
                key="response_length",
                score=0.0,
                comment=f"Response too short: {word_count} words"
            )
        elif word_count > 500:
            return EvaluationResult(
                key="response_length",
                score=0.5,
                comment=f"Response too long: {word_count} words"
            )
        else:
            return EvaluationResult(
                key="response_length",
                score=1.0,
                comment=f"Response length OK: {word_count} words"
            )

    @staticmethod
    def check_latency(run: Run, example: Example) -> EvaluationResult:
        """Monitor response time against SLA."""
        latency = run.end_time - run.start_time if run.end_time else 0

        # SLA: response under 5 seconds
        if latency > 5.0:
            return EvaluationResult(
                key="latency_sla",
                score=0.0,
                comment=f"Latency {latency:.2f}s exceeds 5s SLA"
            )
        else:
            return EvaluationResult(
                key="latency_sla",
                score=1.0,
                comment=f"Latency {latency:.2f}s within SLA"
            )

# Register evaluators with LangSmith
from langsmith.evaluation import evaluate

evaluator = ResponseQualityEvaluator()

# In production, this would be called asynchronously on each trace
# For demonstration, we'll evaluate a sample run

Step 4: Integrate Weights & Biases for Aggregated Monitoring

W&B provides powerful visualization and alerting capabilities. Here's how to log LangSmith traces to W&B:

import wandb
from wandb.integration.langchain import WandbTracer
import pandas as pd
from datetime import datetime, timedelta

class WBDashboardLogger:
    """Logs aggregated metrics from LangSmith to W&B."""

    def __init__(self, project_name: str = "llm-monitoring"):
        self.run = wandb.init(
            project=project_name,
            config={
                "model": "gpt-4",
                "environment": os.getenv("ENVIRONMENT", "production"),
                "version": os.getenv("APP_VERSION", "1.0.0")
            }
        )
        self.metrics_buffer = []

    def log_trace_metrics(self, trace_data: Dict[str, Any]):
        """Log individual trace metrics to W&B."""
        metrics = {
            "latency_seconds": trace_data.get("latency", 0),
            "total_tokens": trace_data.get("total_tokens", 0),
            "prompt_tokens": trace_data.get("prompt_tokens", 0),
            "completion_tokens": trace_data.get("completion_tokens", 0),
            "cost_usd": self._calculate_cost(trace_data),
            "hallucination_score": trace_data.get("hallucination_score", 0),
            "response_length": trace_data.get("response_length", 0)
        }

        self.metrics_buffer.append(metrics)

        # Log every 10 traces to avoid overwhelming W&B
        if len(self.metrics_buffer) >= 10:
            self._flush_buffer()

    def _calculate_cost(self, trace_data: Dict[str, Any]) -> float:
        """Calculate cost based on token usage."""
        # GPT-4 pricing: $0.03/1K prompt tokens, $0.06/1K completion tokens
        prompt_cost = trace_data.get("prompt_tokens", 0) * 0.03 / 1000
        completion_cost = trace_data.get("completion_tokens", 0) * 0.06 / 1000
        return prompt_cost + completion_cost

    def _flush_buffer(self):
        """Flush metrics buffer to W&B."""
        if self.metrics_buffer:
            df = pd.DataFrame(self.metrics_buffer)

            # Log summary statistics
            self.run.log({
                "avg_latency": df["latency_seconds"].mean(),
                "p95_latency": df["latency_seconds"].quantile(0.95),
                "avg_cost": df["cost_usd"].mean(),
                "total_cost": df["cost_usd"].sum(),
                "avg_hallucination_score": df["hallucination_score"].mean(),
                "total_tokens": df["total_tokens"].sum(),
                "trace_count": len(df)
            })

            # Log distribution histograms
            self.run.log({
                "latency_distribution": wandb.Histogram(df["latency_seconds"]),
                "token_distribution": wandb.Histogram(df["total_tokens"]),
                "cost_distribution": wandb.Histogram(df["cost_usd"])
            })

            self.metrics_buffer = []

    def log_alert(self, alert_type: str, message: str, severity: str = "warning"):
        """Log an alert for anomalous behavior."""
        self.run.log({
            f"alert_{alert_type}": wandb.Table(
                columns=["timestamp", "type", "message", "severity"],
                data=[[datetime.now().isoformat(), alert_type, message, severity]]
            )
        })

    def finish(self):
        """Clean up W&B run."""
        self._flush_buffer()
        self.run.finish()

# Initialize W&B logger
wb_logger = WBDashboardLogger()

Step 5: Production Monitoring Pipeline

Combine everything into a production-ready monitoring pipeline:

import asyncio
from typing import AsyncGenerator, Optional
import json

class ProductionMonitor:
    """Main monitoring orchestrator for LLM applications."""

    def __init__(self):
        self.wb_logger = WBDashboardLogger()
        self.tracer = tracer
        self.evaluator = ResponseQualityEvaluator()
        self.alert_thresholds = {
            "max_latency": 5.0,  # seconds
            "max_hallucination": 0.3,  # score
            "max_cost_per_request": 0.05,  # USD
            "error_rate_threshold": 0.05  # 5% error rate
        }
        self.error_count = 0
        self.total_requests = 0

    async def monitor_request(
        self,
        query: str,
        chain: RetrievalQA
    ) -> Dict[str, Any]:
        """Monitor a single LLM request end-to-end."""
        start_time = time.time()
        self.total_requests += 1

        try:
            # Execute the chain with tracing
            response = chain.run(query)

            # Calculate metrics
            latency = time.time() - start_time

            # Create trace data for W&B
            trace_data = {
                "latency": latency,
                "query": query,
                "response": response,
                "total_tokens": self._extract_token_usage(),
                "prompt_tokens": self._extract_prompt_tokens(),
                "completion_tokens": self._extract_completion_tokens(),
                "hallucination_score": 0.0,  # Would be set by evaluator
                "response_length": len(response.split()),
                "timestamp": datetime.now().isoformat()
            }

            # Log to W&B
            self.wb_logger.log_trace_metrics(trace_data)

            # Check for alerts
            await self._check_alerts(trace_data)

            return {
                "success": True,
                "response": response,
                "latency": latency,
                "trace_id": self._get_current_trace_id()
            }

        except Exception as e:
            self.error_count += 1
            error_rate = self.error_count / self.total_requests

            # Log error to W&B
            self.wb_logger.log_alert(
                alert_type="request_error",
                message=f"Request failed: {str(e)}",
                severity="error"
            )

            # Check if error rate exceeds threshold
            if error_rate > self.alert_thresholds["error_rate_threshold"]:
                self._trigger_high_error_rate_alert(error_rate)

            return {
                "success": False,
                "error": str(e),
                "error_rate": error_rate
            }

    async def _check_alerts(self, trace_data: Dict[str, Any]):
        """Check metrics against alert thresholds."""
        alerts = []

        if trace_data["latency"] > self.alert_thresholds["max_latency"]:
            alerts.append({
                "type": "high_latency",
                "message": f"Latency {trace_data['latency']:.2f}s exceeds threshold",
                "severity": "warning"
            })

        if trace_data.get("hallucination_score", 0) > self.alert_thresholds["max_hallucination"]:
            alerts.append({
                "type": "high_hallucination",
                "message": f"Hallucination score {trace_data['hallucination_score']:.2f} exceeds threshold",
                "severity": "critical"
            })

        cost = self.wb_logger._calculate_cost(trace_data)
        if cost > self.alert_thresholds["max_cost_per_request"]:
            alerts.append({
                "type": "high_cost",
                "message": f"Request cost ${cost:.4f} exceeds threshold",
                "severity": "warning"
            })

        for alert in alerts:
            self.wb_logger.log_alert(**alert)

    def _trigger_high_error_rate_alert(self, error_rate: float):
        """Trigger critical alert for high error rate."""
        self.wb_logger.log_alert(
            alert_type="high_error_rate",
            message=f"Error rate {error_rate:.2%} exceeds threshold of {self.alert_thresholds['error_rate_threshold']:.0%}",
            severity="critical"
        )

    def _extract_token_usage(self) -> int:
        """Extract token usage from the current trace."""
        # In production, this would read from LangSmith's trace API
        return 0

    def _extract_prompt_tokens(self) -> int:
        """Extract prompt token count."""
        return 0

    def _extract_completion_tokens(self) -> int:
        """Extract completion token count."""
        return 0

    def _get_current_trace_id(self) -> str:
        """Get the current trace ID from LangSmith."""
        # In production, this would be available from the tracer
        return "trace_id_placeholder"

    async def shutdown(self):
        """Gracefully shutdown the monitor."""
        self.wb_logger.finish()

# Usage example
async def main():
    monitor = ProductionMonitor()

    # Simulate production requests
    test_queries = [
        "What is LangSmith?",
        "How does W&B help with ML monitoring?",
        "Tell me about something not in the context"
    ]

    for query in test_queries:
        result = await monitor.monitor_request(query, qa_chain)
        print(f"Query: {query}")
        print(f"Result: {result['success']}")
        if result['success']:
            print(f"Response: {result['response'][:100]}..")
        print("---")

    await monitor.shutdown()

# Run the monitoring pipeline
if __name__ == "__main__":
    asyncio.run(main())

Edge Cases and Production Considerations

Handling API Rate Limits

When monitoring at scale, you'll encounter API rate limits from both LangSmith and W&B. Implement exponential backoff:

import time
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1.0):
    """Decorator for retrying API calls with exponential backoff."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if "rate limit" in str(e).lower() and attempt < max_retries - 1:
                        delay = base_delay * (2 ** attempt)
                        time.sleep(delay)
                    else:
                        raise
            return None
        return wrapper
    return decorator

# Apply to W&B logging
@retry_with_backoff(max_retries=3)
def safe_log_metrics(self, metrics):
    self.run.log(metrics)

Memory Management for High-Throughput Systems

In production, you might process thousands of requests per minute. Buffer metrics and flush periodically:

class BufferedMetricsLogger:
    """Thread-safe metrics buffer for high-throughput scenarios."""

    def __init__(self, buffer_size=100, flush_interval=10):
        self.buffer = []
        self.buffer_size = buffer_size
        self.flush_interval = flush_interval
        self.last_flush = time.time()
        self.lock = threading.Lock()

    def add_metric(self, metric: Dict[str, Any]):
        with self.lock:
            self.buffer.append(metric)

            if len(self.buffer) >= self.buffer_size or \
               time.time() - self.last_flush >= self.flush_interval:
                self._flush()

    def _flush(self):
        """Flush buffer to W&B."""
        if self.buffer:
            # Batch log to W&B
            wandb.log({"batch_metrics": self.buffer})
            self.buffer = []
            self.last_flush = time.time()

Handling Missing or Corrupted Traces

Not all traces will complete successfully. Implement graceful degradation:

def safe_trace_evaluation(trace_data: Optional[Dict]) -> Dict[str, Any]:
    """Safely evaluate a trace, returning defaults on failure."""
    if not trace_data:
        return {
            "hallucination_score": 0.0,
            "latency": 0.0,
            "error": "missing_trace_data"
        }

    try:
        # Attempt evaluation
        return evaluator.evaluate(trace_data)
    except Exception as e:
        # Log the error but don't crash the pipeline
        wandb.alert(
            title="Trace Evaluation Failed",
            text=f"Failed to evaluate trace: {str(e)}"
        )
        return {
            "hallucination_score": 0.0,
            "latency": trace_data.get("latency", 0.0),
            "error": str(e)
        }

What's Next

You now have a production-ready monitoring stack for LLM applications. Here are the next steps to consider:

  1. Set up automated alerts: Configure W&B to send Slack or email alerts when metrics exceed thresholds
  2. Implement A/B testing: Use LangSmith's dataset features to compare model versions
  3. Add user feedback: Integrate thumbs up/down buttons that feed into your evaluation pipeline
  4. Scale with Kubernetes: Deploy the monitoring pipeline as a sidecar container in your Kubernetes pods

For deeper dives, check out our guides on LLM evaluation best practices and production MLOps patterns.

The key insight from this implementation is that monitoring isn't an afterthought—it's a core architectural component that should be designed alongside your LLM application. By combining LangSmith's tracing capabilities with W&B's visualization and alerting, you create a feedback loop that catches regressions early and provides the data needed for continuous improvement.

Remember: in production LLM applications, what you can't see can hurt you. Implement monitoring from day one, and your future self will thank you when that mysterious degradation appears at 2 AM.


References

1. Wikipedia - OpenAI. Wikipedia. [Source]
2. Wikipedia - GPT. Wikipedia. [Source]
3. Wikipedia - Embedding. Wikipedia. [Source]
4. GitHub - openai/openai-python. Github. [Source]
5. GitHub - Significant-Gravitas/AutoGPT. Github. [Source]
6. GitHub - fighting41love/funNLP. Github. [Source]
7. GitHub - langchain-ai/langchain. Github. [Source]
8. OpenAI Pricing. Pricing. [Source]
tutorialaillm
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles