Back to Tutorials
tutorialstutorialaiml

How to Build a RAG Pipeline with LanceDB and LangChain

Practical tutorial: It discusses an important aspect of AI model development and usage but does not involve a major release or company news.

BlogIA AcademyJune 5, 202613 min read2 422 words

How to Build a RAG Pipeline with LanceDB and LangChain

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Retrieval-Augmented Generation (RAG) has become the de facto architecture for grounding large language models in private or domain-specific data. While many vector databases exist, LanceDB offers a unique columnar storage format that excels at handling large-scale, multi-modal data with zero-copy reads and native GPU acceleration. In this tutorial, we'll build a production-ready RAG pipeline using LanceDB as our vector store and LangChain for orchestration, covering everything from environment setup to handling edge cases like concurrent writes and memory management.

Understanding the Production RAG Architecture

Before diving into code, let's examine why LanceDB is particularly well-suited for production RAG workloads. Traditional vector databases like Pinecone or Weaviate [8] are excellent but often require separate infrastructure and incur per-query costs. LanceDB operates as an embedded database, meaning it runs within your application process, eliminating network latency for vector searches. According to the LanceDB documentation, it supports up to 100x faster random access compared to Parquet files, making it ideal for real-time retrieval scenarios.

The architecture we'll build consists of three main components:

  1. Document Ingestion Pipeline: Chunks documents, generates embedding [1]s, and stores them in LanceDB
  2. Retrieval Service: Performs similarity search with metadata filtering
  3. Generation Layer: Combines retrieved context with LLM prompts

This design handles production concerns like:

  • Concurrent access: LanceDB supports multiple readers with a single writer
  • Memory efficiency: Columnar storage means we only load needed columns
  • Versioning: LanceDB tables are versioned, enabling rollbacks

Prerequisites and Environment Setup

We'll need Python 3.10+ and several libraries. Create a virtual environment and install dependencies:

python -m venv rag_env
source rag_env/bin/activate  # On Windows: rag_env\Scripts\activate

pip install lancedb langchain langchain-community sentence-transformers fastapi uvicorn pypdf python-multipart

Key library versions as of June 2026:

  • lancedb>=0.12.0 - Core vector database
  • langchain>=0.3.0 - Orchestration framework
  • sentence-transformers>=3.0.0 - Embedding models
  • fastapi>=0.115.0 - API server

We'll use the all-MiniLM-L6-v2 embedding model because it offers a good balance of speed and quality (384-dimensional vectors) and is widely available offline. For production, you might swap to intfloat/e5-mistral [9]-7b-instruct for better retrieval quality, but that requires GPU memory.

Building the Document Ingestion Pipeline

The ingestion pipeline is the foundation of any RAG system. We'll create a modular class that handles document loading, chunking, embedding, and storage. Let's start with the core implementation:

import lancedb
import pyarrow as pa
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from sentence_transformers import SentenceTransformer
import uuid
from typing import List, Dict, Any
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class LanceRAGIngestor:
    def __init__(self, db_path: str = "./lancedb_data", 
                 embedding_model: str = "all-MiniLM-L6-v2",
                 chunk_size: int = 512,
                 chunk_overlap: int = 64):
        """
        Initialize the RAG ingestor with LanceDB backend.

        Args:
            db_path: Path to LanceDB database directory
            embedding_model: SentenceTransformer model name
            chunk_size: Number of characters per chunk
            chunk_overlap: Overlap between consecutive chunks
        """
        self.db = lancedb.connect(db_path)
        self.embedder = SentenceTransformer(embedding_model)
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", ".", " ", ""]
        )
        self.dimension = self.embedder.get_sentence_embedding_dimension()
        logger.info(f"Initialized ingestor with embedding dimension: {self.dimension}")

    def create_table(self, table_name: str = "documents", 
                     overwrite: bool = False) -> None:
        """
        Create or get a LanceDB table with proper schema.

        The schema includes:
        - vector: Fixed-size list for embeddings
        - text: Original chunk text
        - metadata: JSON-compatible metadata
        - id: Unique identifier for deduplication
        """
        if table_name in self.db.table_names() and not overwrite:
            logger.info(f"Table '{table_name}' already exists, reusing")
            return

        schema = pa.schema([
            pa.field("vector", pa.list_(pa.float32(), self.dimension)),
            pa.field("text", pa.string()),
            pa.field("metadata", pa.string()),  # JSON string for flexibility
            pa.field("id", pa.string()),
            pa.field("source", pa.string()),
            pa.field("chunk_index", pa.int32())
        ])

        self.db.create_table(table_name, schema=schema, mode="overwrite" if overwrite else "create")
        logger.info(f"Created table '{table_name}' with schema")

    def ingest_document(self, file_path: str, 
                        table_name: str = "documents",
                        metadata: Dict[str, Any] = None) -> int:
        """
        Ingest a single document into LanceDB.

        Handles PDF files and returns number of chunks ingested.
        """
        # Load document based on extension
        if file_path.endswith(".pdf"):
            loader = PyPDFLoader(file_path)
            documents = loader.load()
        else:
            # For plain text files
            with open(file_path, "r", encoding="utf-8") as f:
                text = f.read()
            from langchain.schema import Document
            documents = [Document(page_content=text, metadata={"source": file_path})]

        # Split into chunks
        chunks = self.text_splitter.split_documents(documents)
        logger.info(f"Split document into {len(chunks)} chunks")

        # Prepare batch data
        texts = [chunk.page_content for chunk in chunks]
        embeddings = self.embedder.encode(texts, show_progress_bar=True, 
                                          normalize_embeddings=True)

        # Build records for LanceDB
        records = []
        base_metadata = metadata or {}
        for idx, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
            record = {
                "vector": embedding.tolist(),
                "text": chunk.page_content,
                "metadata": str({**base_metadata, **chunk.metadata}),
                "id": str(uuid.uuid4()),
                "source": file_path,
                "chunk_index": idx
            }
            records.append(record)

        # Batch insert into LanceDB
        table = self.db.open_table(table_name)
        table.add(records)
        logger.info(f"Ingested {len(records)} chunks into '{table_name}'")

        return len(records)

Key design decisions:

  1. Normalized embeddings: We normalize embeddings to unit vectors, which enables cosine similarity via dot product. LanceDB defaults to L2 distance, but we can configure it for cosine similarity during search.

  2. Metadata as JSON string: Storing metadata as a JSON string provides flexibility for different document types without schema changes. The trade-off is that we cannot filter on nested fields directly in LanceDB queries—we'd need to parse them in post-processing.

  3. Batch insertion: LanceDB's add() method accepts a list of dictionaries, which is more efficient than inserting one record at a time. For very large batches (100k+ records), consider using add() with a PyArrow table directly.

Edge case: Duplicate detection In production, you'll encounter duplicate documents. The current implementation generates a new UUID for each chunk, so duplicates will be inserted. To handle this, add a deduplication step:

def check_duplicate(self, table_name: str, source: str) -> bool:
    """Check if a document source already exists in the table."""
    table = self.db.open_table(table_name)
    # LanceDB supports SQL-like filtering
    result = table.search().where(f"source = '{source}'").limit(1).to_pandas()
    return len(result) > 0

Implementing the Retrieval Service

Now let's build the retrieval component that handles similarity search with metadata filtering. This is where LanceDB's columnar storage shines—we can filter on metadata columns without loading entire vectors into memory.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import json
from typing import Optional, List

app = FastAPI(title="LanceDB RAG Service")

class SearchRequest(BaseModel):
    query: str
    table_name: str = "documents"
    top_k: int = 5
    metadata_filter: Optional[str] = None  # SQL-like filter string

class SearchResult(BaseModel):
    text: str
    score: float
    metadata: dict
    source: str

class LanceRAGRetriever:
    def __init__(self, db_path: str = "./lancedb_data",
                 embedding_model: str = "all-MiniLM-L6-v2"):
        self.db = lancedb.connect(db_path)
        self.embedder = SentenceTransformer(embedding_model)

    def search(self, query: str, table_name: str = "documents",
               top_k: int = 5, metadata_filter: Optional[str] = None) -> List[SearchResult]:
        """
        Perform similarity search with optional metadata filtering.

        Args:
            query: Natural language query
            table_name: LanceDB table to search
            top_k: Number of results to return
            metadata_filter: SQL-like filter (e.g., "source LIKE '%report%'")

        Returns:
            List of SearchResult objects sorted by relevance
        """
        # Generate query embedding
        query_embedding = self.embedder.encode(query, normalize_embeddings=True)

        # Open table and configure search
        table = self.db.open_table(table_name)

        # Build search query
        search_builder = table.search(query_embedding.tolist())

        # Apply metadata filter if provided
        if metadata_filter:
            search_builder = search_builder.where(metadata_filter)

        # Execute search with metric type
        # LanceDB uses L2 distance by default, but we normalized embeddings
        # so we can use cosine similarity via metric="cosine"
        results = search_builder.limit(top_k).to_pandas()

        # Convert to SearchResult objects
        search_results = []
        for _, row in results.iterrows():
            # Parse metadata from JSON string
            try:
                metadata = json.loads(row["metadata"])
            except (json.JSONDecodeError, KeyError):
                metadata = {}

            # LanceDB returns distance, convert to similarity score
            # For cosine similarity: score = 1 - distance (when normalized)
            score = 1.0 - row["_distance"] if "_distance" in row else 0.0

            search_results.append(SearchResult(
                text=row["text"],
                score=max(0.0, min(1.0, score)),  # Clamp to [0, 1]
                metadata=metadata,
                source=row.get("source", "unknown")
            ))

        return search_results

# Initialize retriever
retriever = LanceRAGRetriever()

@app.post("/search", response_model=List[SearchResult])
async def search_endpoint(request: SearchRequest):
    """REST endpoint for vector search."""
    try:
        results = retriever.search(
            query=request.query,
            table_name=request.table_name,
            top_k=request.top_k,
            metadata_filter=request.metadata_filter
        )
        return results
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "healthy", "database": "lancedb"}

Critical implementation details:

  1. Distance to similarity conversion: LanceDB returns L2 distances by default. Since we normalized embeddings, the cosine similarity is 1 - (distance^2 / 2). However, for simplicity, we use 1 - distance which works well for normalized vectors. For production, use metric="cosine" in the search builder to get proper cosine distances.

  2. Metadata filtering syntax: LanceDB supports SQL-like WHERE clauses. For example:

    • source = 'document.pdf'
    • chunk_index > 10
    • metadata LIKE '%department:engineering%' (string matching on JSON)
  3. Memory management: LanceDB uses memory-mapped files, so large tables don't consume RAM until accessed. However, each search loads the vector column for the entire dataset into memory temporarily. For tables with millions of vectors, consider using IVF-PQ indexing (available in LanceDB 0.12+).

Building the Generation Layer with Context Management

The final piece connects retrieval to an LLM for answer generation. We'll use LangChain's prompt templates and handle context window limits gracefully.

from langchain.prompts import ChatPromptTemplate
from langchain.schema import SystemMessage, HumanMessage
from langchain_community.chat_models import ChatOpenAI  # Or any other provider
import tiktoken

class RAGGenerator:
    def __init__(self, retriever: LanceRAGRetriever, 
                 model_name: str = "gpt [7]-4o-mini",
                 max_context_tokens: int = 4000):
        """
        Initialize RAG generator with context management.

        Args:
            retriever: LanceRAGRetriever instance
            model_name: LLM model identifier
            max_context_tokens: Maximum tokens for retrieved context
        """
        self.retriever = retriever
        self.llm = ChatOpenAI(model=model_name, temperature=0.1)
        self.tokenizer = tiktoken.encoding_for_model("gpt-4")  # Approximate
        self.max_context_tokens = max_context_tokens

        # Define prompt template
        self.prompt = ChatPromptTemplate.from_messages([
            SystemMessage(content=(
                "You are a helpful assistant that answers questions based on provided context. "
                "If the context doesn't contain relevant information, say so clearly. "
                "Cite specific parts of the context when possible."
            )),
            HumanMessage(content=(
                "Context:\n{context}\n\n"
                "Question: {question}\n\n"
                "Answer:"
            ))
        ])

    def _truncate_context(self, texts: List[str], scores: List[float]) -> str:
        """
        Truncate retrieved context to fit within token limits.

        Prioritizes higher-scoring chunks when truncating.
        """
        # Sort by score descending (already sorted from search)
        combined_text = ""
        total_tokens = 0

        for text, score in zip(texts, scores):
            text_tokens = len(self.tokenizer.encode(text))

            if total_tokens + text_tokens > self.max_context_tokens:
                # Truncate this chunk to fit remaining budget
                remaining = self.max_context_tokens - total_tokens
                if remaining > 50:  # Only include if meaningful
                    truncated = self.tokenizer.decode(
                        self.tokenizer.encode(text)[:remaining]
                    )
                    combined_text += f"\n[Score: {score:.3f}] {truncated}..\n"
                break

            combined_text += f"\n[Score: {score:.3f}] {text}\n"
            total_tokens += text_tokens

        return combined_text

    def generate(self, question: str, table_name: str = "documents",
                 top_k: int = 5, metadata_filter: Optional[str] = None) -> str:
        """
        Generate answer using RAG pipeline.

        Args:
            question: User's question
            table_name: LanceDB table to search
            top_k: Number of chunks to retrieve
            metadata_filter: Optional filter string

        Returns:
            Generated answer string
        """
        # Retrieve relevant chunks
        results = self.retriever.search(
            query=question,
            table_name=table_name,
            top_k=top_k,
            metadata_filter=metadata_filter
        )

        if not results:
            return "No relevant documents found."

        # Prepare context with token management
        texts = [r.text for r in results]
        scores = [r.score for r in results]
        context = self._truncate_context(texts, scores)

        # Generate response
        messages = self.prompt.format_messages(
            context=context,
            question=question
        )

        response = self.llm.invoke(messages)
        return response.content

# Example usage
if __name__ == "__main__":
    # Initialize components
    ingestor = LanceRAGIngestor()
    ingestor.create_table(overwrite=True)

    # Ingest a sample document
    ingestor.ingest_document("sample_report.pdf", metadata={"department": "engineering"})

    # Query
    retriever = LanceRAGRetriever()
    generator = RAGGenerator(retriever)

    answer = generator.generate(
        "What were the key findings in the Q2 report?",
        metadata_filter="source = 'sample_report.pdf'"
    )
    print(f"Answer: {answer}")

Edge cases handled:

  1. Context window overflow: The _truncate_context method ensures we never exceed the LLM's token limit. It prioritizes higher-scoring chunks and gracefully truncates the last chunk if needed.

  2. Empty results: Returns a clear message instead of crashing.

  3. Score-based chunk ordering: Each chunk is prefixed with its relevance score, allowing the LLM to weigh evidence appropriately.

  4. Metadata filtering: Enables document-level access control (e.g., only search documents from a specific department).

Production Deployment Considerations

For a production deployment, consider these additional factors:

Concurrent access: LanceDB supports multiple readers but only one writer at a time. If you need concurrent writes, implement a write queue or use LanceDB's built-in locking (available in v0.14+). For read-heavy workloads, create read replicas by copying the database directory.

Indexing for speed: For tables with >100k vectors, create an IVF-PQ index:

table.create_index(metric="cosine", num_partitions=256, num_sub_vectors=32)

This reduces search latency from O(n) to O(log n) at the cost of some recall.

Monitoring: Track these metrics:

  • Embedding generation latency
  • Search latency (p50, p95, p99)
  • Context truncation rate (how often we hit token limits)
  • Cache hit ratio (if using query caching)

Scaling: LanceDB's columnar format means you can store billions of vectors in a single table. For multi-terabyte datasets, shard across multiple LanceDB instances and use a router to distribute queries.

What's Next

This tutorial covered building a production-ready RAG pipeline with LanceDB and LangChain. To extend this system:

  1. Add hybrid search: Combine vector search with keyword search using LanceDB's full-text search capabilities (available in v0.16+)
  2. Implement caching: Cache frequent queries using Redis or LanceDB's built-in cache
  3. Add feedback loop: Collect user feedback on answers to fine-tune retrieval parameters
  4. Explore multi-modal RAG: LanceDB supports storing images and audio alongside text vectors

The complete code is available on GitHub. For more on RAG architectures, see our guide on advanced retrieval strategies and vector database comparison.

Remember: The key to production RAG is not just retrieval accuracy, but system reliability under load. LanceDB's embedded architecture eliminates network hops, making it ideal for latency-sensitive applications. Start with the code above, monitor your metrics, and iterate based on real usage patterns.


References

1. Wikipedia - Embedding. Wikipedia. [Source]
2. Wikipedia - List of generation IV Pokémon. Wikipedia. [Source]
3. Wikipedia - Mistral. Wikipedia. [Source]
4. GitHub - fighting41love/funNLP. Github. [Source]
5. GitHub - weaviate/weaviate. Github. [Source]
6. GitHub - mistralai/mistral-inference. Github. [Source]
7. GitHub - Significant-Gravitas/AutoGPT. Github. [Source]
8. Weaviate Pricing. Pricing. [Source]
9. Mistral AI Pricing. Pricing. [Source]
tutorialaiml
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles