How to Build a Multimodal App with Gemini 2.0 Vision API
Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API
How to Build a Multimodal App with Gemini 2.0 Vision API
Table of Contents
- How to Build a Multimodal App with Gemini 2.0 Vision API
- Create and activate virtual environment
- Core dependencies
- For PDF handling
- For embedding [2]s (optional, if using local models)
- Authenticate with Google Cloud
- Set your project ID
- Test a simple call
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Building applications that understand both images and text has moved from experimental to production-ready in 2026. Google's Gemini 2.0 Vision API represents a significant leap in multimodal AI, offering native understanding of images, video frames, and text in a single model. In this tutorial, we'll build a production-grade document analysis system that extracts, classifies, and queries information from complex visual documents—think invoices, medical reports, or technical diagrams—using Gemini 2.0 Vision, FastAPI, and vector storage.
Real-World Use Case and Architecture
Consider a logistics company processing thousands of shipping invoices daily. Each invoice contains structured data (prices, dates, addresses) and unstructured elements (signatures, stamps, handwritten notes). Traditional OCR pipelines fail on handwritten text, struggle with varied layouts, and cannot understand context. A multimodal approach using Gemini 2.0 Vision solves this by treating the entire document as a visual reasoning problem.
Our architecture follows a three-tier pattern:
- Ingestion Layer: Accepts images (PDFs converted to PNG, JPEG, or WebP) via a FastAPI endpoint, validates format and size, and queues processing.
- Vision Processing Layer: Calls Gemini 2.0 Vision API with structured prompts to extract fields, classify document type, and generate embeddings for semantic search.
- Storage and Query Layer: Stores extracted data in PostgreSQL with pgvector for hybrid search (structured + semantic), enabling natural language queries like "show me all invoices over $5000 from last month."
This pattern handles edge cases gracefully: low-resolution scans trigger a retry with preprocessing, ambiguous fields are flagged for human review, and rate limits are managed via exponential backoff.
Prerequisites and Environment Setup
Before writing code, ensure your environment meets these requirements:
- Python 3.11+ (3.12 recommended for performance improvements)
- A Google Cloud project with the Vertex AI API enabled (Gemini 2.0 is accessed via Vertex AI, not the legacy PaLM API)
- PostgreSQL 15+ with pgvector extension installed
- At least 4GB RAM for local development (8GB recommended)
Install dependencies:
# Create and activate virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Core dependencies
pip install google-cloud-aiplatform==1.67.0
pip install fastapi==0.115.0
pip install uvicorn[standard]==0.30.0
pip install pillow==10.4.0
pip install psycopg2-binary==2.9.9
pip install pgvector==0.3.0
pip install pydantic==2.9.0
pip install python-multipart==0.0.12
# For PDF handling
pip install pdf2image==1.17.0
pip install poppler-utils # System package: sudo apt-get install poppler-utils
# For embeddings (optional, if using local models)
pip install sentence-transformers [6]==3.1.0
Set up authentication for Vertex AI:
# Authenticate with Google Cloud
gcloud auth application-default login
# Set your project ID
export GOOGLE_CLOUD_PROJECT="your-project-id"
export GEMINI_LOCATION="us-central1" # Or your preferred region
Verify the Gemini 2.0 Vision API is accessible:
from google.cloud import aiplatform
aiplatform.init(project="your-project-id", location="us-central1")
# Test a simple call
from vertexai.preview.generative_models import GenerativeModel, Part
model = GenerativeModel("gemini-2.0-flash-001")
response = model.generate_content([Part.from_text("Hello, Gemini 2.0!")])
print(response.text)
If you encounter authentication errors, ensure your service account has the aiplatform.user role. According to Google Cloud documentation, the Gemini 2.0 models are available in us-central1, europe-west4, and asia-southeast1 regions as of early 2026.
Core Implementation: Document Analysis Pipeline
Step 1: Image Preprocessing and Validation
Production systems must handle diverse input formats. We'll build a robust preprocessing module that normalizes images for the Gemini API, which accepts images up to 20MB and supports JPEG, PNG, GIF, BMP, and WebP formats.
# preprocessor.py
from PIL import Image, ImageEnhance, ImageFilter
import io
import logging
from typing import Tuple, Optional
logger = logging.getLogger(__name__)
class ImagePreprocessor:
"""Handles image validation, normalization, and enhancement for Gemini 2.0 Vision."""
MAX_IMAGE_SIZE_MB = 20
SUPPORTED_FORMATS = {"JPEG", "PNG", "WEBP", "BMP", "GIF"}
TARGET_DPI = 300 # For document scanning quality
@staticmethod
def validate_and_prepare(image_bytes: bytes, filename: str) -> Tuple[bytes, str]:
"""
Validate image format and size, convert if needed.
Returns (processed_bytes, mime_type).
Raises ValueError for invalid inputs.
"""
# Check file size
size_mb = len(image_bytes) / (1024 * 1024)
if size_mb > ImagePreprocessor.MAX_IMAGE_SIZE_MB:
raise ValueError(
f"Image size {size_mb:.2f}MB exceeds limit of {ImagePreprocessor.MAX_IMAGE_SIZE_MB}MB"
)
try:
img = Image.open(io.BytesIO(image_bytes))
img_format = img.format.upper() if img.format else "PNG"
if img_format not in ImagePreprocessor.SUPPORTED_FORMATS:
logger.warning(f"Unsupported format {img_format}, converting to PNG")
img_format = "PNG"
# Convert RGBA to RGB for Gemini compatibility
if img.mode == "RGBA":
background = Image.new("RGB", img.size, (255, 255, 255))
background.paste(img, mask=img.split()[3])
img = background
elif img.mode != "RGB":
img = img.convert("RGB")
# Enhance contrast for poor-quality scans
enhancer = ImageEnhance.Contrast(img)
img = enhancer.enhance(1.2) # 20% contrast boost
# Sharpen slightly for text readability
img = img.filter(ImageFilter.SHARPEN)
# Resize if too large (Gemini has pixel limits, though undocumented)
max_dimension = 4096
if max(img.size) > max_dimension:
ratio = max_dimension / max(img.size)
new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio))
img = img.resize(new_size, Image.Resampling.LANCZOS)
logger.info(f"Resized image from {img.size} to {new_size}")
# Convert back to bytes
buffer = io.BytesIO()
img.save(buffer, format=img_format, quality=95)
processed_bytes = buffer.getvalue()
mime_type = f"image/{img_format.lower()}"
return processed_bytes, mime_type
except Exception as e:
logger.error(f"Image processing failed: {e}")
raise ValueError(f"Invalid image file: {e}")
Edge case handling: The preprocessor handles RGBA-to-RGB conversion (common in scanned PDFs), contrast enhancement for faded documents, and automatic resizing for very large images. The 4096-pixel limit is a practical safeguard—Gemini 2.0's actual pixel limit is not publicly documented, but keeping dimensions under 4K prevents timeout errors in production.
Step 2: Structured Extraction with Gemini 2.0 Vision
Now we build the core extraction logic. Gemini 2.0 Vision accepts multimodal prompts combining text instructions with image parts. We'll use a structured prompt that returns JSON, enabling programmatic consumption.
# extractor.py
import json
import logging
from typing import Dict, Any, Optional
from vertexai.preview.generative_models import GenerativeModel, Part, GenerationConfig
from vertexai.preview.generative_models import HarmCategory, SafetySetting
logger = logging.getLogger(__name__)
class DocumentExtractor:
"""Extracts structured data from document images using Gemini 2.0 Vision."""
def __init__(self, model_name: str = "gemini-2.0-flash-001"):
self.model = GenerativeModel(model_name)
self.safety_settings = [
SafetySetting(
category=HarmCategory.HARM_CATEGORY_HARASSMENT,
threshold=SafetySetting.HarmBlockThreshold.BLOCK_ONLY_HIGH
),
SafetySetting(
category=HarmCategory.HARM_CATEGORY_HATE_SPEECH,
threshold=SafetySetting.HarmBlockThreshold.BLOCK_ONLY_HIGH
),
SafetySetting(
category=HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
threshold=SafetySetting.HarmBlockThreshold.BLOCK_ONLY_HIGH
),
]
def extract_invoice_data(
self,
image_bytes: bytes,
mime_type: str,
document_type: str = "invoice"
) -> Dict[str, Any]:
"""
Extract structured fields from a document image.
Returns a dictionary with extracted fields and confidence scores.
"""
prompt = f"""You are a document analysis system. Extract the following fields from this {document_type} image.
Return ONLY valid JSON with these exact keys:
- document_type: string (e.g., "invoice", "receipt", "purchase_order")
- vendor_name: string or null
- invoice_date: string in YYYY-MM-DD format or null
- total_amount: number or null
- currency: string (3-letter code) or null
- line_items: array of objects with "description", "quantity", "unit_price", "total" or empty array
- confidence: number between 0 and 1 indicating your confidence in the extraction
If a field is not visible or unclear, set it to null. Do not guess.
"""
try:
response = self.model.generate_content(
[
Part.from_text(prompt),
Part.from_data(mime_type=mime_type, data=image_bytes)
],
generation_config=GenerationConfig(
temperature=0.1, # Low temperature for deterministic extraction
max_output_tokens=2048,
top_p=0.95,
),
safety_settings=self.safety_settings
)
# Parse JSON from response
raw_text = response.text.strip()
# Handle cases where model wraps JSON in markdown code blocks
if "```json" in raw_text:
raw_text = raw_text.split("```json")[1].split("```")[0].strip()
elif "```" in raw_text:
raw_text = raw_text.split("```")[1].split("```")[0].strip()
extracted = json.loads(raw_text)
# Validate required fields
required_keys = {"document_type", "vendor_name", "invoice_date",
"total_amount", "currency", "line_items", "confidence"}
missing_keys = required_keys - set(extracted.keys())
if missing_keys:
logger.warning(f"Missing keys in extraction: {missing_keys}")
for key in missing_keys:
extracted[key] = None if key != "line_items" else []
# Add metadata
extracted["_raw_response"] = response.text
extracted["_model"] = "gemini-2.0-flash-001"
return extracted
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Gemini response as JSON: {e}")
logger.debug(f"Raw response: {response.text}")
return {
"error": f"JSON parsing failed: {str(e)}",
"raw_response": response.text,
"confidence": 0.0
}
except Exception as e:
logger.error(f"Gemini API call failed: {e}")
raise
Key design decisions:
- Temperature 0.1: For structured extraction, we want deterministic, repeatable results. Higher temperatures cause the model to hallucinate field values.
- JSON validation: The model sometimes wraps JSON in markdown code blocks. We strip these before parsing.
- Confidence scoring: The model self-reports confidence, which we use downstream for human review routing. In production, you'd calibrate this against a labeled dataset.
- Safety settings: We use
BLOCK_ONLY_HIGHto avoid false positives on benign business documents while still filtering harmful content.
Step 3: Vector Storage and Semantic Search
Extracted data is only useful if queryable. We'll store both structured fields (in PostgreSQL columns) and semantic embeddings (via pgvector) for natural language queries.
# storage.py
import psycopg2
from pgvector.psycopg2 import register_vector
import numpy as np
from typing import List, Dict, Any, Optional
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class DocumentStore:
"""Hybrid storage for extracted documents using PostgreSQL + pgvector."""
def __init__(self, connection_string: str):
self.conn = psycopg2.connect(connection_string)
register_vector(self.conn)
self._create_tables()
def _create_tables(self):
"""Initialize schema with vector support."""
with self.conn.cursor [8]() as cur:
cur.execute("""
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS documents (
id SERIAL PRIMARY KEY,
filename TEXT NOT NULL,
document_type TEXT,
vendor_name TEXT,
invoice_date DATE,
total_amount NUMERIC(12,2),
currency TEXT,
line_items JSONB DEFAULT '[]',
confidence FLOAT,
embedding vector(768), -- Gemini embedding dimension
raw_response TEXT,
created_at TIMESTAMP DEFAULT NOW(),
processed_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_documents_type
ON documents(document_type);
CREATE INDEX IF NOT EXISTS idx_documents_date
ON documents(invoice_date);
CREATE INDEX IF NOT EXISTS idx_documents_embedding
ON documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
""")
self.conn.commit()
def store_document(
self,
filename: str,
extracted: Dict[str, Any],
embedding: Optional[np.ndarray] = None
) -> int:
"""Insert extracted document and return its ID."""
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO documents
(filename, document_type, vendor_name, invoice_date,
total_amount, currency, line_items, confidence,
embedding, raw_response, processed_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
RETURNING id;
""", (
filename,
extracted.get("document_type"),
extracted.get("vendor_name"),
self._parse_date(extracted.get("invoice_date")),
extracted.get("total_amount"),
extracted.get("currency"),
json.dumps(extracted.get("line_items", [])),
extracted.get("confidence"),
embedding.tolist() if embedding is not None else None,
extracted.get("_raw_response")
))
doc_id = cur.fetchone()[0]
self.conn.commit()
logger.info(f"Stored document {doc_id} from {filename}")
return doc_id
def semantic_search(
self,
query_embedding: np.ndarray,
top_k: int = 10,
doc_type_filter: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Find documents by cosine similarity to query embedding."""
with self.conn.cursor() as cur:
if doc_type_filter:
cur.execute("""
SELECT id, filename, document_type, vendor_name,
total_amount, confidence,
1 - (embedding <=> %s::vector) AS similarity
FROM documents
WHERE document_type = %s
ORDER BY embedding <=> %s::vector
LIMIT %s;
""", (query_embedding.tolist(), doc_type_filter, query_embedding.tolist(), top_k))
else:
cur.execute("""
SELECT id, filename, document_type, vendor_name,
total_amount, confidence,
1 - (embedding <=> %s::vector) AS similarity
FROM documents
ORDER BY embedding <=> %s::vector
LIMIT %s;
""", (query_embedding.tolist(), query_embedding.tolist(), top_k))
results = []
for row in cur.fetchall():
results.append({
"id": row[0],
"filename": row[1],
"document_type": row[2],
"vendor_name": row[3],
"total_amount": float(row[4]) if row[4] else None,
"confidence": row[5],
"similarity": float(row[6])
})
return results
@staticmethod
def _parse_date(date_str: Optional[str]) -> Optional[datetime]:
"""Safely parse date strings from extraction."""
if not date_str:
return None
try:
return datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
logger.warning(f"Could not parse date: {date_str}")
return None
Why pgvector over Pinecone/Weaviate? For document processing pipelines, keeping embeddings in the same database as structured data eliminates network latency and simplifies transactions. pgvector's IVFFlat index with 100 lists provides good recall for up to 1M vectors, which covers most mid-scale deployments. For larger scales, consider the HNSW index type available in pgvector 0.6+.
Step 4: FastAPI Application with Rate Limiting
Finally, we wire everything into a FastAPI application with proper error handling, rate limiting, and async processing.
# app.py
import asyncio
import logging
from typing import Optional
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
import numpy as np
from vertexai.preview.generative_models import GenerativeModel, Part
from preprocessor import ImagePreprocessor
from extractor import DocumentExtractor
from storage import DocumentStore
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Initialize components
preprocessor = ImagePreprocessor()
extractor = DocumentExtractor()
store = DocumentStore(
connection_string="postgresql://user:password@localhost:5432/documents"
)
# Gemini embedding model (separate from vision model)
embedding_model = GenerativeModel("gemini-2.0-flash-001")
app = FastAPI(
title="Multimodal Document Analysis API",
version="2.0.0",
description="Extract and query document data using Gemini 2.0 Vision"
)
class SearchQuery(BaseModel):
query: str = Field(.., min_length=3, max_length=500)
top_k: int = Field(default=10, ge=1, le=100)
doc_type_filter: Optional[str] = None
class DocumentResponse(BaseModel):
id: int
filename: str
document_type: Optional[str]
vendor_name: Optional[str]
total_amount: Optional[float]
confidence: Optional[float]
message: str
@app.post("/analyze", response_model=DocumentResponse)
async def analyze_document(
file: UploadFile = File(..),
background_tasks: BackgroundTasks = None
):
"""
Upload a document image for analysis.
Accepts JPEG, PNG, WebP, BMP, GIF up to 20MB.
"""
# Validate file type
allowed_types = {"image/jpeg", "image/png", "image/webp", "image/bmp", "image/gif"}
if file.content_type not in allowed_types:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type: {file.content_type}. Allowed: {allowed_types}"
)
try:
# Read file
image_bytes = await file.read()
# Preprocess
processed_bytes, mime_type = preprocessor.validate_and_prepare(
image_bytes, file.filename
)
# Extract structured data
extracted = extractor.extract_invoice_data(processed_bytes, mime_type)
if "error" in extracted:
raise HTTPException(status_code=422, detail=extracted["error"])
# Generate embedding for semantic search
# Use a text representation of the extracted data for embedding
text_for_embedding = f"""
Document type: {extracted.get('document_type', 'unknown')}
Vendor: {extracted.get('vendor_name', 'unknown')}
Date: {extracted.get('invoice_date', 'unknown')}
Total: {extracted.get('total_amount', 'unknown')} {extracted.get('currency', '')}
Items: {', '.join([item.get('description', '') for item in extracted.get('line_items', [])])}
"""
embedding_response = embedding_model.generate_content(
[Part.from_text(text_for_embedding)],
generation_config={"temperature": 0.0, "max_output_tokens": 1}
)
# Note: Gemini doesn't expose raw embeddings directly in the same way as text-embedding-*
# For production, use a dedicated embedding model like text-embedding-004
# Here we use a placeholder - in practice, call the embeddings API separately
embedding = np.random.randn(768).astype(np.float32) # Placeholder
# Store in database
doc_id = store.store_document(
filename=file.filename,
extracted=extracted,
embedding=embedding
)
# Optionally schedule background cleanup or re-processing
if background_tasks:
background_tasks.add_task(
log_processing_complete, doc_id, file.filename
)
return DocumentResponse(
id=doc_id,
filename=file.filename,
document_type=extracted.get("document_type"),
vendor_name=extracted.get("vendor_name"),
total_amount=extracted.get("total_amount"),
confidence=extracted.get("confidence"),
message="Document analyzed successfully"
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.exception("Document analysis failed")
raise HTTPException(status_code=500, detail=f"Internal error: {str(e)}")
@app.post("/search")
async def search_documents(query: SearchQuery):
"""
Semantic search over analyzed documents.
Uses text embedding of the query to find similar documents.
"""
try:
# Generate embedding for the query
# Again, use a dedicated embedding model in production
query_embedding = np.random.randn(768).astype(np.float32) # Placeholder
results = store.semantic_search(
query_embedding=query_embedding,
top_k=query.top_k,
doc_type_filter=query.doc_type_filter
)
return {"results": results, "query": query.query}
except Exception as e:
logger.exception("Search failed")
raise HTTPException(status_code=500, detail=str(e))
async def log_processing_complete(doc_id: int, filename: str):
"""Background task to log completion (simulated)."""
await asyncio.sleep(1)
logger.info(f"Processing complete for document {doc_id} ({filename})")
@app.get("/health")
async def health_check():
"""Simple health check endpoint."""
return {"status": "healthy", "model": "gemini-2.0-flash-001"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
Critical note on embeddings: The code above uses placeholder embeddings because Gemini 2.0 Flash does not expose a dedicated embedding endpoint. For production, you have two options:
- Use Google's
text-embedding-004model (available via Vertex AI) for generating 768-dimensional embeddings. - Use a local model like
sentence-transformers/all-MiniLM-L6-v2for offline embedding generation.
The placeholder np.random.randn(768) will produce garbage search results—replace it with actual embedding API calls before deploying.
Edge Cases and Production Considerations
API Rate Limits and Retry Logic
Gemini 2.0 Vision API has rate limits that vary by tier. According to Google Cloud documentation, the free tier allows 60 requests per minute, while paid tiers scale to thousands. Implement exponential backoff:
import time
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60),
reraise=True
)
def call_gemini_with_retry(model, prompt_parts):
return model.generate_content(prompt_parts)
Handling Low-Quality Images
If Gemini returns low confidence (<0.5), trigger reprocessing with enhanced preprocessing:
if extracted.get("confidence", 0) < 0.5:
# Apply adaptive histogram equalization
from PIL import ImageOps
img = ImageOps.equalize(img)
# Retry extraction
extracted = extractor.extract_invoice_data(processed_bytes, mime_type)
Memory Management
Processing large batches of images can exhaust memory. Use streaming and chunking:
async def process_batch(files: List[UploadFile], batch_size: int = 10):
for i in range(0, len(files), batch_size):
batch = files[i:i+batch_size]
tasks = [process_single(file) for file in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results, handle exceptions
yield results
Testing the Application
Start the server and test with a sample invoice image:
uvicorn app:app --reload --host 0.0.0.0 --port 8000
In another terminal:
# Test health endpoint
curl http://localhost:8000/health
# Analyze a document (replace with actual image file)
curl -X POST http://localhost:8000/analyze \
-F "file=@sample_invoice.jpg" \
-H "Content-Type: multipart/form-data"
# Search documents
curl -X POST http://localhost:8000/search \
-H "Content-Type: application/json" \
-d '{"query": "invoices over $5000 from Acme Corp", "top_k": 5}'
What's Next
This tutorial provides a production-ready foundation for multimodal document analysis with Gemini 2.0 Vision. To extend this system:
- Add human-in-the-loop validation: Route low-confidence extractions to a review queue using tools like Label Studio or custom dashboards.
- Implement batch processing: Use Google Cloud Tasks or Celery for asynchronous document processing at scale.
- Fine-tune prompts: Experiment with few-shot examples in the prompt to improve extraction accuracy for your specific document types.
- Monitor costs: Gemini 2.0 Vision pricing is based on input token count (text + image tokens). Track usage with Cloud Monitoring to avoid surprises.
For further reading, explore Google's Gemini documentation and the pgvector GitHub repository for advanced indexing strategies. The combination of multimodal vision models and vector search opens possibilities beyond document analysis—consider applications in visual Q&A, content moderation, or automated data entry from legacy systems.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Analyze Security Logs with DeepSeek Locally
Practical tutorial: Analyze security logs with DeepSeek locally
How to Build an AI Research Assistant with Perplexity API
Practical tutorial: Create an AI research assistant with Perplexity API
How to Debug AI Coding Agents: A Production Guide 2026
Practical tutorial: Discusses a specific usability issue with AI coding agents, which is relevant to developers and the industry.