How to Build AI Data Agents with LangChain and LanceDB
Practical tutorial: It introduces an open-source tool that could enhance data agent functionality within the AI ecosystem.
How to Build AI Data Agents with LangChain and LanceDB
Table of Contents
- How to Build AI Data Agents with LangChain and LanceDB
- Create a virtual environment
- Install core dependencies
- Initialize and test the vector store
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Building production-ready AI data agents that can query, transform, and analyze structured data remains one of the most challenging problems in the AI ecosystem. While large language models (LLMs) have made remarkable progress in natural language understanding, connecting them to real data sources with reliability and performance requires careful architectural decisions. In this tutorial, we'll build a complete data agent system using LangChain for orchestration and LanceDB for vector storage, creating an agent that can answer complex questions across multiple data sources with sub-second latency.
Why LanceDB Changes the Game for Data Agents
Traditional approaches to building data agents rely on either SQL databases with text-to-SQL pipelines or vector databases with semantic search. Both have significant limitations. Text-to-SQL systems struggle with ambiguous queries and complex joins, while pure vector search loses the structured query capabilities that make relational data valuable. LanceDB, an open-source vector database built on the Lance columnar format, bridges this gap by supporting both vector similarity search and SQL-like filtering on the same data.
According to the LanceDB documentation, the database achieves query latencies under 10 milliseconds for datasets with millions of vectors when using GPU acceleration, and it supports hybrid search combining vector similarity with metadata filtering. This makes it particularly suitable for data agents that need to answer questions like "Show me all products in the electronics category with customer satisfaction above 4.5 stars" - queries that require both semantic understanding and structured filtering.
The architecture we'll build uses LangChain's agent framework to orchestrate multiple tools: a LanceDB vector store for semantic search, a SQLite database for structured queries, and a Python REPL tool for data transformation. The agent will decide which tools to use based on the user's question, creating a flexible system that can handle diverse data analysis tasks.
Setting Up the LanceDB Vector Store
Before we can build our agent, we need to set up the infrastructure. We'll create a vector store containing product data with both embeddings and metadata, enabling the hybrid search capabilities that make LanceDB powerful.
Prerequisites and Environment Setup
First, let's set up our Python environment with all necessary dependencies:
# Create a virtual environment
python -m venv data-agent-env
source data-agent-env/bin/activate # On Windows: data-agent-env\Scripts\activate
# Install core dependencies
pip install langchain==0.3.1
pip install langchain-community==0.3.1
pip install lancedb==0.12.0
pip install openai [9]==1.51.0
pip install pandas==2.2.2
pip install numpy==1.26.4
pip install sqlite3 # Usually comes with Python
pip install pydantic==2.9.0
We'll use OpenAI's embedding model for generating vector representations, but the same approach works with any embedding provider supported by LangChain. The LanceDB Python SDK provides seamless integration with LangChain's vector store interface.
Creating the Vector Store with Hybrid Search Support
Let's build a comprehensive vector store that supports both semantic search and metadata filtering:
import lancedb
import pandas as pd
import numpy as np
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import LanceDB
from langchain.schema import Document
from typing import List, Dict, Any, Optional
import json
class ProductVectorStore:
"""
Production-ready vector store for product data with hybrid search capabilities.
Supports both vector similarity search and SQL-like metadata filtering.
"""
def __init__(self, db_path: str = "./lancedb_data"):
self.db = lancedb.connect(db_path)
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
dimensions=1536 # Default for text-embedding-3-small
)
self.table_name = "products"
def create_sample_data(self) -> List[Document]:
"""
Create sample product data with rich metadata for demonstration.
In production, this would connect to your actual data source.
"""
products = [
{
"product_id": "P001",
"name": "Wireless Bluetooth Headphones",
"category": "Electronics",
"price": 89.99,
"rating": 4.5,
"reviews_count": 1234,
"in_stock": True,
"description": "High-quality wireless headphones with noise cancellation, 30-hour battery life, and comfortable over-ear design."
},
{
"product_id": "P002",
"name": "Organic Cotton T-Shirt",
"category": "Clothing",
"price": 29.99,
"rating": 4.2,
"reviews_count": 567,
"in_stock": True,
"description": "Soft, sustainable organic cotton t-shirt available in multiple colors. Machine washable and ethically sourced."
},
{
"product_id": "P003",
"name": "Smart Home Security Camera",
"category": "Electronics",
"price": 149.99,
"rating": 4.0,
"reviews_count": 890,
"in_stock": False,
"description": "1080p HD security camera with night vision, motion detection, and smartphone alerts. Weatherproof for outdoor use."
},
{
"product_id": "P004",
"name": "Stainless Steel Water Bottle",
"category": "Home & Kitchen",
"price": 24.99,
"rating": 4.7,
"reviews_count": 2345,
"in_stock": True,
"description": "Double-walled vacuum insulated water bottle. Keeps drinks cold for 24 hours or hot for 12 hours. BPA-free."
},
{
"product_id": "P005",
"name": "Yoga Mat Premium",
"category": "Sports & Outdoors",
"price": 49.99,
"rating": 4.3,
"reviews_count": 1567,
"in_stock": True,
"description": "Extra thick, non-slip yoga mat with alignment lines. Eco-friendly TPE material with carrying strap."
}
]
documents = []
for product in products:
# Create a rich text representation for embedding
text_content = f"""
Product: {product['name']}
Category: {product['category']}
Price: ${product['price']}
Description: {product['description']}
"""
doc = Document(
page_content=text_content.strip(),
metadata={
"product_id": product["product_id"],
"name": product["name"],
"category": product["category"],
"price": product["price"],
"rating": product["rating"],
"reviews_count": product["reviews_count"],
"in_stock": product["in_stock"]
}
)
documents.append(doc)
return documents
def initialize_store(self, documents: List[Document]) -> LanceDB:
"""
Initialize LanceDB vector store with documents and embeddings.
Handles the case where the table already exists.
"""
try:
# Check if table exists
if self.table_name in self.db.table_names():
self.db.drop_table(self.table_name)
print(f"Dropped existing table: {self.table_name}")
# Create vector store with LanceDB
vector_store = LanceDB.from_documents(
documents=documents,
embedding=self.embeddings,
connection=self.db,
table_name=self.table_name,
mode="overwrite" # Replace existing data
)
print(f"Created vector store with {len(documents)} documents")
return vector_store
except Exception as e:
print(f"Error initializing vector store: {e}")
raise
def hybrid_search(
self,
query: str,
metadata_filter: Optional[Dict[str, Any]] = None,
k: int = 5
) -> List[Document]:
"""
Perform hybrid search combining vector similarity with metadata filtering.
Args:
query: Natural language query string
metadata_filter: Dictionary of metadata fields to filter on
k: Number of results to return
Returns:
List of matching documents
"""
vector_store = LanceDB(
connection=self.db,
table_name=self.table_name,
embedding=self.embeddings
)
# Build filter expression for LanceDB
filter_expr = None
if metadata_filter:
conditions = []
for key, value in metadata_filter.items():
if isinstance(value, str):
conditions.append(f"{key} = '{value}'")
elif isinstance(value, (int, float)):
conditions.append(f"{key} = {value}")
elif isinstance(value, bool):
conditions.append(f"{key} = {str(value).lower()}")
elif isinstance(value, dict):
# Support range queries like {"price": {"gte": 50, "lte": 100}}
for op, val in value.items():
if op == "gte":
conditions.append(f"{key} >= {val}")
elif op == "lte":
conditions.append(f"{key} <= {val}")
elif op == "gt":
conditions.append(f"{key} > {val}")
elif op == "lt":
conditions.append(f"{key} < {val}")
if conditions:
filter_expr = " AND ".join(conditions)
# Perform hybrid search
results = vector_store.similarity_search(
query=query,
k=k,
filter=filter_expr
)
return results
# Initialize and test the vector store
if __name__ == "__main__":
store = ProductVectorStore()
documents = store.create_sample_data()
vector_store = store.initialize_store(documents)
# Test hybrid search
results = store.hybrid_search(
query="affordable electronics for home security",
metadata_filter={"category": "Electronics", "price": {"lte": 200}},
k=3
)
print("\nHybrid Search Results:")
for doc in results:
print(f"- {doc.metadata['name']} (${doc.metadata['price']})")
print(f" Rating: {doc.metadata['rating']} stars")
print(f" In Stock: {doc.metadata['in_stock']}")
print()
This implementation demonstrates several production considerations:
- Error handling: We gracefully handle table existence conflicts and connection errors
- Flexible filtering: The
hybrid_searchmethod supports various filter types including equality, range queries, and boolean conditions - Rich metadata: Each document carries structured metadata that enables precise filtering
- Clean separation: The vector store logic is encapsulated in a reusable class
Building the Data Agent with LangChain
Now we'll create the data agent that orchestrates multiple tools to answer complex questions. The agent uses LangChain's ReAct (Reasoning + Acting) framework to decide which tools to use and in what order.
Core Agent Implementation
from langchain.agents import Tool, AgentExecutor, create_react_agent
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI
from langchain.memory import ConversationBufferMemory
from langchain.tools import BaseTool
from langchain.callbacks import StdOutCallbackHandler
from typing import Type
from pydantic import BaseModel, Field
import sqlite3
import pandas as pd
from datetime import datetime
class DataAnalysisAgent:
"""
Production-grade data agent that combines vector search, SQL queries,
and Python code execution for comprehensive data analysis.
"""
def __init__(self, vector_store: ProductVectorStore, sqlite_path: str = "./products.db"):
self.vector_store = vector_store
self.sqlite_path = sqlite_path
self.llm = OpenAI(
model="gpt [6]-4",
temperature=0.1, # Low temperature for deterministic behavior
max_tokens=2000
)
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# Initialize tools
self.tools = self._create_tools()
# Create agent
self.agent = self._create_agent()
def _create_tools(self) -> List[Tool]:
"""Create the tools available to the agent."""
# Tool 1: Vector Search with Metadata Filtering
def vector_search_tool(query: str) -> str:
"""
Search products using semantic understanding.
Input format: "query | filter_key=value, filter_key2=value2"
Example: "wireless headphones | category=Electronics, price_lte=200"
"""
try:
# Parse query and filters
parts = query.split("|")
search_query = parts[0].strip()
metadata_filter = None
if len(parts) > 1:
filter_parts = parts[1].strip().split(",")
metadata_filter = {}
for fp in filter_parts:
if "=" in fp:
key, value = fp.split("=", 1)
key = key.strip()
value = value.strip()
# Handle numeric values
if value.replace(".", "").isdigit():
value = float(value) if "." in value else int(value)
elif value.lower() in ["true", "false"]:
value = value.lower() == "true"
# Handle range queries
if "_lte" in key:
actual_key = key.replace("_lte", "")
metadata_filter[actual_key] = {"lte": value}
elif "_gte" in key:
actual_key = key.replace("_gte", "")
metadata_filter[actual_key] = {"gte": value}
else:
metadata_filter[key] = value
results = self.vector_store.hybrid_search(
query=search_query,
metadata_filter=metadata_filter,
k=5
)
if not results:
return "No products found matching your criteria."
output = "Found products:\n"
for doc in results:
meta = doc.metadata
output += f"- {meta['name']} (${meta['price']:.2f}) | "
output += f"Rating: {meta['rating']}/5 | "
output += f"Category: {meta['category']} | "
output += f"In Stock: {'Yes' if meta['in_stock'] else 'No'}\n"
return output
except Exception as e:
return f"Error performing vector search: {str(e)}"
# Tool 2: SQL Query Execution
def sql_query_tool(query: str) -> str:
"""
Execute SQL queries against the product database.
Input: Valid SQL SELECT query
Example: "SELECT category, AVG(price) as avg_price FROM products GROUP BY category"
"""
try:
conn = sqlite3.connect(self.sqlite_path)
df = pd.read_sql_query(query, conn)
conn.close()
if df.empty:
return "Query returned no results."
# Format results as readable text
output = f"Query results ({len(df)} rows):\n"
output += df.to_string(index=False)
return output
except Exception as e:
return f"Error executing SQL query: {str(e)}"
# Tool 3: Python Code Execution for Data Analysis
def python_analysis_tool(code: str) -> str:
"""
Execute Python code for data analysis and transformation.
Available variables: products_df (pandas DataFrame with all product data)
Input: Valid Python code that produces output via print()
"""
try:
# Create a restricted execution environment
local_vars = {
"products_df": self._get_products_dataframe(),
"pd": pd,
"np": np
}
# Execute the code
exec(code, {"__builtins__": {}}, local_vars)
return "Code executed successfully."
except Exception as e:
return f"Error executing Python code: {str(e)}"
return [
Tool(
name="VectorSearch",
func=vector_search_tool,
description="Search products using natural language with optional metadata filters. "
"Use for semantic search questions like 'find comfortable headphones' "
"or 'show me affordable electronics'. Supports filters with | syntax."
),
Tool(
name="SQLQuery",
func=sql_query_tool,
description="Execute SQL queries against the product database. "
"Use for aggregation, statistics, and structured queries. "
"Tables: products(product_id, name, category, price, rating, reviews_count, in_stock)"
),
Tool(
name="PythonAnalysis",
func=python_analysis_tool,
description="Execute Python code for custom data analysis. "
"Use for complex calculations, visualizations, or transformations. "
"Available: products_df (pandas DataFrame)"
)
]
def _get_products_dataframe(self) -> pd.DataFrame:
"""Retrieve all products from SQLite as a DataFrame."""
conn = sqlite3.connect(self.sqlite_path)
df = pd.read_sql_query("SELECT * FROM products", conn)
conn.close()
return df
def _create_agent(self) -> AgentExecutor:
"""Create the ReAct agent with tools and memory."""
prompt = PromptTemplate.from_template("""
You are a helpful data analysis assistant. You have access to the following tools:
{tools}
Use these tools to answer questions about products and data.
Think step by step about which tool to use.
When you need to search for products semantically, use VectorSearch.
When you need to compute statistics or aggregate data, use SQLQuery.
When you need to perform custom analysis, use PythonAnalysis.
Chat History:
{chat_history}
Question: {input}
{agent_scratchpad}
""")
agent = create_react_agent(
llm=self.llm,
tools=self.tools,
prompt=prompt
)
return AgentExecutor(
agent=agent,
tools=self.tools,
memory=self.memory,
verbose=True,
max_iterations=5, # Prevent infinite loops
handle_parsing_errors=True
)
def query(self, question: str) -> str:
"""
Process a user question and return the answer.
Handles edge cases like empty questions and API errors.
"""
if not question or not question.strip():
return "Please provide a valid question."
try:
response = self.agent.invoke({"input": question})
return response["output"]
except Exception as e:
return f"I encountered an error processing your question: {str(e)}"
Setting Up the SQLite Database
We need to populate our SQLite database with the same product data for SQL queries:
def setup_sqlite_database(db_path: str = "./products.db"):
"""Initialize SQLite database with product data."""
conn = sqlite3.connect(db_path)
cursor [8] = conn.cursor()
# Create products table
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
product_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
category TEXT NOT NULL,
price REAL NOT NULL,
rating REAL NOT NULL,
reviews_count INTEGER NOT NULL,
in_stock INTEGER NOT NULL
)
""")
# Insert sample data
products = [
("P001", "Wireless Bluetooth Headphones", "Electronics", 89.99, 4.5, 1234, 1),
("P002", "Organic Cotton T-Shirt", "Clothing", 29.99, 4.2, 567, 1),
("P003", "Smart Home Security Camera", "Electronics", 149.99, 4.0, 890, 0),
("P004", "Stainless Steel Water Bottle", "Home & Kitchen", 24.99, 4.7, 2345, 1),
("P005", "Yoga Mat Premium", "Sports & Outdoors", 49.99, 4.3, 1567, 1)
]
cursor.executemany(
"INSERT OR REPLACE INTO products VALUES (?, ?, ?, ?, ?, ?, ?)",
products
)
conn.commit()
conn.close()
print(f"SQLite database created at {db_path}")
# Initialize both stores
if __name__ == "__main__":
# Setup SQLite
setup_sqlite_database()
# Setup LanceDB
store = ProductVectorStore()
documents = store.create_sample_data()
vector_store = store.initialize_store(documents)
# Create and test the agent
agent = DataAnalysisAgent(vector_store=store)
# Test queries
test_questions = [
"Find me affordable electronics under $100",
"What is the average price of products in each category?",
"Which products have the highest customer ratings?",
"Show me products that are out of stock"
]
for question in test_questions:
print(f"\n{'='*60}")
print(f"Question: {question}")
print(f"{'='*60}")
answer = agent.query(question)
print(f"Answer: {answer}")
Production Considerations and Edge Cases
Building a production data agent requires handling numerous edge cases that can break naive implementations. Here are critical considerations we've addressed:
Rate Limiting and API Costs
When using OpenAI's API, you'll encounter rate limits and costs. According to OpenAI's pricing page, GPT-4 costs $0.03 per 1K input tokens and $0.06 per 1K output tokens as of 2024. For production deployments, implement:
import time
from functools import wraps
from typing import Callable
def rate_limiter(max_calls: int = 60, time_window: int = 60):
"""Rate limiter decorator for API calls."""
calls = []
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
now = time.time()
# Remove old calls
calls[:] = [c for c in calls if now - c < time_window]
if len(calls) >= max_calls:
wait_time = time_window - (now - calls[0])
time.sleep(wait_time)
calls.append(now)
return func(*args, **kwargs)
return wrapper
return decorator
Memory Management for Large Datasets
LanceDB handles large datasets efficiently, but you should still be mindful of memory usage. The Lance columnar format uses memory mapping, which means it can work with datasets larger than available RAM. However, when loading data into pandas DataFrames for analysis, you may encounter memory issues:
def process_large_dataset_in_chunks(vector_store, query: str, chunk_size: int = 1000):
"""Process large datasets in chunks to manage memory."""
results = []
offset = 0
while True:
chunk = vector_store.similarity_search(
query=query,
k=chunk_size,
offset=offset
)
if not chunk:
break
results.extend(chunk)
offset += chunk_size
# Process chunk to free memory
yield chunk
return results
Error Recovery and Retry Logic
Network failures and transient errors are inevitable in production. Implement retry logic with exponential backoff:
import random
from tenacity import retry, stop_after_attempt, wait_exponential
class ResilientAgent:
"""Agent wrapper with retry logic for production reliability."""
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
reraise=True
)
def query_with_retry(self, question: str) -> str:
"""Query the agent with automatic retry on failure."""
try:
return self.agent.query(question)
except (ConnectionError, TimeoutError) as e:
print(f"Transient error, retrying: {e}")
raise
except Exception as e:
# Don't retry on business logic errors
return f"Error: {str(e)}"
Handling Ambiguous Queries
Data agents frequently encounter ambiguous questions. Implement clarification mechanisms:
def handle_ambiguous_query(agent, question: str) -> str:
"""Detect and handle ambiguous queries by asking clarifying questions."""
ambiguity_patterns = [
"cheapest", "best", "top", "popular"
]
if any(pattern in question.lower() for pattern in ambiguity_patterns):
# Ask for clarification
clarification = agent.query(
f"The user asked: '{question}'. "
f"Ask them to clarify what they mean by '{question.split()[-1]}'. "
f"Provide specific options."
)
return clarification
return agent.query(question)
Performance Optimization and Benchmarking
For production deployments, you should benchmark your agent's performance. Here's a simple benchmarking framework:
import time
import statistics
from typing import List, Tuple
class AgentBenchmark:
"""Benchmark agent performance across multiple queries."""
def __init__(self, agent: DataAnalysisAgent):
self.agent = agent
self.results: List[Tuple[str, float, str]] = []
def run_benchmark(self, test_queries: List[str], iterations: int = 3):
"""Run benchmark with multiple iterations for statistical significance."""
for query in test_queries:
times = []
for i in range(iterations):
start = time.time()
response = self.agent.query(query)
elapsed = time.time() - start
times.append(elapsed)
avg_time = statistics.mean(times)
std_time = statistics.stdev(times) if len(times) > 1 else 0
self.results.append((query, avg_time, std_time))
print(f"Query: '{query[:50]}..'")
print(f" Average time: {avg_time:.2f}s ± {std_time:.2f}s")
def get_summary(self) -> dict:
"""Get summary statistics."""
all_times = [r[1] for r in self.results]
return {
"total_queries": len(self.results),
"avg_time": statistics.mean(all_times),
"max_time": max(all_times),
"min_time": min(all_times),
"std_time": statistics.stdev(all_times) if len(all_times) > 1 else 0
}
# Run benchmark
if __name__ == "__main__":
benchmark = AgentBenchmark(agent)
test_queries = [
"Find me affordable electronics",
"What is the average rating of all products?",
"Show me products with high ratings and low prices",
"Which categories have the most products?"
]
benchmark.run_benchmark(test_queries, iterations=3)
summary = benchmark.get_summary()
print(f"\nBenchmark Summary: {summary}")
What's Next
This tutorial has shown you how to build a production-ready data agent using LangChain and LanceDB. The architecture we've built handles hybrid search combining vector similarity with metadata filtering, SQL query execution for structured analysis, and Python code execution for custom transformations.
To extend this system for production use, consider:
- Adding authentication and authorization: Implement API keys and user-specific data access controls
- Implementing caching: Cache frequent queries using Redis or similar to reduce API costs
- Adding monitoring: Integrate with Prometheus or Datadog for production observability
- Expanding data sources: Connect to PostgreSQL, MongoDB, or cloud storage services
- Implementing feedback loops: Allow users to rate responses and use that data to improve the agent
The combination of LangChain's agent framework with LanceDB's hybrid search capabilities creates a powerful foundation for building AI data agents that can handle real-world complexity. As the AI ecosystem continues to evolve, tools like these will become increasingly essential for organizations looking to leverage their data through natural language interfaces.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Analyze Security Logs with DeepSeek Locally
Practical tutorial: Analyze security logs with DeepSeek locally
How to Build a Multimodal App with Gemini 2.0 Vision API
Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API
How to Build an AI Research Assistant with Perplexity API
Practical tutorial: Create an AI research assistant with Perplexity API