Back to Tutorials
tutorialstutorialaiapi

How to Build a Multimodal App with Gemini 2.0 Vision API

Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API

BlogIA AcademyMay 23, 202616 min read3 014 words

How to Build a Multimodal App with Gemini 2.0 Vision API

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


The landscape of AI applications is shifting from text-only interactions to multimodal experiences that can understand images, video, and text simultaneously. Google's Gemini 2.0 Vision API represents a significant leap in this direction, offering native multimodal understanding that goes far beyond simple image captioning or OCR. In this tutorial, we'll build a production-grade document intelligence system that can extract, analyze, and query information from complex documents containing charts, tables, handwritten notes, and mixed media.

What makes this approach production-ready is our focus on error handling, rate limiting, cost optimization, and structured output parsing. We'll implement a complete pipeline that processes PDFs and images, extracts structured data using Gemini 2.0's vision capabilities, stores results in a vector database [1] for semantic search, and exposes everything through a FastAPI endpoint.

Understanding Gemini 2.0 Vision Architecture and Production Considerations

Before diving into code, it's crucial to understand how Gemini 2.0 processes multimodal inputs and what that means for production systems. According to Google's official documentation, Gemini 2.0 accepts images, video, and text as direct inputs through a unified API endpoint. Unlike earlier approaches that required separate encoders for each modality, Gemini processes these inputs natively, allowing for cross-modal reasoning.

The key architectural decision you'll face is whether to use the synchronous generate_content method or the streaming variant. For document processing pipelines where you need complete results before proceeding, synchronous calls are appropriate. For real-time applications like live video analysis, streaming provides lower latency. We'll focus on the synchronous approach since it's more common in batch document processing.

Rate limiting is your primary production concern. As of May 2026, the Gemini API enforces rate limits based on your pricing tier. The free tier allows 60 requests per minute, while paid tiers offer higher limits. We'll implement exponential backoff and request queuing to handle these constraints gracefully.

Prerequisites and Environment Setup

You'll need Python 3.10 or later, a Google Cloud project with the Vertex AI API enabled, and billing configured. Let's set up our environment:

# Create a virtual environment
python -m venv gemini_vision_env
source gemini_vision_env/bin/activate  # On Windows: gemini_vision_env\Scripts\activate

# Install core dependencies
pip install google-cloud-aiplatform==1.47.0
pip install google-generativeai==0.8.3
pip install fastapi==0.111.0
pip install uvicorn==0.29.0
pip install pypdf2==3.0.1
pip install pillow==10.3.0
pip install chromadb==0.5.0
pip install python-multipart==0.0.9
pip install pydantic==2.7.1
pip install tenacity==8.3.0
pip install loguru==0.7.2

For authentication, you have two options. The recommended approach for production is using a service account:

# Set up authentication
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"
export GEMINI_API_KEY="your-api-key"  # Alternative for direct API access

If you're using the google-generativeai library directly (not through Vertex AI), you'll need an API key from Google AI Studio. For enterprise deployments, Vertex AI provides better security, monitoring, and compliance features.

Building the Multimodal Document Intelligence Pipeline

Our pipeline will consist of four stages: document preprocessing, multimodal analysis with Gemini 2.0, structured data extraction, and semantic indexing. Let's implement each component with production-grade error handling.

Document Preprocessing and Image Preparation

The first challenge is handling diverse document formats. We'll create a robust preprocessing module that converts PDFs to images, normalizes image sizes, and handles edge cases like corrupted files or password-protected documents.

# preprocessor.py
import io
from pathlib import Path
from typing import List, Optional, Union
from PIL import Image, ImageEnhance, UnidentifiedImageError
import PyPDF2
from loguru import logger

class DocumentPreprocessor:
    """Handles document conversion and image preparation for Gemini Vision API."""

    MAX_IMAGE_SIZE = (2048, 2048)  # Gemini 2.0 recommended max dimensions
    SUPPORTED_EXTENSIONS = {'.pdf', '.png', '.jpg', '.jpeg', '.tiff', '.bmp'}

    def __init__(self, enhance_contrast: bool = True, dpi: int = 200):
        self.enhance_contrast = enhance_contrast
        self.dpi = dpi

    def process_document(self, file_path: Union[str, Path]) -> List[Image.Image]:
        """
        Convert document to list of PIL Images suitable for Gemini API.

        Args:
            file_path: Path to PDF or image file

        Returns:
            List of processed PIL Image objects

        Raises:
            ValueError: If file type is unsupported or file is corrupted
            FileNotFoundError: If file doesn't exist
        """
        path = Path(file_path)

        if not path.exists():
            raise FileNotFoundError(f"Document not found: {path}")

        if path.suffix.lower() not in self.SUPPORTED_EXTENSIONS:
            raise ValueError(f"Unsupported file type: {path.suffix}. "
                           f"Supported: {self.SUPPORTED_EXTENSIONS}")

        try:
            if path.suffix.lower() == '.pdf':
                return self._process_pdf(path)
            else:
                return [self._process_single_image(path)]
        except (PyPDF2.errors.PdfReadError, UnidentifiedImageError) as e:
            logger.error(f"Failed to process {path}: {e}")
            raise ValueError(f"Corrupted or invalid file: {path}") from e

    def _process_pdf(self, path: Path) -> List[Image.Image]:
        """Convert PDF pages to processed images."""
        images = []

        with open(path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)

            if pdf_reader.is_encrypted:
                logger.warning(f"PDF is encrypted: {path}. Attempting empty password..")
                try:
                    pdf_reader.decrypt('')
                except:
                    raise ValueError(f"Cannot process encrypted PDF without password: {path}")

            for page_num, page in enumerate(pdf_reader.pages):
                logger.debug(f"Processing page {page_num + 1}/{len(pdf_reader.pages)}")

                # Extract text for metadata (not for vision analysis)
                text = page.extract_text()

                # Convert page to image using PyPDF2's built-in capabilities
                # Note: For production, consider using pdf2image for better quality
                if '/XObject' in page['/Resources']:
                    xobjects = page['/Resources']['/XObject']
                    for obj_name in xobjects:
                        obj = xobjects[obj_name]
                        if obj['/Subtype'] == '/Image':
                            # Extract embedded images
                            width = obj['/Width']
                            height = obj['/Height']
                            data = obj.get_data()

                            try:
                                img = Image.frombytes('RGB', (width, height), data)
                                processed = self._optimize_image(img)
                                images.append(processed)
                            except Exception as e:
                                logger.warning(f"Failed to extract image from page {page_num}: {e}")

        if not images:
            logger.warning(f"No images extracted from PDF: {path}. "
                          f"Consider using pdf2image library for rasterization.")

        return images

    def _process_single_image(self, path: Path) -> Image.Image:
        """Process a single image file."""
        with Image.open(path) as img:
            img = img.convert('RGB')  # Ensure consistent color mode
            return self._optimize_image(img)

    def _optimize_image(self, image: Image.Image) -> Image.Image:
        """
        Optimize image for Gemini Vision API processing.

        - Resize if too large
        - Enhance contrast for better text recognition
        - Normalize color profile
        """
        # Resize while maintaining aspect ratio
        if image.size[0] > self.MAX_IMAGE_SIZE[0] or image.size[1] > self.MAX_IMAGE_SIZE[1]:
            image.thumbnail(self.MAX_IMAGE_SIZE, Image.Resampling.LANCZOS)
            logger.debug(f"Resized image to {image.size}")

        # Enhance contrast for better OCR and chart reading
        if self.enhance_contrast:
            enhancer = ImageEnhance.Contrast(image)
            image = enhancer.enhance(1.5)

        return image

    def image_to_bytes(self, image: Image.Image, format: str = 'PNG') -> bytes:
        """Convert PIL Image to bytes for API transmission."""
        buffer = io.BytesIO()
        image.save(buffer, format=format, optimize=True)
        return buffer.getvalue()

This preprocessor handles several edge cases that commonly break production pipelines:

  • Encrypted PDFs (attempts empty password)
  • Corrupted image files (catches UnidentifiedImageError)
  • Mixed-format documents (PDFs with embedded images)
  • Memory management (uses thumbnailing for large images)

Core Gemini 2.0 Vision Integration

Now let's implement the core integration with Gemini 2.0's Vision API. We'll use the google-generativeai library with structured output parsing and comprehensive error handling.

# gemini_vision.py
import base64
import json
from typing import Dict, List, Optional, Any
from datetime import datetime
from PIL import Image
import google.generativeai as genai
from google.generativeai.types import HarmCategory, HarmBlockThreshold
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from pydantic import BaseModel, Field
from loguru import logger

# Define structured output schemas
class DocumentMetadata(BaseModel):
    """Extracted document metadata."""
    title: Optional[str] = Field(None, description="Document title or filename")
    document_type: Optional[str] = Field(None, description="Type of document: invoice, report, letter, etc.")
    date: Optional[str] = Field(None, description="Document date if present")
    language: Optional[str] = Field("en", description="Detected language")
    page_count: Optional[int] = Field(None, description="Number of pages")

class ExtractedData(BaseModel):
    """Structured data extracted from document."""
    metadata: DocumentMetadata
    key_findings: List[str] = Field(default_factory=list, description="Key points extracted")
    tables: List[Dict[str, Any]] = Field(default_factory=list, description="Extracted table data")
    numerical_values: List[Dict[str, Any]] = Field(default_factory=list, description="Numbers and statistics")
    handwritten_notes: List[str] = Field(default_factory=list, description="Handwritten text if detected")
    charts_summary: Optional[str] = Field(None, description="Summary of any charts or graphs")

class GeminiVisionClient:
    """Production client for Gemini 2.0 Vision API with structured output."""

    def __init__(self, api_key: Optional[str] = None, model_name: str = "gemini-2.0-flash-exp"):
        """
        Initialize Gemini Vision client.

        Args:
            api_key: Gemini API key. If None, uses GOOGLE_API_KEY env var.
            model_name: Model identifier. Default uses experimental flash for speed.
        """
        if api_key:
            genai.configure(api_key=api_key)

        self.model = genai.GenerativeModel(model_name)

        # Configure safety settings for document processing
        self.safety_settings = {
            HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
            HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
            HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
            HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
        }

        # Generation config for structured output
        self.generation_config = {
            "temperature": 0.2,  # Low temperature for consistent extraction
            "top_p": 0.95,
            "top_k": 40,
            "max_output_tokens": 8192,
        }

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=60),
        retry=retry_if_exception_type((Exception,)),
        before_sleep=lambda retry_state: logger.warning(
            f"Retry {retry_state.attempt_number} after {retry_state.outcome.exception()}"
        )
    )
    def analyze_document(self, image: Image.Image, context: Optional[str] = None) -> ExtractedData:
        """
        Analyze a document image using Gemini 2.0 Vision.

        Args:
            image: PIL Image object of the document page
            context: Optional context about the document type

        Returns:
            ExtractedData with structured information

        Raises:
            Exception: After all retries exhausted
        """
        prompt = self._build_analysis_prompt(context)

        try:
            response = self.model.generate_content(
                [prompt, image],
                generation_config=self.generation_config,
                safety_settings=self.safety_settings,
            )

            # Parse the response
            if not response.candidates:
                logger.error("No candidates returned from Gemini API")
                raise ValueError("Empty response from Gemini API")

            # Extract text from response
            response_text = response.text

            # Try to parse as JSON first (if model returns structured output)
            try:
                # Clean potential markdown code blocks
                if "```json" in response_text:
                    response_text = response_text.split("```json")[1].split("```")[0].strip()
                elif "```" in response_text:
                    response_text = response_text.split("```")[1].split("```")[0].strip()

                data_dict = json.loads(response_text)
                return ExtractedData(**data_dict)
            except (json.JSONDecodeError, Exception) as e:
                logger.warning(f"Failed to parse structured JSON, falling back to text: {e}")
                # Fallback: wrap raw text in structured format
                return ExtractedData(
                    metadata=DocumentMetadata(),
                    key_findings=[response_text]
                )

        except Exception as e:
            logger.error(f"Gemini API call failed: {e}")
            raise

    def _build_analysis_prompt(self, context: Optional[str] = None) -> str:
        """Build the analysis prompt for document understanding."""
        base_prompt = """You are a document analysis expert. Analyze this document image and extract all relevant information.

IMPORTANT: Respond ONLY with a valid JSON object. No markdown, no explanations.

Extract the following structured information:
{
    "metadata": {
        "title": "Document title or filename if visible",
        "document_type": "Type (invoice, report, letter, form, etc.)",
        "date": "Document date if visible (YYYY-MM-DD format)",
        "language": "Detected language code",
        "page_count": null
    },
    "key_findings": [
        "List the 3-5 most important findings or data points"
    ],
    "tables": [
        {
            "headers": ["column1", "column2"],
            "rows": [["value1", "value2"]]
        }
    ],
    "numerical_values": [
        {
            "value": "extracted number",
            "context": "what this number represents",
            "unit": "unit if applicable"
        }
    ],
    "handwritten_notes": [
        "Any handwritten text found in the document"
    ],
    "charts_summary": "Description of any charts, graphs, or visualizations"
}

Be thorough. Extract every visible number, date, and key piece of information."""

        if context:
            base_prompt += f"\n\nAdditional context: {context}"

        return base_prompt

    def analyze_document_batch(self, images: List[Image.Image], 
                              batch_size: int = 5) -> List[ExtractedData]:
        """
        Process multiple document pages with rate limiting.

        Args:
            images: List of PIL Images
            batch_size: Number of concurrent requests (respect API limits)

        Returns:
            List of ExtractedData objects
        """
        results = []

        for i in range(0, len(images), batch_size):
            batch = images[i:i + batch_size]
            logger.info(f"Processing batch {i//batch_size + 1}/{(len(images)-1)//batch_size + 1}")

            for image in batch:
                try:
                    result = self.analyze_document(image)
                    results.append(result)
                except Exception as e:
                    logger.error(f"Failed to process image in batch: {e}")
                    results.append(ExtractedData(
                        metadata=DocumentMetadata(),
                        key_findings=[f"Error: {str(e)}"]
                    ))

        return results

Key production considerations in this implementation:

  1. Retry Logic: Using tenacity with exponential backoff handles transient API failures gracefully. The retry decorator waits 4 seconds initially, then doubles up to 60 seconds max.

  2. Structured Output: We force the model to return JSON by explicitly requesting it in the prompt. This enables programmatic consumption of results without manual parsing.

  3. Safety Settings: For document processing, we disable content filtering since we're analyzing business documents, not generating content. This prevents false positives on technical content.

  4. Fallback Parsing: If JSON parsing fails, we gracefully fall back to wrapping raw text in our structured format rather than crashing.

FastAPI Endpoint with Vector Search

Now let's build the API layer that exposes our pipeline and adds semantic search capabilities using ChromaDB.

# api.py
import io
import uuid
from typing import List, Optional
from datetime import datetime
from fastapi import FastAPI, UploadFile, File, HTTPException, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import chromadb
from chromadb.config import Settings
from loguru import logger

from preprocessor import DocumentPreprocessor
from gemini_vision import GeminiVisionClient, ExtractedData

# Initialize FastAPI
app = FastAPI(
    title="Gemini 2.0 Document Intelligence API",
    version="1.0.0",
    description="Production-grade document analysis with multimodal AI"
)

# Initialize components
preprocessor = DocumentPreprocessor()
vision_client = GeminiVisionClient()

# Initialize ChromaDB for vector storag [2]e
chroma_client = chromadb.Client(Settings(
    chroma_db_impl="duckdb+parquet",
    persist_directory="./chroma_db"
))

# Create or get collection
try:
    collection = chroma_client.get_collection("document_embeddings")
except:
    collection = chroma_client.create_collection("document_embeddings")

# Pydantic models for API
class AnalysisResponse(BaseModel):
    document_id: str
    filename: str
    analysis: ExtractedData
    processed_at: datetime
    pages_analyzed: int

class SearchQuery(BaseModel):
    query: str
    n_results: int = 5

class SearchResult(BaseModel):
    document_id: str
    filename: str
    relevance_score: float
    extracted_data: ExtractedData

@app.post("/analyze", response_model=AnalysisResponse)
async def analyze_document(file: UploadFile = File(..)):
    """
    Upload and analyze a document using Gemini 2.0 Vision.

    Accepts PDF, PNG, JPG, TIFF, BMP files up to 20MB.
    Returns structured analysis with extracted data.
    """
    # Validate file size (20MB limit)
    contents = await file.read()
    if len(contents) > 20 * 1024 * 1024:
        raise HTTPException(status_code=413, detail="File too large. Maximum 20MB.")

    # Generate unique document ID
    document_id = str(uuid.uuid4())

    try:
        # Save uploaded file temporarily
        temp_path = f"/tmp/{document_id}_{file.filename}"
        with open(temp_path, "wb") as f:
            f.write(contents)

        # Process document
        logger.info(f"Processing document: {file.filename}")
        images = preprocessor.process_document(temp_path)

        if not images:
            raise HTTPException(
                status_code=400, 
                detail="No processable images found in document"
            )

        # Analyze with Gemini Vision
        logger.info(f"Analyzing {len(images)} pages with Gemini 2.0 Vision")
        results = vision_client.analyze_document_batch(images)

        # Combine results from multiple pages
        combined_analysis = self._combine_results(results, file.filename)

        # Store in vector database for semantic search
        self._index_document(document_id, file.filename, combined_analysis)

        # Clean up temp file
        import os
        os.remove(temp_path)

        return AnalysisResponse(
            document_id=document_id,
            filename=file.filename,
            analysis=combined_analysis,
            processed_at=datetime.utcnow(),
            pages_analyzed=len(images)
        )

    except ValueError as e:
        logger.error(f"Validation error: {e}")
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        logger.error(f"Analysis failed: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Internal analysis error")

@app.post("/search", response_model=List[SearchResult])
async def search_documents(query: SearchQuery):
    """
    Semantic search across analyzed documents.

    Uses ChromaDB vector similarity to find relevant documents
    based on natural language queries.
    """
    try:
        # Generate embedding for query using Gemini
        query_embedding = vision_client.model.embed_content(
            content=query.query,
            task_type="retrieval_query"
        )

        # Search vector database
        results = collection.query(
            query_embeddings=[query_embedding.embedding],
            n_results=query.n_results
        )

        # Format results
        search_results = []
        for i in range(len(results['ids'][0])):
            search_results.append(SearchResult(
                document_id=results['ids'][0][i],
                filename=results['metadatas'][0][i].get('filename', 'Unknown'),
                relevance_score=1 - results['distances'][0][i],  # Convert distance to similarity
                extracted_data=ExtractedData(**json.loads(
                    results['documents'][0][i]
                ))
            ))

        return search_results

    except Exception as e:
        logger.error(f"Search failed: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Search failed")

def _combine_results(self, results: List[ExtractedData], filename: str) -> ExtractedData:
    """Combine analysis results from multiple pages."""
    if not results:
        return ExtractedData(metadata=DocumentMetadata(title=filename))

    combined = results[0]
    combined.metadata.title = filename
    combined.metadata.page_count = len(results)

    # Merge findings from all pages
    for result in results[1:]:
        combined.key_findings.extend(result.key_findings)
        combined.tables.extend(result.tables)
        combined.numerical_values.extend(result.numerical_values)
        combined.handwritten_notes.extend(result.handwritten_notes)

    return combined

def _index_document(self, document_id: str, filename: str, analysis: ExtractedData):
    """Store document analysis in vector database."""
    # Convert analysis to text for embedding
    analysis_text = json.dumps(analysis.dict())

    # Generate embedding
    embedding = vision_client.model.embed_content(
        content=analysis_text,
        task_type="retrieval_document"
    )

    # Store in ChromaDB
    collection.add(
        embeddings=[embedding.embedding],
        documents=[analysis_text],
        metadatas=[{
            "filename": filename,
            "document_id": document_id,
            "processed_at": datetime.utcnow().isoformat()
        }],
        ids=[document_id]
    )

Running the Production Pipeline

To run the complete application:

# main.py
import uvicorn
from api import app

if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8000,
        reload=False,  # Disable in production
        workers=4,     # Adjust based on CPU cores
        log_level="info"
    )

Start the server:

python main.py

Test the endpoint:

# Analyze a document
curl -X POST http://localhost:8000/analyze \
  -F "file=@/path/to/invoice.pdf"

# Search analyzed documents
curl -X POST http://localhost:8000/search \
  -H "Content-Type: application/json" \
  -d '{"query": "What invoices mention project Alpha?", "n_results": 5}'

Edge Cases and Production Hardening

Memory Management

When processing large PDFs (100+ pages), memory usage can spike. Implement streaming processing:

async def process_large_document(file_path: str, max_pages: int = 50):
    """Process large documents in chunks to manage memory."""
    images = []
    with open(file_path, 'rb') as f:
        pdf_reader = PyPDF2.PdfReader(f)
        total_pages = len(pdf_reader.pages)

        if total_pages > max_pages:
            logger.warning(f"Document has {total_pages} pages, processing first {max_pages}")

        for page_num in range(min(total_pages, max_pages)):
            # Process page and immediately release memory
            page = pdf_reader.pages[page_num]
            img = convert_page_to_image(page)
            images.append(img)

            # Process in batches of 10
            if len(images) >= 10:
                yield images
                images = []

        if images:
            yield images

API Cost Optimization

Gemini 2.0 Vision pricing is based on input tokens (images cost more than text). Optimize by:

  1. Resizing images to minimum required resolution
  2. Caching results for duplicate documents using content hashing
  3. Batching requests within rate limits
import hashlib

def get_document_hash(contents: bytes) -> str:
    """Generate content hash for deduplication."""
    return hashlib.sha256(contents).hexdigest()

# Check cache before processing
doc_hash = get_document_hash(contents)
cached_result = cache.get(doc_hash)
if cached_result:
    return cached_result

What's Next

This production pipeline demonstrates how to build a multimodal document intelligence system using Gemini 2.0 Vision API. The architecture handles real-world challenges like rate limiting, structured output parsing, and semantic search integration.

To extend this system:

  1. Add support for video analysis using Gemini's video understanding capabilities
  2. Implement real-time processing with WebSocket endpoints for streaming results
  3. Add user authentication and multi-tenant document isolation
  4. Integrate with cloud storage (GCS, S3) for document persistence
  5. Add monitoring with Prometheus metrics and structured logging

The complete code is available on GitHub. For more tutorials on building AI-powered applications, check out our guides on production AI pipelines and multimodal model deployment.

Remember that while Gemini 2.0 Vision is powerful, it's not infallible. Always validate extracted data against business rules, especially for financial or legal documents. The combination of structured prompting, vector search, and human-in-the-loop validation creates the most reliable production system.


References

1. Wikipedia - Vector database. Wikipedia. [Source]
2. Wikipedia - Rag. Wikipedia. [Source]
3. Wikipedia - Gemini. Wikipedia. [Source]
4. GitHub - milvus-io/milvus. Github. [Source]
5. GitHub - Shubhamsaboo/awesome-llm-apps. Github. [Source]
6. GitHub - google-gemini/gemini-cli. Github. [Source]
7. GitHub - chroma-core/chroma. Github. [Source]
8. Google Gemini Pricing. Pricing. [Source]
tutorialaiapi
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles