Back to Tutorials
tutorialstutorialai

How to Build AI Data Agents with LangChain and LanceDB

Practical tutorial: It introduces an open-source tool that could enhance data agent functionality within the AI ecosystem.

BlogIA AcademyMay 29, 202617 min read3 265 words

How to Build AI Data Agents with LangChain and LanceDB

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Building production-ready AI data agents that can query, transform, and analyze structured data remains one of the most challenging problems in the AI ecosystem. While large language models (LLMs) have made remarkable progress in natural language understanding, connecting them to real data sources with reliability and performance requires careful architectural decisions. In this tutorial, we'll build a complete data agent system using LangChain for orchestration and LanceDB for vector storage, creating an agent that can answer complex questions across multiple data sources with sub-second latency.

Why LanceDB Changes the Game for Data Agents

Traditional approaches to building data agents rely on either SQL databases with text-to-SQL pipelines or vector databases with semantic search. Both have significant limitations. Text-to-SQL systems struggle with ambiguous queries and complex joins, while pure vector search loses the structured query capabilities that make relational data valuable. LanceDB, an open-source vector database built on the Lance columnar format, bridges this gap by supporting both vector similarity search and SQL-like filtering on the same data.

According to the LanceDB documentation, the database achieves query latencies under 10 milliseconds for datasets with millions of vectors when using GPU acceleration, and it supports hybrid search combining vector similarity with metadata filtering. This makes it particularly suitable for data agents that need to answer questions like "Show me all products in the electronics category with customer satisfaction above 4.5 stars" - queries that require both semantic understanding and structured filtering.

The architecture we'll build uses LangChain's agent framework to orchestrate multiple tools: a LanceDB vector store for semantic search, a SQLite database for structured queries, and a Python REPL tool for data transformation. The agent will decide which tools to use based on the user's question, creating a flexible system that can handle diverse data analysis tasks.

Setting Up the LanceDB Vector Store

Before we can build our agent, we need to set up the infrastructure. We'll create a vector store containing product data with both embeddings and metadata, enabling the hybrid search capabilities that make LanceDB powerful.

Prerequisites and Environment Setup

First, let's set up our Python environment with all necessary dependencies:

# Create a virtual environment
python -m venv data-agent-env
source data-agent-env/bin/activate  # On Windows: data-agent-env\Scripts\activate

# Install core dependencies
pip install langchain==0.3.1
pip install langchain-community==0.3.1
pip install lancedb==0.12.0
pip install openai [9]==1.51.0
pip install pandas==2.2.2
pip install numpy==1.26.4
pip install sqlite3  # Usually comes with Python
pip install pydantic==2.9.0

We'll use OpenAI's embedding model for generating vector representations, but the same approach works with any embedding provider supported by LangChain. The LanceDB Python SDK provides seamless integration with LangChain's vector store interface.

Creating the Vector Store with Hybrid Search Support

Let's build a comprehensive vector store that supports both semantic search and metadata filtering:

import lancedb
import pandas as pd
import numpy as np
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import LanceDB
from langchain.schema import Document
from typing import List, Dict, Any, Optional
import json

class ProductVectorStore:
    """
    Production-ready vector store for product data with hybrid search capabilities.
    Supports both vector similarity search and SQL-like metadata filtering.
    """

    def __init__(self, db_path: str = "./lancedb_data"):
        self.db = lancedb.connect(db_path)
        self.embeddings = OpenAIEmbeddings(
            model="text-embedding-3-small",
            dimensions=1536  # Default for text-embedding-3-small
        )
        self.table_name = "products"

    def create_sample_data(self) -> List[Document]:
        """
        Create sample product data with rich metadata for demonstration.
        In production, this would connect to your actual data source.
        """
        products = [
            {
                "product_id": "P001",
                "name": "Wireless Bluetooth Headphones",
                "category": "Electronics",
                "price": 89.99,
                "rating": 4.5,
                "reviews_count": 1234,
                "in_stock": True,
                "description": "High-quality wireless headphones with noise cancellation, 30-hour battery life, and comfortable over-ear design."
            },
            {
                "product_id": "P002",
                "name": "Organic Cotton T-Shirt",
                "category": "Clothing",
                "price": 29.99,
                "rating": 4.2,
                "reviews_count": 567,
                "in_stock": True,
                "description": "Soft, sustainable organic cotton t-shirt available in multiple colors. Machine washable and ethically sourced."
            },
            {
                "product_id": "P003",
                "name": "Smart Home Security Camera",
                "category": "Electronics",
                "price": 149.99,
                "rating": 4.0,
                "reviews_count": 890,
                "in_stock": False,
                "description": "1080p HD security camera with night vision, motion detection, and smartphone alerts. Weatherproof for outdoor use."
            },
            {
                "product_id": "P004",
                "name": "Stainless Steel Water Bottle",
                "category": "Home & Kitchen",
                "price": 24.99,
                "rating": 4.7,
                "reviews_count": 2345,
                "in_stock": True,
                "description": "Double-walled vacuum insulated water bottle. Keeps drinks cold for 24 hours or hot for 12 hours. BPA-free."
            },
            {
                "product_id": "P005",
                "name": "Yoga Mat Premium",
                "category": "Sports & Outdoors",
                "price": 49.99,
                "rating": 4.3,
                "reviews_count": 1567,
                "in_stock": True,
                "description": "Extra thick, non-slip yoga mat with alignment lines. Eco-friendly TPE material with carrying strap."
            }
        ]

        documents = []
        for product in products:
            # Create a rich text representation for embedding
            text_content = f"""
            Product: {product['name']}
            Category: {product['category']}
            Price: ${product['price']}
            Description: {product['description']}
            """

            doc = Document(
                page_content=text_content.strip(),
                metadata={
                    "product_id": product["product_id"],
                    "name": product["name"],
                    "category": product["category"],
                    "price": product["price"],
                    "rating": product["rating"],
                    "reviews_count": product["reviews_count"],
                    "in_stock": product["in_stock"]
                }
            )
            documents.append(doc)

        return documents

    def initialize_store(self, documents: List[Document]) -> LanceDB:
        """
        Initialize LanceDB vector store with documents and embeddings.
        Handles the case where the table already exists.
        """
        try:
            # Check if table exists
            if self.table_name in self.db.table_names():
                self.db.drop_table(self.table_name)
                print(f"Dropped existing table: {self.table_name}")

            # Create vector store with LanceDB
            vector_store = LanceDB.from_documents(
                documents=documents,
                embedding=self.embeddings,
                connection=self.db,
                table_name=self.table_name,
                mode="overwrite"  # Replace existing data
            )

            print(f"Created vector store with {len(documents)} documents")
            return vector_store

        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise

    def hybrid_search(
        self, 
        query: str, 
        metadata_filter: Optional[Dict[str, Any]] = None,
        k: int = 5
    ) -> List[Document]:
        """
        Perform hybrid search combining vector similarity with metadata filtering.

        Args:
            query: Natural language query string
            metadata_filter: Dictionary of metadata fields to filter on
            k: Number of results to return

        Returns:
            List of matching documents
        """
        vector_store = LanceDB(
            connection=self.db,
            table_name=self.table_name,
            embedding=self.embeddings
        )

        # Build filter expression for LanceDB
        filter_expr = None
        if metadata_filter:
            conditions = []
            for key, value in metadata_filter.items():
                if isinstance(value, str):
                    conditions.append(f"{key} = '{value}'")
                elif isinstance(value, (int, float)):
                    conditions.append(f"{key} = {value}")
                elif isinstance(value, bool):
                    conditions.append(f"{key} = {str(value).lower()}")
                elif isinstance(value, dict):
                    # Support range queries like {"price": {"gte": 50, "lte": 100}}
                    for op, val in value.items():
                        if op == "gte":
                            conditions.append(f"{key} >= {val}")
                        elif op == "lte":
                            conditions.append(f"{key} <= {val}")
                        elif op == "gt":
                            conditions.append(f"{key} > {val}")
                        elif op == "lt":
                            conditions.append(f"{key} < {val}")

            if conditions:
                filter_expr = " AND ".join(conditions)

        # Perform hybrid search
        results = vector_store.similarity_search(
            query=query,
            k=k,
            filter=filter_expr
        )

        return results

# Initialize and test the vector store
if __name__ == "__main__":
    store = ProductVectorStore()
    documents = store.create_sample_data()
    vector_store = store.initialize_store(documents)

    # Test hybrid search
    results = store.hybrid_search(
        query="affordable electronics for home security",
        metadata_filter={"category": "Electronics", "price": {"lte": 200}},
        k=3
    )

    print("\nHybrid Search Results:")
    for doc in results:
        print(f"- {doc.metadata['name']} (${doc.metadata['price']})")
        print(f"  Rating: {doc.metadata['rating']} stars")
        print(f"  In Stock: {doc.metadata['in_stock']}")
        print()

This implementation demonstrates several production considerations:

  1. Error handling: We gracefully handle table existence conflicts and connection errors
  2. Flexible filtering: The hybrid_search method supports various filter types including equality, range queries, and boolean conditions
  3. Rich metadata: Each document carries structured metadata that enables precise filtering
  4. Clean separation: The vector store logic is encapsulated in a reusable class

Building the Data Agent with LangChain

Now we'll create the data agent that orchestrates multiple tools to answer complex questions. The agent uses LangChain's ReAct (Reasoning + Acting) framework to decide which tools to use and in what order.

Core Agent Implementation

from langchain.agents import Tool, AgentExecutor, create_react_agent
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI
from langchain.memory import ConversationBufferMemory
from langchain.tools import BaseTool
from langchain.callbacks import StdOutCallbackHandler
from typing import Type
from pydantic import BaseModel, Field
import sqlite3
import pandas as pd
from datetime import datetime

class DataAnalysisAgent:
    """
    Production-grade data agent that combines vector search, SQL queries,
    and Python code execution for comprehensive data analysis.
    """

    def __init__(self, vector_store: ProductVectorStore, sqlite_path: str = "./products.db"):
        self.vector_store = vector_store
        self.sqlite_path = sqlite_path
        self.llm = OpenAI(
            model="gpt [6]-4",
            temperature=0.1,  # Low temperature for deterministic behavior
            max_tokens=2000
        )
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )

        # Initialize tools
        self.tools = self._create_tools()

        # Create agent
        self.agent = self._create_agent()

    def _create_tools(self) -> List[Tool]:
        """Create the tools available to the agent."""

        # Tool 1: Vector Search with Metadata Filtering
        def vector_search_tool(query: str) -> str:
            """
            Search products using semantic understanding.
            Input format: "query | filter_key=value, filter_key2=value2"
            Example: "wireless headphones | category=Electronics, price_lte=200"
            """
            try:
                # Parse query and filters
                parts = query.split("|")
                search_query = parts[0].strip()

                metadata_filter = None
                if len(parts) > 1:
                    filter_parts = parts[1].strip().split(",")
                    metadata_filter = {}
                    for fp in filter_parts:
                        if "=" in fp:
                            key, value = fp.split("=", 1)
                            key = key.strip()
                            value = value.strip()

                            # Handle numeric values
                            if value.replace(".", "").isdigit():
                                value = float(value) if "." in value else int(value)
                            elif value.lower() in ["true", "false"]:
                                value = value.lower() == "true"

                            # Handle range queries
                            if "_lte" in key:
                                actual_key = key.replace("_lte", "")
                                metadata_filter[actual_key] = {"lte": value}
                            elif "_gte" in key:
                                actual_key = key.replace("_gte", "")
                                metadata_filter[actual_key] = {"gte": value}
                            else:
                                metadata_filter[key] = value

                results = self.vector_store.hybrid_search(
                    query=search_query,
                    metadata_filter=metadata_filter,
                    k=5
                )

                if not results:
                    return "No products found matching your criteria."

                output = "Found products:\n"
                for doc in results:
                    meta = doc.metadata
                    output += f"- {meta['name']} (${meta['price']:.2f}) | "
                    output += f"Rating: {meta['rating']}/5 | "
                    output += f"Category: {meta['category']} | "
                    output += f"In Stock: {'Yes' if meta['in_stock'] else 'No'}\n"

                return output

            except Exception as e:
                return f"Error performing vector search: {str(e)}"

        # Tool 2: SQL Query Execution
        def sql_query_tool(query: str) -> str:
            """
            Execute SQL queries against the product database.
            Input: Valid SQL SELECT query
            Example: "SELECT category, AVG(price) as avg_price FROM products GROUP BY category"
            """
            try:
                conn = sqlite3.connect(self.sqlite_path)
                df = pd.read_sql_query(query, conn)
                conn.close()

                if df.empty:
                    return "Query returned no results."

                # Format results as readable text
                output = f"Query results ({len(df)} rows):\n"
                output += df.to_string(index=False)
                return output

            except Exception as e:
                return f"Error executing SQL query: {str(e)}"

        # Tool 3: Python Code Execution for Data Analysis
        def python_analysis_tool(code: str) -> str:
            """
            Execute Python code for data analysis and transformation.
            Available variables: products_df (pandas DataFrame with all product data)
            Input: Valid Python code that produces output via print()
            """
            try:
                # Create a restricted execution environment
                local_vars = {
                    "products_df": self._get_products_dataframe(),
                    "pd": pd,
                    "np": np
                }

                # Execute the code
                exec(code, {"__builtins__": {}}, local_vars)

                return "Code executed successfully."

            except Exception as e:
                return f"Error executing Python code: {str(e)}"

        return [
            Tool(
                name="VectorSearch",
                func=vector_search_tool,
                description="Search products using natural language with optional metadata filters. "
                          "Use for semantic search questions like 'find comfortable headphones' "
                          "or 'show me affordable electronics'. Supports filters with | syntax."
            ),
            Tool(
                name="SQLQuery",
                func=sql_query_tool,
                description="Execute SQL queries against the product database. "
                          "Use for aggregation, statistics, and structured queries. "
                          "Tables: products(product_id, name, category, price, rating, reviews_count, in_stock)"
            ),
            Tool(
                name="PythonAnalysis",
                func=python_analysis_tool,
                description="Execute Python code for custom data analysis. "
                          "Use for complex calculations, visualizations, or transformations. "
                          "Available: products_df (pandas DataFrame)"
            )
        ]

    def _get_products_dataframe(self) -> pd.DataFrame:
        """Retrieve all products from SQLite as a DataFrame."""
        conn = sqlite3.connect(self.sqlite_path)
        df = pd.read_sql_query("SELECT * FROM products", conn)
        conn.close()
        return df

    def _create_agent(self) -> AgentExecutor:
        """Create the ReAct agent with tools and memory."""

        prompt = PromptTemplate.from_template("""
        You are a helpful data analysis assistant. You have access to the following tools:

        {tools}

        Use these tools to answer questions about products and data. 
        Think step by step about which tool to use.

        When you need to search for products semantically, use VectorSearch.
        When you need to compute statistics or aggregate data, use SQLQuery.
        When you need to perform custom analysis, use PythonAnalysis.

        Chat History:
        {chat_history}

        Question: {input}

        {agent_scratchpad}
        """)

        agent = create_react_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=prompt
        )

        return AgentExecutor(
            agent=agent,
            tools=self.tools,
            memory=self.memory,
            verbose=True,
            max_iterations=5,  # Prevent infinite loops
            handle_parsing_errors=True
        )

    def query(self, question: str) -> str:
        """
        Process a user question and return the answer.
        Handles edge cases like empty questions and API errors.
        """
        if not question or not question.strip():
            return "Please provide a valid question."

        try:
            response = self.agent.invoke({"input": question})
            return response["output"]
        except Exception as e:
            return f"I encountered an error processing your question: {str(e)}"

Setting Up the SQLite Database

We need to populate our SQLite database with the same product data for SQL queries:

def setup_sqlite_database(db_path: str = "./products.db"):
    """Initialize SQLite database with product data."""
    conn = sqlite3.connect(db_path)
    cursor [8] = conn.cursor()

    # Create products table
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS products (
            product_id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            category TEXT NOT NULL,
            price REAL NOT NULL,
            rating REAL NOT NULL,
            reviews_count INTEGER NOT NULL,
            in_stock INTEGER NOT NULL
        )
    """)

    # Insert sample data
    products = [
        ("P001", "Wireless Bluetooth Headphones", "Electronics", 89.99, 4.5, 1234, 1),
        ("P002", "Organic Cotton T-Shirt", "Clothing", 29.99, 4.2, 567, 1),
        ("P003", "Smart Home Security Camera", "Electronics", 149.99, 4.0, 890, 0),
        ("P004", "Stainless Steel Water Bottle", "Home & Kitchen", 24.99, 4.7, 2345, 1),
        ("P005", "Yoga Mat Premium", "Sports & Outdoors", 49.99, 4.3, 1567, 1)
    ]

    cursor.executemany(
        "INSERT OR REPLACE INTO products VALUES (?, ?, ?, ?, ?, ?, ?)",
        products
    )

    conn.commit()
    conn.close()
    print(f"SQLite database created at {db_path}")

# Initialize both stores
if __name__ == "__main__":
    # Setup SQLite
    setup_sqlite_database()

    # Setup LanceDB
    store = ProductVectorStore()
    documents = store.create_sample_data()
    vector_store = store.initialize_store(documents)

    # Create and test the agent
    agent = DataAnalysisAgent(vector_store=store)

    # Test queries
    test_questions = [
        "Find me affordable electronics under $100",
        "What is the average price of products in each category?",
        "Which products have the highest customer ratings?",
        "Show me products that are out of stock"
    ]

    for question in test_questions:
        print(f"\n{'='*60}")
        print(f"Question: {question}")
        print(f"{'='*60}")
        answer = agent.query(question)
        print(f"Answer: {answer}")

Production Considerations and Edge Cases

Building a production data agent requires handling numerous edge cases that can break naive implementations. Here are critical considerations we've addressed:

Rate Limiting and API Costs

When using OpenAI's API, you'll encounter rate limits and costs. According to OpenAI's pricing page, GPT-4 costs $0.03 per 1K input tokens and $0.06 per 1K output tokens as of 2024. For production deployments, implement:

import time
from functools import wraps
from typing import Callable

def rate_limiter(max_calls: int = 60, time_window: int = 60):
    """Rate limiter decorator for API calls."""
    calls = []

    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            now = time.time()
            # Remove old calls
            calls[:] = [c for c in calls if now - c < time_window]

            if len(calls) >= max_calls:
                wait_time = time_window - (now - calls[0])
                time.sleep(wait_time)

            calls.append(now)
            return func(*args, **kwargs)
        return wrapper
    return decorator

Memory Management for Large Datasets

LanceDB handles large datasets efficiently, but you should still be mindful of memory usage. The Lance columnar format uses memory mapping, which means it can work with datasets larger than available RAM. However, when loading data into pandas DataFrames for analysis, you may encounter memory issues:

def process_large_dataset_in_chunks(vector_store, query: str, chunk_size: int = 1000):
    """Process large datasets in chunks to manage memory."""
    results = []
    offset = 0

    while True:
        chunk = vector_store.similarity_search(
            query=query,
            k=chunk_size,
            offset=offset
        )

        if not chunk:
            break

        results.extend(chunk)
        offset += chunk_size

        # Process chunk to free memory
        yield chunk

    return results

Error Recovery and Retry Logic

Network failures and transient errors are inevitable in production. Implement retry logic with exponential backoff:

import random
from tenacity import retry, stop_after_attempt, wait_exponential

class ResilientAgent:
    """Agent wrapper with retry logic for production reliability."""

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        reraise=True
    )
    def query_with_retry(self, question: str) -> str:
        """Query the agent with automatic retry on failure."""
        try:
            return self.agent.query(question)
        except (ConnectionError, TimeoutError) as e:
            print(f"Transient error, retrying: {e}")
            raise
        except Exception as e:
            # Don't retry on business logic errors
            return f"Error: {str(e)}"

Handling Ambiguous Queries

Data agents frequently encounter ambiguous questions. Implement clarification mechanisms:

def handle_ambiguous_query(agent, question: str) -> str:
    """Detect and handle ambiguous queries by asking clarifying questions."""
    ambiguity_patterns = [
        "cheapest", "best", "top", "popular"
    ]

    if any(pattern in question.lower() for pattern in ambiguity_patterns):
        # Ask for clarification
        clarification = agent.query(
            f"The user asked: '{question}'. "
            f"Ask them to clarify what they mean by '{question.split()[-1]}'. "
            f"Provide specific options."
        )
        return clarification

    return agent.query(question)

Performance Optimization and Benchmarking

For production deployments, you should benchmark your agent's performance. Here's a simple benchmarking framework:

import time
import statistics
from typing import List, Tuple

class AgentBenchmark:
    """Benchmark agent performance across multiple queries."""

    def __init__(self, agent: DataAnalysisAgent):
        self.agent = agent
        self.results: List[Tuple[str, float, str]] = []

    def run_benchmark(self, test_queries: List[str], iterations: int = 3):
        """Run benchmark with multiple iterations for statistical significance."""

        for query in test_queries:
            times = []
            for i in range(iterations):
                start = time.time()
                response = self.agent.query(query)
                elapsed = time.time() - start
                times.append(elapsed)

            avg_time = statistics.mean(times)
            std_time = statistics.stdev(times) if len(times) > 1 else 0

            self.results.append((query, avg_time, std_time))
            print(f"Query: '{query[:50]}..'")
            print(f"  Average time: {avg_time:.2f}s ± {std_time:.2f}s")

    def get_summary(self) -> dict:
        """Get summary statistics."""
        all_times = [r[1] for r in self.results]
        return {
            "total_queries": len(self.results),
            "avg_time": statistics.mean(all_times),
            "max_time": max(all_times),
            "min_time": min(all_times),
            "std_time": statistics.stdev(all_times) if len(all_times) > 1 else 0
        }

# Run benchmark
if __name__ == "__main__":
    benchmark = AgentBenchmark(agent)
    test_queries = [
        "Find me affordable electronics",
        "What is the average rating of all products?",
        "Show me products with high ratings and low prices",
        "Which categories have the most products?"
    ]

    benchmark.run_benchmark(test_queries, iterations=3)
    summary = benchmark.get_summary()
    print(f"\nBenchmark Summary: {summary}")

What's Next

This tutorial has shown you how to build a production-ready data agent using LangChain and LanceDB. The architecture we've built handles hybrid search combining vector similarity with metadata filtering, SQL query execution for structured analysis, and Python code execution for custom transformations.

To extend this system for production use, consider:

  1. Adding authentication and authorization: Implement API keys and user-specific data access controls
  2. Implementing caching: Cache frequent queries using Redis or similar to reduce API costs
  3. Adding monitoring: Integrate with Prometheus or Datadog for production observability
  4. Expanding data sources: Connect to PostgreSQL, MongoDB, or cloud storage services
  5. Implementing feedback loops: Allow users to rate responses and use that data to improve the agent

The combination of LangChain's agent framework with LanceDB's hybrid search capabilities creates a powerful foundation for building AI data agents that can handle real-world complexity. As the AI ecosystem continues to evolve, tools like these will become increasingly essential for organizations looking to leverage their data through natural language interfaces.


References

1. Wikipedia - Cursor. Wikipedia. [Source]
2. Wikipedia - OpenAI. Wikipedia. [Source]
3. Wikipedia - GPT. Wikipedia. [Source]
4. GitHub - affaan-m/ECC. Github. [Source]
5. GitHub - openai/openai-python. Github. [Source]
6. GitHub - Significant-Gravitas/AutoGPT. Github. [Source]
7. GitHub - fighting41love/funNLP. Github. [Source]
8. Cursor Pricing. Pricing. [Source]
9. OpenAI Pricing. Pricing. [Source]
tutorialai
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles