How to Monitor LLM Apps with LangSmith and Weights & Biases
Practical tutorial: Monitor LLM apps with LangSmith and Weights & Biases
How to Monitor LLM Apps with LangSmith and Weights & Biases
Table of Contents
- How to Monitor LLM Apps with LangSmith and Weights & Biases
- Initialize the tracer
- Initialize the LLM with monitoring
- Create a sample vector store (in production, use a persistent store)
📺 Watch: Intro to Large Language Models
Video by Andrej Karpathy
Building production LLM applications is deceptively simple in development but notoriously difficult to maintain at scale. You've likely experienced the frustration: a chain that works perfectly in your notebook starts hallucinating, returning empty responses, or silently degrading in production. Without proper observability, debugging these failures is like finding a needle in a haystack while blindfolded.
This tutorial will walk you through implementing a comprehensive monitoring stack using LangSmith for LLM-specific tracing and evaluation, combined with Weights & Biases (W&B) for experiment tracking and performance visualization. By the end, you'll have a production-ready monitoring system that catches regressions before they impact users.
Why LangSmith and W&B Together?
LangSmith and W&B serve complementary roles in the LLM observability landscape. LangSmith provides granular, per-request tracing of LLM calls, including token usage, latency, and exact prompts/responses. W&B excels at aggregating these traces into dashboards, comparing experiments across model versions, and alerting on performance degradation.
According to LangChain [7]'s documentation, LangSmith captures "every step of your LLM application, from the initial user input to the final response, including all intermediate calls to LLMs, tools, and retrievers." W&B extends this by providing "experiment tracking, dataset versioning, and model evaluation" capabilities that LangSmith alone doesn't offer.
Prerequisites and Environment Setup
Before diving into implementation, ensure you have:
- Python 3.10+
- A LangSmith account (free tier available at langsmith.com)
- A Weights & Biases account (free tier at wandb.ai)
- An OpenAI [8] API key (or another LLM provider)
Install the required packages:
pip install langchain langsmith wandb openai python-dotenv
Create a .env file with your credentials:
LANGSMITH_API_KEY=your_langsmith_api_key
LANGSMITH_PROJECT=llm-monitoring-tutorial
WANDB_API_KEY=your_wandb_api_key
OPENAI_API_KEY=your_openai_api_key
Set the LangSmith environment variables to enable tracing:
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_ENDPOINT="https://api.smith.langchain.com"
Architecture: The Monitoring Pipeline
Our monitoring system follows a three-tier architecture:
- Instrumentation Layer: LangSmith callbacks automatically capture every LLM call, chain execution, and tool invocation
- Evaluation Layer: Custom evaluators run on each trace to check for hallucinations, response quality, and latency
- Visualization Layer: W&B aggregates traces into dashboards with real-time metrics and historical comparisons
Here's the data flow:
User Input → LangChain Chain → LLM Call
↓
LangSmith Trace (captured)
↓
Evaluation Callbacks (quality checks)
↓
W&B Logging (aggregated metrics)
↓
Dashboard Visualization
Implementing the Monitoring Stack
Step 1: Configure LangSmith Tracing
First, set up LangSmith to capture all LLM interactions. We'll create a custom callback handler that enriches traces with metadata:
import os
from dotenv import load_dotenv
from langchain.callbacks.tracers import LangChainTracer
from langchain.callbacks.manager import CallbackManager
from langchain.schema import BaseMessage
from typing import Any, Dict, List, Optional
load_dotenv()
class ProductionTracer(LangChainTracer):
"""Custom tracer that adds production metadata to every trace."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.environment = os.getenv("ENVIRONMENT", "production")
self.version = os.getenv("APP_VERSION", "1.0.0")
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs: Any
) -> Any:
# Add custom metadata to every LLM call
kwargs["metadata"] = {
"environment": self.environment,
"version": self.version,
"trace_type": "llm_call"
}
return super().on_llm_start(serialized, prompts, **kwargs)
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs: Any
) -> Any:
# Track chain execution with custom tags
kwargs["tags"] = kwargs.get("tags", []) + [self.environment]
return super().on_chain_start(serialized, inputs, **kwargs)
# Initialize the tracer
tracer = ProductionTracer(
project_name=os.getenv("LANGSMITH_PROJECT", "llm-monitoring"),
api_key=os.getenv("LANGSMITH_API_KEY")
)
callback_manager = CallbackManager([tracer])
Step 2: Build the LLM Application with Monitoring
Now create a production-grade question-answering chain with built-in monitoring:
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.embedding [3]s import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.prompts import PromptTemplate
from langchain.schema import Document
import time
# Initialize the LLM with monitoring
llm = ChatOpenAI(
model_name="gpt [5]-4",
temperature=0.1,
callbacks=callback_manager,
metadata={
"model_version": "gpt-4-0613",
"deployment": "production"
}
)
# Create a sample vector store (in production, use a persistent store)
documents = [
Document(
page_content="LangSmith is a platform for LLM application development, monitoring, and testing.",
metadata={"source": "docs", "topic": "observability"}
),
Document(
page_content="Weights & Biases provides experiment tracking and model visualization tools.",
metadata={"source": "docs", "topic": "mlops"}
)
]
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_documents(documents, embeddings)
# Custom prompt template with monitoring hooks
QA_PROMPT = PromptTemplate(
template="""You are a helpful assistant. Use the following context to answer the question.
Context: {context}
Question: {question}
Answer the question concisely and accurately. If you don't know the answer, say so.
""",
input_variables=["context", "question"]
)
# Create the QA chain with monitoring
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 2}),
chain_type_kwargs={"prompt": QA_PROMPT},
callbacks=callback_manager,
metadata={"chain_type": "retrieval_qa", "version": "1.0"}
)
Step 3: Implement Custom Evaluators
LangSmith allows you to run custom evaluators on each trace. Here's a comprehensive evaluation suite:
from langsmith.evaluation import EvaluationResult
from langsmith.schemas import Run, Example
import re
class ResponseQualityEvaluator:
"""Evaluates response quality metrics."""
@staticmethod
def check_hallucination(run: Run, example: Example) -> EvaluationResult:
"""Check if the response contains information not in the context."""
# Extract context from the run inputs
context = run.inputs.get("context", "")
response = run.outputs.get("result", "")
# Simple heuristic: check for unsupported claims
# In production, use a dedicated hallucination detection model
context_lower = context.lower()
response_lower = response.lower()
# Check if key terms in response appear in context
key_terms = set(re.findall(r'\b\w+\b', response_lower)) - \
set(re.findall(r'\b\w+\b', context_lower))
hallucination_score = len(key_terms) / max(len(response.split()), 1)
return EvaluationResult(
key="hallucination_score",
score=1.0 - min(hallucination_score, 1.0),
comment=f"Found {len(key_terms)} terms not in context"
)
@staticmethod
def check_response_length(run: Run, example: Example) -> EvaluationResult:
"""Ensure responses are within acceptable length bounds."""
response = run.outputs.get("result", "")
word_count = len(response.split())
# Flag responses that are too short or too long
if word_count < 5:
return EvaluationResult(
key="response_length",
score=0.0,
comment=f"Response too short: {word_count} words"
)
elif word_count > 500:
return EvaluationResult(
key="response_length",
score=0.5,
comment=f"Response too long: {word_count} words"
)
else:
return EvaluationResult(
key="response_length",
score=1.0,
comment=f"Response length OK: {word_count} words"
)
@staticmethod
def check_latency(run: Run, example: Example) -> EvaluationResult:
"""Monitor response time against SLA."""
latency = run.end_time - run.start_time if run.end_time else 0
# SLA: response under 5 seconds
if latency > 5.0:
return EvaluationResult(
key="latency_sla",
score=0.0,
comment=f"Latency {latency:.2f}s exceeds 5s SLA"
)
else:
return EvaluationResult(
key="latency_sla",
score=1.0,
comment=f"Latency {latency:.2f}s within SLA"
)
# Register evaluators with LangSmith
from langsmith.evaluation import evaluate
evaluator = ResponseQualityEvaluator()
# In production, this would be called asynchronously on each trace
# For demonstration, we'll evaluate a sample run
Step 4: Integrate Weights & Biases for Aggregated Monitoring
W&B provides powerful visualization and alerting capabilities. Here's how to log LangSmith traces to W&B:
import wandb
from wandb.integration.langchain import WandbTracer
import pandas as pd
from datetime import datetime, timedelta
class WBDashboardLogger:
"""Logs aggregated metrics from LangSmith to W&B."""
def __init__(self, project_name: str = "llm-monitoring"):
self.run = wandb.init(
project=project_name,
config={
"model": "gpt-4",
"environment": os.getenv("ENVIRONMENT", "production"),
"version": os.getenv("APP_VERSION", "1.0.0")
}
)
self.metrics_buffer = []
def log_trace_metrics(self, trace_data: Dict[str, Any]):
"""Log individual trace metrics to W&B."""
metrics = {
"latency_seconds": trace_data.get("latency", 0),
"total_tokens": trace_data.get("total_tokens", 0),
"prompt_tokens": trace_data.get("prompt_tokens", 0),
"completion_tokens": trace_data.get("completion_tokens", 0),
"cost_usd": self._calculate_cost(trace_data),
"hallucination_score": trace_data.get("hallucination_score", 0),
"response_length": trace_data.get("response_length", 0)
}
self.metrics_buffer.append(metrics)
# Log every 10 traces to avoid overwhelming W&B
if len(self.metrics_buffer) >= 10:
self._flush_buffer()
def _calculate_cost(self, trace_data: Dict[str, Any]) -> float:
"""Calculate cost based on token usage."""
# GPT-4 pricing: $0.03/1K prompt tokens, $0.06/1K completion tokens
prompt_cost = trace_data.get("prompt_tokens", 0) * 0.03 / 1000
completion_cost = trace_data.get("completion_tokens", 0) * 0.06 / 1000
return prompt_cost + completion_cost
def _flush_buffer(self):
"""Flush metrics buffer to W&B."""
if self.metrics_buffer:
df = pd.DataFrame(self.metrics_buffer)
# Log summary statistics
self.run.log({
"avg_latency": df["latency_seconds"].mean(),
"p95_latency": df["latency_seconds"].quantile(0.95),
"avg_cost": df["cost_usd"].mean(),
"total_cost": df["cost_usd"].sum(),
"avg_hallucination_score": df["hallucination_score"].mean(),
"total_tokens": df["total_tokens"].sum(),
"trace_count": len(df)
})
# Log distribution histograms
self.run.log({
"latency_distribution": wandb.Histogram(df["latency_seconds"]),
"token_distribution": wandb.Histogram(df["total_tokens"]),
"cost_distribution": wandb.Histogram(df["cost_usd"])
})
self.metrics_buffer = []
def log_alert(self, alert_type: str, message: str, severity: str = "warning"):
"""Log an alert for anomalous behavior."""
self.run.log({
f"alert_{alert_type}": wandb.Table(
columns=["timestamp", "type", "message", "severity"],
data=[[datetime.now().isoformat(), alert_type, message, severity]]
)
})
def finish(self):
"""Clean up W&B run."""
self._flush_buffer()
self.run.finish()
# Initialize W&B logger
wb_logger = WBDashboardLogger()
Step 5: Production Monitoring Pipeline
Combine everything into a production-ready monitoring pipeline:
import asyncio
from typing import AsyncGenerator, Optional
import json
class ProductionMonitor:
"""Main monitoring orchestrator for LLM applications."""
def __init__(self):
self.wb_logger = WBDashboardLogger()
self.tracer = tracer
self.evaluator = ResponseQualityEvaluator()
self.alert_thresholds = {
"max_latency": 5.0, # seconds
"max_hallucination": 0.3, # score
"max_cost_per_request": 0.05, # USD
"error_rate_threshold": 0.05 # 5% error rate
}
self.error_count = 0
self.total_requests = 0
async def monitor_request(
self,
query: str,
chain: RetrievalQA
) -> Dict[str, Any]:
"""Monitor a single LLM request end-to-end."""
start_time = time.time()
self.total_requests += 1
try:
# Execute the chain with tracing
response = chain.run(query)
# Calculate metrics
latency = time.time() - start_time
# Create trace data for W&B
trace_data = {
"latency": latency,
"query": query,
"response": response,
"total_tokens": self._extract_token_usage(),
"prompt_tokens": self._extract_prompt_tokens(),
"completion_tokens": self._extract_completion_tokens(),
"hallucination_score": 0.0, # Would be set by evaluator
"response_length": len(response.split()),
"timestamp": datetime.now().isoformat()
}
# Log to W&B
self.wb_logger.log_trace_metrics(trace_data)
# Check for alerts
await self._check_alerts(trace_data)
return {
"success": True,
"response": response,
"latency": latency,
"trace_id": self._get_current_trace_id()
}
except Exception as e:
self.error_count += 1
error_rate = self.error_count / self.total_requests
# Log error to W&B
self.wb_logger.log_alert(
alert_type="request_error",
message=f"Request failed: {str(e)}",
severity="error"
)
# Check if error rate exceeds threshold
if error_rate > self.alert_thresholds["error_rate_threshold"]:
self._trigger_high_error_rate_alert(error_rate)
return {
"success": False,
"error": str(e),
"error_rate": error_rate
}
async def _check_alerts(self, trace_data: Dict[str, Any]):
"""Check metrics against alert thresholds."""
alerts = []
if trace_data["latency"] > self.alert_thresholds["max_latency"]:
alerts.append({
"type": "high_latency",
"message": f"Latency {trace_data['latency']:.2f}s exceeds threshold",
"severity": "warning"
})
if trace_data.get("hallucination_score", 0) > self.alert_thresholds["max_hallucination"]:
alerts.append({
"type": "high_hallucination",
"message": f"Hallucination score {trace_data['hallucination_score']:.2f} exceeds threshold",
"severity": "critical"
})
cost = self.wb_logger._calculate_cost(trace_data)
if cost > self.alert_thresholds["max_cost_per_request"]:
alerts.append({
"type": "high_cost",
"message": f"Request cost ${cost:.4f} exceeds threshold",
"severity": "warning"
})
for alert in alerts:
self.wb_logger.log_alert(**alert)
def _trigger_high_error_rate_alert(self, error_rate: float):
"""Trigger critical alert for high error rate."""
self.wb_logger.log_alert(
alert_type="high_error_rate",
message=f"Error rate {error_rate:.2%} exceeds threshold of {self.alert_thresholds['error_rate_threshold']:.0%}",
severity="critical"
)
def _extract_token_usage(self) -> int:
"""Extract token usage from the current trace."""
# In production, this would read from LangSmith's trace API
return 0
def _extract_prompt_tokens(self) -> int:
"""Extract prompt token count."""
return 0
def _extract_completion_tokens(self) -> int:
"""Extract completion token count."""
return 0
def _get_current_trace_id(self) -> str:
"""Get the current trace ID from LangSmith."""
# In production, this would be available from the tracer
return "trace_id_placeholder"
async def shutdown(self):
"""Gracefully shutdown the monitor."""
self.wb_logger.finish()
# Usage example
async def main():
monitor = ProductionMonitor()
# Simulate production requests
test_queries = [
"What is LangSmith?",
"How does W&B help with ML monitoring?",
"Tell me about something not in the context"
]
for query in test_queries:
result = await monitor.monitor_request(query, qa_chain)
print(f"Query: {query}")
print(f"Result: {result['success']}")
if result['success']:
print(f"Response: {result['response'][:100]}..")
print("---")
await monitor.shutdown()
# Run the monitoring pipeline
if __name__ == "__main__":
asyncio.run(main())
Edge Cases and Production Considerations
Handling API Rate Limits
When monitoring at scale, you'll encounter API rate limits from both LangSmith and W&B. Implement exponential backoff:
import time
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1.0):
"""Decorator for retrying API calls with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if "rate limit" in str(e).lower() and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
else:
raise
return None
return wrapper
return decorator
# Apply to W&B logging
@retry_with_backoff(max_retries=3)
def safe_log_metrics(self, metrics):
self.run.log(metrics)
Memory Management for High-Throughput Systems
In production, you might process thousands of requests per minute. Buffer metrics and flush periodically:
class BufferedMetricsLogger:
"""Thread-safe metrics buffer for high-throughput scenarios."""
def __init__(self, buffer_size=100, flush_interval=10):
self.buffer = []
self.buffer_size = buffer_size
self.flush_interval = flush_interval
self.last_flush = time.time()
self.lock = threading.Lock()
def add_metric(self, metric: Dict[str, Any]):
with self.lock:
self.buffer.append(metric)
if len(self.buffer) >= self.buffer_size or \
time.time() - self.last_flush >= self.flush_interval:
self._flush()
def _flush(self):
"""Flush buffer to W&B."""
if self.buffer:
# Batch log to W&B
wandb.log({"batch_metrics": self.buffer})
self.buffer = []
self.last_flush = time.time()
Handling Missing or Corrupted Traces
Not all traces will complete successfully. Implement graceful degradation:
def safe_trace_evaluation(trace_data: Optional[Dict]) -> Dict[str, Any]:
"""Safely evaluate a trace, returning defaults on failure."""
if not trace_data:
return {
"hallucination_score": 0.0,
"latency": 0.0,
"error": "missing_trace_data"
}
try:
# Attempt evaluation
return evaluator.evaluate(trace_data)
except Exception as e:
# Log the error but don't crash the pipeline
wandb.alert(
title="Trace Evaluation Failed",
text=f"Failed to evaluate trace: {str(e)}"
)
return {
"hallucination_score": 0.0,
"latency": trace_data.get("latency", 0.0),
"error": str(e)
}
What's Next
You now have a production-ready monitoring stack for LLM applications. Here are the next steps to consider:
- Set up automated alerts: Configure W&B to send Slack or email alerts when metrics exceed thresholds
- Implement A/B testing: Use LangSmith's dataset features to compare model versions
- Add user feedback: Integrate thumbs up/down buttons that feed into your evaluation pipeline
- Scale with Kubernetes: Deploy the monitoring pipeline as a sidecar container in your Kubernetes pods
For deeper dives, check out our guides on LLM evaluation best practices and production MLOps patterns.
The key insight from this implementation is that monitoring isn't an afterthought—it's a core architectural component that should be designed alongside your LLM application. By combining LangSmith's tracing capabilities with W&B's visualization and alerting, you create a feedback loop that catches regressions early and provides the data needed for continuous improvement.
Remember: in production LLM applications, what you can't see can hurt you. Implement monitoring from day one, and your future self will thank you when that mysterious degradation appears at 2 AM.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Analyze Security Logs with DeepSeek Locally
Practical tutorial: Analyze security logs with DeepSeek locally
How to Build a Multimodal App with Gemini 2.0 Vision API
Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API
How to Build an AI Research Assistant with Perplexity API
Practical tutorial: Create an AI research assistant with Perplexity API