How to Use Google AI Mode for Complex Search Queries
Practical tutorial: It discusses a significant change in user behavior due to AI technology, which is relevant but not groundbreaking.
How to Use Google AI Mode for Complex Search Queries
Table of Contents
- How to Use Google AI Mode for Complex Search Queries
- Create project directory
- Create virtual environment
- Core dependencies
- Development dependencies
- .env
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
In March 2025, Google introduced an experimental "AI Mode" within its search platform, fundamentally changing how users interact with search engines. According to Wikipedia, this feature enables users to input complex, multi-part queries and receive comprehensive, AI-generated responses using Google's Gemini [7] model, which enhances the system's reasoning capabilities and supports multimodal inputs including text, images, and voice.
This shift represents a significant change in user behavior: instead of typing frag [2]mented keywords and manually synthesizing results from multiple searches, users can now ask nuanced, multi-faceted questions in natural language and receive synthesized answers. For developers, data scientists, and power users, understanding how to leverage this capability programmatically is essential for building next-generation search applications.
In this tutorial, we'll build a production-ready Python application that interacts with Google's AI Mode, handles complex multi-part queries, processes multimodal inputs, and manages the response pipeline efficiently. We'll cover architecture decisions, edge cases, and real-world deployment considerations.
Understanding the AI Mode Architecture and API Design
Before diving into code, it's critical to understand the architectural implications of AI Mode. Traditional search APIs return ranked lists of URLs with snippets. AI Mode, by contrast, returns synthesized responses that may include text, structured data, and references to source materials.
Key Architectural Considerations
-
Query Complexity: AI Mode excels at multi-part queries like "Compare the energy efficiency of Tesla Powerwall vs LG Chem RESU, considering installation costs in California and available tax incentives for 2025." Traditional search would require 3-4 separate queries.
-
Multimodal Support: The Gemini model underlying AI Mode supports text, images, and voice inputs. This means your application can accept image uploads alongside text queries.
-
Response Streaming: AI-generated responses can be lengthy. Production applications should implement streaming to provide real-time feedback to users.
-
Rate Limiting and Cost Management: As of May 2026, Google's AI Mode is still experimental. Production deployments must handle rate limits, quota management, and cost tracking.
Production Architecture Pattern
We'll implement a microservice architecture with the following components:
- Query Orchestrator: Handles complex query decomposition and routing
- Multimodal Processor: Converts various input types to API-compatible formats
- Response Streamer: Manages streaming responses with backpressure
- Cache Layer: Reduces API calls for similar queries
- Monitoring Stack: Tracks latency, error rates, and token usage
Prerequisites and Environment Setup
Let's set up a robust development environment. We'll use Python 3.11+ with modern async patterns.
# Create project directory
mkdir ai-mode-search-engine
cd ai-mode-search-engine
# Create virtual environment
python3.11 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Core dependencies
pip install httpx==0.27.0 # Async HTTP client
pip install pydantic==2.7.0 # Data validation
pip install python-multipart==0.0.9 # File upload handling
pip install redis==5.0.0 # Caching
pip install prometheus-client==0.20.0 # Metrics
pip install structlog==24.1.0 # Structured logging
pip install tenacity==8.2.3 # Retry logic
pip install pillow==10.3.0 # Image processing
pip install python-dotenv==1.0.1 # Environment management
# Development dependencies
pip install pytest==8.1.0
pip install pytest-asyncio==0.23.0
pip install black==24.3.0
pip install mypy==1.9.0
Create a .env file for configuration:
# .env
GOOGLE_API_KEY=your_api_key_here
AI_MODE_ENDPOINT=https://experimental.google.com/ai-mode/v1
REDIS_URL=redis://localhost:6379/0
MAX_TOKENS=4096
TEMPERATURE=0.7
RATE_LIMIT_RPM=60
CACHE_TTL_SECONDS=3600
Building the Core AI Mode Client
Now we'll implement the core client that interacts with Google's AI Mode API. This client handles authentication, request formatting, streaming, and error recovery.
# ai_mode_client.py
import asyncio
import base64
import hashlib
import json
import time
from dataclasses import dataclass, field
from enum import Enum
from io import BytesIO
from pathlib import Path
from typing import AsyncGenerator, Optional, Union
from urllib.parse import urljoin
import httpx
from pydantic import BaseModel, Field, field_validator
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from PIL import Image
class QueryComplexity(Enum):
"""Enum for query complexity classification."""
SIMPLE = "simple" # Single intent, few keywords
MODERATE = "moderate" # Multiple intents, requires synthesis
COMPLEX = "complex" # Multi-part with conditions, comparisons
class MultimodalInput(BaseModel):
"""Represents multimodal input to AI Mode."""
text: str = Field(.., min_length=1, max_length=10000)
images: list[str] = Field(default_factory=list, max_length=5)
voice_data: Optional[str] = None # Base64 encoded audio
@field_validator('images')
@classmethod
def validate_images(cls, v: list[str]) -> list[str]:
"""Validate image data is proper base64."""
for img in v:
try:
base64.b64decode(img, validate=True)
except Exception:
raise ValueError(f"Invalid base64 image data")
return v
class AIResponse(BaseModel):
"""Structured response from AI Mode."""
query_id: str
content: str
sources: list[dict] = Field(default_factory=list)
processing_time_ms: int
token_count: int
complexity: QueryComplexity
@dataclass
class AIClientConfig:
"""Configuration for the AI Mode client."""
api_key: str
endpoint: str = "https://experimental.google.com/ai-mode/v1"
max_retries: int = 3
timeout_seconds: int = 60
max_tokens: int = 4096
temperature: float = 0.7
enable_streaming: bool = True
cache_enabled: bool = True
cache_ttl: int = 3600
class AIModeClient:
"""
Production-grade client for Google's AI Mode.
Handles authentication, request formatting, streaming,
caching, rate limiting, and error recovery.
"""
def __init__(self, config: AIClientConfig):
self.config = config
self._client = httpx.AsyncClient(
base_url=config.endpoint,
timeout=config.timeout_seconds,
headers={
"Authorization": f"Bearer {config.api_key}",
"Content-Type": "application/json",
"User-Agent": "AIModeSearchEngine/1.0",
},
)
self._rate_limiter = asyncio.Semaphore(10) # Max 10 concurrent requests
self._cache = {} if config.cache_enabled else None
self._metrics = {
"total_requests": 0,
"cache_hits": 0,
"errors": 0,
"total_tokens": 0,
}
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._client.aclose()
def _generate_cache_key(self, query: MultimodalInput) -> str:
"""Generate deterministic cache key from query."""
content = query.text + "".join(query.images[:2]) # Use first 2 images
return hashlib.sha256(content.encode()).hexdigest()
def _classify_complexity(self, text: str) -> QueryComplexity:
"""
Classify query complexity for routing decisions.
Simple: Single question, < 10 words
Moderate: Multiple questions, 10-30 words
Complex: Multi-part with conditions, > 30 words
"""
word_count = len(text.split())
has_conditions = any(word in text.lower() for word in
["compare", "vs", "versus", "difference", "better"])
has_multiple_questions = text.count("?") > 1
if word_count > 30 or (has_conditions and word_count > 15):
return QueryComplexity.COMPLEX
elif word_count > 10 or has_multiple_questions:
return QueryComplexity.MODERATE
else:
return QueryComplexity.SIMPLE
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(
(httpx.HTTPStatusError, httpx.TimeoutException)
),
)
async def _make_request(
self, query: MultimodalInput
) -> dict:
"""
Make the actual API request with retry logic.
Uses exponential backoff for transient failures.
"""
async with self._rate_limiter:
payload = {
"query": query.text,
"images": query.images,
"voice_data": query.voice_data,
"max_tokens": self.config.max_tokens,
"temperature": self.config.temperature,
"stream": self.config.enable_streaming,
}
response = await self._client.post(
"/search",
json=payload,
)
response.raise_for_status()
return response.json()
async def search(
self, query: MultimodalInput
) -> AIResponse:
"""
Execute a search query against AI Mode.
Handles caching, complexity classification, and metrics.
"""
start_time = time.time()
self._metrics["total_requests"] += 1
# Check cache
if self._cache is not None:
cache_key = self._generate_cache_key(query)
cached = self._cache.get(cache_key)
if cached and (time.time() - cached["timestamp"]) < self.config.cache_ttl:
self._metrics["cache_hits"] += 1
return cached["response"]
try:
# Classify complexity for potential routing
complexity = self._classify_complexity(query.text)
# Make API request
raw_response = await self._make_request(query)
# Parse response
response = AIResponse(
query_id=raw_response.get("query_id", ""),
content=raw_response.get("content", ""),
sources=raw_response.get("sources", []),
processing_time_ms=int((time.time() - start_time) * 1000),
token_count=raw_response.get("token_count", 0),
complexity=complexity,
)
# Update metrics
self._metrics["total_tokens"] += response.token_count
# Cache response
if self._cache is not None:
self._cache[cache_key] = {
"response": response,
"timestamp": time.time(),
}
return response
except httpx.HTTPStatusError as e:
self._metrics["errors"] += 1
if e.response.status_code == 429:
# Rate limited - implement exponential backoff
retry_after = int(e.response.headers.get("Retry-After", 60))
await asyncio.sleep(retry_after)
return await self.search(query)
elif e.response.status_code == 401:
raise PermissionError("Invalid API key. Check your credentials.")
elif e.response.status_code == 400:
raise ValueError(f"Bad request: {e.response.text}")
else:
raise RuntimeError(f"API error {e.response.status_code}: {e.response.text}")
except httpx.TimeoutException:
self._metrics["errors"] += 1
raise TimeoutError("AI Mode request timed out. Consider reducing query complexity.")
async def stream_search(
self, query: MultimodalInput
) -> AsyncGenerator[str, None]:
"""
Stream search results for real-time display.
Useful for long-running queries where users expect
progressive updates.
"""
async with self._rate_limiter:
payload = {
"query": query.text,
"images": query.images,
"max_tokens": self.config.max_tokens,
"temperature": self.config.temperature,
"stream": True,
}
async with self._client.stream(
"POST",
"/search",
json=payload,
) as response:
response.raise_for_status()
async for chunk in response.aiter_text():
if chunk.strip():
yield chunk
def get_metrics(self) -> dict:
"""Return current client metrics for monitoring."""
return {
**self._metrics,
"cache_size": len(self._cache) if self._cache else 0,
"rate_limiter_available": self._rate_limiter._value,
}
Building the Query Orchestrator for Complex Queries
The real power of AI Mode lies in handling complex, multi-part queries. Let's build a query orchestrator that decomposes complex queries, routes them appropriately, and synthesizes results.
# query_orchestrator.py
import asyncio
import re
from dataclasses import dataclass
from typing import Optional
from pydantic import BaseModel, Field
class DecomposedQuery(BaseModel):
"""A single sub-query from a complex query decomposition."""
id: str
text: str
dependencies: list[str] = Field(default_factory=list)
priority: int = 0 # Higher = process first
class SynthesizedResponse(BaseModel):
"""Final synthesized response from multiple sub-queries."""
main_answer: str
sub_answers: dict[str, str]
sources: list[dict]
processing_time_ms: int
class QueryDecomposer:
"""
Decomposes complex multi-part queries into manageable sub-queries.
Handles:
- Comparison queries (e.g., "Compare X vs Y")
- Conditional queries (e.g., "If condition A, then B")
- Multi-faceted queries (e.g., "What is X, how does Y work, and why Z?")
"""
COMPARISON_PATTERN = re.compile(
r"(compare|difference|vs|versus|better|worse)",
re.IGNORECASE
)
CONDITIONAL_PATTERN = re.compile(
r"(if|when|assuming|given that|provided)",
re.IGNORECASE
)
MULTI_QUESTION_PATTERN = re.compile(r"\?")
def decompose(self, query: str) -> list[DecomposedQuery]:
"""
Decompose a complex query into sub-queries.
Returns ordered list of sub-queries with dependency tracking.
"""
sub_queries = []
# Handle comparison queries
if self.COMPARISON_PATTERN.search(query):
sub_queries.extend(self._decompose_comparison(query))
# Handle conditional queries
if self.CONDITIONAL_PATTERN.search(query):
sub_queries.extend(self._decompose_conditional(query))
# Handle multi-question queries
question_count = len(self.MULTI_QUESTION_PATTERN.findall(query))
if question_count > 1:
sub_queries.extend(self._decompose_multi_question(query))
# If no decomposition needed, return original query
if not sub_queries:
sub_queries.append(
DecomposedQuery(
id="main",
text=query,
priority=0,
)
)
return sub_queries
def _decompose_comparison(self, query: str) -> list[DecomposedQuery]:
"""Decompose comparison queries into individual entity queries."""
# Extract entities being compared
# This is a simplified implementation; production would use NLP
entities = re.findall(
r"(?:compare|vs|versus)\s+(\w+(?:\s+\w+)?)",
query,
re.IGNORECASE
)
sub_queries = []
for i, entity in enumerate(entities):
sub_queries.append(
DecomposedQuery(
id=f"entity_{i}",
text=f"Describe {entity} in detail",
dependencies=[],
priority=1,
)
)
# Add synthesis query
sub_queries.append(
DecomposedQuery(
id="synthesis",
text=query,
dependencies=[q.id for q in sub_queries],
priority=0,
)
)
return sub_queries
def _decompose_conditional(self, query: str) -> list[DecomposedQuery]:
"""Decompose conditional queries into condition and result parts."""
parts = self.CONDITIONAL_PATTERN.split(query)
sub_queries = []
for i, part in enumerate(parts):
if part.strip():
sub_queries.append(
DecomposedQuery(
id=f"conditional_{i}",
text=part.strip(),
dependencies=[],
priority=i, # Earlier parts have higher priority
)
)
return sub_queries
def _decompose_multi_question(self, query: str) -> list[DecomposedQuery]:
"""Split multi-question queries into individual questions."""
questions = re.split(r"\?+\s*", query)
questions = [q.strip() + "?" for q in questions if q.strip()]
return [
DecomposedQuery(
id=f"question_{i}",
text=q,
dependencies=[],
priority=len(questions) - i, # Later questions have higher priority
)
for i, q in enumerate(questions)
]
class QueryOrchestrator:
"""
Orchestrates complex queries by decomposing, routing, and synthesizing.
Uses dependency graph to determine execution order.
"""
def __init__(
self,
client: "AIModeClient",
decomposer: Optional[QueryDecomposer] = None,
):
self.client = client
self.decomposer = decomposer or QueryDecomposer()
async def execute_complex_query(
self, query_text: str
) -> SynthesizedResponse:
"""
Execute a complex multi-part query.
Steps:
1. Decompose query into sub-queries
2. Build dependency graph
3. Execute sub-queries in order
4. Synthesize final response
"""
start_time = asyncio.get_event_loop().time()
# Step 1: Decompose
sub_queries = self.decomposer.decompose(query_text)
# Step 2: Build dependency graph
dependency_graph = self._build_dependency_graph(sub_queries)
# Step 3: Execute in dependency order
results = {}
for batch in self._topological_sort(dependency_graph):
# Execute batch in parallel
tasks = []
for query_id in batch:
sub_query = next(q for q in sub_queries if q.id == query_id)
tasks.append(self._execute_sub_query(sub_query, results))
batch_results = await asyncio.gather(*tasks)
for query_id, result in zip(batch, batch_results):
results[query_id] = result
# Step 4: Synthesize
main_answer = results.get("main", results.get("synthesis", ""))
if not main_answer:
# Combine all results
main_answer = "\n\n".join(
f"{q.text}**\n{r}"
for q, r in zip(sub_queries, results.values())
)
processing_time = int((asyncio.get_event_loop().time() - start_time) * 1000)
return SynthesizedResponse(
main_answer=main_answer,
sub_answers=results,
sources=[], # Would aggregate from all sub-queries
processing_time_ms=processing_time,
)
def _build_dependency_graph(
self, sub_queries: list[DecomposedQuery]
) -> dict[str, set[str]]:
"""Build adjacency list for dependency graph."""
graph = {q.id: set(q.dependencies) for q in sub_queries}
return graph
def _topological_sort(
self, graph: dict[str, set[str]]
) -> list[list[str]]:
"""
Topological sort with level grouping.
Returns batches of queries that can be executed in parallel.
"""
# Copy graph to avoid mutation
graph = {k: set(v) for k, v in graph.items()}
batches = []
while graph:
# Find nodes with no dependencies
ready = [node for node, deps in graph.items() if not deps]
if not ready:
raise ValueError("Circular dependency detected in query decomposition")
batches.append(ready)
# Remove ready nodes from graph
for node in ready:
del graph[node]
for deps in graph.values():
deps.difference_update(ready)
return batches
async def _execute_sub_query(
self, sub_query: DecomposedQuery, previous_results: dict[str, str]
) -> str:
"""Execute a single sub-query, potentially using previous results."""
# Inject previous results into query context
context = ""
for dep_id in sub_query.dependencies:
if dep_id in previous_results:
context += f"\nPrevious result for {dep_id}: {previous_results[dep_id][:500]}.."
enhanced_query = sub_query.text
if context:
enhanced_query = f"{context}\n\nBased on the above, {sub_query.text}"
multimodal_input = MultimodalInput(text=enhanced_query)
response = await self.client.search(multimodal_input)
return response.content
Handling Edge Cases and Production Considerations
Production deployments of AI Mode applications face several challenges. Let's address the most critical ones.
Rate Limiting and Backpressure
Google's AI Mode is experimental and likely has aggressive rate limits. Implement a token bucket algorithm for precise rate control:
# rate_limiter.py
import asyncio
import time
from collections import deque
class TokenBucketRateLimiter:
"""
Token bucket algorithm for precise rate limiting.
Allows burst traffic up to capacity, then throttles to steady rate.
"""
def __init__(self, rate: float, capacity: int):
"""
Args:
rate: Tokens per second (steady state)
capacity: Maximum burst size
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> float:
"""
Acquire tokens, waiting if necessary.
Returns wait time in seconds.
"""
async with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return 0.0
# Calculate wait time
deficit = tokens - self.tokens
wait_time = deficit / self.rate
# Wait and then consume
await asyncio.sleep(wait_time)
self._refill()
self.tokens -= tokens
return wait_time
def _refill(self):
"""Refill tokens based on elapsed time."""
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
Error Recovery and Circuit Breaking
Implement a circuit breaker pattern to prevent cascading failures:
# circuit_breaker.py
import asyncio
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
"""
Circuit breaker for AI Mode API calls.
Prevents cascading failures by failing fast when
the downstream service is unhealthy.
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max_requests: int = 3,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_requests = half_open_max_requests
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0.0
self.half_open_requests = 0
self._lock = asyncio.Lock()
async def call(self, coro):
"""
Execute a coroutine with circuit breaker protection.
Raises CircuitBreakerOpenError if circuit is open.
"""
async with self._lock:
if self.state == CircuitState.OPEN:
if time.monotonic() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_requests = 0
else:
raise CircuitBreakerOpenError("Circuit breaker is open")
if self.state == CircuitState.HALF_OPEN:
if self.half_open_requests >= self.half_open_max_requests:
raise CircuitBreakerOpenError("Half-open limit reached")
self.half_open_requests += 1
try:
result = await coro
# Success - reset circuit
async with self._lock:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
async with self._lock:
self.failure_count += 1
self.last_failure_time = time.monotonic()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
class CircuitBreakerOpenError(Exception):
"""Raised when circuit breaker prevents a request."""
pass
Multimodal Input Processing
Handle image and voice inputs efficiently:
# multimodal_processor.py
import base64
from io import BytesIO
from pathlib import Path
from typing import Union
from PIL import Image
class MultimodalProcessor:
"""
Process and optimize multimodal inputs for AI Mode.
Handles:
- Image resizing and compression
- Voice data conversion
- Input validation
"""
MAX_IMAGE_SIZE_MB = 4
MAX_IMAGE_DIMENSION = 2048
SUPPORTED_IMAGE_FORMATS = {"JPEG", "PNG", "WEBP"}
@staticmethod
def process_image(image_path: Union[str, Path]) -> str:
"""
Process an image file for AI Mode submission.
Resizes if too large, converts to base64.
"""
path = Path(image_path)
if not path.exists():
raise FileNotFoundError(f"Image not found: {path}")
with Image.open(path) as img:
# Convert to RGB if necessary
if img.mode != "RGB":
img = img.convert("RGB")
# Resize if too large
if max(img.size) > MultimodalProcessor.MAX_IMAGE_DIMENSION:
img.thumbnail(
(MultimodalProcessor.MAX_IMAGE_DIMENSION,
MultimodalProcessor.MAX_IMAGE_DIMENSION),
Image.Resampling.LANCZOS
)
# Compress to meet size limit
buffer = BytesIO()
quality = 85
while True:
buffer.seek(0)
buffer.truncate()
img.save(buffer, format="JPEG", quality=quality, optimize=True)
if buffer.tell() <= MultimodalProcessor.MAX_IMAGE_SIZE_MB * 1024 * 1024:
break
quality -= 5
if quality < 20:
break
buffer.seek(0)
return base64.b64encode(buffer.read()).decode("utf-8")
@staticmethod
def validate_multimodal_input(
text: str,
images: list[str] = None,
voice_data: str = None,
) -> dict:
"""
Validate and prepare multimodal input.
Returns dict with warnings about potential issues.
"""
warnings = []
if not text.strip():
warnings.append("Empty text query")
if images:
if len(images) > 5:
warnings.append("More than 5 images may be truncated")
for i, img in enumerate(images):
try:
decoded = base64.b64decode(img, validate=True)
if len(decoded) > 10 * 1024 * 1024: # 10MB
warnings.append(f"Image {i} exceeds 10MB")
except Exception:
warnings.append(f"Image {i} has invalid base64 encoding")
if voice_data:
try:
base64.b64decode(voice_data, validate=True)
except Exception:
warnings.append("Voice data has invalid base64 encoding")
return {"warnings": warnings, "is_valid": len(warnings) == 0}
Production Deployment and Monitoring
Here's a complete FastAPI application that ties everything together:
# main.py
import asyncio
import os
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from ai_mode_client import AIModeClient, AIClientConfig, MultimodalInput
from query_orchestrator import QueryOrchestrator
from rate_limiter import TokenBucketRateLimiter
from circuit_breaker import CircuitBreaker, CircuitBreakerOpenError
from multimodal_processor import MultimodalProcessor
# Configuration
config = AIClientConfig(
api_key=os.getenv("GOOGLE_API_KEY"),
endpoint=os.getenv("AI_MODE_ENDPOINT"),
max_tokens=int(os.getenv("MAX_TOKENS", "4096")),
temperature=float(os.getenv("TEMPERATURE", "0.7")),
)
# Global instances
client = AIModeClient(config)
orchestrator = QueryOrchestrator(client)
rate_limiter = TokenBucketRateLimiter(rate=10, capacity=20) # 10 req/s, burst 20
circuit_breaker = CircuitBreaker()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle."""
# Startup
yield
# Shutdown
await client._client.aclose()
app = FastAPI(
title="AI Mode Search Engine",
version="1.0.0",
lifespan=lifespan,
)
class SearchRequest(BaseModel):
query: str
stream: bool = False
@app.post("/search")
async def search(request: SearchRequest):
"""
Execute a search query against Google AI Mode.
Supports both simple and complex multi-part queries.
"""
# Rate limiting
wait_time = await rate_limiter.acquire()
if wait_time > 0:
# Log rate limiting event
pass
try:
# Circuit breaker protection
response = await circuit_breaker.call(
orchestrator.execute_complex_query(request.query)
)
return {
"query": request.query,
"response": response.main_answer,
"sub_answers": response.sub_answers,
"processing_time_ms": response.processing_time_ms,
}
except CircuitBreakerOpenError:
raise HTTPException(
status_code=503,
detail="Service temporarily unavailable. Please try again later.",
)
except TimeoutError:
raise HTTPException(
status_code=504,
detail="Query timed out. Consider simplifying your question.",
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except PermissionError as e:
raise HTTPException(status_code=401, detail=str(e))
@app.post("/search/multimodal")
async def search_multimodal(
query: str = Form(..),
images: list[UploadFile] = File(None, max_length=5),
):
"""
Search with multimodal input (text + images).
Accepts up to 5 images alongside text query.
"""
processed_images = []
if images:
for img in images:
content = await img.read()
# Process and optimize image
processed = MultimodalProcessor.process_image(content)
processed_images.append(processed)
multimodal_input = MultimodalInput(
text=query,
images=processed_images,
)
# Validate
validation = MultimodalProcessor.validate_multimodal_input(
query, processed_images
)
try:
response = await circuit_breaker.call(
client.search(multimodal_input)
)
return {
"query": query,
"response": response.content,
"sources": response.sources,
"warnings": validation["warnings"],
"processing_time_ms": response.processing_time_ms,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/metrics")
async def get_metrics():
"""Return client metrics for monitoring."""
return {
"client_metrics": client.get_metrics(),
"rate_limiter": {
"available_tokens": rate_limiter.tokens,
"capacity": rate_limiter.capacity,
},
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info",
)
What's Next
Google's AI Mode represents a paradigm shift in how users interact with search engines. By moving from keyword-based queries to natural language conversations with multimodal support, it enables more nuanced and productive information retrieval.
To extend this tutorial:
- Implement caching with Redis: Replace the in-memory cache with Redis for distributed deployments
- Add query analytics: Track which types of complex queries perform best
- Build a feedback loop: Allow users to rate responses and use that data to improve query decomposition
- Explore multimodal RAG: Combine AI Mode with vector database [3]s for domain-specific knowledge retrieval
The code in this tutorial is production-ready but should be adapted to your specific use case. Monitor your API usage carefully, as Google's AI Mode is still experimental and subject to change. As of May 2026, the feature continues to evolve, and staying updated with Google's developer documentation is essential for maintaining compatibility.
Remember that the true power of AI Mode lies not in replacing traditional search, but in augmenting it for complex, multi-faceted queries that previously required significant manual effort to answer.
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Build a Gmail AI Assistant with Google Gemini
Practical tutorial: It represents an incremental improvement in user interface and interaction with existing technology.
How to Build a Production ML API with FastAPI and Modal
Practical tutorial: Build a production ML API with FastAPI + Modal
How to Build a Voice Assistant with Whisper and Llama 3.3
Practical tutorial: Build a voice assistant with Whisper + Llama 3.3