How to Build a RAG Pipeline with LanceDB and LangChain
Practical tutorial: It discusses an important aspect of AI model development and usage but does not involve a major release or company news.
How to Build a RAG Pipeline with LanceDB and LangChain
Table of Contents
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Retrieval-Augmented Generation (RAG) has become the de facto architecture for grounding large language models in private or domain-specific data. While many vector databases exist, LanceDB offers a unique columnar storage format that excels at handling large-scale, multi-modal data with zero-copy reads and native GPU acceleration. In this tutorial, we'll build a production-ready RAG pipeline using LanceDB as our vector store and LangChain for orchestration, covering everything from environment setup to handling edge cases like concurrent writes and memory management.
Understanding the Production RAG Architecture
Before diving into code, let's examine why LanceDB is particularly well-suited for production RAG workloads. Traditional vector databases like Pinecone or Weaviate [8] are excellent but often require separate infrastructure and incur per-query costs. LanceDB operates as an embedded database, meaning it runs within your application process, eliminating network latency for vector searches. According to the LanceDB documentation, it supports up to 100x faster random access compared to Parquet files, making it ideal for real-time retrieval scenarios.
The architecture we'll build consists of three main components:
- Document Ingestion Pipeline: Chunks documents, generates embedding [1]s, and stores them in LanceDB
- Retrieval Service: Performs similarity search with metadata filtering
- Generation Layer: Combines retrieved context with LLM prompts
This design handles production concerns like:
- Concurrent access: LanceDB supports multiple readers with a single writer
- Memory efficiency: Columnar storage means we only load needed columns
- Versioning: LanceDB tables are versioned, enabling rollbacks
Prerequisites and Environment Setup
We'll need Python 3.10+ and several libraries. Create a virtual environment and install dependencies:
python -m venv rag_env
source rag_env/bin/activate # On Windows: rag_env\Scripts\activate
pip install lancedb langchain langchain-community sentence-transformers fastapi uvicorn pypdf python-multipart
Key library versions as of June 2026:
lancedb>=0.12.0- Core vector databaselangchain>=0.3.0- Orchestration frameworksentence-transformers>=3.0.0- Embedding modelsfastapi>=0.115.0- API server
We'll use the all-MiniLM-L6-v2 embedding model because it offers a good balance of speed and quality (384-dimensional vectors) and is widely available offline. For production, you might swap to intfloat/e5-mistral [9]-7b-instruct for better retrieval quality, but that requires GPU memory.
Building the Document Ingestion Pipeline
The ingestion pipeline is the foundation of any RAG system. We'll create a modular class that handles document loading, chunking, embedding, and storage. Let's start with the core implementation:
import lancedb
import pyarrow as pa
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from sentence_transformers import SentenceTransformer
import uuid
from typing import List, Dict, Any
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LanceRAGIngestor:
def __init__(self, db_path: str = "./lancedb_data",
embedding_model: str = "all-MiniLM-L6-v2",
chunk_size: int = 512,
chunk_overlap: int = 64):
"""
Initialize the RAG ingestor with LanceDB backend.
Args:
db_path: Path to LanceDB database directory
embedding_model: SentenceTransformer model name
chunk_size: Number of characters per chunk
chunk_overlap: Overlap between consecutive chunks
"""
self.db = lancedb.connect(db_path)
self.embedder = SentenceTransformer(embedding_model)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", ".", " ", ""]
)
self.dimension = self.embedder.get_sentence_embedding_dimension()
logger.info(f"Initialized ingestor with embedding dimension: {self.dimension}")
def create_table(self, table_name: str = "documents",
overwrite: bool = False) -> None:
"""
Create or get a LanceDB table with proper schema.
The schema includes:
- vector: Fixed-size list for embeddings
- text: Original chunk text
- metadata: JSON-compatible metadata
- id: Unique identifier for deduplication
"""
if table_name in self.db.table_names() and not overwrite:
logger.info(f"Table '{table_name}' already exists, reusing")
return
schema = pa.schema([
pa.field("vector", pa.list_(pa.float32(), self.dimension)),
pa.field("text", pa.string()),
pa.field("metadata", pa.string()), # JSON string for flexibility
pa.field("id", pa.string()),
pa.field("source", pa.string()),
pa.field("chunk_index", pa.int32())
])
self.db.create_table(table_name, schema=schema, mode="overwrite" if overwrite else "create")
logger.info(f"Created table '{table_name}' with schema")
def ingest_document(self, file_path: str,
table_name: str = "documents",
metadata: Dict[str, Any] = None) -> int:
"""
Ingest a single document into LanceDB.
Handles PDF files and returns number of chunks ingested.
"""
# Load document based on extension
if file_path.endswith(".pdf"):
loader = PyPDFLoader(file_path)
documents = loader.load()
else:
# For plain text files
with open(file_path, "r", encoding="utf-8") as f:
text = f.read()
from langchain.schema import Document
documents = [Document(page_content=text, metadata={"source": file_path})]
# Split into chunks
chunks = self.text_splitter.split_documents(documents)
logger.info(f"Split document into {len(chunks)} chunks")
# Prepare batch data
texts = [chunk.page_content for chunk in chunks]
embeddings = self.embedder.encode(texts, show_progress_bar=True,
normalize_embeddings=True)
# Build records for LanceDB
records = []
base_metadata = metadata or {}
for idx, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
record = {
"vector": embedding.tolist(),
"text": chunk.page_content,
"metadata": str({**base_metadata, **chunk.metadata}),
"id": str(uuid.uuid4()),
"source": file_path,
"chunk_index": idx
}
records.append(record)
# Batch insert into LanceDB
table = self.db.open_table(table_name)
table.add(records)
logger.info(f"Ingested {len(records)} chunks into '{table_name}'")
return len(records)
Key design decisions:
-
Normalized embeddings: We normalize embeddings to unit vectors, which enables cosine similarity via dot product. LanceDB defaults to L2 distance, but we can configure it for cosine similarity during search.
-
Metadata as JSON string: Storing metadata as a JSON string provides flexibility for different document types without schema changes. The trade-off is that we cannot filter on nested fields directly in LanceDB queries—we'd need to parse them in post-processing.
-
Batch insertion: LanceDB's
add()method accepts a list of dictionaries, which is more efficient than inserting one record at a time. For very large batches (100k+ records), consider usingadd()with a PyArrow table directly.
Edge case: Duplicate detection In production, you'll encounter duplicate documents. The current implementation generates a new UUID for each chunk, so duplicates will be inserted. To handle this, add a deduplication step:
def check_duplicate(self, table_name: str, source: str) -> bool:
"""Check if a document source already exists in the table."""
table = self.db.open_table(table_name)
# LanceDB supports SQL-like filtering
result = table.search().where(f"source = '{source}'").limit(1).to_pandas()
return len(result) > 0
Implementing the Retrieval Service
Now let's build the retrieval component that handles similarity search with metadata filtering. This is where LanceDB's columnar storage shines—we can filter on metadata columns without loading entire vectors into memory.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import json
from typing import Optional, List
app = FastAPI(title="LanceDB RAG Service")
class SearchRequest(BaseModel):
query: str
table_name: str = "documents"
top_k: int = 5
metadata_filter: Optional[str] = None # SQL-like filter string
class SearchResult(BaseModel):
text: str
score: float
metadata: dict
source: str
class LanceRAGRetriever:
def __init__(self, db_path: str = "./lancedb_data",
embedding_model: str = "all-MiniLM-L6-v2"):
self.db = lancedb.connect(db_path)
self.embedder = SentenceTransformer(embedding_model)
def search(self, query: str, table_name: str = "documents",
top_k: int = 5, metadata_filter: Optional[str] = None) -> List[SearchResult]:
"""
Perform similarity search with optional metadata filtering.
Args:
query: Natural language query
table_name: LanceDB table to search
top_k: Number of results to return
metadata_filter: SQL-like filter (e.g., "source LIKE '%report%'")
Returns:
List of SearchResult objects sorted by relevance
"""
# Generate query embedding
query_embedding = self.embedder.encode(query, normalize_embeddings=True)
# Open table and configure search
table = self.db.open_table(table_name)
# Build search query
search_builder = table.search(query_embedding.tolist())
# Apply metadata filter if provided
if metadata_filter:
search_builder = search_builder.where(metadata_filter)
# Execute search with metric type
# LanceDB uses L2 distance by default, but we normalized embeddings
# so we can use cosine similarity via metric="cosine"
results = search_builder.limit(top_k).to_pandas()
# Convert to SearchResult objects
search_results = []
for _, row in results.iterrows():
# Parse metadata from JSON string
try:
metadata = json.loads(row["metadata"])
except (json.JSONDecodeError, KeyError):
metadata = {}
# LanceDB returns distance, convert to similarity score
# For cosine similarity: score = 1 - distance (when normalized)
score = 1.0 - row["_distance"] if "_distance" in row else 0.0
search_results.append(SearchResult(
text=row["text"],
score=max(0.0, min(1.0, score)), # Clamp to [0, 1]
metadata=metadata,
source=row.get("source", "unknown")
))
return search_results
# Initialize retriever
retriever = LanceRAGRetriever()
@app.post("/search", response_model=List[SearchResult])
async def search_endpoint(request: SearchRequest):
"""REST endpoint for vector search."""
try:
results = retriever.search(
query=request.query,
table_name=request.table_name,
top_k=request.top_k,
metadata_filter=request.metadata_filter
)
return results
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "database": "lancedb"}
Critical implementation details:
-
Distance to similarity conversion: LanceDB returns L2 distances by default. Since we normalized embeddings, the cosine similarity is
1 - (distance^2 / 2). However, for simplicity, we use1 - distancewhich works well for normalized vectors. For production, usemetric="cosine"in the search builder to get proper cosine distances. -
Metadata filtering syntax: LanceDB supports SQL-like WHERE clauses. For example:
source = 'document.pdf'chunk_index > 10metadata LIKE '%department:engineering%'(string matching on JSON)
-
Memory management: LanceDB uses memory-mapped files, so large tables don't consume RAM until accessed. However, each search loads the vector column for the entire dataset into memory temporarily. For tables with millions of vectors, consider using IVF-PQ indexing (available in LanceDB 0.12+).
Building the Generation Layer with Context Management
The final piece connects retrieval to an LLM for answer generation. We'll use LangChain's prompt templates and handle context window limits gracefully.
from langchain.prompts import ChatPromptTemplate
from langchain.schema import SystemMessage, HumanMessage
from langchain_community.chat_models import ChatOpenAI # Or any other provider
import tiktoken
class RAGGenerator:
def __init__(self, retriever: LanceRAGRetriever,
model_name: str = "gpt [7]-4o-mini",
max_context_tokens: int = 4000):
"""
Initialize RAG generator with context management.
Args:
retriever: LanceRAGRetriever instance
model_name: LLM model identifier
max_context_tokens: Maximum tokens for retrieved context
"""
self.retriever = retriever
self.llm = ChatOpenAI(model=model_name, temperature=0.1)
self.tokenizer = tiktoken.encoding_for_model("gpt-4") # Approximate
self.max_context_tokens = max_context_tokens
# Define prompt template
self.prompt = ChatPromptTemplate.from_messages([
SystemMessage(content=(
"You are a helpful assistant that answers questions based on provided context. "
"If the context doesn't contain relevant information, say so clearly. "
"Cite specific parts of the context when possible."
)),
HumanMessage(content=(
"Context:\n{context}\n\n"
"Question: {question}\n\n"
"Answer:"
))
])
def _truncate_context(self, texts: List[str], scores: List[float]) -> str:
"""
Truncate retrieved context to fit within token limits.
Prioritizes higher-scoring chunks when truncating.
"""
# Sort by score descending (already sorted from search)
combined_text = ""
total_tokens = 0
for text, score in zip(texts, scores):
text_tokens = len(self.tokenizer.encode(text))
if total_tokens + text_tokens > self.max_context_tokens:
# Truncate this chunk to fit remaining budget
remaining = self.max_context_tokens - total_tokens
if remaining > 50: # Only include if meaningful
truncated = self.tokenizer.decode(
self.tokenizer.encode(text)[:remaining]
)
combined_text += f"\n[Score: {score:.3f}] {truncated}..\n"
break
combined_text += f"\n[Score: {score:.3f}] {text}\n"
total_tokens += text_tokens
return combined_text
def generate(self, question: str, table_name: str = "documents",
top_k: int = 5, metadata_filter: Optional[str] = None) -> str:
"""
Generate answer using RAG pipeline.
Args:
question: User's question
table_name: LanceDB table to search
top_k: Number of chunks to retrieve
metadata_filter: Optional filter string
Returns:
Generated answer string
"""
# Retrieve relevant chunks
results = self.retriever.search(
query=question,
table_name=table_name,
top_k=top_k,
metadata_filter=metadata_filter
)
if not results:
return "No relevant documents found."
# Prepare context with token management
texts = [r.text for r in results]
scores = [r.score for r in results]
context = self._truncate_context(texts, scores)
# Generate response
messages = self.prompt.format_messages(
context=context,
question=question
)
response = self.llm.invoke(messages)
return response.content
# Example usage
if __name__ == "__main__":
# Initialize components
ingestor = LanceRAGIngestor()
ingestor.create_table(overwrite=True)
# Ingest a sample document
ingestor.ingest_document("sample_report.pdf", metadata={"department": "engineering"})
# Query
retriever = LanceRAGRetriever()
generator = RAGGenerator(retriever)
answer = generator.generate(
"What were the key findings in the Q2 report?",
metadata_filter="source = 'sample_report.pdf'"
)
print(f"Answer: {answer}")
Edge cases handled:
-
Context window overflow: The
_truncate_contextmethod ensures we never exceed the LLM's token limit. It prioritizes higher-scoring chunks and gracefully truncates the last chunk if needed. -
Empty results: Returns a clear message instead of crashing.
-
Score-based chunk ordering: Each chunk is prefixed with its relevance score, allowing the LLM to weigh evidence appropriately.
-
Metadata filtering: Enables document-level access control (e.g., only search documents from a specific department).
Production Deployment Considerations
For a production deployment, consider these additional factors:
Concurrent access: LanceDB supports multiple readers but only one writer at a time. If you need concurrent writes, implement a write queue or use LanceDB's built-in locking (available in v0.14+). For read-heavy workloads, create read replicas by copying the database directory.
Indexing for speed: For tables with >100k vectors, create an IVF-PQ index:
table.create_index(metric="cosine", num_partitions=256, num_sub_vectors=32)
This reduces search latency from O(n) to O(log n) at the cost of some recall.
Monitoring: Track these metrics:
- Embedding generation latency
- Search latency (p50, p95, p99)
- Context truncation rate (how often we hit token limits)
- Cache hit ratio (if using query caching)
Scaling: LanceDB's columnar format means you can store billions of vectors in a single table. For multi-terabyte datasets, shard across multiple LanceDB instances and use a router to distribute queries.
What's Next
This tutorial covered building a production-ready RAG pipeline with LanceDB and LangChain. To extend this system:
- Add hybrid search: Combine vector search with keyword search using LanceDB's full-text search capabilities (available in v0.16+)
- Implement caching: Cache frequent queries using Redis or LanceDB's built-in cache
- Add feedback loop: Collect user feedback on answers to fine-tune retrieval parameters
- Explore multi-modal RAG: LanceDB supports storing images and audio alongside text vectors
The complete code is available on GitHub. For more on RAG architectures, see our guide on advanced retrieval strategies and vector database comparison.
Remember: The key to production RAG is not just retrieval accuracy, but system reliability under load. LanceDB's embedded architecture eliminates network hops, making it ideal for latency-sensitive applications. Start with the code above, monitor your metrics, and iterate based on real usage patterns.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Build a Multimodal App with Gemini 2.0 Vision API
Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API
How to Build an AI Pentesting Assistant with LangChain
Practical tutorial: Build an AI-powered pentesting assistant
How to Build Autonomous Scientific Discovery Agents with EurekAgent
Practical tutorial: The story discusses a significant advancement in AI research that could impact autonomous scientific discovery.