How to Build an AI Research Assistant with Perplexity API
Practical tutorial: Create an AI research assistant with Perplexity API
How to Build an AI Research Assistant with Perplexity API
Table of Contents
- How to Build an AI Research Assistant with Perplexity API
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Building a production-grade AI research assistant requires more than just wrapping an API call. You need robust context management, citation handling, and intelligent query routing. In this tutorial, we'll build a complete research assistant using the Perplexity API that can answer complex questions with verified sources, maintain conversation history, and handle rate limits gracefully.
Real-World Use Case & Architecture
Research assistants powered by large language models (LLMs) are transforming how knowledge workers interact with information. According to Perplexity's documentation, their API provides access to real-time web search capabilities combined with LLM reasoning, making it ideal for research tasks that require up-to-date information.
In production, a research assistant must handle:
- Multi-turn conversations with context retention
- Citation management for source verification
- Rate limiting and error recovery
- Query optimization for cost efficiency
Our architecture uses a three-layer approach:
- API Layer: Perplexity API for search and generation
- Context Layer: In-memory conversation buffer with sliding window
- Cache Layer: Redis-backed response caching for identical queries
Prerequisites and Environment Setup
Before diving into code, ensure you have:
- Python 3.10+ installed
- A Perplexity API key (available at perplexity.ai/api)
- Redis server running locally (optional, for caching)
Install the required packages:
pip install httpx==0.27.0 pydantic==2.7.0 redis==5.0.0 python-dotenv==1.0.1
Create a .env file in your project root:
PERPLEXITY_API_KEY=your_api_key_here
REDIS_URL=redis://localhost:6379/0
Core Implementation: Building the Research Assistant
1. API Client with Rate Limiting
The Perplexity API uses a token-based rate limiting system. As of June 2026, the API supports both synchronous and streaming responses. We'll implement a robust client that handles rate limits automatically.
import os
import time
import json
from typing import Optional, List, Dict, Any
from datetime import datetime
import httpx
from pydantic import BaseModel, Field
from dotenv import load_dotenv
load_dotenv()
class PerplexityConfig(BaseModel):
"""Configuration for Perplexity API client."""
api_key: str = Field(default_factory=lambda: os.getenv("PERPLEXITY_API_KEY"))
base_url: str = "https://api.perplexity.ai"
model: str = "sonar-pro" # Latest model as of June 2026
max_retries: int = 3
rate_limit_per_minute: int = 30
timeout: int = 60
class RateLimiter:
"""Token bucket rate limiter for API calls."""
def __init__(self, max_calls: int, period: float = 60.0):
self.max_calls = max_calls
self.period = period
self.tokens = max_calls
self.last_refill = time.time()
self._lock = __import__('threading').Lock()
def acquire(self) -> float:
"""Wait for a token to become available. Returns wait time."""
with self._lock:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.max_calls, self.tokens + elapsed * (self.max_calls / self.period))
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return 0.0
wait_time = (1 - self.tokens) * (self.period / self.max_calls)
return wait_time
class PerplexityClient:
"""Production-grade client for Perplexity API."""
def __init__(self, config: Optional[PerplexityConfig] = None):
self.config = config or PerplexityConfig()
self.rate_limiter = RateLimiter(self.config.rate_limit_per_minute)
self.client = httpx.AsyncClient(
base_url=self.config.base_url,
timeout=self.config.timeout,
headers={
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json"
}
)
async def search(self, query: str, context: Optional[List[Dict]] = None) -> Dict[str, Any]:
"""
Execute a search query with optional conversation context.
Args:
query: The research question or search query
context: Previous conversation turns for context retention
Returns:
API response with answer and citations
"""
wait_time = self.rate_limiter.acquire()
if wait_time > 0:
await __import__('asyncio').sleep(wait_time)
messages = []
if context:
messages.extend(context)
messages.append({"role": "user", "content": query})
payload = {
"model": self.config.model,
"messages": messages,
"max_tokens": 1024,
"temperature": 0.2, # Lower temperature for factual responses
"top_p": 0.9,
"return_citations": True,
"return_images": False,
"search_recency_filter": "month" # Prioritize recent sources
}
for attempt in range(self.config.max_retries):
try:
response = await self.client.post("/chat/completions", json=payload)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 429: # Rate limit exceeded
retry_after = int(e.response.headers.get("Retry-After", 5))
await __import__('asyncio').sleep(retry_after)
continue
elif e.response.status_code == 401:
raise PermissionError("Invalid API key. Check your Perplexity API credentials.")
else:
raise
except httpx.TimeoutException:
if attempt < self.config.max_retries - 1:
await __import__('asyncio').sleep(2 ** attempt) # Exponential backoff
continue
raise TimeoutError(f"Request timed out after {self.config.timeout} seconds")
raise RuntimeError("Max retries exceeded")
async def close(self):
await self.client.aclose()
Key design decisions:
- Async implementation: Uses
httpx.AsyncClientfor non-blocking I/O, critical for handling multiple concurrent research queries - Token bucket rate limiter: More precise than simple sleep-based throttling, prevents burst requests
- Exponential backoff: For transient failures, we double wait time between retries (2, 4, 8 seconds)
- Temperature control: Set to 0.2 for research tasks to minimize hallucination risk
2. Conversation Context Manager
Maintaining conversation history is crucial for multi-turn research. We implement a sliding window approach that balances context retention with token limits.
from collections import deque
from typing import List, Dict, Optional
import tiktoken # For accurate token counting
class ConversationManager:
"""Manages conversation context with token-aware sliding window."""
def __init__(self, max_tokens: int = 4096):
self.max_tokens = max_tokens
self.history: deque = deque()
self.encoder = tiktoken.get_encoding("cl100k_base")
def add_turn(self, role: str, content: str):
"""Add a conversation turn with automatic context pruning."""
turn = {"role": role, "content": content}
self.history.append(turn)
self._prune_context()
def _prune_context(self):
"""Remove oldest turns until total tokens fit within limit."""
while self._count_tokens() > self.max_tokens and len(self.history) > 1:
self.history.popleft()
def _count_tokens(self) -> int:
"""Count total tokens in conversation history."""
total = 0
for turn in self.history:
total += len(self.encoder.encode(turn["content"]))
total += 4 # Overhead for role markers
return total
def get_context(self) -> List[Dict[str, str]]:
"""Return conversation history as list of messages."""
return list(self.history)
def clear(self):
"""Reset conversation history."""
self.history.clear()
Edge case handling:
- Token overflow: Automatically prunes oldest context when approaching limits
- Empty history: Returns empty list, handled gracefully by API client
- Single turn: Never prunes below one turn to maintain at least some context
3. Response Cache with Redis
Caching identical queries reduces API costs and improves response times. We implement a TTL-based cache with Redis.
import hashlib
import json
from typing import Optional, Dict, Any
import redis.asyncio as redis
class ResponseCache:
"""Redis-backed cache for API responses with TTL."""
def __init__(self, redis_url: str = None, ttl: int = 3600):
self.redis_url = redis_url or os.getenv("REDIS_URL", "redis://localhost:6379/0")
self.ttl = ttl
self.client: Optional[redis.Redis] = None
async def initialize(self):
"""Establish Redis connection."""
self.client = await redis.from_url(self.redis_url, decode_responses=True)
def _generate_key(self, query: str, context: Optional[List[Dict]] = None) -> str:
"""Generate deterministic cache key from query and context."""
data = {"query": query, "context": context or []}
serialized = json.dumps(data, sort_keys=True)
return f"perplexity:{hashlib.sha256(serialized.encode()).hexdigest()}"
async def get(self, query: str, context: Optional[List[Dict]] = None) -> Optional[Dict[str, Any]]:
"""Retrieve cached response if available."""
if not self.client:
return None
key = self._generate_key(query, context)
cached = await self.client.get(key)
if cached:
return json.loads(cached)
return None
async def set(self, query: str, response: Dict[str, Any], context: Optional[List[Dict]] = None):
"""Cache API response with TTL."""
if not self.client:
return
key = self._generate_key(query, context)
await self.client.setex(key, self.ttl, json.dumps(response))
async def close(self):
if self.client:
await self.client.close()
Cache invalidation strategy:
- TTL-based: Default 1 hour, configurable per use case
- Deterministic keys: Same query + context always produces same cache key
- SHA-256 hashing: Prevents key collisions from long query strings
4. Complete Research Assistant
Now we combine all components into a production-ready research assistant.
import asyncio
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
@dataclass
class ResearchResult:
"""Structured research result with citations."""
answer: str
citations: List[Dict[str, str]]
model: str
tokens_used: int
cached: bool = False
class ResearchAssistant:
"""Production-grade AI research assistant using Perplexity API."""
def __init__(self, config: Optional[PerplexityConfig] = None):
self.config = config or PerplexityConfig()
self.client = PerplexityClient(self.config)
self.conversation = ConversationManager()
self.cache = ResponseCache()
self._initialized = False
async def initialize(self):
"""Async initialization for components that need it."""
await self.cache.initialize()
self._initialized = True
async def research(self, query: str, use_cache: bool = True) -> ResearchResult:
"""
Execute a research query with full context management.
Args:
query: The research question
use_cache: Whether to check cache before API call
Returns:
Structured research result with citations
"""
if not self._initialized:
await self.initialize()
# Check cache first
if use_cache:
cached_response = await self.cache.get(query, self.conversation.get_context())
if cached_response:
return ResearchResult(
answer=cached_response["choices"][0]["message"]["content"],
citations=cached_response.get("citations", []),
model=cached_response["model"],
tokens_used=cached_response["usage"]["total_tokens"],
cached=True
)
# Execute API call
try:
response = await self.client.search(query, self.conversation.get_context())
except PermissionError as e:
return ResearchResult(
answer=f"Authentication error: {str(e)}",
citations=[],
model="",
tokens_used=0
)
except TimeoutError as e:
return ResearchResult(
answer=f"Request timed out. Please try again later.",
citations=[],
model="",
tokens_used=0
)
# Extract response data
choice = response["choices"][0]
answer = choice["message"]["content"]
citations = response.get("citations", [])
# Update conversation context
self.conversation.add_turn("user", query)
self.conversation.add_turn("assistant", answer)
# Cache the response
if use_cache:
await self.cache.set(query, response, self.conversation.get_context())
return ResearchResult(
answer=answer,
citations=citations,
model=response["model"],
tokens_used=response["usage"]["total_tokens"],
cached=False
)
async def clear_conversation(self):
"""Reset conversation history."""
self.conversation.clear()
async def close(self):
"""Clean up resources."""
await self.client.close()
await self.cache.close()
5. Usage Example
Here's how to use the research assistant in practice:
async def main():
assistant = ResearchAssistant()
# First query establishes context
result1 = await assistant.research(
"What are the latest developments in quantum computing error correction?"
)
print(f"Answer: {result1.answer[:200]}..")
print(f"Citations: {len(result1.citations)} sources")
print(f"Cached: {result1.cached}")
# Follow-up query uses conversation context
result2 = await assistant.research(
"How does this compare to classical error correction methods?"
)
print(f"Follow-up answer: {result2.answer[:200]}..")
await assistant.close()
if __name__ == "__main__":
asyncio.run(main())
Edge Cases and Production Considerations
Rate Limit Handling
The Perplexity API enforces rate limits per API key. Our implementation:
- Uses token bucket algorithm for precise rate limiting
- Implements exponential backoff with jitter
- Respects
Retry-Afterheaders from server
Error Recovery
- Network failures: Retry with backoff up to 3 attempts
- Authentication errors: Immediate failure with clear error message
- Empty responses: Validate response structure before extraction
Memory Management
- Conversation history uses sliding window to prevent memory leaks
- Redis cache has configurable TTL to prevent stale data
- Async context managers ensure proper resource cleanup
Cost Optimization
- Response caching reduces duplicate API calls
- Lower temperature (0.2) reduces token usage for factual queries
search_recency_filterlimits unnecessary data transfer
What's Next
This research assistant provides a solid foundation for production use. Consider extending it with:
- Multi-model routing: Route simple queries to cheaper models, complex research to Perplexity
- Document ingestion: Add support for PDF and web page analysis alongside search
- Feedback loop: Implement user feedback to improve response quality over time
- Monitoring: Add OpenTelemetry tracing for API latency and error tracking
For more advanced patterns, explore our guides on building RAG [1] systems and LLM caching strategies.
The complete source code is available on GitHub. Remember to handle your API keys securely and monitor usage to stay within your Perplexity API quota. As of June 2026, the Perplexity API supports multiple models including Sonar Pro and Sonar Huge, with pricing starting at $0.005 per request for the base model.
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Analyze Security Logs with DeepSeek Locally
Practical tutorial: Analyze security logs with DeepSeek locally
How to Build a Multimodal App with Gemini 2.0 Vision API
Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API
How to Debug AI Coding Agents: A Production Guide 2026
Practical tutorial: Discusses a specific usability issue with AI coding agents, which is relevant to developers and the industry.