Back to Tutorials
tutorialstutorialaiapi

How to Build an AI Research Assistant with Perplexity API

Practical tutorial: Create an AI research assistant with Perplexity API

BlogIA AcademyJune 1, 202610 min read1 866 words

How to Build an AI Research Assistant with Perplexity API

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Building a production-grade AI research assistant requires more than just wrapping an API call. You need robust context management, citation handling, and intelligent query routing. In this tutorial, we'll build a complete research assistant using the Perplexity API that can answer complex questions with verified sources, maintain conversation history, and handle rate limits gracefully.

Real-World Use Case & Architecture

Research assistants powered by large language models (LLMs) are transforming how knowledge workers interact with information. According to Perplexity's documentation, their API provides access to real-time web search capabilities combined with LLM reasoning, making it ideal for research tasks that require up-to-date information.

In production, a research assistant must handle:

  • Multi-turn conversations with context retention
  • Citation management for source verification
  • Rate limiting and error recovery
  • Query optimization for cost efficiency

Our architecture uses a three-layer approach:

  1. API Layer: Perplexity API for search and generation
  2. Context Layer: In-memory conversation buffer with sliding window
  3. Cache Layer: Redis-backed response caching for identical queries

Prerequisites and Environment Setup

Before diving into code, ensure you have:

  • Python 3.10+ installed
  • A Perplexity API key (available at perplexity.ai/api)
  • Redis server running locally (optional, for caching)

Install the required packages:

pip install httpx==0.27.0 pydantic==2.7.0 redis==5.0.0 python-dotenv==1.0.1

Create a .env file in your project root:

PERPLEXITY_API_KEY=your_api_key_here
REDIS_URL=redis://localhost:6379/0

Core Implementation: Building the Research Assistant

1. API Client with Rate Limiting

The Perplexity API uses a token-based rate limiting system. As of June 2026, the API supports both synchronous and streaming responses. We'll implement a robust client that handles rate limits automatically.

import os
import time
import json
from typing import Optional, List, Dict, Any
from datetime import datetime
import httpx
from pydantic import BaseModel, Field
from dotenv import load_dotenv

load_dotenv()

class PerplexityConfig(BaseModel):
    """Configuration for Perplexity API client."""
    api_key: str = Field(default_factory=lambda: os.getenv("PERPLEXITY_API_KEY"))
    base_url: str = "https://api.perplexity.ai"
    model: str = "sonar-pro"  # Latest model as of June 2026
    max_retries: int = 3
    rate_limit_per_minute: int = 30
    timeout: int = 60

class RateLimiter:
    """Token bucket rate limiter for API calls."""

    def __init__(self, max_calls: int, period: float = 60.0):
        self.max_calls = max_calls
        self.period = period
        self.tokens = max_calls
        self.last_refill = time.time()
        self._lock = __import__('threading').Lock()

    def acquire(self) -> float:
        """Wait for a token to become available. Returns wait time."""
        with self._lock:
            now = time.time()
            elapsed = now - self.last_refill
            self.tokens = min(self.max_calls, self.tokens + elapsed * (self.max_calls / self.period))
            self.last_refill = now

            if self.tokens >= 1:
                self.tokens -= 1
                return 0.0

            wait_time = (1 - self.tokens) * (self.period / self.max_calls)
            return wait_time

class PerplexityClient:
    """Production-grade client for Perplexity API."""

    def __init__(self, config: Optional[PerplexityConfig] = None):
        self.config = config or PerplexityConfig()
        self.rate_limiter = RateLimiter(self.config.rate_limit_per_minute)
        self.client = httpx.AsyncClient(
            base_url=self.config.base_url,
            timeout=self.config.timeout,
            headers={
                "Authorization": f"Bearer {self.config.api_key}",
                "Content-Type": "application/json"
            }
        )

    async def search(self, query: str, context: Optional[List[Dict]] = None) -> Dict[str, Any]:
        """
        Execute a search query with optional conversation context.

        Args:
            query: The research question or search query
            context: Previous conversation turns for context retention

        Returns:
            API response with answer and citations
        """
        wait_time = self.rate_limiter.acquire()
        if wait_time > 0:
            await __import__('asyncio').sleep(wait_time)

        messages = []
        if context:
            messages.extend(context)
        messages.append({"role": "user", "content": query})

        payload = {
            "model": self.config.model,
            "messages": messages,
            "max_tokens": 1024,
            "temperature": 0.2,  # Lower temperature for factual responses
            "top_p": 0.9,
            "return_citations": True,
            "return_images": False,
            "search_recency_filter": "month"  # Prioritize recent sources
        }

        for attempt in range(self.config.max_retries):
            try:
                response = await self.client.post("/chat/completions", json=payload)
                response.raise_for_status()
                return response.json()
            except httpx.HTTPStatusError as e:
                if e.response.status_code == 429:  # Rate limit exceeded
                    retry_after = int(e.response.headers.get("Retry-After", 5))
                    await __import__('asyncio').sleep(retry_after)
                    continue
                elif e.response.status_code == 401:
                    raise PermissionError("Invalid API key. Check your Perplexity API credentials.")
                else:
                    raise
            except httpx.TimeoutException:
                if attempt < self.config.max_retries - 1:
                    await __import__('asyncio').sleep(2 ** attempt)  # Exponential backoff
                    continue
                raise TimeoutError(f"Request timed out after {self.config.timeout} seconds")

        raise RuntimeError("Max retries exceeded")

    async def close(self):
        await self.client.aclose()

