How to Build a RAG Pipeline with LangChain and LanceDB 2026
Practical tutorial: It clarifies a fundamental aspect of AI technology that can influence industry understanding and discourse.
How to Build a RAG Pipeline with LangChain and LanceDB 2026
Table of Contents
- How to Build a RAG Pipeline with LangChain and LanceDB 2026
- Create a virtual environment
- Install core dependencies
- ingest.py
- vector_store.py
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Retrieval-Augmented Generation (RAG) has become the de facto architecture for grounding large language models in private or domain-specific data. Instead of fine-tuning a model—which is expensive, brittle, and requires retraining—RAG retrieves relevant documents at inference time and injects them into the prompt context. This approach reduces hallucinations, enables real-time knowledge updates, and keeps sensitive data off the model provider's servers.
In this tutorial, you will build a production-ready RAG pipeline using LangChain for orchestration and LanceDB as the vector store. LanceDB is a columnar, disk-based vector database [1] built on the Lance columnar format. Unlike in-memory stores like FAISS or Chroma, LanceDB handles datasets of millions of vectors without exhausting RAM, making it suitable for enterprise-scale retrieval.
By the end of this tutorial, you will have a FastAPI service that ingests PDF documents, chunks them, embeds them into LanceDB, and answers questions using OpenAI's GPT-4o-mini (or any LangChain-compatible LLM). We will cover chunking strategies, embedding cache invalidation, concurrent ingestion, and query-time filtering.
Architecture Overview: Why LanceDB for Production RAG
A typical RAG pipeline has four stages: ingestion, embedding, retrieval, and generation. The choice of vector store determines scalability, query latency, and operational complexity.
LanceDB differs from alternatives in three critical ways:
-
Disk-based storage: LanceDB stores vectors and metadata in the Lance columnar format on disk. This means you can store billions of vectors on a single machine with a standard SSD. In contrast, FAISS (in-memory) requires RAM proportional to dataset size, and Pinecone [8]/Weaviate are managed services with egress costs.
-
Zero-copy reads: LanceDB memory-maps the data files, so queries read directly from disk cache without deserialization overhead. Benchmarks from the LanceDB team show query latency under 10ms for 10M vectors on an NVMe drive.
-
Native filtering: LanceDB supports SQL-like
whereclauses on metadata columns before vector search. This is crucial for production RAG where you need to filter by document source, date range, or access permissions. In-memory stores often require post-filtering, which degrades recall.
Our architecture will use LangChain's LanceDB wrapper, which abstracts the connection and provides a familiar vectorstore.similarity_search() interface. The ingestion pipeline will use langchain_community.document_loaders for PDFs and RecursiveCharacterTextSplitter for chunking.
Prerequisites and Environment Setup
You need Python 3.10+ and the following packages. We pin versions to avoid breaking changes.
# Create a virtual environment
python -m venv rag_env
source rag_env/bin/activate # On Windows: rag_env\Scripts\activate
# Install core dependencies
pip install langchain==0.3.14 langchain-community==0.3.14 langchain-openai==0.2.14
pip install lancedb==0.12.0 pypdf==5.1.0 fastapi==0.115.6 uvicorn==0.34.0
pip install python-multipart==0.0.18 # For file uploads in FastAPI
Set your OpenAI API key as an environment variable:
export OPENAI_API_KEY="sk-your-key-here"
If you prefer a local LLM, you can swap the ChatOpenAI call for Ollama [9] or llama-cpp-python. The pipeline remains identical.
Step 1: Document Ingestion and Chunking Strategy
The quality of a RAG system depends heavily on chunking. Too large chunks dilute relevance; too small chunks lose context. We use RecursiveCharacterTextSplitter with a chunk size of 1000 characters and overlap of 200 characters. This is a reasonable default for technical documents.
# ingest.py
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List
from langchain_core.documents import Document
def load_and_chunk_pdf(file_path: str) -> List[Document]:
"""
Load a PDF and split into overlapping chunks.
Handles multi-page documents and preserves page numbers in metadata.
"""
loader = PyPDFLoader(file_path)
documents = loader.load() # Each page is a Document with metadata['source'] and metadata['page']
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ".", " ", ""],
length_function=len,
)
chunks = text_splitter.split_documents(documents)
# Add chunk index to metadata for traceability
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_index"] = i
chunk.metadata["total_chunks"] = len(chunks)
return chunks
Edge case: PDFs with scanned images (no extractable text) will return empty documents. You should check len(documents) and raise a clear error. In production, integrate an OCR service like Tesseract.
Step 2: Embedding and LanceDB Vector Store Setup
We use OpenAI's text-embedding-3-small model (1536 dimensions) for embeddings. LanceDB stores these embeddings along with the chunk text and metadata.
# vector_store.py
import lancedb
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import LanceDB
from langchain_core.documents import Document
from typing import List
import os
def create_vector_store(
table_name: str = "doc_chunks",
uri: str = "./lancedb_data",
reset: bool = False
) -> LanceDB:
"""
Initialize or connect to a LanceDB vector store.
Args:
table_name: Name of the LanceDB table.
uri: Directory path for LanceDB storage.
reset: If True, drop existing table and recreate.
Returns:
LanceDB vector store instance.
"""
db = lancedb.connect(uri)
if reset and table_name in db.table_names():
db.drop_table(table_name)
print(f"Dropped existing table '{table_name}'")
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key=os.getenv("OPENAI_API_KEY")
)
vector_store = LanceDB(
uri=uri,
table_name=table_name,
embedding=embeddings,
mode="overwrite" if reset else "append"
)
return vector_store
def add_documents_to_store(vector_store: LanceDB, documents: List[Document]):
"""
Add documents to the vector store with batching to handle API rate limits.
OpenAI's embedding API has a rate limit of 3,000 RPM for tier-1 accounts.
We batch in groups of 100 to stay safe.
"""
batch_size = 100
total = len(documents)
for i in range(0, total, batch_size):
batch = documents[i:i+batch_size]
vector_store.add_documents(batch)
print(f"Indexed {min(i+batch_size, total)}/{total} chunks")
Memory consideration: LanceDB writes embeddings to disk incrementally. The mode="append" parameter ensures we don't overwrite existing data when adding new documents. If you need to re-index, pass reset=True.
Step 3: Retrieval and Generation with LangChain
The retrieval chain uses a RetrievalQA pipeline. We add a MaxMarginalRelevance (MMR) search to diversify results and avoid redundant chunks.
# rag_chain.py
from langchain_openai import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_community.vectorstores import LanceDB
# Custom prompt template to enforce grounded answers
RAG_PROMPT = PromptTemplate(
template="""You are a helpful assistant. Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know. Do not make up information.
Always cite the source document and page number from the context.
Context:
{context}
Question: {question}
Answer (with citations):""",
input_variables=["context", "question"]
)
def build_rag_chain(vector_store: LanceDB):
"""
Build a RetrievalQA chain with MMR search and custom prompt.
"""
llm = ChatOpenAI(
model="gpt-4o-mini", # Fast and cost-effective
temperature=0.0, # Deterministic for factual QA
max_tokens=1024
)
retriever = vector_store.as_retriever(
search_type="mmr", # Max Marginal Relevance
search_kwargs={
"k": 5, # Retrieve top 5 chunks
"fetch_k": 20, # Fetch 20 candidates for MMR diversity
"lambda_mult": 0.5 # Balance relevance vs diversity
}
)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff", # Stuff all context into prompt
retriever=retriever,
chain_type_kwargs={"prompt": RAG_PROMPT},
return_source_documents=True # Include sources in response
)
return qa_chain
Why MMR? Standard similarity_search can return near-duplicate chunks from the same paragraph. MMR penalizes redundancy, giving you broader coverage of the document. The lambda_mult parameter controls the trade-off: 0.5 is a balanced default.
Step 4: FastAPI Service with Concurrent Ingestion
We wrap everything in a FastAPI application with two endpoints: /ingest for uploading PDFs and /query for asking questions.
# app.py
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
import tempfile
import os
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from ingest import load_and_chunk_pdf
from vector_store import create_vector_store, add_documents_to_store
from rag_chain import build_rag_chain
app = FastAPI(title="RAG Pipeline with LangChain and LanceDB")
# Global state (in production, use a proper state manager)
vector_store = None
qa_chain = None
executor = ThreadPoolExecutor(max_workers=2) # For CPU-bound PDF parsing
@app.on_event("startup")
async def startup_event():
global vector_store, qa_chain
vector_store = create_vector_store(reset=False)
qa_chain = build_rag_chain(vector_store)
print("Vector store and QA chain initialized")
@app.post("/ingest")
async def ingest_pdf(file: UploadFile = File(..)):
"""
Upload a PDF, chunk it, and add to the vector store.
Runs PDF parsing in a thread pool to avoid blocking the event loop.
"""
if not file.filename.endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF files are supported")
# Save uploaded file to temporary location
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
try:
# Run CPU-bound PDF loading in thread pool
loop = asyncio.get_event_loop()
chunks = await loop.run_in_executor(executor, load_and_chunk_pdf, tmp_path)
if not chunks:
raise HTTPException(status_code=400, detail="No text could be extracted from PDF")
# Add to vector store (this is I/O bound, but LanceDB is thread-safe)
add_documents_to_store(vector_store, chunks)
return JSONResponse({
"status": "success",
"chunks_indexed": len(chunks),
"filename": file.filename
})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
os.unlink(tmp_path) # Clean up temp file
@app.post("/query")
async def query_rag(question: str, top_k: Optional[int] = 5):
"""
Ask a question against the indexed documents.
"""
if not qa_chain:
raise HTTPException(status_code=503, detail="System not initialized")
if not question.strip():
raise HTTPException(status_code=400, detail="Question cannot be empty")
try:
result = qa_chain.invoke({"query": question})
# Extract source documents for citation
sources = []
for doc in result.get("source_documents", []):
sources.append({
"content_preview": doc.page_content[:200],
"source": doc.metadata.get("source", "unknown"),
"page": doc.metadata.get("page", "unknown"),
"chunk_index": doc.metadata.get("chunk_index", "unknown")
})
return JSONResponse({
"answer": result["result"],
"sources": sources
})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Concurrency note: FastAPI's async endpoints handle many concurrent requests, but PDF parsing with PyPDFLoader is CPU-bound. We offload it to a ThreadPoolExecutor to prevent blocking the event loop. LanceDB operations are thread-safe for reads, but writes should be serialized or use a queue in production.
Step 5: Running the Service and Testing
Start the server:
uvicorn app:app --reload --port 8000
Ingest a PDF (using curl or any HTTP client):
curl -X POST http://localhost:8000/ingest \
-F "file=@/path/to/your/document.pdf"
Expected response:
{
"status": "success",
"chunks_indexed": 47,
"filename": "document.pdf"
}
Query the system:
curl -X POST "http://localhost:8000/query?question=What+is+the+main+conclusion+of+the+paper%3F"
Response includes the answer and source citations with page numbers.
Edge Cases and Production Considerations
1. Embedding Cache Invalidation
If you update a document, the old embeddings remain in LanceDB. You have two strategies:
- Delete by metadata: Use
vector_store.delete(filter={"source": "old_document.pdf"})before re-ingesting. - Versioned tables: Create a new table for each document version and use a routing layer.
LanceDB supports delete() with a filter expression. The current implementation does not handle updates—you must explicitly delete old entries.
2. API Rate Limits
OpenAI's embedding API has rate limits. Our batch size of 100 is conservative for tier-1 accounts (3,000 RPM). For larger datasets, implement exponential backoff:
import time
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def add_with_retry(vector_store, batch):
vector_store.add_documents(batch)
3. Memory Usage
LanceDB memory-maps the data files, so the OS manages caching. On a machine with 16GB RAM, you can index up to ~50M vectors (1536 dimensions) before hitting swap. Monitor lancedb's cache size with:
db = lancedb.connect(uri)
print(f"Table size: {db.open_table(table_name).count_rows()} rows")
4. Security: Prompt Injection
The RAG prompt instructs the LLM to refuse ungrounded answers. However, a malicious user could craft a question that overrides the system prompt. Mitigations:
- Use a separate LLM call to classify the question as "in-scope" vs "out-of-scope".
- Sanitize retrieved chunks to remove prompt-injection patterns (e.g., "ignore previous instructions").
What's Next
You now have a production-grade RAG pipeline that can ingest PDFs and answer questions with citations. The architecture is modular: swap OpenAIEmbeddings for HuggingFaceEmbeddings to run fully local, or replace ChatOpenAI with Ollama [7] for air-gapped deployments.
Next steps to extend this system:
- Hybrid search: Combine vector similarity with keyword (BM25) search using LanceDB's FTS (full-text search) index. This improves recall for exact matches like product codes or names.
- Multi-modal RAG: Use LanceDB to store image embeddings alongside text, enabling questions like "Show me the chart on page 5."
- Streaming responses: Modify the
/queryendpoint to useStreamingResponsefor real-time token-by-token output. - Authentication: Add API key validation using FastAPI dependencies.
For deeper dives, explore our guides on advanced chunking strategies and vector database comparison.
The complete code for this tutorial is available on GitHub. Remember to never hardcode API keys—use environment variables or a secrets manager in production.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Build a Gmail AI Assistant with Google Gemini
Practical tutorial: It represents an incremental improvement in user interface and interaction with existing technology.
How to Build a Production ML API with FastAPI and Modal
Practical tutorial: Build a production ML API with FastAPI + Modal
How to Build a Voice Assistant with Whisper and Llama 3.3
Practical tutorial: Build a voice assistant with Whisper + Llama 3.3