How to Set Up an MCP Server for AI Agent Communication
Practical tutorial: It describes a technical solution for setting up an MCP server, which is useful but not groundbreaking.
How to Set Up an MCP Server for AI Agent Communication
Table of Contents
- How to Set Up an MCP Server for AI Agent Communication
- Python 3.10+ required for modern async features
- Install core dependencies
- src/models.py
- src/context_manager.py
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Building production-grade AI systems often requires multiple agents to communicate efficiently. The Model Context Protocol (MCP) provides a standardized way for AI agents to share context, state, and results. In this tutorial, we'll build a complete MCP server implementation that handles agent-to-agent communication with proper error handling, authentication, and scalability considerations.
Understanding MCP Architecture and Production Requirements
The Model Context Protocol emerged from the need to standardize how AI agents exchange information in distributed systems. Unlike simple REST APIs, MCP provides structured context sharing with versioning, conflict resolution, and state management built into the protocol.
In production environments, an MCP server must handle:
- Concurrent agent connections with proper resource management
- Context serialization and deserialization with schema validation
- Authentication and authorization for sensitive context data
- Graceful degradation under load
- Comprehensive logging and monitoring
According to available research on distributed AI systems, the key challenge in multi-agent architectures is maintaining consistent state across agents while minimizing latency. The MCP protocol addresses this through its context synchronization mechanism.
Prerequisites and Environment Setup
Before implementing our MCP server, ensure you have the following installed:
# Python 3.10+ required for modern async features
python --version # Should show 3.10 or higher
# Install core dependencies
pip install fastapi==0.111.0 uvicorn==0.29.0 pydantic==2.7.0
pip install redis==5.0.0 for context caching
pip install python-jose==3.3.0 for JWT authentication
pip install prometheus-client==0.20.0 for metrics
Create a project structure that separates concerns:
mcp-server/
├── src/
│ ├── __init__.py
│ ├── server.py # Main server implementation
│ ├── context_manager.py # Context storag [2]e and retrieval
│ ├── auth.py # Authentication middleware
│ ├── models.py # Pydantic models
│ └── config.py # Configuration management
├── tests/
│ ├── test_server.py
│ └── test_context.py
├── requirements.txt
└── docker-compose.yml
Core Implementation: Building the MCP Server
Context Models and Schema Validation
First, we define our data models using Pydantic for automatic validation:
# src/models.py
from pydantic import BaseModel, Field, validator
from typing import Dict, Any, Optional, List
from datetime import datetime
import uuid
class ContextEntry(BaseModel):
"""Represents a single context entry in the MCP server."""
context_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
agent_id: str
context_type: str # e.g., "task_result", "state_update", "error"
payload: Dict[str, Any]
timestamp: datetime = Field(default_factory=datetime.utcnow)
ttl_seconds: Optional[int] = 3600 # Default 1 hour TTL
@validator('context_type')
def validate_context_type(cls, v):
allowed_types = ['task_result', 'state_update', 'error', 'metadata']
if v not in allowed_types:
raise ValueError(f'context_type must be one of {allowed_types}')
return v
class AgentRegistration(BaseModel):
"""Model for agent registration requests."""
agent_id: str
capabilities: List[str]
metadata: Dict[str, Any] = {}
class ContextQuery(BaseModel):
"""Model for querying context from the server."""
agent_id: Optional[str] = None
context_type: Optional[str] = None
since_timestamp: Optional[datetime] = None
limit: int = Field(default=100, ge=1, le=1000)
Context Manager with Redis Backend
The context manager handles storage and retrieval with proper caching:
# src/context_manager.py
import redis.asyncio as redis
import json
from typing import Optional, List
from datetime import datetime
from .models import ContextEntry
class ContextManager:
"""Manages context storage with Redis backend for production scalability."""
def __init__(self, redis_url: str = "redis://localhost:6379/0"):
self.redis_client = redis.from_url(redis_url, decode_responses=True)
self.context_prefix = "mcp:context:"
self.agent_prefix = "mcp:agent:"
async def store_context(self, entry: ContextEntry) -> bool:
"""Store a context entry with TTL and indexing."""
try:
# Store the context data
key = f"{self.context_prefix}{entry.context_id}"
await self.redis_client.setex(
key,
entry.ttl_seconds,
entry.model_dump_json()
)
# Index by agent for efficient queries
agent_key = f"{self.agent_prefix}{entry.agent_id}:contexts"
await self.redis_client.sadd(agent_key, entry.context_id)
await self.redis_client.expire(agent_key, entry.ttl_seconds)
# Index by context type for filtering
type_key = f"{self.context_prefix}type:{entry.context_type}"
await self.redis_client.sadd(type_key, entry.context_id)
await self.redis_client.expire(type_key, entry.ttl_seconds)
return True
except redis.RedisError as e:
# Log error and return False for production error handling
print(f"Redis error storing context: {e}")
return False
async def query_context(self,
agent_id: Optional[str] = None,
context_type: Optional[str] = None,
since_timestamp: Optional[datetime] = None,
limit: int = 100) -> List[ContextEntry]:
"""Query context entries with optional filters."""
try:
# Start with all context IDs or filter by agent
if agent_id:
agent_key = f"{self.agent_prefix}{agent_id}:contexts"
context_ids = await self.redis_client.smembers(agent_key)
else:
# Get all context IDs (expensive in production - use pagination)
cursor [7] = 0
context_ids = set()
while True:
cursor, keys = await self.redis_client.scan(
cursor,
match=f"{self.context_prefix}*",
count=1000
)
context_ids.update(k.replace(self.context_prefix, '') for k in keys)
if cursor == 0:
break
# Fetch and filter context entries
results = []
for cid in list(context_ids)[:limit * 2]: # Fetch more for filtering
key = f"{self.context_prefix}{cid}"
data = await self.redis_client.get(key)
if data:
entry = ContextEntry.model_validate_json(data)
# Apply filters
if context_type and entry.context_type != context_type:
continue
if since_timestamp and entry.timestamp < since_timestamp:
continue
results.append(entry)
if len(results) >= limit:
break
return results[:limit]
except redis.RedisError as e:
print(f"Redis error querying context: {e}")
return []
async def delete_context(self, context_id: str) -> bool:
"""Delete a specific context entry."""
try:
key = f"{self.context_prefix}{context_id}"
return bool(await self.redis_client.delete(key))
except redis.RedisError as e:
print(f"Redis error deleting context: {e}")
return False
Authentication Middleware
Implement JWT-based authentication for secure agent communication:
# src/auth.py
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
class AuthMiddleware:
"""Handles JWT authentication for MCP server endpoints."""
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
self.security = HTTPBearer()
def create_token(self, agent_id: str, expires_delta: Optional[timedelta] = None) -> str:
"""Generate a JWT token for an agent."""
to_encode = {
"sub": agent_id,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + (expires_delta or timedelta(hours=24))
}
return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
async def verify_token(self, credentials: HTTPAuthorizationCredentials = Security(HTTPBearer())):
"""Verify and decode a JWT token."""
try:
payload = jwt.decode(
credentials.credentials,
self.secret_key,
algorithms=[self.algorithm]
)
agent_id = payload.get("sub")
if agent_id is None:
raise HTTPException(status_code=401, detail="Invalid token")
return agent_id
except JWTError:
raise HTTPException(status_code=401, detail="Invalid or expired token")
Main Server Implementation
Now we combine everything into a production-ready FastAPI server:
# src/server.py
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
from typing import List
import logging
from .models import ContextEntry, AgentRegistration, ContextQuery
from .context_manager import ContextManager
from .auth import AuthMiddleware
from .config import Settings
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MCPServer:
"""Production MCP server implementation."""
def __init__(self, settings: Settings):
self.settings = settings
self.app = FastAPI(
title="MCP Server",
version="1.0.0",
docs_url="/docs" if settings.environment == "development" else None
)
self.context_manager = ContextManager(settings.redis_url)
self.auth = AuthMiddleware(settings.secret_key)
self.setup_middleware()
self.setup_routes()
def setup_middleware(self):
"""Configure server middleware."""
self.app.add_middleware(
CORSMiddleware,
allow_origins=self.settings.allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@self.app.middleware("http")
async def log_requests(request: Request, call_next):
"""Log all incoming requests for monitoring."""
logger.info(f"Request: {request.method} {request.url.path}")
response = await call_next(request)
logger.info(f"Response: {response.status_code}")
return response
def setup_routes(self):
"""Define all API endpoints."""
@self.app.post("/agents/register")
async def register_agent(registration: AgentRegistration):
"""Register a new agent with the MCP server."""
try:
# Store agent metadata
agent_key = f"mcp:agent:{registration.agent_id}:metadata"
await self.context_manager.redis_client.setex(
agent_key,
86400, # 24 hours
registration.model_dump_json()
)
# Generate authentication token
token = self.auth.create_token(registration.agent_id)
logger.info(f"Agent registered: {registration.agent_id}")
return {
"status": "success",
"agent_id": registration.agent_id,
"token": token,
"token_expiry": "24h"
}
except Exception as e:
logger.error(f"Registration failed: {e}")
raise HTTPException(status_code=500, detail="Registration failed")
@self.app.post("/context/store")
async def store_context(
entry: ContextEntry,
agent_id: str = Depends(self.auth.verify_token)
):
"""Store a new context entry."""
# Ensure agent can only store its own context
if entry.agent_id != agent_id:
raise HTTPException(
status_code=403,
detail="Cannot store context for other agents"
)
success = await self.context_manager.store_context(entry)
if not success:
raise HTTPException(status_code=500, detail="Failed to store context")
return {
"status": "success",
"context_id": entry.context_id,
"timestamp": entry.timestamp.isoformat()
}
@self.app.post("/context/query")
async def query_context(
query: ContextQuery,
agent_id: str = Depends(self.auth.verify_token)
):
"""Query context entries with filters."""
results = await self.context_manager.query_context(
agent_id=query.agent_id,
context_type=query.context_type,
since_timestamp=query.since_timestamp,
limit=query.limit
)
return {
"status": "success",
"count": len(results),
"results": [r.model_dump() for r in results]
}
@self.app.delete("/context/{context_id}")
async def delete_context(
context_id: str,
agent_id: str = Depends(self.auth.verify_token)
):
"""Delete a specific context entry."""
success = await self.context_manager.delete_context(context_id)
if not success:
raise HTTPException(status_code=404, detail="Context not found")
return {"status": "success", "deleted": context_id}
@self.app.get("/health")
async def health_check():
"""Health check endpoint for monitoring."""
try:
# Check Redis connectivity
await self.context_manager.redis_client.ping()
return {
"status": "healthy",
"redis": "connected",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=503,
content={"status": "unhealthy", "redis": "disconnected"}
)
def create_server():
"""Factory function to create and configure the MCP server."""
settings = Settings()
server = MCPServer(settings)
return server.app
if __name__ == "__main__":
app = create_server()
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
workers=4, # Multiple workers for production
log_level="info"
)
Configuration Management
# src/config.py
from pydantic_settings import BaseSettings
from typing import List
class Settings(BaseSettings):
"""Application configuration with environment variable support."""
# Server settings
environment: str = "development"
allowed_origins: List[str] = ["*"]
# Redis configuration
redis_url: str = "redis://localhost:6379/0"
# Authentication
secret_key: str = "your-secret-key-change-in-production"
# Rate limiting
rate_limit_per_minute: int = 60
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
Production Deployment and Docker Configuration
Create a Docker Compose file for easy deployment:
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
mcp-server:
build: .
ports:
- "8000:8000"
environment:
- ENVIRONMENT=production
- REDIS_URL=redis://redis:6379/0
- SECRET_KEY=${SECRET_KEY}
depends_on:
redis:
condition: service_healthy
volumes:
- ./logs:/app/logs
deploy:
replicas: 3
resources:
limits:
cpus: '0.5'
memory: 512M
volumes:
redis_data:
Testing the MCP Server
Create comprehensive tests to ensure reliability:
# tests/test_server.py
import pytest
from httpx import AsyncClient
from src.server import create_server
from src.models import ContextEntry
@pytest.fixture
async def client():
app = create_server()
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
@pytest.mark.asyncio
async def test_agent_registration(client):
response = await client.post("/agents/register", json={
"agent_id": "test-agent-1",
"capabilities": ["text_generation", "code_review"],
"metadata": {"version": "1.0"}
})
assert response.status_code == 200
data = response.json()
assert data["status"] == "success"
assert "token" in data
@pytest.mark.asyncio
async def test_context_store_and_query(client):
# First register and get token
reg_response = await client.post("/agents/register", json={
"agent_id": "test-agent-2",
"capabilities": ["data_analysis"]
})
token = reg_response.json()["token"]
# Store context
context_response = await client.post(
"/context/store",
json={
"agent_id": "test-agent-2",
"context_type": "task_result",
"payload": {"result": "success", "data": [1, 2, 3]}
},
headers={"Authorization": f"Bearer {token}"}
)
assert context_response.status_code == 200
# Query context
query_response = await client.post(
"/context/query",
json={"agent_id": "test-agent-2", "limit": 10},
headers={"Authorization": f"Bearer {token}"}
)
assert query_response.status_code == 200
assert query_response.json()["count"] >= 1
@pytest.mark.asyncio
async def test_unauthorized_access(client):
response = await client.post(
"/context/store",
json={
"agent_id": "test-agent-3",
"context_type": "task_result",
"payload": {}
}
)
assert response.status_code == 403
Edge Cases and Production Considerations
Handling Concurrent Access
When multiple agents try to update the same context simultaneously, implement optimistic locking:
async def update_context_with_lock(self, context_id: str, update_func, max_retries=3):
"""Update context with optimistic locking to prevent race conditions."""
for attempt in range(max_retries):
key = f"{self.context_prefix}{context_id}"
# Watch the key for changes
async with self.redis_client.pipeline(transaction=True) as pipe:
try:
await pipe.watch(key)
current_data = await pipe.get(key)
if not current_data:
return None
# Apply update function
entry = ContextEntry.model_validate_json(current_data)
updated_entry = update_func(entry)
# Execute transaction
pipe.multi()
await pipe.setex(key, entry.ttl_seconds, updated_entry.model_dump_json())
await pipe.execute()
return updated_entry
except redis.WatchError:
if attempt == max_retries - 1:
raise
continue
Memory Management and TTL Strategies
Implement intelligent TTL management to prevent memory exhaustion:
class TTLManager:
"""Manages context TTL based on access patterns."""
def __init__(self, redis_client):
self.redis_client = redis_client
self.access_count_key = "mcp:access_count:"
async def update_ttl_based_on_access(self, context_id: str):
"""Extend TTL for frequently accessed contexts."""
access_key = f"{self.access_count_key}{context_id}"
# Increment access count
count = await self.redis_client.incr(access_key)
# Extend TTL for popular contexts
if count > 10: # Accessed more than 10 times
context_key = f"{self.context_prefix}{context_id}"
await self.redis_client.expire(context_key, 7200) # Extend to 2 hours
Monitoring and Observability
Add Prometheus metrics for production monitoring:
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response
# Define metrics
context_operations = Counter(
'mcp_context_operations_total',
'Total context operations',
['operation_type', 'status']
)
operation_duration = Histogram(
'mcp_operation_duration_seconds',
'Duration of MCP operations',
['operation_type']
)
@app.get("/metrics")
async def metrics():
"""Expose Prometheus metrics."""
return Response(content=generate_latest(), media_type="text/plain")
What's Next
This MCP server implementation provides a solid foundation for agent-to-agent communication in production environments. To extend this further:
- Implement context versioning with conflict resolution strategies for concurrent updates
- Add WebSocket support for real-time context streaming between agents
- Integrate with message queues like RabbitMQ or Kafka for event-driven architectures
- Implement circuit breakers to handle Redis failures gracefully
- Add context compression for large payloads to reduce memory usage
The complete source code for this tutorial is available at our GitHub repository. For more advanced patterns in multi-agent systems, check out our guide on building resilient AI agent architectures.
Remember that in production, you should always monitor your MCP server's performance metrics and set up alerts for unusual patterns. The combination of proper authentication, efficient caching, and comprehensive monitoring will ensure your AI agents communicate reliably at scale.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Build a Multimodal App with Gemini 2.0 Vision API
Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API
How to Build an AI Pentesting Assistant with LangChain
Practical tutorial: Build an AI-powered pentesting assistant
How to Build Autonomous Scientific Discovery Agents with EurekAgent
Practical tutorial: The story discusses a significant advancement in AI research that could impact autonomous scientific discovery.