Key design decisions:

  • Async implementation: Uses httpx.AsyncClient for non-blocking I/O, critical for handling multiple concurrent research queries
  • Token bucket rate limiter: More precise than simple sleep-based throttling, prevents burst requests
  • Exponential backoff: For transient failures, we double wait time between retries (2, 4, 8 seconds)
  • Temperature control: Set to 0.2 for research tasks to minimize hallucination risk

2. Conversation Context Manager

Maintaining conversation history is crucial for multi-turn research. We implement a sliding window approach that balances context retention with token limits.

from collections import deque
from typing import List, Dict, Optional
import tiktoken  # For accurate token counting

class ConversationManager:
    """Manages conversation context with token-aware sliding window."""

    def __init__(self, max_tokens: int = 4096):
        self.max_tokens = max_tokens
        self.history: deque = deque()
        self.encoder = tiktoken.get_encoding("cl100k_base")

    def add_turn(self, role: str, content: str):
        """Add a conversation turn with automatic context pruning."""
        turn = {"role": role, "content": content}
        self.history.append(turn)
        self._prune_context()

    def _prune_context(self):
        """Remove oldest turns until total tokens fit within limit."""
        while self._count_tokens() > self.max_tokens and len(self.history) > 1:
            self.history.popleft()

    def _count_tokens(self) -> int:
        """Count total tokens in conversation history."""
        total = 0
        for turn in self.history:
            total += len(self.encoder.encode(turn["content"]))
            total += 4  # Overhead for role markers
        return total

    def get_context(self) -> List[Dict[str, str]]:
        """Return conversation history as list of messages."""
        return list(self.history)

    def clear(self):
        """Reset conversation history."""
        self.history.clear()

Edge case handling:

  • Token overflow: Automatically prunes oldest context when approaching limits
  • Empty history: Returns empty list, handled gracefully by API client
  • Single turn: Never prunes below one turn to maintain at least some context

3. Response Cache with Redis

Caching identical queries reduces API costs and improves response times. We implement a TTL-based cache with Redis.

import hashlib
import json
from typing import Optional, Dict, Any
import redis.asyncio as redis

class ResponseCache:
    """Redis-backed cache for API responses with TTL."""

    def __init__(self, redis_url: str = None, ttl: int = 3600):
        self.redis_url = redis_url or os.getenv("REDIS_URL", "redis://localhost:6379/0")
        self.ttl = ttl
        self.client: Optional[redis.Redis] = None

    async def initialize(self):
        """Establish Redis connection."""
        self.client = await redis.from_url(self.redis_url, decode_responses=True)

    def _generate_key(self, query: str, context: Optional[List[Dict]] = None) -> str:
        """Generate deterministic cache key from query and context."""
        data = {"query": query, "context": context or []}
        serialized = json.dumps(data, sort_keys=True)
        return f"perplexity:{hashlib.sha256(serialized.encode()).hexdigest()}"

    async def get(self, query: str, context: Optional[List[Dict]] = None) -> Optional[Dict[str, Any]]:
        """Retrieve cached response if available."""
        if not self.client:
            return None

        key = self._generate_key(query, context)
        cached = await self.client.get(key)
        if cached:
            return json.loads(cached)
        return None

    async def set(self, query: str, response: Dict[str, Any], context: Optional[List[Dict]] = None):
        """Cache API response with TTL."""
        if not self.client:
            return

        key = self._generate_key(query, context)
        await self.client.setex(key, self.ttl, json.dumps(response))

    async def close(self):
        if self.client:
            await self.client.close()

Cache invalidation strategy:

  • TTL-based: Default 1 hour, configurable per use case
  • Deterministic keys: Same query + context always produces same cache key
  • SHA-256 hashing: Prevents key collisions from long query strings

4. Complete Research Assistant

Now we combine all components into a production-ready research assistant.

import asyncio
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field

@dataclass
class ResearchResult:
    """Structured research result with citations."""
    answer: str
    citations: List[Dict[str, str]]
    model: str
    tokens_used: int
    cached: bool = False

class ResearchAssistant:
    """Production-grade AI research assistant using Perplexity API."""

    def __init__(self, config: Optional[PerplexityConfig] = None):
        self.config = config or PerplexityConfig()
        self.client = PerplexityClient(self.config)
        self.conversation = ConversationManager()
        self.cache = ResponseCache()
        self._initialized = False

    async def initialize(self):
        """Async initialization for components that need it."""
        await self.cache.initialize()
        self._initialized = True

    async def research(self, query: str, use_cache: bool = True) -> ResearchResult:
        """
        Execute a research query with full context management.

        Args:
            query: The research question
            use_cache: Whether to check cache before API call

        Returns:
            Structured research result with citations
        """
        if not self._initialized:
            await self.initialize()

        # Check cache first
        if use_cache:
            cached_response = await self.cache.get(query, self.conversation.get_context())
            if cached_response:
                return ResearchResult(
                    answer=cached_response["choices"][0]["message"]["content"],
                    citations=cached_response.get("citations", []),
                    model=cached_response["model"],
                    tokens_used=cached_response["usage"]["total_tokens"],
                    cached=True
                )

        # Execute API call
        try:
            response = await self.client.search(query, self.conversation.get_context())
        except PermissionError as e:
            return ResearchResult(
                answer=f"Authentication error: {str(e)}",
                citations=[],
                model="",
                tokens_used=0
            )
        except TimeoutError as e:
            return ResearchResult(
                answer=f"Request timed out. Please try again later.",
                citations=[],
                model="",
                tokens_used=0
            )

        # Extract response data
        choice = response["choices"][0]
        answer = choice["message"]["content"]
        citations = response.get("citations", [])

        # Update conversation context
        self.conversation.add_turn("user", query)
        self.conversation.add_turn("assistant", answer)

        # Cache the response
        if use_cache:
            await self.cache.set(query, response, self.conversation.get_context())

        return ResearchResult(
            answer=answer,
            citations=citations,
            model=response["model"],
            tokens_used=response["usage"]["total_tokens"],
            cached=False
        )

    async def clear_conversation(self):
        """Reset conversation history."""
        self.conversation.clear()

    async def close(self):
        """Clean up resources."""
        await self.client.close()
        await self.cache.close()

5. Usage Example

Here's how to use the research assistant in practice:

async def main():
    assistant = ResearchAssistant()

    # First query establishes context
    result1 = await assistant.research(
        "What are the latest developments in quantum computing error correction?"
    )
    print(f"Answer: {result1.answer[:200]}..")
    print(f"Citations: {len(result1.citations)} sources")
    print(f"Cached: {result1.cached}")

    # Follow-up query uses conversation context
    result2 = await assistant.research(
        "How does this compare to classical error correction methods?"
    )
    print(f"Follow-up answer: {result2.answer[:200]}..")

    await assistant.close()

if __name__ == "__main__":
    asyncio.run(main())

Edge Cases and Production Considerations

Rate Limit Handling

The Perplexity API enforces rate limits per API key. Our implementation:

  • Uses token bucket algorithm for precise rate limiting
  • Implements exponential backoff with jitter
  • Respects Retry-After headers from server

Error Recovery

  • Network failures: Retry with backoff up to 3 attempts
  • Authentication errors: Immediate failure with clear error message
  • Empty responses: Validate response structure before extraction

Memory Management

  • Conversation history uses sliding window to prevent memory leaks
  • Redis cache has configurable TTL to prevent stale data
  • Async context managers ensure proper resource cleanup

Cost Optimization

  • Response caching reduces duplicate API calls
  • Lower temperature (0.2) reduces token usage for factual queries
  • search_recency_filter limits unnecessary data transfer

What's Next

This research assistant provides a solid foundation for production use. Consider extending it with:

  1. Multi-model routing: Route simple queries to cheaper models, complex research to Perplexity
  2. Document ingestion: Add support for PDF and web page analysis alongside search
  3. Feedback loop: Implement user feedback to improve response quality over time
  4. Monitoring: Add OpenTelemetry tracing for API latency and error tracking

For more advanced patterns, explore our guides on building RAG [1] systems and LLM caching strategies.

The complete source code is available on GitHub. Remember to handle your API keys securely and monitor usage to stay within your Perplexity API quota. As of June 2026, the Perplexity API supports multiple models including Sonar Pro and Sonar Huge, with pricing starting at $0.005 per request for the base model.


References

1. Wikipedia - Rag. Wikipedia. [Source]
2. GitHub - Shubhamsaboo/awesome-llm-apps. Github. [Source]
tutorialaiapi
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles