Back to Tutorials
tutorialstutorialai

How to Set Up an MCP Server for AI Agent Communication

Practical tutorial: It describes a technical solution for setting up an MCP server, which is useful but not groundbreaking.

BlogIA AcademyJune 6, 202612 min read2 288 words

How to Set Up an MCP Server for AI Agent Communication

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Building production-grade AI systems often requires multiple agents to communicate efficiently. The Model Context Protocol (MCP) provides a standardized way for AI agents to share context, state, and results. In this tutorial, we'll build a complete MCP server implementation that handles agent-to-agent communication with proper error handling, authentication, and scalability considerations.

Understanding MCP Architecture and Production Requirements

The Model Context Protocol emerged from the need to standardize how AI agents exchange information in distributed systems. Unlike simple REST APIs, MCP provides structured context sharing with versioning, conflict resolution, and state management built into the protocol.

In production environments, an MCP server must handle:

  • Concurrent agent connections with proper resource management
  • Context serialization and deserialization with schema validation
  • Authentication and authorization for sensitive context data
  • Graceful degradation under load
  • Comprehensive logging and monitoring

According to available research on distributed AI systems, the key challenge in multi-agent architectures is maintaining consistent state across agents while minimizing latency. The MCP protocol addresses this through its context synchronization mechanism.

Prerequisites and Environment Setup

Before implementing our MCP server, ensure you have the following installed:

# Python 3.10+ required for modern async features
python --version  # Should show 3.10 or higher

# Install core dependencies
pip install fastapi==0.111.0 uvicorn==0.29.0 pydantic==2.7.0
pip install redis==5.0.0 for context caching
pip install python-jose==3.3.0 for JWT authentication
pip install prometheus-client==0.20.0 for metrics

Create a project structure that separates concerns:

mcp-server/
├── src/
│   ├── __init__.py
│   ├── server.py          # Main server implementation
│   ├── context_manager.py # Context storag [2]e and retrieval
│   ├── auth.py           # Authentication middleware
│   ├── models.py         # Pydantic models
│   └── config.py         # Configuration management
├── tests/
│   ├── test_server.py
│   └── test_context.py
├── requirements.txt
└── docker-compose.yml

Core Implementation: Building the MCP Server

Context Models and Schema Validation

First, we define our data models using Pydantic for automatic validation:

# src/models.py
from pydantic import BaseModel, Field, validator
from typing import Dict, Any, Optional, List
from datetime import datetime
import uuid

class ContextEntry(BaseModel):
    """Represents a single context entry in the MCP server."""
    context_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    agent_id: str
    context_type: str  # e.g., "task_result", "state_update", "error"
    payload: Dict[str, Any]
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    ttl_seconds: Optional[int] = 3600  # Default 1 hour TTL

    @validator('context_type')
    def validate_context_type(cls, v):
        allowed_types = ['task_result', 'state_update', 'error', 'metadata']
        if v not in allowed_types:
            raise ValueError(f'context_type must be one of {allowed_types}')
        return v

class AgentRegistration(BaseModel):
    """Model for agent registration requests."""
    agent_id: str
    capabilities: List[str]
    metadata: Dict[str, Any] = {}

class ContextQuery(BaseModel):
    """Model for querying context from the server."""
    agent_id: Optional[str] = None
    context_type: Optional[str] = None
    since_timestamp: Optional[datetime] = None
    limit: int = Field(default=100, ge=1, le=1000)

Context Manager with Redis Backend

The context manager handles storage and retrieval with proper caching:

# src/context_manager.py
import redis.asyncio as redis
import json
from typing import Optional, List
from datetime import datetime
from .models import ContextEntry

class ContextManager:
    """Manages context storage with Redis backend for production scalability."""

    def __init__(self, redis_url: str = "redis://localhost:6379/0"):
        self.redis_client = redis.from_url(redis_url, decode_responses=True)
        self.context_prefix = "mcp:context:"
        self.agent_prefix = "mcp:agent:"

    async def store_context(self, entry: ContextEntry) -> bool:
        """Store a context entry with TTL and indexing."""
        try:
            # Store the context data
            key = f"{self.context_prefix}{entry.context_id}"
            await self.redis_client.setex(
                key,
                entry.ttl_seconds,
                entry.model_dump_json()
            )

            # Index by agent for efficient queries
            agent_key = f"{self.agent_prefix}{entry.agent_id}:contexts"
            await self.redis_client.sadd(agent_key, entry.context_id)
            await self.redis_client.expire(agent_key, entry.ttl_seconds)

            # Index by context type for filtering
            type_key = f"{self.context_prefix}type:{entry.context_type}"
            await self.redis_client.sadd(type_key, entry.context_id)
            await self.redis_client.expire(type_key, entry.ttl_seconds)

            return True
        except redis.RedisError as e:
            # Log error and return False for production error handling
            print(f"Redis error storing context: {e}")
            return False

    async def query_context(self, 
                           agent_id: Optional[str] = None,
                           context_type: Optional[str] = None,
                           since_timestamp: Optional[datetime] = None,
                           limit: int = 100) -> List[ContextEntry]:
        """Query context entries with optional filters."""
        try:
            # Start with all context IDs or filter by agent
            if agent_id:
                agent_key = f"{self.agent_prefix}{agent_id}:contexts"
                context_ids = await self.redis_client.smembers(agent_key)
            else:
                # Get all context IDs (expensive in production - use pagination)
                cursor [7] = 0
                context_ids = set()
                while True:
                    cursor, keys = await self.redis_client.scan(
                        cursor, 
                        match=f"{self.context_prefix}*",
                        count=1000
                    )
                    context_ids.update(k.replace(self.context_prefix, '') for k in keys)
                    if cursor == 0:
                        break

            # Fetch and filter context entries
            results = []
            for cid in list(context_ids)[:limit * 2]:  # Fetch more for filtering
                key = f"{self.context_prefix}{cid}"
                data = await self.redis_client.get(key)
                if data:
                    entry = ContextEntry.model_validate_json(data)

                    # Apply filters
                    if context_type and entry.context_type != context_type:
                        continue
                    if since_timestamp and entry.timestamp < since_timestamp:
                        continue

                    results.append(entry)

                    if len(results) >= limit:
                        break

            return results[:limit]

        except redis.RedisError as e:
            print(f"Redis error querying context: {e}")
            return []

    async def delete_context(self, context_id: str) -> bool:
        """Delete a specific context entry."""
        try:
            key = f"{self.context_prefix}{context_id}"
            return bool(await self.redis_client.delete(key))
        except redis.RedisError as e:
            print(f"Redis error deleting context: {e}")
            return False

Authentication Middleware

Implement JWT-based authentication for secure agent communication:

# src/auth.py
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional

class AuthMiddleware:
    """Handles JWT authentication for MCP server endpoints."""

    def __init__(self, secret_key: str, algorithm: str = "HS256"):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.security = HTTPBearer()

    def create_token(self, agent_id: str, expires_delta: Optional[timedelta] = None) -> str:
        """Generate a JWT token for an agent."""
        to_encode = {
            "sub": agent_id,
            "iat": datetime.utcnow(),
            "exp": datetime.utcnow() + (expires_delta or timedelta(hours=24))
        }
        return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)

    async def verify_token(self, credentials: HTTPAuthorizationCredentials = Security(HTTPBearer())):
        """Verify and decode a JWT token."""
        try:
            payload = jwt.decode(
                credentials.credentials, 
                self.secret_key, 
                algorithms=[self.algorithm]
            )
            agent_id = payload.get("sub")
            if agent_id is None:
                raise HTTPException(status_code=401, detail="Invalid token")
            return agent_id
        except JWTError:
            raise HTTPException(status_code=401, detail="Invalid or expired token")

Main Server Implementation

Now we combine everything into a production-ready FastAPI server:

# src/server.py
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
from typing import List
import logging

from .models import ContextEntry, AgentRegistration, ContextQuery
from .context_manager import ContextManager
from .auth import AuthMiddleware
from .config import Settings

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class MCPServer:
    """Production MCP server implementation."""

    def __init__(self, settings: Settings):
        self.settings = settings
        self.app = FastAPI(
            title="MCP Server",
            version="1.0.0",
            docs_url="/docs" if settings.environment == "development" else None
        )
        self.context_manager = ContextManager(settings.redis_url)
        self.auth = AuthMiddleware(settings.secret_key)
        self.setup_middleware()
        self.setup_routes()

    def setup_middleware(self):
        """Configure server middleware."""
        self.app.add_middleware(
            CORSMiddleware,
            allow_origins=self.settings.allowed_origins,
            allow_credentials=True,
            allow_methods=["*"],
            allow_headers=["*"],
        )

        @self.app.middleware("http")
        async def log_requests(request: Request, call_next):
            """Log all incoming requests for monitoring."""
            logger.info(f"Request: {request.method} {request.url.path}")
            response = await call_next(request)
            logger.info(f"Response: {response.status_code}")
            return response

    def setup_routes(self):
        """Define all API endpoints."""

        @self.app.post("/agents/register")
        async def register_agent(registration: AgentRegistration):
            """Register a new agent with the MCP server."""
            try:
                # Store agent metadata
                agent_key = f"mcp:agent:{registration.agent_id}:metadata"
                await self.context_manager.redis_client.setex(
                    agent_key,
                    86400,  # 24 hours
                    registration.model_dump_json()
                )

                # Generate authentication token
                token = self.auth.create_token(registration.agent_id)

                logger.info(f"Agent registered: {registration.agent_id}")
                return {
                    "status": "success",
                    "agent_id": registration.agent_id,
                    "token": token,
                    "token_expiry": "24h"
                }
            except Exception as e:
                logger.error(f"Registration failed: {e}")
                raise HTTPException(status_code=500, detail="Registration failed")

        @self.app.post("/context/store")
        async def store_context(
            entry: ContextEntry,
            agent_id: str = Depends(self.auth.verify_token)
        ):
            """Store a new context entry."""
            # Ensure agent can only store its own context
            if entry.agent_id != agent_id:
                raise HTTPException(
                    status_code=403, 
                    detail="Cannot store context for other agents"
                )

            success = await self.context_manager.store_context(entry)
            if not success:
                raise HTTPException(status_code=500, detail="Failed to store context")

            return {
                "status": "success",
                "context_id": entry.context_id,
                "timestamp": entry.timestamp.isoformat()
            }

        @self.app.post("/context/query")
        async def query_context(
            query: ContextQuery,
            agent_id: str = Depends(self.auth.verify_token)
        ):
            """Query context entries with filters."""
            results = await self.context_manager.query_context(
                agent_id=query.agent_id,
                context_type=query.context_type,
                since_timestamp=query.since_timestamp,
                limit=query.limit
            )

            return {
                "status": "success",
                "count": len(results),
                "results": [r.model_dump() for r in results]
            }

        @self.app.delete("/context/{context_id}")
        async def delete_context(
            context_id: str,
            agent_id: str = Depends(self.auth.verify_token)
        ):
            """Delete a specific context entry."""
            success = await self.context_manager.delete_context(context_id)
            if not success:
                raise HTTPException(status_code=404, detail="Context not found")

            return {"status": "success", "deleted": context_id}

        @self.app.get("/health")
        async def health_check():
            """Health check endpoint for monitoring."""
            try:
                # Check Redis connectivity
                await self.context_manager.redis_client.ping()
                return {
                    "status": "healthy",
                    "redis": "connected",
                    "timestamp": datetime.utcnow().isoformat()
                }
            except Exception as e:
                logger.error(f"Health check failed: {e}")
                return JSONResponse(
                    status_code=503,
                    content={"status": "unhealthy", "redis": "disconnected"}
                )

def create_server():
    """Factory function to create and configure the MCP server."""
    settings = Settings()
    server = MCPServer(settings)
    return server.app

if __name__ == "__main__":
    app = create_server()
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8000,
        workers=4,  # Multiple workers for production
        log_level="info"
    )

Configuration Management

# src/config.py
from pydantic_settings import BaseSettings
from typing import List

class Settings(BaseSettings):
    """Application configuration with environment variable support."""

    # Server settings
    environment: str = "development"
    allowed_origins: List[str] = ["*"]

    # Redis configuration
    redis_url: str = "redis://localhost:6379/0"

    # Authentication
    secret_key: str = "your-secret-key-change-in-production"

    # Rate limiting
    rate_limit_per_minute: int = 60

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

Production Deployment and Docker Configuration

Create a Docker Compose file for easy deployment:

# docker-compose.yml
version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  mcp-server:
    build: .
    ports:
      - "8000:8000"
    environment:
      - ENVIRONMENT=production
      - REDIS_URL=redis://redis:6379/0
      - SECRET_KEY=${SECRET_KEY}
    depends_on:
      redis:
        condition: service_healthy
    volumes:
      - ./logs:/app/logs
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '0.5'
          memory: 512M

volumes:
  redis_data:

Testing the MCP Server

Create comprehensive tests to ensure reliability:

# tests/test_server.py
import pytest
from httpx import AsyncClient
from src.server import create_server
from src.models import ContextEntry

@pytest.fixture
async def client():
    app = create_server()
    async with AsyncClient(app=app, base_url="http://test") as ac:
        yield ac

@pytest.mark.asyncio
async def test_agent_registration(client):
    response = await client.post("/agents/register", json={
        "agent_id": "test-agent-1",
        "capabilities": ["text_generation", "code_review"],
        "metadata": {"version": "1.0"}
    })
    assert response.status_code == 200
    data = response.json()
    assert data["status"] == "success"
    assert "token" in data

@pytest.mark.asyncio
async def test_context_store_and_query(client):
    # First register and get token
    reg_response = await client.post("/agents/register", json={
        "agent_id": "test-agent-2",
        "capabilities": ["data_analysis"]
    })
    token = reg_response.json()["token"]

    # Store context
    context_response = await client.post(
        "/context/store",
        json={
            "agent_id": "test-agent-2",
            "context_type": "task_result",
            "payload": {"result": "success", "data": [1, 2, 3]}
        },
        headers={"Authorization": f"Bearer {token}"}
    )
    assert context_response.status_code == 200

    # Query context
    query_response = await client.post(
        "/context/query",
        json={"agent_id": "test-agent-2", "limit": 10},
        headers={"Authorization": f"Bearer {token}"}
    )
    assert query_response.status_code == 200
    assert query_response.json()["count"] >= 1

@pytest.mark.asyncio
async def test_unauthorized_access(client):
    response = await client.post(
        "/context/store",
        json={
            "agent_id": "test-agent-3",
            "context_type": "task_result",
            "payload": {}
        }
    )
    assert response.status_code == 403

Edge Cases and Production Considerations

Handling Concurrent Access

When multiple agents try to update the same context simultaneously, implement optimistic locking:

async def update_context_with_lock(self, context_id: str, update_func, max_retries=3):
    """Update context with optimistic locking to prevent race conditions."""
    for attempt in range(max_retries):
        key = f"{self.context_prefix}{context_id}"

        # Watch the key for changes
        async with self.redis_client.pipeline(transaction=True) as pipe:
            try:
                await pipe.watch(key)
                current_data = await pipe.get(key)
                if not current_data:
                    return None

                # Apply update function
                entry = ContextEntry.model_validate_json(current_data)
                updated_entry = update_func(entry)

                # Execute transaction
                pipe.multi()
                await pipe.setex(key, entry.ttl_seconds, updated_entry.model_dump_json())
                await pipe.execute()
                return updated_entry
            except redis.WatchError:
                if attempt == max_retries - 1:
                    raise
                continue

Memory Management and TTL Strategies

Implement intelligent TTL management to prevent memory exhaustion:

class TTLManager:
    """Manages context TTL based on access patterns."""

    def __init__(self, redis_client):
        self.redis_client = redis_client
        self.access_count_key = "mcp:access_count:"

    async def update_ttl_based_on_access(self, context_id: str):
        """Extend TTL for frequently accessed contexts."""
        access_key = f"{self.access_count_key}{context_id}"

        # Increment access count
        count = await self.redis_client.incr(access_key)

        # Extend TTL for popular contexts
        if count > 10:  # Accessed more than 10 times
            context_key = f"{self.context_prefix}{context_id}"
            await self.redis_client.expire(context_key, 7200)  # Extend to 2 hours

Monitoring and Observability

Add Prometheus metrics for production monitoring:

from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response

# Define metrics
context_operations = Counter(
    'mcp_context_operations_total',
    'Total context operations',
    ['operation_type', 'status']
)

operation_duration = Histogram(
    'mcp_operation_duration_seconds',
    'Duration of MCP operations',
    ['operation_type']
)

@app.get("/metrics")
async def metrics():
    """Expose Prometheus metrics."""
    return Response(content=generate_latest(), media_type="text/plain")

What's Next

This MCP server implementation provides a solid foundation for agent-to-agent communication in production environments. To extend this further:

  1. Implement context versioning with conflict resolution strategies for concurrent updates
  2. Add WebSocket support for real-time context streaming between agents
  3. Integrate with message queues like RabbitMQ or Kafka for event-driven architectures
  4. Implement circuit breakers to handle Redis failures gracefully
  5. Add context compression for large payloads to reduce memory usage

The complete source code for this tutorial is available at our GitHub repository. For more advanced patterns in multi-agent systems, check out our guide on building resilient AI agent architectures.

Remember that in production, you should always monitor your MCP server's performance metrics and set up alerts for unusual patterns. The combination of proper authentication, efficient caching, and comprehensive monitoring will ensure your AI agents communicate reliably at scale.


References

1. Wikipedia - Cursor. Wikipedia. [Source]
2. Wikipedia - Rag. Wikipedia. [Source]
3. arXiv - MCP-Universe: Benchmarking Large Language Models with Real-W. Arxiv. [Source]
4. arXiv - citecheck: An MCP Server for Automated Bibliographic Verific. Arxiv. [Source]
5. GitHub - affaan-m/ECC. Github. [Source]
6. GitHub - Shubhamsaboo/awesome-llm-apps. Github. [Source]
7. Cursor Pricing. Pricing. [Source]
tutorialai
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles