How to Build a Multimodal App with Gemini 2.0 Vision API
Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API
How to Build a Multimodal App with Gemini 2.0 Vision API
Table of Contents
- How to Build a Multimodal App with Gemini 2.0 Vision API
- Create a virtual environment
- Install core dependencies
- Set up authentication
- preprocessor.py
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
The landscape of AI applications is shifting from text-only interactions to multimodal experiences that can understand images, video, and text simultaneously. Google's Gemini 2.0 Vision API represents a significant leap in this direction, offering native multimodal understanding that goes far beyond simple image captioning or OCR. In this tutorial, we'll build a production-grade document intelligence system that can extract, analyze, and query information from complex documents containing charts, tables, handwritten notes, and mixed media.
What makes this approach production-ready is our focus on error handling, rate limiting, cost optimization, and structured output parsing. We'll implement a complete pipeline that processes PDFs and images, extracts structured data using Gemini 2.0's vision capabilities, stores results in a vector database [1] for semantic search, and exposes everything through a FastAPI endpoint.
Understanding Gemini 2.0 Vision Architecture and Production Considerations
Before diving into code, it's crucial to understand how Gemini 2.0 processes multimodal inputs and what that means for production systems. According to Google's official documentation, Gemini 2.0 accepts images, video, and text as direct inputs through a unified API endpoint. Unlike earlier approaches that required separate encoders for each modality, Gemini processes these inputs natively, allowing for cross-modal reasoning.
The key architectural decision you'll face is whether to use the synchronous generate_content method or the streaming variant. For document processing pipelines where you need complete results before proceeding, synchronous calls are appropriate. For real-time applications like live video analysis, streaming provides lower latency. We'll focus on the synchronous approach since it's more common in batch document processing.
Rate limiting is your primary production concern. As of May 2026, the Gemini API enforces rate limits based on your pricing tier. The free tier allows 60 requests per minute, while paid tiers offer higher limits. We'll implement exponential backoff and request queuing to handle these constraints gracefully.
Prerequisites and Environment Setup
You'll need Python 3.10 or later, a Google Cloud project with the Vertex AI API enabled, and billing configured. Let's set up our environment:
# Create a virtual environment
python -m venv gemini_vision_env
source gemini_vision_env/bin/activate # On Windows: gemini_vision_env\Scripts\activate
# Install core dependencies
pip install google-cloud-aiplatform==1.47.0
pip install google-generativeai==0.8.3
pip install fastapi==0.111.0
pip install uvicorn==0.29.0
pip install pypdf2==3.0.1
pip install pillow==10.3.0
pip install chromadb==0.5.0
pip install python-multipart==0.0.9
pip install pydantic==2.7.1
pip install tenacity==8.3.0
pip install loguru==0.7.2
For authentication, you have two options. The recommended approach for production is using a service account:
# Set up authentication
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"
export GEMINI_API_KEY="your-api-key" # Alternative for direct API access
If you're using the google-generativeai library directly (not through Vertex AI), you'll need an API key from Google AI Studio. For enterprise deployments, Vertex AI provides better security, monitoring, and compliance features.
Building the Multimodal Document Intelligence Pipeline
Our pipeline will consist of four stages: document preprocessing, multimodal analysis with Gemini 2.0, structured data extraction, and semantic indexing. Let's implement each component with production-grade error handling.
Document Preprocessing and Image Preparation
The first challenge is handling diverse document formats. We'll create a robust preprocessing module that converts PDFs to images, normalizes image sizes, and handles edge cases like corrupted files or password-protected documents.
# preprocessor.py
import io
from pathlib import Path
from typing import List, Optional, Union
from PIL import Image, ImageEnhance, UnidentifiedImageError
import PyPDF2
from loguru import logger
class DocumentPreprocessor:
"""Handles document conversion and image preparation for Gemini Vision API."""
MAX_IMAGE_SIZE = (2048, 2048) # Gemini 2.0 recommended max dimensions
SUPPORTED_EXTENSIONS = {'.pdf', '.png', '.jpg', '.jpeg', '.tiff', '.bmp'}
def __init__(self, enhance_contrast: bool = True, dpi: int = 200):
self.enhance_contrast = enhance_contrast
self.dpi = dpi
def process_document(self, file_path: Union[str, Path]) -> List[Image.Image]:
"""
Convert document to list of PIL Images suitable for Gemini API.
Args:
file_path: Path to PDF or image file
Returns:
List of processed PIL Image objects
Raises:
ValueError: If file type is unsupported or file is corrupted
FileNotFoundError: If file doesn't exist
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"Document not found: {path}")
if path.suffix.lower() not in self.SUPPORTED_EXTENSIONS:
raise ValueError(f"Unsupported file type: {path.suffix}. "
f"Supported: {self.SUPPORTED_EXTENSIONS}")
try:
if path.suffix.lower() == '.pdf':
return self._process_pdf(path)
else:
return [self._process_single_image(path)]
except (PyPDF2.errors.PdfReadError, UnidentifiedImageError) as e:
logger.error(f"Failed to process {path}: {e}")
raise ValueError(f"Corrupted or invalid file: {path}") from e
def _process_pdf(self, path: Path) -> List[Image.Image]:
"""Convert PDF pages to processed images."""
images = []
with open(path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
if pdf_reader.is_encrypted:
logger.warning(f"PDF is encrypted: {path}. Attempting empty password..")
try:
pdf_reader.decrypt('')
except:
raise ValueError(f"Cannot process encrypted PDF without password: {path}")
for page_num, page in enumerate(pdf_reader.pages):
logger.debug(f"Processing page {page_num + 1}/{len(pdf_reader.pages)}")
# Extract text for metadata (not for vision analysis)
text = page.extract_text()
# Convert page to image using PyPDF2's built-in capabilities
# Note: For production, consider using pdf2image for better quality
if '/XObject' in page['/Resources']:
xobjects = page['/Resources']['/XObject']
for obj_name in xobjects:
obj = xobjects[obj_name]
if obj['/Subtype'] == '/Image':
# Extract embedded images
width = obj['/Width']
height = obj['/Height']
data = obj.get_data()
try:
img = Image.frombytes('RGB', (width, height), data)
processed = self._optimize_image(img)
images.append(processed)
except Exception as e:
logger.warning(f"Failed to extract image from page {page_num}: {e}")
if not images:
logger.warning(f"No images extracted from PDF: {path}. "
f"Consider using pdf2image library for rasterization.")
return images
def _process_single_image(self, path: Path) -> Image.Image:
"""Process a single image file."""
with Image.open(path) as img:
img = img.convert('RGB') # Ensure consistent color mode
return self._optimize_image(img)
def _optimize_image(self, image: Image.Image) -> Image.Image:
"""
Optimize image for Gemini Vision API processing.
- Resize if too large
- Enhance contrast for better text recognition
- Normalize color profile
"""
# Resize while maintaining aspect ratio
if image.size[0] > self.MAX_IMAGE_SIZE[0] or image.size[1] > self.MAX_IMAGE_SIZE[1]:
image.thumbnail(self.MAX_IMAGE_SIZE, Image.Resampling.LANCZOS)
logger.debug(f"Resized image to {image.size}")
# Enhance contrast for better OCR and chart reading
if self.enhance_contrast:
enhancer = ImageEnhance.Contrast(image)
image = enhancer.enhance(1.5)
return image
def image_to_bytes(self, image: Image.Image, format: str = 'PNG') -> bytes:
"""Convert PIL Image to bytes for API transmission."""
buffer = io.BytesIO()
image.save(buffer, format=format, optimize=True)
return buffer.getvalue()
This preprocessor handles several edge cases that commonly break production pipelines:
- Encrypted PDFs (attempts empty password)
- Corrupted image files (catches
UnidentifiedImageError) - Mixed-format documents (PDFs with embedded images)
- Memory management (uses thumbnailing for large images)
Core Gemini 2.0 Vision Integration
Now let's implement the core integration with Gemini 2.0's Vision API. We'll use the google-generativeai library with structured output parsing and comprehensive error handling.
# gemini_vision.py
import base64
import json
from typing import Dict, List, Optional, Any
from datetime import datetime
from PIL import Image
import google.generativeai as genai
from google.generativeai.types import HarmCategory, HarmBlockThreshold
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from pydantic import BaseModel, Field
from loguru import logger
# Define structured output schemas
class DocumentMetadata(BaseModel):
"""Extracted document metadata."""
title: Optional[str] = Field(None, description="Document title or filename")
document_type: Optional[str] = Field(None, description="Type of document: invoice, report, letter, etc.")
date: Optional[str] = Field(None, description="Document date if present")
language: Optional[str] = Field("en", description="Detected language")
page_count: Optional[int] = Field(None, description="Number of pages")
class ExtractedData(BaseModel):
"""Structured data extracted from document."""
metadata: DocumentMetadata
key_findings: List[str] = Field(default_factory=list, description="Key points extracted")
tables: List[Dict[str, Any]] = Field(default_factory=list, description="Extracted table data")
numerical_values: List[Dict[str, Any]] = Field(default_factory=list, description="Numbers and statistics")
handwritten_notes: List[str] = Field(default_factory=list, description="Handwritten text if detected")
charts_summary: Optional[str] = Field(None, description="Summary of any charts or graphs")
class GeminiVisionClient:
"""Production client for Gemini 2.0 Vision API with structured output."""
def __init__(self, api_key: Optional[str] = None, model_name: str = "gemini-2.0-flash-exp"):
"""
Initialize Gemini Vision client.
Args:
api_key: Gemini API key. If None, uses GOOGLE_API_KEY env var.
model_name: Model identifier. Default uses experimental flash for speed.
"""
if api_key:
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel(model_name)
# Configure safety settings for document processing
self.safety_settings = {
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
}
# Generation config for structured output
self.generation_config = {
"temperature": 0.2, # Low temperature for consistent extraction
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
}
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type((Exception,)),
before_sleep=lambda retry_state: logger.warning(
f"Retry {retry_state.attempt_number} after {retry_state.outcome.exception()}"
)
)
def analyze_document(self, image: Image.Image, context: Optional[str] = None) -> ExtractedData:
"""
Analyze a document image using Gemini 2.0 Vision.
Args:
image: PIL Image object of the document page
context: Optional context about the document type
Returns:
ExtractedData with structured information
Raises:
Exception: After all retries exhausted
"""
prompt = self._build_analysis_prompt(context)
try:
response = self.model.generate_content(
[prompt, image],
generation_config=self.generation_config,
safety_settings=self.safety_settings,
)
# Parse the response
if not response.candidates:
logger.error("No candidates returned from Gemini API")
raise ValueError("Empty response from Gemini API")
# Extract text from response
response_text = response.text
# Try to parse as JSON first (if model returns structured output)
try:
# Clean potential markdown code blocks
if "```json" in response_text:
response_text = response_text.split("```json")[1].split("```")[0].strip()
elif "```" in response_text:
response_text = response_text.split("```")[1].split("```")[0].strip()
data_dict = json.loads(response_text)
return ExtractedData(**data_dict)
except (json.JSONDecodeError, Exception) as e:
logger.warning(f"Failed to parse structured JSON, falling back to text: {e}")
# Fallback: wrap raw text in structured format
return ExtractedData(
metadata=DocumentMetadata(),
key_findings=[response_text]
)
except Exception as e:
logger.error(f"Gemini API call failed: {e}")
raise
def _build_analysis_prompt(self, context: Optional[str] = None) -> str:
"""Build the analysis prompt for document understanding."""
base_prompt = """You are a document analysis expert. Analyze this document image and extract all relevant information.
IMPORTANT: Respond ONLY with a valid JSON object. No markdown, no explanations.
Extract the following structured information:
{
"metadata": {
"title": "Document title or filename if visible",
"document_type": "Type (invoice, report, letter, form, etc.)",
"date": "Document date if visible (YYYY-MM-DD format)",
"language": "Detected language code",
"page_count": null
},
"key_findings": [
"List the 3-5 most important findings or data points"
],
"tables": [
{
"headers": ["column1", "column2"],
"rows": [["value1", "value2"]]
}
],
"numerical_values": [
{
"value": "extracted number",
"context": "what this number represents",
"unit": "unit if applicable"
}
],
"handwritten_notes": [
"Any handwritten text found in the document"
],
"charts_summary": "Description of any charts, graphs, or visualizations"
}
Be thorough. Extract every visible number, date, and key piece of information."""
if context:
base_prompt += f"\n\nAdditional context: {context}"
return base_prompt
def analyze_document_batch(self, images: List[Image.Image],
batch_size: int = 5) -> List[ExtractedData]:
"""
Process multiple document pages with rate limiting.
Args:
images: List of PIL Images
batch_size: Number of concurrent requests (respect API limits)
Returns:
List of ExtractedData objects
"""
results = []
for i in range(0, len(images), batch_size):
batch = images[i:i + batch_size]
logger.info(f"Processing batch {i//batch_size + 1}/{(len(images)-1)//batch_size + 1}")
for image in batch:
try:
result = self.analyze_document(image)
results.append(result)
except Exception as e:
logger.error(f"Failed to process image in batch: {e}")
results.append(ExtractedData(
metadata=DocumentMetadata(),
key_findings=[f"Error: {str(e)}"]
))
return results
Key production considerations in this implementation:
-
Retry Logic: Using
tenacitywith exponential backoff handles transient API failures gracefully. The retry decorator waits 4 seconds initially, then doubles up to 60 seconds max. -
Structured Output: We force the model to return JSON by explicitly requesting it in the prompt. This enables programmatic consumption of results without manual parsing.
-
Safety Settings: For document processing, we disable content filtering since we're analyzing business documents, not generating content. This prevents false positives on technical content.
-
Fallback Parsing: If JSON parsing fails, we gracefully fall back to wrapping raw text in our structured format rather than crashing.
FastAPI Endpoint with Vector Search
Now let's build the API layer that exposes our pipeline and adds semantic search capabilities using ChromaDB.
# api.py
import io
import uuid
from typing import List, Optional
from datetime import datetime
from fastapi import FastAPI, UploadFile, File, HTTPException, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import chromadb
from chromadb.config import Settings
from loguru import logger
from preprocessor import DocumentPreprocessor
from gemini_vision import GeminiVisionClient, ExtractedData
# Initialize FastAPI
app = FastAPI(
title="Gemini 2.0 Document Intelligence API",
version="1.0.0",
description="Production-grade document analysis with multimodal AI"
)
# Initialize components
preprocessor = DocumentPreprocessor()
vision_client = GeminiVisionClient()
# Initialize ChromaDB for vector storag [2]e
chroma_client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="./chroma_db"
))
# Create or get collection
try:
collection = chroma_client.get_collection("document_embeddings")
except:
collection = chroma_client.create_collection("document_embeddings")
# Pydantic models for API
class AnalysisResponse(BaseModel):
document_id: str
filename: str
analysis: ExtractedData
processed_at: datetime
pages_analyzed: int
class SearchQuery(BaseModel):
query: str
n_results: int = 5
class SearchResult(BaseModel):
document_id: str
filename: str
relevance_score: float
extracted_data: ExtractedData
@app.post("/analyze", response_model=AnalysisResponse)
async def analyze_document(file: UploadFile = File(..)):
"""
Upload and analyze a document using Gemini 2.0 Vision.
Accepts PDF, PNG, JPG, TIFF, BMP files up to 20MB.
Returns structured analysis with extracted data.
"""
# Validate file size (20MB limit)
contents = await file.read()
if len(contents) > 20 * 1024 * 1024:
raise HTTPException(status_code=413, detail="File too large. Maximum 20MB.")
# Generate unique document ID
document_id = str(uuid.uuid4())
try:
# Save uploaded file temporarily
temp_path = f"/tmp/{document_id}_{file.filename}"
with open(temp_path, "wb") as f:
f.write(contents)
# Process document
logger.info(f"Processing document: {file.filename}")
images = preprocessor.process_document(temp_path)
if not images:
raise HTTPException(
status_code=400,
detail="No processable images found in document"
)
# Analyze with Gemini Vision
logger.info(f"Analyzing {len(images)} pages with Gemini 2.0 Vision")
results = vision_client.analyze_document_batch(images)
# Combine results from multiple pages
combined_analysis = self._combine_results(results, file.filename)
# Store in vector database for semantic search
self._index_document(document_id, file.filename, combined_analysis)
# Clean up temp file
import os
os.remove(temp_path)
return AnalysisResponse(
document_id=document_id,
filename=file.filename,
analysis=combined_analysis,
processed_at=datetime.utcnow(),
pages_analyzed=len(images)
)
except ValueError as e:
logger.error(f"Validation error: {e}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Analysis failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal analysis error")
@app.post("/search", response_model=List[SearchResult])
async def search_documents(query: SearchQuery):
"""
Semantic search across analyzed documents.
Uses ChromaDB vector similarity to find relevant documents
based on natural language queries.
"""
try:
# Generate embedding for query using Gemini
query_embedding = vision_client.model.embed_content(
content=query.query,
task_type="retrieval_query"
)
# Search vector database
results = collection.query(
query_embeddings=[query_embedding.embedding],
n_results=query.n_results
)
# Format results
search_results = []
for i in range(len(results['ids'][0])):
search_results.append(SearchResult(
document_id=results['ids'][0][i],
filename=results['metadatas'][0][i].get('filename', 'Unknown'),
relevance_score=1 - results['distances'][0][i], # Convert distance to similarity
extracted_data=ExtractedData(**json.loads(
results['documents'][0][i]
))
))
return search_results
except Exception as e:
logger.error(f"Search failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Search failed")
def _combine_results(self, results: List[ExtractedData], filename: str) -> ExtractedData:
"""Combine analysis results from multiple pages."""
if not results:
return ExtractedData(metadata=DocumentMetadata(title=filename))
combined = results[0]
combined.metadata.title = filename
combined.metadata.page_count = len(results)
# Merge findings from all pages
for result in results[1:]:
combined.key_findings.extend(result.key_findings)
combined.tables.extend(result.tables)
combined.numerical_values.extend(result.numerical_values)
combined.handwritten_notes.extend(result.handwritten_notes)
return combined
def _index_document(self, document_id: str, filename: str, analysis: ExtractedData):
"""Store document analysis in vector database."""
# Convert analysis to text for embedding
analysis_text = json.dumps(analysis.dict())
# Generate embedding
embedding = vision_client.model.embed_content(
content=analysis_text,
task_type="retrieval_document"
)
# Store in ChromaDB
collection.add(
embeddings=[embedding.embedding],
documents=[analysis_text],
metadatas=[{
"filename": filename,
"document_id": document_id,
"processed_at": datetime.utcnow().isoformat()
}],
ids=[document_id]
)
Running the Production Pipeline
To run the complete application:
# main.py
import uvicorn
from api import app
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=False, # Disable in production
workers=4, # Adjust based on CPU cores
log_level="info"
)
Start the server:
python main.py
Test the endpoint:
# Analyze a document
curl -X POST http://localhost:8000/analyze \
-F "file=@/path/to/invoice.pdf"
# Search analyzed documents
curl -X POST http://localhost:8000/search \
-H "Content-Type: application/json" \
-d '{"query": "What invoices mention project Alpha?", "n_results": 5}'
Edge Cases and Production Hardening
Memory Management
When processing large PDFs (100+ pages), memory usage can spike. Implement streaming processing:
async def process_large_document(file_path: str, max_pages: int = 50):
"""Process large documents in chunks to manage memory."""
images = []
with open(file_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
total_pages = len(pdf_reader.pages)
if total_pages > max_pages:
logger.warning(f"Document has {total_pages} pages, processing first {max_pages}")
for page_num in range(min(total_pages, max_pages)):
# Process page and immediately release memory
page = pdf_reader.pages[page_num]
img = convert_page_to_image(page)
images.append(img)
# Process in batches of 10
if len(images) >= 10:
yield images
images = []
if images:
yield images
API Cost Optimization
Gemini 2.0 Vision pricing is based on input tokens (images cost more than text). Optimize by:
- Resizing images to minimum required resolution
- Caching results for duplicate documents using content hashing
- Batching requests within rate limits
import hashlib
def get_document_hash(contents: bytes) -> str:
"""Generate content hash for deduplication."""
return hashlib.sha256(contents).hexdigest()
# Check cache before processing
doc_hash = get_document_hash(contents)
cached_result = cache.get(doc_hash)
if cached_result:
return cached_result
What's Next
This production pipeline demonstrates how to build a multimodal document intelligence system using Gemini 2.0 Vision API. The architecture handles real-world challenges like rate limiting, structured output parsing, and semantic search integration.
To extend this system:
- Add support for video analysis using Gemini's video understanding capabilities
- Implement real-time processing with WebSocket endpoints for streaming results
- Add user authentication and multi-tenant document isolation
- Integrate with cloud storage (GCS, S3) for document persistence
- Add monitoring with Prometheus metrics and structured logging
The complete code is available on GitHub. For more tutorials on building AI-powered applications, check out our guides on production AI pipelines and multimodal model deployment.
Remember that while Gemini 2.0 Vision is powerful, it's not infallible. Always validate extracted data against business rules, especially for financial or legal documents. The combination of structured prompting, vector search, and human-in-the-loop validation creates the most reliable production system.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Analyze Security Logs with DeepSeek Locally
Practical tutorial: Analyze security logs with DeepSeek locally
How to Build a Multimodal App with Gemini 2.0 Vision API
Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API
How to Build an AI Research Assistant with Perplexity API
Practical tutorial: Create an AI research assistant with Perplexity API