Back to Tutorials
tutorialstutorialai

How to Build a RAG Pipeline with LangChain and LanceDB

Practical tutorial: The story highlights a trend in the industry rather than a specific product launch or major technological breakthrough.

BlogIA AcademyMay 25, 202613 min read2 527 words

How to Build a RAG Pipeline with LangChain and LanceDB

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Retrieval-Augmented Generation (RAG) has become the de facto architecture for production AI systems that need to ground LLM responses in private or domain-specific data. The industry trend is clear: teams are moving away from monolithic fine-tuning toward modular retrieval pipelines that offer better data freshness, lower costs, and easier debugging.

In this tutorial, you'll build a production-grade RAG pipeline using LangChain for orchestration and LanceDB for vector storage. LanceDB's columnar storage format and zero-copy reads make it particularly well-suited for large-scale document retrieval workloads. By the end, you'll have a working system that can ingest PDFs, chunk them intelligently, embed them, and answer questions with citations.

Real-World Use Case and Architecture

Consider a legal document review system. A law firm needs to search through thousands of contracts, deposition transcripts, and case law PDFs. The system must return answers with exact source locations, handle multi-page documents, and scale to millions of chunks without breaking the bank on vector database [3] costs.

The architecture we'll build consists of four layers:

  1. Ingestion Pipeline: PDF parsing, text extraction, and intelligent chunking
  2. Embedding Service: Converting text chunks to vector representations
  3. Vector Store: LanceDB for efficient similarity search with metadata filtering
  4. Generation Layer: LangChain chains that combine retrieval with LLM completion

This separation of concerns allows each component to be independently scaled, tested, and replaced. For example, you could swap the embedding model without touching the vector store, or change the LLM provider without re-indexing your documents.

Prerequisites and Environment Setup

Before writing any code, set up your Python environment. We'll use Python 3.11+ and the following packages:

python -m venv rag_env
source rag_env/bin/activate  # On Windows: rag_env\Scripts\activate

pip install langchain==0.3.14 \
    langchain-community==0.3.14 \
    lancedb==0.12.0 \
    pypdf==5.1.0 \
    sentence-transformers [7]==3.3.1 \
    fastapi==0.115.6 \
    uvicorn==0.34.0 \
    pydantic==2.10.3

Why these specific versions? As of May 2026, these are the latest stable releases. LangChain 0.3.x introduced significant improvements to document transformer interfaces, and LanceDB 0.12.0 added native support for the sentence-transformers integration without requiring a separate embedding service.

Create a project structure:

rag-pipeline/
├── app/
│   ├── __init__.py
│   ├── main.py          # FastAPI application
│   ├── ingestion.py     # PDF processing and chunking
│   ├── vector_store.py  # LanceDB operations
│   ├── chain.py         # LangChain RAG chain
│   └── models.py        # Pydantic schemas
├── data/
│   └── documents/       # Place PDFs here
├── .env                 # API keys
└── requirements.txt

Building the Ingestion Pipeline with Smart Chunking

The quality of your RAG system depends almost entirely on how you chunk your documents. Naive fixed-size chunking loses semantic boundaries. We'll implement recursive character text splitting with overlap, which preserves paragraph and sentence boundaries while maintaining context between chunks.

# app/ingestion.py
import os
from typing import List
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document

class DocumentIngestionPipeline:
    """
    Handles PDF ingestion with intelligent chunking.

    Key design decisions:
    - Uses RecursiveCharacterTextSplitter to respect natural text boundaries
    - Chunk size of 1000 characters with 200 character overlap
    - Preserves metadata (source file, page number) for citation tracking
    """

    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""],
            keep_separator=True,
        )

    def load_pdf(self, file_path: str) -> List[Document]:
        """
        Load a PDF file and extract text with page metadata.

        Edge case: Handles PDFs with no extractable text by raising
        a descriptive error rather than silently failing.
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"PDF file not found: {file_path}")

        loader = PyPDFLoader(file_path)
        documents = loader.load()

        if not documents:
            raise ValueError(
                f"No text could be extracted from {file_path}. "
                "The PDF may be scanned or image-based."
            )

        return documents

    def chunk_documents(self, documents: List[Document]) -> List[Document]:
        """
        Split documents into overlapping chunks while preserving metadata.

        Memory consideration: For very large documents (>1000 pages),
        consider processing in batches to avoid OOM errors.
        """
        chunked_docs = self.text_splitter.split_documents(documents)

        # Add chunk index metadata for traceability
        for i, doc in enumerate(chunked_docs):
            doc.metadata["chunk_index"] = i
            doc.metadata["total_chunks"] = len(chunked_docs)

        return chunked_docs

    def process_pdf(self, file_path: str) -> List[Document]:
        """Complete pipeline: load, chunk, and return documents."""
        docs = self.load_pdf(file_path)
        return self.chunk_documents(docs)

Why RecursiveCharacterTextSplitter? According to LangChain's documentation, this splitter attempts to split on natural boundaries (paragraphs, then sentences, then words) before falling back to character-level splitting. This produces chunks that are more semantically coherent than fixed-size splitting.

Edge case handling: The pipeline checks for empty documents (scanned PDFs) and raises a clear error. In production, you'd want to add OCR fallback using something like pytesseract for image-based PDFs.

Setting up the LanceDB Vector Store

LanceDB stores vectors in Apache Lance format, which uses columnar storage and supports zero-copy reads. This means you can query billions of vectors without loading them all into memory. We'll configure it with cosine similarity and metadata filtering.

# app/vector_store.py
import lancedb
import pyarrow as pa
from typing import List, Optional, Dict, Any
from langchain_core.documents import Document
from langchain_community.vectorstores import LanceDB
from langchain_community.embeddings import HuggingFace [7]Embeddings

class VectorStoreManager:
    """
    Manages LanceDB vector store operations.

    Architecture decisions:
    - Uses 'all-MiniLM-L6-v2' embedding model (384 dimensions)
    - Creates table with explicit schema for type safety
    - Enables IVF-PQ indexing for fast approximate search
    """

    def __init__(self, db_path: str = "./lancedb_data"):
        self.db_path = db_path
        self.db = lancedb.connect(db_path)

        # Using a lightweight but effective embedding model
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2",
            model_kwargs={"device": "cpu"},
            encode_kwargs={"normalize_embeddings": True},
        )

        self.table_name = "documents"
        self.vector_store = None

    def create_table(self, overwrite: bool = False):
        """
        Create or recreate the vector store table.

        Schema includes:
        - vector: Fixed-size list of 384 floats
        - text: The chunk content
        - metadata: JSON-compatible dict
        - id: Unique identifier for deduplication
        """
        if overwrite and self.table_name in self.db.table_names():
            self.db.drop_table(self.table_name)

        # Define schema explicitly for production reliability
        schema = pa.schema([
            pa.field("vector", pa.list_(pa.float32(), 384)),
            pa.field("text", pa.string()),
            pa.field("metadata", pa.string()),  # JSON string
            pa.field("id", pa.string()),
        ])

        self.db.create_table(
            self.table_name,
            schema=schema,
            mode="overwrite" if overwrite else "create",
        )

    def index_documents(self, documents: List[Document]):
        """
        Embed and store documents in LanceDB.

        Performance note: For >100k documents, batch the embeddings
        to avoid memory spikes. The HuggingFaceEmbeddings class
        handles batching internally with a default batch_size of 32.
        """
        if not documents:
            return

        # Create vector store from documents
        self.vector_store = LanceDB.from_documents(
            documents=documents,
            embedding=self.embeddings,
            connection=self.db,
            table_name=self.table_name,
        )

        # Create IVF-PQ index for fast approximate search
        # This is critical for production performance
        table = self.db.open_table(self.table_name)
        table.create_index(
            metric="cosine",
            num_partitions=256,  # For up to 1M vectors
            num_sub_vectors=96,  # PQ compression
        )

    def similarity_search(
        self,
        query: str,
        k: int = 4,
        filter_expr: Optional[str] = None,
    ) -> List[Document]:
        """
        Perform similarity search with optional metadata filtering.

        Args:
            query: Natural language query
            k: Number of results to return
            filter_expr: LanceDB filter expression (e.g., "metadata.page > 5")

        Returns:
            List of Document objects with relevance scores in metadata
        """
        if not self.vector_store:
            raise RuntimeError("Vector store not initialized. Call index_documents first.")

        results = self.vector_store.similarity_search_with_score(
            query,
            k=k,
            filter=filter_expr,
        )

        documents = []
        for doc, score in results:
            doc.metadata["relevance_score"] = float(score)
            documents.append(doc)

        return documents

Why IVF-PQ indexing? LanceDB's IVF (Inverted File Index) with PQ (Product Quantization) compression reduces search latency from O(n) to O(log n) for approximate nearest neighbor search. The num_partitions=256 setting works well for up to 1 million vectors. For larger datasets, increase this to 1024 or more.

Memory management: LanceDB's zero-copy reads mean you can have a 10GB dataset while using only a few hundred MB of RAM. The index is memory-mapped, so only the accessed pages are loaded.

Building the RAG Chain with LangChain

Now we connect retrieval to generation. We'll use LangChain's expression language (LCEL) to create a composable chain that handles the full RAG flow: retrieve relevant chunks, format them as context, and generate an answer with citations.

# app/chain.py
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from typing import List
from langchain_core.documents import Document

class RAGChain:
    """
    Production RAG chain with citation support.

    Design decisions:
    - Uses LCEL for type-safe, composable chains
    - Includes source tracking for every generated statement
    - Handles empty retrieval gracefully
    """

    def __init__(self, vector_store_manager, model_name: str = "gpt [5]-4o-mini"):
        self.vector_store = vector_store_manager

        # Using GPT-4o-mini for cost-effective generation
        # As of May 2026, this model costs $0.15/1M input tokens
        self.llm = ChatOpenAI(
            model=model_name,
            temperature=0.1,  # Low temperature for factual answers
            max_tokens=1024,
        )

        self._setup_prompt()
        self._setup_chain()

    def _setup_prompt(self):
        """Create a prompt that forces citation usage."""
        self.prompt = ChatPromptTemplate.from_messages([
            (
                "system",
                """You are a precise document analyst. Answer the question based ONLY on the provided context.

                Rules:
                1. If the context doesn't contain the answer, say "I cannot find this information in the provided documents."
                2. Always cite the source document and page number for each claim.
                3. Use direct quotes when possible.
                4. Be concise but complete.

                Context:
                {context}
                """
            ),
            ("human", "{question}"),
        ])

    def _format_docs(self, documents: List[Document]) -> str:
        """
        Format retrieved documents into a structured context string.

        Includes source metadata for citation tracking.
        """
        formatted = []
        for i, doc in enumerate(documents, 1):
            source = doc.metadata.get("source", "Unknown")
            page = doc.metadata.get("page", "N/A")
            formatted.append(
                f"[Document {i}] Source: {source}, Page: {page}\n"
                f"Content: {doc.page_content}\n"
            )
        return "\n---\n".join(formatted)

    def _setup_chain(self):
        """
        Build the RAG chain using LCEL.

        Flow:
        1. Retrieve relevant documents
        2. Format them as context
        3. Pass context + question to LLM
        4. Parse the response
        """
        self.chain = (
            RunnableParallel(
                {
                    "context": (
                        lambda x: self.vector_store.similarity_search(
                            x["question"], k=4
                        )
                    ) | self._format_docs,
                    "question": RunnablePassthrough(),
                }
            )
            | self.prompt
            | self.llm
            | StrOutputParser()
        )

    def invoke(self, question: str) -> dict:
        """
        Run the RAG chain and return answer with sources.

        Returns:
            dict with 'answer' and 'sources' keys
        """
        # Get raw documents for source tracking
        retrieved_docs = self.vector_store.similarity_search(question, k=4)

        # Generate answer
        answer = self.chain.invoke({"question": question})

        # Extract source information
        sources = []
        for doc in retrieved_docs:
            sources.append({
                "content": doc.page_content[:200] + "..",
                "source": doc.metadata.get("source", "Unknown"),
                "page": doc.metadata.get("page", "N/A"),
                "relevance_score": doc.metadata.get("relevance_score", 0.0),
            })

        return {
            "answer": answer,
            "sources": sources,
        }

Why LCEL over legacy chains? LangChain's expression language provides compile-time type checking, better error messages, and easier debugging. The RunnableParallel construct allows parallel execution of independent steps, which is critical for latency-sensitive applications.

Temperature setting: We use temperature=0.1 because RAG systems should prioritize factual accuracy over creativity. Higher temperatures can cause the model to hallucinate or embellish beyond the provided context.

Creating the FastAPI Application

Finally, we wrap everything in a FastAPI application with proper error handling and request validation.

# app/main.py
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
import os
import shutil
from typing import Optional

from app.ingestion import DocumentIngestionPipeline
from app.vector_store import VectorStoreManager
from app.chain import RAGChain

app = FastAPI(
    title="RAG Document Q&A System",
    version="1.0.0",
    description="Production-grade RAG pipeline using LangChain and LanceDB",
)

# Initialize components
ingestion_pipeline = DocumentIngestionPipeline()
vector_store = VectorStoreManager()
rag_chain = RAGChain(vector_store_manager=vector_store)

class QueryRequest(BaseModel):
    question: str = Field(.., min_length=1, max_length=2000)
    top_k: int = Field(default=4, ge=1, le=20)

class QueryResponse(BaseModel):
    answer: str
    sources: list

@app.on_event("startup")
async def startup_event():
    """Initialize vector store on application start."""
    vector_store.create_table(overwrite=False)

@app.post("/ingest", status_code=201)
async def ingest_document(file: UploadFile = File(..)):
    """
    Upload and index a PDF document.

    Accepts PDF files up to 50MB.
    Returns the number of chunks created.
    """
    if not file.filename.endswith(".pdf"):
        raise HTTPException(
            status_code=400,
            detail="Only PDF files are supported",
        )

    # Save uploaded file temporarily
    temp_path = f"/tmp/{file.filename}"
    try:
        with open(temp_path, "wb") as buffer:
            shutil.copyfileobj(file.file, buffer)

        # Process and index
        documents = ingestion_pipeline.process_pdf(temp_path)
        vector_store.index_documents(documents)

        return {
            "message": f"Successfully indexed {len(documents)} chunks",
            "filename": file.filename,
            "chunks": len(documents),
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        if os.path.exists(temp_path):
            os.remove(temp_path)

@app.post("/query", response_model=QueryResponse)
async def query_documents(request: QueryRequest):
    """
    Query the indexed documents using RAG.

    Returns an answer with source citations.
    """
    try:
        result = rag_chain.invoke(request.question)
        return QueryResponse(
            answer=result["answer"],
            sources=result["sources"],
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "healthy", "timestamp": "2026-05-25"}

API design considerations: The /ingest endpoint accepts file uploads and processes them synchronously. For production, you'd want to make this asynchronous using Celery or a task queue, especially for large documents. The /query endpoint includes input validation via Pydantic, ensuring questions are between 1 and 2000 characters.

Running the System

Start the application:

uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

Test the ingestion:

curl -X POST -F "file=@data/documents/contract.pdf" http://localhost:8000/ingest

Test the query:

curl -X POST -H "Content-Type: application/json" \
    -d '{"question": "What are the termination clauses in this contract?"}' \
    http://localhost:8000/query

Edge Cases and Production Considerations

Empty retrieval: When no relevant documents are found, the chain is designed to say "Test this by asking questions completely unrelated to your indexed documents.

Large document handling: For PDFs over 100 pages, the ingestion pipeline may take several seconds. Consider adding progress tracking via WebSocket or implementing background task processing.

Vector store persistence: LanceDB stores data on disk at the path you specify. For production, use a persistent volume or cloud storage (S3, GCS) to avoid data loss on container restarts.

API rate limiting: The current implementation has no rate limiting. In production, add middleware like slowapi to prevent abuse.

Embedding model selection: The all-MiniLM-L6-v2 model produces 384-dimensional vectors. For higher accuracy (at the cost of speed and storage), consider all-mpnet-base-v2 (768 dimensions) or OpenAI's text-embedding-3-small (1536 dimensions).

What's Next

This tutorial covered the core components of a production RAG pipeline, but there's more to explore:

  • Hybrid search: Combine vector similarity with keyword search (BM25) for better retrieval accuracy
  • Query decomposition: Break complex questions into sub-questions for multi-hop reasoning
  • Evaluation: Set up RAGAS or TruLens to measure faithfulness, answer relevance, and context precision
  • Caching: Implement response caching for frequently asked questions to reduce LLM costs

The industry trend toward modular, retrieval-based architectures is accelerating. By mastering these building blocks, you'll be well-equipped to build systems that are both powerful and maintainable. The complete code for this tutorial is available in our GitHub repository.

Last updated: May 25, 2026


References

1. Wikipedia - Transformers. Wikipedia. [Source]
2. Wikipedia - GPT. Wikipedia. [Source]
3. Wikipedia - Vector database. Wikipedia. [Source]
4. GitHub - huggingface/transformers. Github. [Source]
5. GitHub - Significant-Gravitas/AutoGPT. Github. [Source]
6. GitHub - milvus-io/milvus. Github. [Source]
7. GitHub - huggingface/transformers. Github. [Source]
tutorialai
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles