Back to Tutorials
tutorialstutorialairag

How to Build a Semantic Search Engine with Qdrant and OpenAI Embeddings

Practical tutorial: Build a semantic search engine with Qdrant and text-embedding-3

BlogIA AcademyJune 19, 202615 min read2 911 words

How to Build a Semantic Search Engine with Qdrant and OpenAI Embeddings

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Semantic search has transformed how we retrieve information from unstructured text. Unlike traditional keyword-based search, semantic search understands the meaning behind queries, enabling more relevant results even when exact terms don't match. In this tutorial, you'll build a production-ready semantic search engine using Qdrant as the vector database [3] and OpenAI's text-embedding-3 model for generating embeddings.

This system can index thousands of documents and return semantically similar results in milliseconds. We'll cover architecture decisions, implementation details, and edge cases you'll encounter in real-world deployments.

Understanding the Architecture: Why Qdrant and text-embedding-3

Before writing code, let's understand why this combination works well for production semantic search.

Qdrant is a vector similarity search engine written in Rust. According to available documentation, it offers built-in filtering, payload storag [2]e, and horizontal scaling. Unlike FAISS which is primarily an indexing library, Qdrant operates as a full database service with persistence, CRUD operations, and client-server architecture. This makes it suitable for applications where you need to add, update, or delete vectors without rebuilding the entire index.

OpenAI's text-embedding-3 model, released in early 2024, represents a significant improvement over its predecessor text-embedding-ada-002. The model produces 1536-dimensional vectors by default (with the text-embedding-3-small variant) and supports a configurable output dimension parameter. This allows you to trade off between storage efficiency and retrieval accuracy.

The architecture follows a standard pattern:

  1. Ingestion pipeline: Documents are chunked, embedded, and stored in Qdrant with metadata
  2. Query pipeline: User queries are embedded using the same model, then searched against the vector index
  3. Reranking (optional): Initial results can be refined using cross-encoders or additional filters

Prerequisites and Environment Setup

You'll need Python 3.10+ and a running Qdrant instance. For local development, Docker is the simplest approach.

# Create a virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install qdrant-client==1.9.1 openai==1.30.0 tiktoken==0.7.0 pydantic==2.7.0

For Qdrant, you have two options:

Option 1: Docker (recommended for development)

docker run -p 6333:6333 -p 6334:6334 \
    -v $(pwd)/qdrant_storage:/qdrant/storage:z \
    qdrant/qdrant:v1.9.1

Option 2: Qdrant Cloud (for production) Sign up at cloud.qdrant.io and create a cluster. You'll receive a URL and API key.

Set your OpenAI API key as an environment variable:

export OPENAI_API_KEY="sk-your-key-here"

Core Implementation: Building the Semantic Search Pipeline

Step 1: Document Chunking Strategy

Semantic search requires breaking documents into manageable chunks. The chunk size directly impacts retrieval quality. Too large, and you lose semantic focus; too small, and you lose context.

For this tutorial, we'll implement a recursive character text splitter with overlap. This preserves sentence boundaries while maintaining context between chunks.

import tiktoken
from typing import List, Dict, Any

def chunk_document(
    text: str, 
    chunk_size: int = 512, 
    chunk_overlap: int = 128,
    model_name: str = "text-embedding-3-small"
) -> List[Dict[str, Any]]:
    """
    Split a document into overlapping chunks using token-aware splitting.

    Args:
        text: The full document text
        chunk_size: Maximum tokens per chunk
        chunk_overlap: Number of overlapping tokens between chunks
        model_name: Used to select the correct tokenizer

    Returns:
        List of chunk dictionaries with text and metadata
    """
    encoder = tiktoken.encoding_for_model(model_name)
    tokens = encoder.encode(text)

    chunks = []
    start = 0

    while start < len(tokens):
        # Get chunk tokens
        end = min(start + chunk_size, len(tokens))
        chunk_tokens = tokens[start:end]

        # Decode back to text
        chunk_text = encoder.decode(chunk_tokens)

        # Calculate start/end character positions (approximate)
        # This helps with source document navigation
        char_start = len(encoder.decode(tokens[:start]))
        char_end = len(encoder.decode(tokens[:end]))

        chunks.append({
            "text": chunk_text,
            "start_char": char_start,
            "end_char": char_end,
            "token_count": len(chunk_tokens)
        })

        # Move start position, accounting for overlap
        start += chunk_size - chunk_overlap

    return chunks

Edge case consideration: When chunking scientific papers (like those referenced in our sources), pay attention to equations and special characters. The tiktoken tokenizer handles Unicode correctly, but you may want to strip LaTeX or markdown formatting before embedding to avoid noise.

Step 2: Embedding Generation with text-embedding-3

OpenAI's embedding API has rate limits and costs. For production, you should implement retry logic and batch processing.

import time
from openai import OpenAI
from tenacity import retry, stop_after_attempt, wait_exponential

class EmbeddingGenerator:
    def __init__(
        self, 
        api_key: str,
        model: str = "text-embedding-3-small",
        dimensions: int = 1536,
        batch_size: int = 100
    ):
        self.client = OpenAI(api_key=api_key)
        self.model = model
        self.dimensions = dimensions
        self.batch_size = batch_size

        # text-embedding-3-small supports dimensions parameter
        # This allows you to reduce vector size for storage efficiency
        if model == "text-embedding-3-small":
            self.max_dimensions = 1536
        elif model == "text-embedding-3-large":
            self.max_dimensions = 3072
        else:
            self.max_dimensions = dimensions

        if dimensions > self.max_dimensions:
            raise ValueError(
                f"Model {model} supports max {self.max_dimensions} dimensions, "
                f"requested {dimensions}"
            )

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    def embed_batch(self, texts: List[str]) -> List[List[float]]:
        """
        Generate embeddings for a batch of texts with retry logic.

        Args:
            texts: List of text strings to embed

        Returns:
            List of embedding vectors
        """
        response = self.client.embeddings.create(
            model=self.model,
            input=texts,
            dimensions=self.dimensions
        )

        # Sort by index to maintain order
        sorted_embeddings = sorted(
            response.data, 
            key=lambda x: x.index
        )

        return [item.embedding for item in sorted_embeddings]

    def embed_documents(
        self, 
        chunks: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """
        Generate embeddings for all document chunks.
        Handles rate limiting by processing in batches.
        """
        texts = [chunk["text"] for chunk in chunks]
        all_embeddings = []

        for i in range(0, len(texts), self.batch_size):
            batch = texts[i:i + self.batch_size]

            try:
                embeddings = self.embed_batch(batch)
                all_embeddings.extend(embeddings)

                # Rate limiting: OpenAI recommends 3000 RPM for tier 1
                # Adjust based on your tier
                time.sleep(0.5)

            except Exception as e:
                print(f"Batch {i//self.batch_size} failed: {e}")
                # Re-raise after logging; retry decorator handles retries
                raise

        # Attach embeddings to chunks
        for chunk, embedding in zip(chunks, all_embeddings):
            chunk["embedding"] = embedding

        return chunks

Production note: The dimensions parameter in text-embedding-3 is a powerful feature. By reducing dimensions from 1536 to 256, you can achieve 6x storage reduction while maintaining ~95% of retrieval accuracy for most use cases (based on OpenAI's published benchmarks). This is particularly valuable when working with large document collections.

Step 3: Qdrant Collection Setup and Indexing

Now we'll set up the Qdrant collection with appropriate configuration for cosine similarity search.

from qdrant_client import QdrantClient
from qdrant_client.http import models
from qdrant_client.http.models import Distance, VectorParams

class SemanticSearchIndex:
    def __init__(
        self,
        host: str = "localhost",
        port: int = 6333,
        api_key: str = None,
        collection_name: str = "documents",
        vector_size: int = 1536,
        use_grpc: bool = False  # gRPC is faster but requires additional setup
    ):
        self.client = QdrantClient(
            host=host,
            port=port,
            api_key=api_key,
            prefer_grpc=use_grpc
        )
        self.collection_name = collection_name
        self.vector_size = vector_size

        # Ensure collection exists with proper configuration
        self._ensure_collection()

    def _ensure_collection(self):
        """Create collection if it doesn't exist, with proper indexing."""
        collections = self.client.get_collections().collections
        exists = any(
            c.name == self.collection_name 
            for c in collections
        )

        if not exists:
            self.client.create_collection(
                collection_name=self.collection_name,
                vectors_config=VectorParams(
                    size=self.vector_size,
                    distance=Distance.COSINE
                ),
                # Optimize for HNSW index
                hnsw_config=models.HNSWConfig(
                    m=16,  # Number of edges per node
                    ef_construct=100,  # Size of dynamic candidate list
                    full_scan_threshold=10000  # Threshold for full scan
                ),
                # Enable on-disk storage for large datasets
                on_disk_payload=True,
                # Optimize for fast writes
                optimizers_config=models.OptimizersConfigDiff(
                    default_segment_number=2,
                    memmap_threshold_kb=20000
                )
            )
            print(f"Created collection: {self.collection_name}")
        else:
            print(f"Collection {self.collection_name} already exists")

    def index_documents(
        self,
        chunks: List[Dict[str, Any]],
        batch_size: int = 100
    ) -> int:
        """
        Index document chunks into Qdrant.

        Args:
            chunks: List of chunks with 'embedding', 'text', and metadata
            batch_size: Number of points to upload per batch

        Returns:
            Number of points indexed
        """
        points = []

        for idx, chunk in enumerate(chunks):
            # Generate a unique ID (use hash of text for idempotency)
            point_id = hash(chunk["text"]) & 0x7FFFFFFFFFFFFFFF

            # Prepare payload with metadata
            payload = {
                "text": chunk["text"],
                "start_char": chunk.get("start_char", 0),
                "end_char": chunk.get("end_char", 0),
                "token_count": chunk.get("token_count", 0),
                "source": chunk.get("source", "unknown"),
                "chunk_index": idx
            }

            points.append(models.PointStruct(
                id=point_id,
                vector=chunk["embedding"],
                payload=payload
            ))

        # Upload in batches
        total_indexed = 0
        for i in range(0, len(points), batch_size):
            batch = points[i:i + batch_size]

            self.client.upsert(
                collection_name=self.collection_name,
                points=batch,
                wait=True  # Wait for indexing to complete
            )
            total_indexed += len(batch)
            print(f"Indexed {total_indexed}/{len(points)} points")

        return total_indexed

Architecture decision: We use wait=True during upsert to ensure data durability. In high-throughput production systems, you might set this to False and handle eventual consistency. The trade-off is write latency vs. read-after-write consistency.

Step 4: Search Implementation with Filtering

The search function needs to handle multiple edge cases: empty queries, insufficient results, and payload filtering.

from typing import List, Optional, Tuple

class SemanticSearchEngine:
    def __init__(
        self,
        index: SemanticSearchIndex,
        embedding_generator: EmbeddingGenerator,
        top_k: int = 10,
        score_threshold: float = 0.5
    ):
        self.index = index
        self.embedding_generator = embedding_generator
        self.top_k = top_k
        self.score_threshold = score_threshold

    def search(
        self,
        query: str,
        filter_conditions: Optional[Dict[str, Any]] = None,
        top_k: Optional[int] = None
    ) -> List[Dict[str, Any]]:
        """
        Perform semantic search with optional metadata filtering.

        Args:
            query: Natural language search query
            filter_conditions: Optional Qdrant filter conditions
            top_k: Number of results to return (overrides default)

        Returns:
            List of search results with scores and payloads
        """
        if not query or not query.strip():
            return []

        # Generate embedding for query
        query_embedding = self.embedding_generator.embed_batch([query])[0]

        # Build search parameters
        search_params = models.SearchParams(
            hnsw_ef=128,  # Higher values = more accurate but slower
            exact=False   # Use approximate search for speed
        )

        # Prepare filter if provided
        query_filter = None
        if filter_conditions:
            must_conditions = []

            for key, value in filter_conditions.items():
                if isinstance(value, list):
                    # Array contains filter
                    must_conditions.append(
                        models.FieldCondition(
                            key=key,
                            match=models.MatchAny(any=value)
                        )
                    )
                elif isinstance(value, dict) and "gte" in value:
                    # Range filter
                    must_conditions.append(
                        models.FieldCondition(
                            key=key,
                            range=models.Range(
                                gte=value.get("gte"),
                                lte=value.get("lte")
                            )
                        )
                    )
                else:
                    # Exact match
                    must_conditions.append(
                        models.FieldCondition(
                            key=key,
                            match=models.MatchValue(value=value)
                        )
                    )

            if must_conditions:
                query_filter = models.Filter(
                    must=must_conditions
                )

        # Execute search
        results = self.index.client.search(
            collection_name=self.index.collection_name,
            query_vector=query_embedding,
            query_filter=query_filter,
            limit=top_k or self.top_k,
            search_params=search_params,
            with_payload=True,
            with_vectors=False  # Don't return vectors in results
        )

        # Process and filter results
        processed_results = []
        for result in results:
            if result.score < self.score_threshold:
                continue

            processed_results.append({
                "id": result.id,
                "score": result.score,
                "text": result.payload.get("text", ""),
                "source": result.payload.get("source", ""),
                "chunk_index": result.payload.get("chunk_index", 0),
                "metadata": {
                    k: v for k, v in result.payload.items()
                    if k not in ["text", "source", "chunk_index"]
                }
            })

        return processed_results

    def hybrid_search(
        self,
        query: str,
        keyword_weight: float = 0.3,
        semantic_weight: float = 0.7,
        filter_conditions: Optional[Dict[str, Any]] = None
    ) -> List[Dict[str, Any]]:
        """
        Combine semantic search with keyword matching for better results.

        This is useful for domain-specific terms that embeddings might miss.
        """
        # Get semantic results
        semantic_results = self.search(
            query, 
            filter_conditions=filter_conditions,
            top_k=self.top_k * 2  # Get more results for fusion
        )

        # Get keyword results using Qdrant's full-text search
        keyword_results = self.index.client.scroll(
            collection_name=self.index.collection_name,
            scroll_filter=models.Filter(
                must=[
                    models.FieldCondition(
                        key="text",
                        match=models.MatchText(text=query)
                    )
                ]
            ),
            limit=self.top_k * 2,
            with_payload=True
        )[0]

        # Reciprocal Rank Fusion (RRF)
        # Combine scores from both methods
        combined_scores = {}

        for rank, result in enumerate(semantic_results):
            combined_scores[result["id"]] = {
                "semantic_rank": rank + 1,
                "keyword_rank": None,
                "score": result["score"],
                "text": result["text"],
                "source": result["source"],
                "metadata": result["metadata"]
            }

        for rank, result in enumerate(keyword_results):
            if result.id in combined_scores:
                combined_scores[result.id]["keyword_rank"] = rank + 1
            else:
                combined_scores[result.id] = {
                    "semantic_rank": None,
                    "keyword_rank": rank + 1,
                    "score": 0.0,
                    "text": result.payload.get("text", ""),
                    "source": result.payload.get("source", ""),
                    "metadata": {}
                }

        # Calculate RRF scores
        for doc_id, scores in combined_scores.items():
            rrf_score = 0.0

            if scores["semantic_rank"] is not None:
                rrf_score += semantic_weight / (60 + scores["semantic_rank"])

            if scores["keyword_rank"] is not None:
                rrf_score += keyword_weight / (60 + scores["keyword_rank"])

            scores["rrf_score"] = rrf_score

        # Sort by RRF score and return top_k
        sorted_results = sorted(
            combined_scores.values(),
            key=lambda x: x["rrf_score"],
            reverse=True
        )

        return sorted_results[:self.top_k]

Edge case handling: The hybrid_search method addresses a critical limitation of pure semantic search: domain-specific terminology. For example, searching for "B^0_s→μ^+μ^- decay" (a physics term from our reference papers) might not match semantically if the embedding model hasn't seen similar patterns. The keyword component catches exact matches while semantic search handles paraphrases.

Production Considerations and Optimization

Memory Management for Large Collections

When indexing millions of documents, memory becomes a concern. Qdrant supports memory-mapped storage, but you should still monitor RAM usage.

# Configure Qdrant client for memory-constrained environments
client = QdrantClient(
    host="localhost",
    port=6333,
    prefer_grpc=True,  # gRPC uses less memory than REST
    grpc_port=6334,
    timeout=30
)

# Use smaller batch sizes for indexing
indexer.index_documents(chunks, batch_size=50)  # Default is 100

Handling API Rate Limits

OpenAI's embedding API has rate limits based on your tier. According to OpenAI's documentation, Tier 1 users get 3,000 requests per minute (RPM) and 1,000,000 tokens per minute (TPM). Implement exponential backoff and batch processing to stay within limits.

import asyncio
from aiolimiter import AsyncLimiter

class AsyncEmbeddingGenerator:
    def __init__(self, rpm_limit: int = 3000, tpm_limit: int = 1000000):
        self.rate_limiter = AsyncLimiter(rpm_limit, 60)  # RPM
        self.token_limiter = AsyncLimiter(tpm_limit, 60)  # TPM

    async def embed_with_rate_limit(self, texts: List[str]) -> List[List[float]]:
        async with self.rate_limiter:
            # Estimate tokens (rough: 1 token ≈ 4 characters)
            estimated_tokens = sum(len(t) // 4 for t in texts)
            async with self.token_limiter:
                # Actual API call here
                pass

Error Recovery and Idempotency

Network failures during indexing can leave your collection in an inconsistent state. Use idempotent point IDs (based on content hash) so that retrying the same document doesn't create duplicates.

import hashlib

def generate_point_id(text: str) -> int:
    """Generate a deterministic point ID from text content."""
    hash_bytes = hashlib.sha256(text.encode()).digest()
    # Convert to 64-bit integer (Qdrant supports up to 2^63)
    return int.from_bytes(hash_bytes[:8], 'big') & 0x7FFFFFFFFFFFFFFF

Testing the Complete Pipeline

Let's put everything together with a real example using scientific abstracts (similar to our reference papers).

def main():
    # Initialize components
    embedding_gen = EmbeddingGenerator(
        api_key=os.environ["OPENAI_API_KEY"],
        model="text-embedding-3-small",
        dimensions=512  # Reduced for efficiency
    )

    index = SemanticSearchIndex(
        host="localhost",
        port=6333,
        collection_name="scientific_papers",
        vector_size=512
    )

    engine = SemanticSearchEngine(
        index=index,
        embedding_generator=embedding_gen,
        top_k=5,
        score_threshold=0.6
    )

    # Sample documents (in practice, load from files)
    documents = [
        {
            "text": "Observation of the rare B^0_s→μ^+μ^- decay from the combined analysis of CMS and LHCb data. This decay is a sensitive probe of new physics beyond the Standard Model.",
            "source": "arxiv:1411.4413"
        },
        {
            "text": "Expected Performance of the ATLAS Experiment - Detector, Trigger and Physics. This paper describes the expected performance of the ATLAS detector at the Large Hadron Collider.",
            "source": "arxiv:0901.0512"
        },
        {
            "text": "Deep Search for Joint Sources of Gravitational Waves and High-Energy Neutrinos with IceCube. We present results of a search for joint sources of gravitational waves and high-energy neutrinos.",
            "source": "arxiv:2105.13160"
        }
    ]

    # Index documents
    all_chunks = []
    for doc in documents:
        chunks = chunk_document(doc["text"], chunk_size=256, chunk_overlap=64)
        for chunk in chunks:
            chunk["source"] = doc["source"]
        all_chunks.extend(chunks)

    # Generate embeddings and index
    indexed_chunks = embedding_gen.embed_documents(all_chunks)
    num_indexed = index.index_documents(indexed_chunks)
    print(f"Indexed {num_indexed} chunks")

    # Perform search
    results = engine.search("rare particle decays beyond standard model")

    print("\nSearch Results:")
    for result in results:
        print(f"Score: {result['score']:.4f}")
        print(f"Text: {result['text'][:100]}..")
        print(f"Source: {result['source']}")
        print("---")

    # Test hybrid search
    hybrid_results = engine.hybrid_search("B^0_s→μ^+μ^-")
    print("\nHybrid Search Results:")
    for result in hybrid_results:
        print(f"RRF Score: {result['rrf_score']:.4f}")
        print(f"Text: {result['text'][:100]}..")
        print("---")

if __name__ == "__main__":
    main()

Performance Benchmarks and Scaling

Based on production deployments documented in Qdrant's case studies, a single node with 16GB RAM can handle:

  • Indexing: ~10,000 vectors/second (1536 dimensions)
  • Search: ~1,000 queries/second (p99 latency < 50ms)
  • Storage: ~1 million vectors with payloads in ~4GB

For larger collections, Qdrant supports sharding across multiple nodes. The HNSW index parameters (m and ef_construct) directly impact the trade-off between memory usage and search speed. Higher m values (up to 64) improve recall but increase memory consumption linearly.

What's Next

This semantic search engine provides a solid foundation, but production systems often require additional capabilities:

  1. Multi-modal search: Extend to search across images, audio, or video by using appropriate embedding models
  2. Real-time indexing: Implement streaming ingestion with Kafka or RabbitMQ for live document updates
  3. A/B testing framework: Compare different embedding models or chunking strategies using metrics like NDCG or MRR
  4. Feedback loop: Incorporate user click data to fine-tune ranking weights

For further reading, explore Qdrant's filtering documentation for complex metadata queries, or OpenAI's embedding guide for advanced usage of the dimensions parameter.

The combination of Qdrant's efficient vector search and OpenAI's high-quality embeddings creates a semantic search system that can scale from personal knowledge bases to enterprise document retrieval platforms. Start with the code above, monitor your retrieval metrics, and iterate on chunking strategies based on your specific domain requirements.


References

1. Wikipedia - Embedding. Wikipedia. [Source]
2. Wikipedia - Rag. Wikipedia. [Source]
3. Wikipedia - Vector database. Wikipedia. [Source]
4. arXiv - Observation of the rare $B^0_s\toμ^+μ^-$ decay from the comb. Arxiv. [Source]
5. arXiv - Expected Performance of the ATLAS Experiment - Detector, Tri. Arxiv. [Source]
6. GitHub - fighting41love/funNLP. Github. [Source]
7. GitHub - Shubhamsaboo/awesome-llm-apps. Github. [Source]
8. GitHub - qdrant/qdrant. Github. [Source]
9. GitHub - milvus-io/milvus. Github. [Source]
tutorialairag
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles