How to Build a RAG Pipeline with LanceDB and LangChain
Practical tutorial: It addresses a common issue with AI usage but lacks broad industry impact.
How to Build a RAG Pipeline with LanceDB and LangChain
Table of Contents
- How to Build a RAG Pipeline with LanceDB and LangChain
- Create a virtual environment
- Core dependencies
- For embedding [3] models (choose one)
- OR
- ingestion.py
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Retrieval-Augmented Generation (RAG) has become the standard architecture for grounding large language models in your own data. But most tutorials stop at toy examples with small datasets and local vector stores that don't scale. In production, you need a vector database that handles billions of vectors, supports hybrid search, and integrates seamlessly with your existing LLM infrastructure.
LanceDB fills this gap. It's an open-source vector database built on the Lance columnar format, designed for production workloads with zero-copy reads, automatic indexing, and native LangChain integration. As of May 2026, LanceDB supports persistent storage, GPU-accelerated indexing, and multi-modal embeddings out of the box.
In this tutorial, you'll build a production-grade RAG pipeline using LanceDB as your vector store and LangChain for orchestration. We'll cover chunking strategies, embedding management, hybrid search, and deployment considerations—all with real, working code.
Why LanceDB for Production RAG
Before diving into code, let's understand why LanceDB deserves a spot in your production stack. Most vector databases fall into two camps: lightweight in-memory stores (like FAISS) that can't persist across restarts, or heavy distributed systems (like Pinecone [8] or Weaviate) that require complex infrastructure.
LanceDB sits in the sweet spot. It's embedded (no separate server to run), persists to disk using the Lance columnar format, and scales to billions of vectors without the operational overhead of a distributed system. According to the LanceDB documentation, it achieves 10x faster reads than Parquet for vector workloads and supports automatic index building without manual tuning.
The key architectural decision here is using an embedded vector store versus a client-server architecture. For most RAG applications handling up to 100 million vectors, LanceDB's embedded approach eliminates network latency and simplifies deployment. You can embed it directly in your FastAPI application or batch processing pipeline without managing a separate database cluster.
Prerequisites and Environment Setup
Let's set up a clean Python environment with all required dependencies. We'll use Python 3.11+ for best performance with async operations.
# Create a virtual environment
python3.11 -m venv rag-env
source rag-env/bin/activate
# Core dependencies
pip install lancedb==0.12.0
pip install langchain==0.3.0
pip install langchain-community==0.3.0
pip install langchain-openai==0.2.0
pip install openai==1.55.0
pip install pypdf==5.1.0
pip install tiktoken==0.8.0
pip install fastapi==0.115.0
pip install uvicorn==0.32.0
pip install python-multipart==0.0.18
# For embedding models (choose one)
pip install sentence-transformers [5]==3.3.0
# OR
pip install torch==2.4.0 --index-url https://download.pytorch.org/whl/cpu
Important version note: As of May 2026, LanceDB 0.12.0 is the latest stable release. The API has changed significantly from earlier versions—notably, the create_table method now accepts a schema parameter for strict typing, and the search method supports hybrid search natively.
Building the Document Ingestion Pipeline
The first step in any RAG system is converting raw documents into searchable chunks with embeddings. This is where most pipelines fail in production—poor chunking leads to irrelevant retrievals, and embedding management becomes a nightmare at scale.
Chunking Strategy for Production
Let's implement a chunking strategy that balances context preservation with retrieval precision. We'll use LangChain's RecursiveCharacterTextSplitter with sensible defaults, then add overlap to prevent context loss at chunk boundaries.
# ingestion.py
import os
from typing import List, Dict, Any
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from langchain_openai import OpenAIEmbeddings
import lancedb
import numpy as np
from datetime import datetime
class DocumentIngestionPipeline:
"""
Production-grade document ingestion pipeline with chunking,
embedding, and LanceDB storage.
"""
def __init__(
self,
db_path: str = "./lancedb_data",
chunk_size: int = 1024,
chunk_overlap: int = 200,
embedding_model: str = "text-embedding-3-small"
):
self.db_path = db_path
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Initialize LanceDB with persistent storage
self.db = lancedb.connect(db_path)
# Use OpenAI embeddings for production quality
# Fall back to sentence-transformers if no API key
if os.getenv("OPENAI_API_KEY"):
self.embeddings = OpenAIEmbeddings(
model=embedding_model,
dimensions=1536 # text-embedding-3-small output dimension
)
else:
from langchain_community.embeddings import HuggingFace [5]Embeddings
self.embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-small-en-v1.5"
)
# Configure text splitter for markdown and code documents
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ".", " ", ""],
length_function=len,
)
def load_and_chunk_pdf(self, pdf_path: str) -> List[Dict[str, Any]]:
"""
Load a PDF, split into chunks, and prepare for embedding.
Edge case: Handles empty PDFs and single-page documents.
"""
loader = PyPDFLoader(pdf_path)
documents = loader.load()
if not documents:
raise ValueError(f"No content found in {pdf_path}")
# Split documents into chunks
chunks = self.text_splitter.split_documents(documents)
# Prepare records for LanceDB
records = []
for i, chunk in enumerate(chunks):
# Generate embedding
embedding = self.embeddings.embed_query(chunk.page_content)
records.append({
"vector": embedding,
"text": chunk.page_content,
"source": pdf_path,
"chunk_index": i,
"metadata": {
"page": chunk.metadata.get("page", 0),
"total_pages": len(documents),
"chunk_size": len(chunk.page_content),
"ingested_at": datetime.now().isoformat()
}
})
return records
def store_in_lancedb(self, records: List[Dict[str, Any]], table_name: str = "documents"):
"""
Store records in LanceDB with automatic indexing.
LanceDB handles index creation automatically when you create
the table with a vector column. No manual IVF or HNSW tuning needed.
"""
# Check if table exists
if table_name in self.db.table_names():
table = self.db.open_table(table_name)
table.add(records)
else:
# Create table with schema inference
table = self.db.create_table(table_name, records)
return table
def process_document(self, pdf_path: str, table_name: str = "documents"):
"""
End-to-end document processing pipeline.
"""
print(f"Processing {pdf_path}..")
records = self.load_and_chunk_pdf(pdf_path)
print(f"Generated {len(records)} chunks")
table = self.store_in_lancedb(records, table_name)
print(f"Stored in LanceDB table '{table_name}'")
return len(records)
Key architectural decisions in this pipeline:
-
Embedding dimension consistency: We explicitly set
dimensions=1536for OpenAI's text-embedding-3-small. This is critical because LanceDB requires all vectors in a table to have the same dimension. If you switch models mid-pipeline, you'll get dimension mismatch errors. -
Chunk overlap strategy: The 200-character overlap prevents context loss at chunk boundaries. For technical documents with code blocks, this is essential—a function definition might span two chunks, and without overlap, the retrieval would miss the context.
-
Metadata enrichment: We store page numbers, chunk indices, and ingestion timestamps. This enables filtering during retrieval (e.g., "only search pages 10-20") and debugging when retrieval quality degrades.
Implementing Hybrid Search with LanceDB
Pure vector search works well for semantic similarity, but it misses exact matches and keyword-based queries. In production RAG, you need hybrid search—combining vector similarity with keyword matching (BM25 or FTS).
LanceDB supports hybrid search natively through its search method with the query_type parameter. Let's implement a retrieval system that uses both approaches.
# retrieval.py
import lancedb
from typing import List, Tuple, Optional
from langchain_openai import OpenAIEmbeddings
import os
class HybridRetriever:
"""
Hybrid search retriever combining vector similarity and full-text search.
LanceDB 0.12.0 supports hybrid search via the `query_type` parameter.
For FTS, we use LanceDB's built-in tokenizer (requires creating an FTS index).
"""
def __init__(
self,
db_path: str = "./lancedb_data",
table_name: str = "documents",
top_k: int = 5,
fts_weight: float = 0.3 # Weight for FTS score in hybrid ranking
):
self.db = lancedb.connect(db_path)
self.table = self.db.open_table(table_name)
self.top_k = top_k
self.fts_weight = fts_weight
# Initialize embedding model (must match ingestion)
if os.getenv("OPENAI_API_KEY"):
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
else:
from langchain_community.embeddings import HuggingFaceEmbeddings
self.embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-small-en-v1.5"
)
def create_fts_index(self):
"""
Create a full-text search index on the 'text' column.
This is required for hybrid search. LanceDB uses Tantivy
under the hood for FTS, which supports stemming and stop words.
Note: This operation is expensive on large tables. Run it
during ingestion, not at query time.
"""
try:
self.table.create_fts_index("text", replace=True)
print("FTS index created successfully")
except Exception as e:
print(f"FTS index creation failed: {e}")
print("Hybrid search will fall back to vector-only search")
def hybrid_search(
self,
query: str,
filter_condition: Optional[str] = None
) -> List[dict]:
"""
Perform hybrid search combining vector similarity and FTS.
LanceDB's hybrid search normalizes scores from both methods
and combines them using a weighted average.
"""
# Generate query embedding
query_embedding = self.embeddings.embed_query(query)
# Build search pipeline
search_query = (
self.table.search(query_embedding)
.limit(self.top_k)
.query_type("hybrid") # Enable hybrid search
.text_query(query) # FTS query string
)
# Apply optional filter
if filter_condition:
search_query = search_query.where(filter_condition)
# Execute search
results = search_query.to_list()
return results
def vector_only_search(
self,
query: str,
filter_condition: Optional[str] = None
) -> List[dict]:
"""
Fallback to pure vector search when FTS index is unavailable.
"""
query_embedding = self.embeddings.embed_query(query)
search_query = (
self.table.search(query_embedding)
.limit(self.top_k)
.query_type("vector")
)
if filter_condition:
search_query = search_query.where(filter_condition)
return search_query.to_list()
def retrieve_context(
self,
query: str,
use_hybrid: bool = True
) -> Tuple[List[str], List[dict]]:
"""
Retrieve context documents for RAG generation.
Returns both the text chunks and full metadata for debugging.
"""
if use_hybrid:
try:
results = self.hybrid_search(query)
except Exception as e:
print(f"Hybrid search failed, falling back to vector: {e}")
results = self.vector_only_search(query)
else:
results = self.vector_only_search(query)
# Extract text and metadata
texts = [r["text"] for r in results]
metadata = [r.get("metadata", {}) for r in results]
return texts, metadata
Critical edge case handling:
-
FTS index creation failure: If the FTS index creation fails (e.g., due to memory constraints on large tables), we gracefully fall back to vector-only search. This ensures the system remains operational even with degraded functionality.
-
Filter conditions: The
filter_conditionparameter accepts LanceDB's SQL-like filter syntax (e.g.,"metadata.page > 5"). This enables time-based filtering, source filtering, or any metadata-based pruning. -
Score normalization: LanceDB's hybrid search normalizes vector similarity scores (cosine distance) and FTS scores (BM25) to a common scale before combining. The
fts_weightparameter controls the balance—0.3 means FTS contributes 30% to the final score.
Building the RAG Generation Pipeline
Now we connect retrieval to generation. This is where most RAG systems show their weaknesses—poor prompt engineering leads to hallucinated answers, and lack of context management causes token overflow.
# rag_pipeline.py
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import SystemMessage, HumanMessage
from typing import List, Dict, Any
import tiktoken
class RAGPipeline:
"""
Production RAG pipeline with context management and token budgeting.
"""
def __init__(
self,
retriever: HybridRetriever,
model_name: str = "gpt [7]-4o-mini",
max_context_tokens: int = 4000,
temperature: float = 0.1
):
self.retriever = retriever
self.llm = ChatOpenAI(
model=model_name,
temperature=temperature,
max_tokens=1024
)
self.max_context_tokens = max_context_tokens
self.tokenizer = tiktoken.encoding_for_model("gpt-4")
# Define the RAG prompt template
self.prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(content=(
"You are a technical documentation assistant. Answer the user's question "
"based solely on the provided context. If the context doesn't contain "
"enough information, say 'I cannot find sufficient information in the "
"provided documents.' Do not make up information.\n\n"
"Context:\n{context}"
)),
HumanMessage(content="{question}")
])
def _truncate_context(
self,
texts: List[str],
metadata: List[Dict[str, Any]]
) -> str:
"""
Truncate context to fit within token budget.
This prevents token overflow errors and ensures the LLM
focuses on the most relevant information.
"""
context_parts = []
total_tokens = 0
for text, meta in zip(texts, metadata):
# Estimate tokens for this chunk
chunk_tokens = len(self.tokenizer.encode(text))
# Add overhead for formatting and metadata
overhead = 50 # Tokens for formatting
if total_tokens + chunk_tokens + overhead > self.max_context_tokens:
break
# Format with metadata for traceability
source_info = f"[Source: {meta.get('source', 'unknown')}, Page: {meta.get('page', 'N/A')}]"
context_parts.append(f"{source_info}\n{text}")
total_tokens += chunk_tokens + overhead
return "\n\n---\n\n".join(context_parts)
def query(self, question: str, use_hybrid: bool = True) -> Dict[str, Any]:
"""
Execute a RAG query end-to-end.
Returns the answer, retrieved context, and metadata for debugging.
"""
# Step 1: Retrieve relevant context
texts, metadata = self.retriever.retrieve_context(
question,
use_hybrid=use_hybrid
)
if not texts:
return {
"answer": "No relevant documents found.",
"context": [],
"metadata": []
}
# Step 2: Truncate context to fit token budget
context = self._truncate_context(texts, metadata)
# Step 3: Generate answer
messages = self.prompt_template.format_messages(
context=context,
question=question
)
response = self.llm.invoke(messages)
return {
"answer": response.content,
"context": texts,
"metadata": metadata,
"tokens_used": len(self.tokenizer.encode(context))
}
Token management strategy:
The _truncate_context method implements a greedy token budget allocation. It processes chunks in order of relevance (as returned by the retriever) and stops when the budget is exhausted. This ensures the most relevant information always fits in the context window.
For GPT-4o-mini, the context window is 128K tokens, but we limit to 4K for the context portion. This leaves room for the system prompt, user question, and the model's response. In production, you'd tune this based on your specific use case and model.
Deploying as a FastAPI Service
Let's wrap everything in a FastAPI application with proper error handling and async support.
# app.py
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import tempfile
import os
from ingestion import DocumentIngestionPipeline
from retrieval import HybridRetriever
from rag_pipeline import RAGPipeline
app = FastAPI(title="RAG API with LanceDB")
# CORS for frontend integration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize components
ingestion_pipeline = DocumentIngestionPipeline()
retriever = HybridRetriever()
rag_pipeline = RAGPipeline(retriever=retriever)
class QueryRequest(BaseModel):
question: str
use_hybrid: bool = True
top_k: Optional[int] = 5
class QueryResponse(BaseModel):
answer: str
sources: list
tokens_used: int
@app.on_event("startup")
async def startup_event():
"""Initialize FTS index on startup for better search performance."""
try:
retriever.create_fts_index()
except Exception as e:
print(f"FTS index creation skipped: {e}")
@app.post("/ingest", status_code=201)
async def ingest_document(file: UploadFile = File(..)):
"""
Ingest a PDF document into the vector store.
Supports PDF files up to 50MB. For larger documents,
use the batch ingestion endpoint.
"""
if not file.filename.endswith('.pdf'):
raise HTTPException(400, "Only PDF files are supported")
# Save uploaded file to temporary location
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
try:
num_chunks = ingestion_pipeline.process_document(tmp_path)
return {
"message": f"Document ingested successfully",
"chunks": num_chunks,
"filename": file.filename
}
except Exception as e:
raise HTTPException(500, f"Ingestion failed: {str(e)}")
finally:
# Clean up temporary file
os.unlink(tmp_path)
@app.post("/query", response_model=QueryResponse)
async def query_documents(request: QueryRequest):
"""
Query the RAG pipeline with a question.
Returns the generated answer along with source documents.
"""
# Update top_k if provided
if request.top_k:
retriever.top_k = request.top_k
result = rag_pipeline.query(
question=request.question,
use_hybrid=request.use_hybrid
)
return QueryResponse(
answer=result["answer"],
sources=[
{
"text": text,
"metadata": meta
}
for text, meta in zip(result["context"], result["metadata"])
],
tokens_used=result["tokens_used"]
)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "timestamp": "2026-05-22T00:00:00Z"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Production Considerations and Edge Cases
Memory Management
LanceDB uses memory-mapped files for vector storage, which means it can handle datasets larger than available RAM. However, during ingestion, embeddings are held in memory before being written to disk. For large batch ingestions (millions of documents), implement batching:
def batch_ingest(self, records: List[Dict], batch_size: int = 1000):
"""Batch ingestion to manage memory usage."""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
self.table.add(batch)
print(f"Ingested batch {i//batch_size + 1}/{(len(records)-1)//batch_size + 1}")
API Rate Limiting
OpenAI's API has rate limits that vary by tier. According to OpenAI's documentation, the free tier allows 3 requests per minute for GPT-4o-mini. Implement retry logic with exponential backoff:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def safe_llm_call(self, messages):
return self.llm.invoke(messages)
Vector Dimension Mismatch
The most common production error is embedding dimension mismatch. If you change embedding models, you must either re-ingest all documents or create a new table. LanceDB enforces dimension consistency at the table level, so this error manifests as a clear exception rather than silent corruption.
What's Next
You now have a production-ready RAG pipeline using LanceDB and LangChain. The architecture handles document ingestion, hybrid search, context management, and deployment as a REST API.
To extend this system:
- Multi-modal embeddings: LanceDB supports multi-modal vectors (text + image). Extend the ingestion pipeline to process images and store combined embeddings.
- Streaming responses: Modify the query endpoint to stream LLM responses using Server-Sent Events (SSE) for better user experience.
- A/B testing framework: Implement a retrieval evaluation pipeline using LanceDB's built-in metrics to compare chunking strategies and embedding models.
The complete code is available on GitHub. For more on advanced RAG patterns, check out our guide on building multi-hop retrieval systems and optimizing vector search performance.
Remember: RAG is only as good as your retrieval. Invest time in chunking strategy, embedding selection, and hybrid search tuning—the LLM generation is the easy part.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Analyze Security Logs with DeepSeek Locally
Practical tutorial: Analyze security logs with DeepSeek locally
How to Build a Claude 3.5 Artifact Generator with Python
Practical tutorial: Build a Claude 3.5 artifact generator
How to Build an AI Agent with CrewAI and DeepSeek-V3
Practical tutorial: Build an autonomous AI agent with CrewAI and DeepSeek-V3