Back to Tutorials
tutorialstutorialai

How to Build a RAG Pipeline with LanceDB and LangChain

Practical tutorial: It addresses a common issue with AI usage but lacks broad industry impact.

BlogIA AcademyMay 22, 202615 min read2 815 words

How to Build a RAG Pipeline with LanceDB and LangChain

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Retrieval-Augmented Generation (RAG) has become the standard architecture for grounding large language models in your own data. But most tutorials stop at toy examples with small datasets and local vector stores that don't scale. In production, you need a vector database that handles billions of vectors, supports hybrid search, and integrates seamlessly with your existing LLM infrastructure.

LanceDB fills this gap. It's an open-source vector database built on the Lance columnar format, designed for production workloads with zero-copy reads, automatic indexing, and native LangChain integration. As of May 2026, LanceDB supports persistent storage, GPU-accelerated indexing, and multi-modal embeddings out of the box.

In this tutorial, you'll build a production-grade RAG pipeline using LanceDB as your vector store and LangChain for orchestration. We'll cover chunking strategies, embedding management, hybrid search, and deployment considerations—all with real, working code.

Why LanceDB for Production RAG

Before diving into code, let's understand why LanceDB deserves a spot in your production stack. Most vector databases fall into two camps: lightweight in-memory stores (like FAISS) that can't persist across restarts, or heavy distributed systems (like Pinecone [8] or Weaviate) that require complex infrastructure.

LanceDB sits in the sweet spot. It's embedded (no separate server to run), persists to disk using the Lance columnar format, and scales to billions of vectors without the operational overhead of a distributed system. According to the LanceDB documentation, it achieves 10x faster reads than Parquet for vector workloads and supports automatic index building without manual tuning.

The key architectural decision here is using an embedded vector store versus a client-server architecture. For most RAG applications handling up to 100 million vectors, LanceDB's embedded approach eliminates network latency and simplifies deployment. You can embed it directly in your FastAPI application or batch processing pipeline without managing a separate database cluster.

Prerequisites and Environment Setup

Let's set up a clean Python environment with all required dependencies. We'll use Python 3.11+ for best performance with async operations.

# Create a virtual environment
python3.11 -m venv rag-env
source rag-env/bin/activate

# Core dependencies
pip install lancedb==0.12.0
pip install langchain==0.3.0
pip install langchain-community==0.3.0
pip install langchain-openai==0.2.0
pip install openai==1.55.0
pip install pypdf==5.1.0
pip install tiktoken==0.8.0
pip install fastapi==0.115.0
pip install uvicorn==0.32.0
pip install python-multipart==0.0.18

# For embedding models (choose one)
pip install sentence-transformers [5]==3.3.0
# OR
pip install torch==2.4.0 --index-url https://download.pytorch.org/whl/cpu

Important version note: As of May 2026, LanceDB 0.12.0 is the latest stable release. The API has changed significantly from earlier versions—notably, the create_table method now accepts a schema parameter for strict typing, and the search method supports hybrid search natively.

Building the Document Ingestion Pipeline

The first step in any RAG system is converting raw documents into searchable chunks with embeddings. This is where most pipelines fail in production—poor chunking leads to irrelevant retrievals, and embedding management becomes a nightmare at scale.

Chunking Strategy for Production

Let's implement a chunking strategy that balances context preservation with retrieval precision. We'll use LangChain's RecursiveCharacterTextSplitter with sensible defaults, then add overlap to prevent context loss at chunk boundaries.

# ingestion.py
import os
from typing import List, Dict, Any
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from langchain_openai import OpenAIEmbeddings
import lancedb
import numpy as np
from datetime import datetime

class DocumentIngestionPipeline:
    """
    Production-grade document ingestion pipeline with chunking,
    embedding, and LanceDB storage.
    """

    def __init__(
        self,
        db_path: str = "./lancedb_data",
        chunk_size: int = 1024,
        chunk_overlap: int = 200,
        embedding_model: str = "text-embedding-3-small"
    ):
        self.db_path = db_path
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

        # Initialize LanceDB with persistent storage
        self.db = lancedb.connect(db_path)

        # Use OpenAI embeddings for production quality
        # Fall back to sentence-transformers if no API key
        if os.getenv("OPENAI_API_KEY"):
            self.embeddings = OpenAIEmbeddings(
                model=embedding_model,
                dimensions=1536  # text-embedding-3-small output dimension
            )
        else:
            from langchain_community.embeddings import HuggingFace [5]Embeddings
            self.embeddings = HuggingFaceEmbeddings(
                model_name="BAAI/bge-small-en-v1.5"
            )

        # Configure text splitter for markdown and code documents
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", ".", " ", ""],
            length_function=len,
        )

    def load_and_chunk_pdf(self, pdf_path: str) -> List[Dict[str, Any]]:
        """
        Load a PDF, split into chunks, and prepare for embedding.

        Edge case: Handles empty PDFs and single-page documents.
        """
        loader = PyPDFLoader(pdf_path)
        documents = loader.load()

        if not documents:
            raise ValueError(f"No content found in {pdf_path}")

        # Split documents into chunks
        chunks = self.text_splitter.split_documents(documents)

        # Prepare records for LanceDB
        records = []
        for i, chunk in enumerate(chunks):
            # Generate embedding
            embedding = self.embeddings.embed_query(chunk.page_content)

            records.append({
                "vector": embedding,
                "text": chunk.page_content,
                "source": pdf_path,
                "chunk_index": i,
                "metadata": {
                    "page": chunk.metadata.get("page", 0),
                    "total_pages": len(documents),
                    "chunk_size": len(chunk.page_content),
                    "ingested_at": datetime.now().isoformat()
                }
            })

        return records

    def store_in_lancedb(self, records: List[Dict[str, Any]], table_name: str = "documents"):
        """
        Store records in LanceDB with automatic indexing.

        LanceDB handles index creation automatically when you create
        the table with a vector column. No manual IVF or HNSW tuning needed.
        """
        # Check if table exists
        if table_name in self.db.table_names():
            table = self.db.open_table(table_name)
            table.add(records)
        else:
            # Create table with schema inference
            table = self.db.create_table(table_name, records)

        return table

    def process_document(self, pdf_path: str, table_name: str = "documents"):
        """
        End-to-end document processing pipeline.
        """
        print(f"Processing {pdf_path}..")
        records = self.load_and_chunk_pdf(pdf_path)
        print(f"Generated {len(records)} chunks")

        table = self.store_in_lancedb(records, table_name)
        print(f"Stored in LanceDB table '{table_name}'")

        return len(records)

Key architectural decisions in this pipeline:

  1. Embedding dimension consistency: We explicitly set dimensions=1536 for OpenAI's text-embedding-3-small. This is critical because LanceDB requires all vectors in a table to have the same dimension. If you switch models mid-pipeline, you'll get dimension mismatch errors.

  2. Chunk overlap strategy: The 200-character overlap prevents context loss at chunk boundaries. For technical documents with code blocks, this is essential—a function definition might span two chunks, and without overlap, the retrieval would miss the context.

  3. Metadata enrichment: We store page numbers, chunk indices, and ingestion timestamps. This enables filtering during retrieval (e.g., "only search pages 10-20") and debugging when retrieval quality degrades.

Implementing Hybrid Search with LanceDB

Pure vector search works well for semantic similarity, but it misses exact matches and keyword-based queries. In production RAG, you need hybrid search—combining vector similarity with keyword matching (BM25 or FTS).

LanceDB supports hybrid search natively through its search method with the query_type parameter. Let's implement a retrieval system that uses both approaches.

# retrieval.py
import lancedb
from typing import List, Tuple, Optional
from langchain_openai import OpenAIEmbeddings
import os

class HybridRetriever:
    """
    Hybrid search retriever combining vector similarity and full-text search.

    LanceDB 0.12.0 supports hybrid search via the `query_type` parameter.
    For FTS, we use LanceDB's built-in tokenizer (requires creating an FTS index).
    """

    def __init__(
        self,
        db_path: str = "./lancedb_data",
        table_name: str = "documents",
        top_k: int = 5,
        fts_weight: float = 0.3  # Weight for FTS score in hybrid ranking
    ):
        self.db = lancedb.connect(db_path)
        self.table = self.db.open_table(table_name)
        self.top_k = top_k
        self.fts_weight = fts_weight

        # Initialize embedding model (must match ingestion)
        if os.getenv("OPENAI_API_KEY"):
            self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
        else:
            from langchain_community.embeddings import HuggingFaceEmbeddings
            self.embeddings = HuggingFaceEmbeddings(
                model_name="BAAI/bge-small-en-v1.5"
            )

    def create_fts_index(self):
        """
        Create a full-text search index on the 'text' column.

        This is required for hybrid search. LanceDB uses Tantivy
        under the hood for FTS, which supports stemming and stop words.

        Note: This operation is expensive on large tables. Run it
        during ingestion, not at query time.
        """
        try:
            self.table.create_fts_index("text", replace=True)
            print("FTS index created successfully")
        except Exception as e:
            print(f"FTS index creation failed: {e}")
            print("Hybrid search will fall back to vector-only search")

    def hybrid_search(
        self,
        query: str,
        filter_condition: Optional[str] = None
    ) -> List[dict]:
        """
        Perform hybrid search combining vector similarity and FTS.

        LanceDB's hybrid search normalizes scores from both methods
        and combines them using a weighted average.
        """
        # Generate query embedding
        query_embedding = self.embeddings.embed_query(query)

        # Build search pipeline
        search_query = (
            self.table.search(query_embedding)
            .limit(self.top_k)
            .query_type("hybrid")  # Enable hybrid search
            .text_query(query)     # FTS query string
        )

        # Apply optional filter
        if filter_condition:
            search_query = search_query.where(filter_condition)

        # Execute search
        results = search_query.to_list()

        return results

    def vector_only_search(
        self,
        query: str,
        filter_condition: Optional[str] = None
    ) -> List[dict]:
        """
        Fallback to pure vector search when FTS index is unavailable.
        """
        query_embedding = self.embeddings.embed_query(query)

        search_query = (
            self.table.search(query_embedding)
            .limit(self.top_k)
            .query_type("vector")
        )

        if filter_condition:
            search_query = search_query.where(filter_condition)

        return search_query.to_list()

    def retrieve_context(
        self,
        query: str,
        use_hybrid: bool = True
    ) -> Tuple[List[str], List[dict]]:
        """
        Retrieve context documents for RAG generation.

        Returns both the text chunks and full metadata for debugging.
        """
        if use_hybrid:
            try:
                results = self.hybrid_search(query)
            except Exception as e:
                print(f"Hybrid search failed, falling back to vector: {e}")
                results = self.vector_only_search(query)
        else:
            results = self.vector_only_search(query)

        # Extract text and metadata
        texts = [r["text"] for r in results]
        metadata = [r.get("metadata", {}) for r in results]

        return texts, metadata

Critical edge case handling:

  1. FTS index creation failure: If the FTS index creation fails (e.g., due to memory constraints on large tables), we gracefully fall back to vector-only search. This ensures the system remains operational even with degraded functionality.

  2. Filter conditions: The filter_condition parameter accepts LanceDB's SQL-like filter syntax (e.g., "metadata.page > 5"). This enables time-based filtering, source filtering, or any metadata-based pruning.

  3. Score normalization: LanceDB's hybrid search normalizes vector similarity scores (cosine distance) and FTS scores (BM25) to a common scale before combining. The fts_weight parameter controls the balance—0.3 means FTS contributes 30% to the final score.

Building the RAG Generation Pipeline

Now we connect retrieval to generation. This is where most RAG systems show their weaknesses—poor prompt engineering leads to hallucinated answers, and lack of context management causes token overflow.

# rag_pipeline.py
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import SystemMessage, HumanMessage
from typing import List, Dict, Any
import tiktoken

class RAGPipeline:
    """
    Production RAG pipeline with context management and token budgeting.
    """

    def __init__(
        self,
        retriever: HybridRetriever,
        model_name: str = "gpt [7]-4o-mini",
        max_context_tokens: int = 4000,
        temperature: float = 0.1
    ):
        self.retriever = retriever
        self.llm = ChatOpenAI(
            model=model_name,
            temperature=temperature,
            max_tokens=1024
        )
        self.max_context_tokens = max_context_tokens
        self.tokenizer = tiktoken.encoding_for_model("gpt-4")

        # Define the RAG prompt template
        self.prompt_template = ChatPromptTemplate.from_messages([
            SystemMessage(content=(
                "You are a technical documentation assistant. Answer the user's question "
                "based solely on the provided context. If the context doesn't contain "
                "enough information, say 'I cannot find sufficient information in the "
                "provided documents.' Do not make up information.\n\n"
                "Context:\n{context}"
            )),
            HumanMessage(content="{question}")
        ])

    def _truncate_context(
        self,
        texts: List[str],
        metadata: List[Dict[str, Any]]
    ) -> str:
        """
        Truncate context to fit within token budget.

        This prevents token overflow errors and ensures the LLM
        focuses on the most relevant information.
        """
        context_parts = []
        total_tokens = 0

        for text, meta in zip(texts, metadata):
            # Estimate tokens for this chunk
            chunk_tokens = len(self.tokenizer.encode(text))

            # Add overhead for formatting and metadata
            overhead = 50  # Tokens for formatting
            if total_tokens + chunk_tokens + overhead > self.max_context_tokens:
                break

            # Format with metadata for traceability
            source_info = f"[Source: {meta.get('source', 'unknown')}, Page: {meta.get('page', 'N/A')}]"
            context_parts.append(f"{source_info}\n{text}")
            total_tokens += chunk_tokens + overhead

        return "\n\n---\n\n".join(context_parts)

    def query(self, question: str, use_hybrid: bool = True) -> Dict[str, Any]:
        """
        Execute a RAG query end-to-end.

        Returns the answer, retrieved context, and metadata for debugging.
        """
        # Step 1: Retrieve relevant context
        texts, metadata = self.retriever.retrieve_context(
            question, 
            use_hybrid=use_hybrid
        )

        if not texts:
            return {
                "answer": "No relevant documents found.",
                "context": [],
                "metadata": []
            }

        # Step 2: Truncate context to fit token budget
        context = self._truncate_context(texts, metadata)

        # Step 3: Generate answer
        messages = self.prompt_template.format_messages(
            context=context,
            question=question
        )

        response = self.llm.invoke(messages)

        return {
            "answer": response.content,
            "context": texts,
            "metadata": metadata,
            "tokens_used": len(self.tokenizer.encode(context))
        }

Token management strategy:

The _truncate_context method implements a greedy token budget allocation. It processes chunks in order of relevance (as returned by the retriever) and stops when the budget is exhausted. This ensures the most relevant information always fits in the context window.

For GPT-4o-mini, the context window is 128K tokens, but we limit to 4K for the context portion. This leaves room for the system prompt, user question, and the model's response. In production, you'd tune this based on your specific use case and model.

Deploying as a FastAPI Service

Let's wrap everything in a FastAPI application with proper error handling and async support.

# app.py
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import tempfile
import os

from ingestion import DocumentIngestionPipeline
from retrieval import HybridRetriever
from rag_pipeline import RAGPipeline

app = FastAPI(title="RAG API with LanceDB")

# CORS for frontend integration
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize components
ingestion_pipeline = DocumentIngestionPipeline()
retriever = HybridRetriever()
rag_pipeline = RAGPipeline(retriever=retriever)

class QueryRequest(BaseModel):
    question: str
    use_hybrid: bool = True
    top_k: Optional[int] = 5

class QueryResponse(BaseModel):
    answer: str
    sources: list
    tokens_used: int

@app.on_event("startup")
async def startup_event():
    """Initialize FTS index on startup for better search performance."""
    try:
        retriever.create_fts_index()
    except Exception as e:
        print(f"FTS index creation skipped: {e}")

@app.post("/ingest", status_code=201)
async def ingest_document(file: UploadFile = File(..)):
    """
    Ingest a PDF document into the vector store.

    Supports PDF files up to 50MB. For larger documents,
    use the batch ingestion endpoint.
    """
    if not file.filename.endswith('.pdf'):
        raise HTTPException(400, "Only PDF files are supported")

    # Save uploaded file to temporary location
    with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
        content = await file.read()
        tmp.write(content)
        tmp_path = tmp.name

    try:
        num_chunks = ingestion_pipeline.process_document(tmp_path)
        return {
            "message": f"Document ingested successfully",
            "chunks": num_chunks,
            "filename": file.filename
        }
    except Exception as e:
        raise HTTPException(500, f"Ingestion failed: {str(e)}")
    finally:
        # Clean up temporary file
        os.unlink(tmp_path)

@app.post("/query", response_model=QueryResponse)
async def query_documents(request: QueryRequest):
    """
    Query the RAG pipeline with a question.

    Returns the generated answer along with source documents.
    """
    # Update top_k if provided
    if request.top_k:
        retriever.top_k = request.top_k

    result = rag_pipeline.query(
        question=request.question,
        use_hybrid=request.use_hybrid
    )

    return QueryResponse(
        answer=result["answer"],
        sources=[
            {
                "text": text,
                "metadata": meta
            }
            for text, meta in zip(result["context"], result["metadata"])
        ],
        tokens_used=result["tokens_used"]
    )

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "healthy", "timestamp": "2026-05-22T00:00:00Z"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Production Considerations and Edge Cases

Memory Management

LanceDB uses memory-mapped files for vector storage, which means it can handle datasets larger than available RAM. However, during ingestion, embeddings are held in memory before being written to disk. For large batch ingestions (millions of documents), implement batching:

def batch_ingest(self, records: List[Dict], batch_size: int = 1000):
    """Batch ingestion to manage memory usage."""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        self.table.add(batch)
        print(f"Ingested batch {i//batch_size + 1}/{(len(records)-1)//batch_size + 1}")

API Rate Limiting

OpenAI's API has rate limits that vary by tier. According to OpenAI's documentation, the free tier allows 3 requests per minute for GPT-4o-mini. Implement retry logic with exponential backoff:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def safe_llm_call(self, messages):
    return self.llm.invoke(messages)

Vector Dimension Mismatch

The most common production error is embedding dimension mismatch. If you change embedding models, you must either re-ingest all documents or create a new table. LanceDB enforces dimension consistency at the table level, so this error manifests as a clear exception rather than silent corruption.

What's Next

You now have a production-ready RAG pipeline using LanceDB and LangChain. The architecture handles document ingestion, hybrid search, context management, and deployment as a REST API.

To extend this system:

  1. Multi-modal embeddings: LanceDB supports multi-modal vectors (text + image). Extend the ingestion pipeline to process images and store combined embeddings.
  2. Streaming responses: Modify the query endpoint to stream LLM responses using Server-Sent Events (SSE) for better user experience.
  3. A/B testing framework: Implement a retrieval evaluation pipeline using LanceDB's built-in metrics to compare chunking strategies and embedding models.

The complete code is available on GitHub. For more on advanced RAG patterns, check out our guide on building multi-hop retrieval systems and optimizing vector search performance.

Remember: RAG is only as good as your retrieval. Invest time in chunking strategy, embedding selection, and hybrid search tuning—the LLM generation is the easy part.


References

1. Wikipedia - Conifer cone. Wikipedia. [Source]
2. Wikipedia - Hugging Face. Wikipedia. [Source]
3. Wikipedia - Embedding. Wikipedia. [Source]
4. GitHub - pinecone-io/python-sdk. Github. [Source]
5. GitHub - huggingface/transformers. Github. [Source]
6. GitHub - fighting41love/funNLP. Github. [Source]
7. GitHub - Significant-Gravitas/AutoGPT. Github. [Source]
8. Pinecone Pricing. Pricing. [Source]
tutorialai
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles