How to Build a Semantic Search Engine with Qdrant and OpenAI Embeddings
Practical tutorial: Build a semantic search engine with Qdrant and text-embedding-3
How to Build a Semantic Search Engine with Qdrant and OpenAI Embeddings
Table of Contents
- How to Build a Semantic Search Engine with Qdrant and OpenAI Embeddings
- Create a virtual environment
- Install dependencies
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Semantic search has transformed how we retrieve information from unstructured text. Unlike traditional keyword-based search, semantic search understands the meaning behind queries, enabling more relevant results even when exact terms don't match. In this tutorial, you'll build a production-ready semantic search engine using Qdrant as the vector database [3] and OpenAI's text-embedding-3 model for generating embeddings.
This system can index thousands of documents and return semantically similar results in milliseconds. We'll cover architecture decisions, implementation details, and edge cases you'll encounter in real-world deployments.
Understanding the Architecture: Why Qdrant and text-embedding-3
Before writing code, let's understand why this combination works well for production semantic search.
Qdrant is a vector similarity search engine written in Rust. According to available documentation, it offers built-in filtering, payload storag [2]e, and horizontal scaling. Unlike FAISS which is primarily an indexing library, Qdrant operates as a full database service with persistence, CRUD operations, and client-server architecture. This makes it suitable for applications where you need to add, update, or delete vectors without rebuilding the entire index.
OpenAI's text-embedding-3 model, released in early 2024, represents a significant improvement over its predecessor text-embedding-ada-002. The model produces 1536-dimensional vectors by default (with the text-embedding-3-small variant) and supports a configurable output dimension parameter. This allows you to trade off between storage efficiency and retrieval accuracy.
The architecture follows a standard pattern:
- Ingestion pipeline: Documents are chunked, embedded, and stored in Qdrant with metadata
- Query pipeline: User queries are embedded using the same model, then searched against the vector index
- Reranking (optional): Initial results can be refined using cross-encoders or additional filters
Prerequisites and Environment Setup
You'll need Python 3.10+ and a running Qdrant instance. For local development, Docker is the simplest approach.
# Create a virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install qdrant-client==1.9.1 openai==1.30.0 tiktoken==0.7.0 pydantic==2.7.0
For Qdrant, you have two options:
Option 1: Docker (recommended for development)
docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/qdrant_storage:/qdrant/storage:z \
qdrant/qdrant:v1.9.1
Option 2: Qdrant Cloud (for production) Sign up at cloud.qdrant.io and create a cluster. You'll receive a URL and API key.
Set your OpenAI API key as an environment variable:
export OPENAI_API_KEY="sk-your-key-here"
Core Implementation: Building the Semantic Search Pipeline
Step 1: Document Chunking Strategy
Semantic search requires breaking documents into manageable chunks. The chunk size directly impacts retrieval quality. Too large, and you lose semantic focus; too small, and you lose context.
For this tutorial, we'll implement a recursive character text splitter with overlap. This preserves sentence boundaries while maintaining context between chunks.
import tiktoken
from typing import List, Dict, Any
def chunk_document(
text: str,
chunk_size: int = 512,
chunk_overlap: int = 128,
model_name: str = "text-embedding-3-small"
) -> List[Dict[str, Any]]:
"""
Split a document into overlapping chunks using token-aware splitting.
Args:
text: The full document text
chunk_size: Maximum tokens per chunk
chunk_overlap: Number of overlapping tokens between chunks
model_name: Used to select the correct tokenizer
Returns:
List of chunk dictionaries with text and metadata
"""
encoder = tiktoken.encoding_for_model(model_name)
tokens = encoder.encode(text)
chunks = []
start = 0
while start < len(tokens):
# Get chunk tokens
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
# Decode back to text
chunk_text = encoder.decode(chunk_tokens)
# Calculate start/end character positions (approximate)
# This helps with source document navigation
char_start = len(encoder.decode(tokens[:start]))
char_end = len(encoder.decode(tokens[:end]))
chunks.append({
"text": chunk_text,
"start_char": char_start,
"end_char": char_end,
"token_count": len(chunk_tokens)
})
# Move start position, accounting for overlap
start += chunk_size - chunk_overlap
return chunks
Edge case consideration: When chunking scientific papers (like those referenced in our sources), pay attention to equations and special characters. The tiktoken tokenizer handles Unicode correctly, but you may want to strip LaTeX or markdown formatting before embedding to avoid noise.
Step 2: Embedding Generation with text-embedding-3
OpenAI's embedding API has rate limits and costs. For production, you should implement retry logic and batch processing.
import time
from openai import OpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
class EmbeddingGenerator:
def __init__(
self,
api_key: str,
model: str = "text-embedding-3-small",
dimensions: int = 1536,
batch_size: int = 100
):
self.client = OpenAI(api_key=api_key)
self.model = model
self.dimensions = dimensions
self.batch_size = batch_size
# text-embedding-3-small supports dimensions parameter
# This allows you to reduce vector size for storage efficiency
if model == "text-embedding-3-small":
self.max_dimensions = 1536
elif model == "text-embedding-3-large":
self.max_dimensions = 3072
else:
self.max_dimensions = dimensions
if dimensions > self.max_dimensions:
raise ValueError(
f"Model {model} supports max {self.max_dimensions} dimensions, "
f"requested {dimensions}"
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def embed_batch(self, texts: List[str]) -> List[List[float]]:
"""
Generate embeddings for a batch of texts with retry logic.
Args:
texts: List of text strings to embed
Returns:
List of embedding vectors
"""
response = self.client.embeddings.create(
model=self.model,
input=texts,
dimensions=self.dimensions
)
# Sort by index to maintain order
sorted_embeddings = sorted(
response.data,
key=lambda x: x.index
)
return [item.embedding for item in sorted_embeddings]
def embed_documents(
self,
chunks: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Generate embeddings for all document chunks.
Handles rate limiting by processing in batches.
"""
texts = [chunk["text"] for chunk in chunks]
all_embeddings = []
for i in range(0, len(texts), self.batch_size):
batch = texts[i:i + self.batch_size]
try:
embeddings = self.embed_batch(batch)
all_embeddings.extend(embeddings)
# Rate limiting: OpenAI recommends 3000 RPM for tier 1
# Adjust based on your tier
time.sleep(0.5)
except Exception as e:
print(f"Batch {i//self.batch_size} failed: {e}")
# Re-raise after logging; retry decorator handles retries
raise
# Attach embeddings to chunks
for chunk, embedding in zip(chunks, all_embeddings):
chunk["embedding"] = embedding
return chunks
Production note: The dimensions parameter in text-embedding-3 is a powerful feature. By reducing dimensions from 1536 to 256, you can achieve 6x storage reduction while maintaining ~95% of retrieval accuracy for most use cases (based on OpenAI's published benchmarks). This is particularly valuable when working with large document collections.
Step 3: Qdrant Collection Setup and Indexing
Now we'll set up the Qdrant collection with appropriate configuration for cosine similarity search.
from qdrant_client import QdrantClient
from qdrant_client.http import models
from qdrant_client.http.models import Distance, VectorParams
class SemanticSearchIndex:
def __init__(
self,
host: str = "localhost",
port: int = 6333,
api_key: str = None,
collection_name: str = "documents",
vector_size: int = 1536,
use_grpc: bool = False # gRPC is faster but requires additional setup
):
self.client = QdrantClient(
host=host,
port=port,
api_key=api_key,
prefer_grpc=use_grpc
)
self.collection_name = collection_name
self.vector_size = vector_size
# Ensure collection exists with proper configuration
self._ensure_collection()
def _ensure_collection(self):
"""Create collection if it doesn't exist, with proper indexing."""
collections = self.client.get_collections().collections
exists = any(
c.name == self.collection_name
for c in collections
)
if not exists:
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.vector_size,
distance=Distance.COSINE
),
# Optimize for HNSW index
hnsw_config=models.HNSWConfig(
m=16, # Number of edges per node
ef_construct=100, # Size of dynamic candidate list
full_scan_threshold=10000 # Threshold for full scan
),
# Enable on-disk storage for large datasets
on_disk_payload=True,
# Optimize for fast writes
optimizers_config=models.OptimizersConfigDiff(
default_segment_number=2,
memmap_threshold_kb=20000
)
)
print(f"Created collection: {self.collection_name}")
else:
print(f"Collection {self.collection_name} already exists")
def index_documents(
self,
chunks: List[Dict[str, Any]],
batch_size: int = 100
) -> int:
"""
Index document chunks into Qdrant.
Args:
chunks: List of chunks with 'embedding', 'text', and metadata
batch_size: Number of points to upload per batch
Returns:
Number of points indexed
"""
points = []
for idx, chunk in enumerate(chunks):
# Generate a unique ID (use hash of text for idempotency)
point_id = hash(chunk["text"]) & 0x7FFFFFFFFFFFFFFF
# Prepare payload with metadata
payload = {
"text": chunk["text"],
"start_char": chunk.get("start_char", 0),
"end_char": chunk.get("end_char", 0),
"token_count": chunk.get("token_count", 0),
"source": chunk.get("source", "unknown"),
"chunk_index": idx
}
points.append(models.PointStruct(
id=point_id,
vector=chunk["embedding"],
payload=payload
))
# Upload in batches
total_indexed = 0
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
self.client.upsert(
collection_name=self.collection_name,
points=batch,
wait=True # Wait for indexing to complete
)
total_indexed += len(batch)
print(f"Indexed {total_indexed}/{len(points)} points")
return total_indexed
Architecture decision: We use wait=True during upsert to ensure data durability. In high-throughput production systems, you might set this to False and handle eventual consistency. The trade-off is write latency vs. read-after-write consistency.
Step 4: Search Implementation with Filtering
The search function needs to handle multiple edge cases: empty queries, insufficient results, and payload filtering.
from typing import List, Optional, Tuple
class SemanticSearchEngine:
def __init__(
self,
index: SemanticSearchIndex,
embedding_generator: EmbeddingGenerator,
top_k: int = 10,
score_threshold: float = 0.5
):
self.index = index
self.embedding_generator = embedding_generator
self.top_k = top_k
self.score_threshold = score_threshold
def search(
self,
query: str,
filter_conditions: Optional[Dict[str, Any]] = None,
top_k: Optional[int] = None
) -> List[Dict[str, Any]]:
"""
Perform semantic search with optional metadata filtering.
Args:
query: Natural language search query
filter_conditions: Optional Qdrant filter conditions
top_k: Number of results to return (overrides default)
Returns:
List of search results with scores and payloads
"""
if not query or not query.strip():
return []
# Generate embedding for query
query_embedding = self.embedding_generator.embed_batch([query])[0]
# Build search parameters
search_params = models.SearchParams(
hnsw_ef=128, # Higher values = more accurate but slower
exact=False # Use approximate search for speed
)
# Prepare filter if provided
query_filter = None
if filter_conditions:
must_conditions = []
for key, value in filter_conditions.items():
if isinstance(value, list):
# Array contains filter
must_conditions.append(
models.FieldCondition(
key=key,
match=models.MatchAny(any=value)
)
)
elif isinstance(value, dict) and "gte" in value:
# Range filter
must_conditions.append(
models.FieldCondition(
key=key,
range=models.Range(
gte=value.get("gte"),
lte=value.get("lte")
)
)
)
else:
# Exact match
must_conditions.append(
models.FieldCondition(
key=key,
match=models.MatchValue(value=value)
)
)
if must_conditions:
query_filter = models.Filter(
must=must_conditions
)
# Execute search
results = self.index.client.search(
collection_name=self.index.collection_name,
query_vector=query_embedding,
query_filter=query_filter,
limit=top_k or self.top_k,
search_params=search_params,
with_payload=True,
with_vectors=False # Don't return vectors in results
)
# Process and filter results
processed_results = []
for result in results:
if result.score < self.score_threshold:
continue
processed_results.append({
"id": result.id,
"score": result.score,
"text": result.payload.get("text", ""),
"source": result.payload.get("source", ""),
"chunk_index": result.payload.get("chunk_index", 0),
"metadata": {
k: v for k, v in result.payload.items()
if k not in ["text", "source", "chunk_index"]
}
})
return processed_results
def hybrid_search(
self,
query: str,
keyword_weight: float = 0.3,
semantic_weight: float = 0.7,
filter_conditions: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Combine semantic search with keyword matching for better results.
This is useful for domain-specific terms that embeddings might miss.
"""
# Get semantic results
semantic_results = self.search(
query,
filter_conditions=filter_conditions,
top_k=self.top_k * 2 # Get more results for fusion
)
# Get keyword results using Qdrant's full-text search
keyword_results = self.index.client.scroll(
collection_name=self.index.collection_name,
scroll_filter=models.Filter(
must=[
models.FieldCondition(
key="text",
match=models.MatchText(text=query)
)
]
),
limit=self.top_k * 2,
with_payload=True
)[0]
# Reciprocal Rank Fusion (RRF)
# Combine scores from both methods
combined_scores = {}
for rank, result in enumerate(semantic_results):
combined_scores[result["id"]] = {
"semantic_rank": rank + 1,
"keyword_rank": None,
"score": result["score"],
"text": result["text"],
"source": result["source"],
"metadata": result["metadata"]
}
for rank, result in enumerate(keyword_results):
if result.id in combined_scores:
combined_scores[result.id]["keyword_rank"] = rank + 1
else:
combined_scores[result.id] = {
"semantic_rank": None,
"keyword_rank": rank + 1,
"score": 0.0,
"text": result.payload.get("text", ""),
"source": result.payload.get("source", ""),
"metadata": {}
}
# Calculate RRF scores
for doc_id, scores in combined_scores.items():
rrf_score = 0.0
if scores["semantic_rank"] is not None:
rrf_score += semantic_weight / (60 + scores["semantic_rank"])
if scores["keyword_rank"] is not None:
rrf_score += keyword_weight / (60 + scores["keyword_rank"])
scores["rrf_score"] = rrf_score
# Sort by RRF score and return top_k
sorted_results = sorted(
combined_scores.values(),
key=lambda x: x["rrf_score"],
reverse=True
)
return sorted_results[:self.top_k]
Edge case handling: The hybrid_search method addresses a critical limitation of pure semantic search: domain-specific terminology. For example, searching for "B^0_s→μ^+μ^- decay" (a physics term from our reference papers) might not match semantically if the embedding model hasn't seen similar patterns. The keyword component catches exact matches while semantic search handles paraphrases.
Production Considerations and Optimization
Memory Management for Large Collections
When indexing millions of documents, memory becomes a concern. Qdrant supports memory-mapped storage, but you should still monitor RAM usage.
# Configure Qdrant client for memory-constrained environments
client = QdrantClient(
host="localhost",
port=6333,
prefer_grpc=True, # gRPC uses less memory than REST
grpc_port=6334,
timeout=30
)
# Use smaller batch sizes for indexing
indexer.index_documents(chunks, batch_size=50) # Default is 100
Handling API Rate Limits
OpenAI's embedding API has rate limits based on your tier. According to OpenAI's documentation, Tier 1 users get 3,000 requests per minute (RPM) and 1,000,000 tokens per minute (TPM). Implement exponential backoff and batch processing to stay within limits.
import asyncio
from aiolimiter import AsyncLimiter
class AsyncEmbeddingGenerator:
def __init__(self, rpm_limit: int = 3000, tpm_limit: int = 1000000):
self.rate_limiter = AsyncLimiter(rpm_limit, 60) # RPM
self.token_limiter = AsyncLimiter(tpm_limit, 60) # TPM
async def embed_with_rate_limit(self, texts: List[str]) -> List[List[float]]:
async with self.rate_limiter:
# Estimate tokens (rough: 1 token ≈ 4 characters)
estimated_tokens = sum(len(t) // 4 for t in texts)
async with self.token_limiter:
# Actual API call here
pass
Error Recovery and Idempotency
Network failures during indexing can leave your collection in an inconsistent state. Use idempotent point IDs (based on content hash) so that retrying the same document doesn't create duplicates.
import hashlib
def generate_point_id(text: str) -> int:
"""Generate a deterministic point ID from text content."""
hash_bytes = hashlib.sha256(text.encode()).digest()
# Convert to 64-bit integer (Qdrant supports up to 2^63)
return int.from_bytes(hash_bytes[:8], 'big') & 0x7FFFFFFFFFFFFFFF
Testing the Complete Pipeline
Let's put everything together with a real example using scientific abstracts (similar to our reference papers).
def main():
# Initialize components
embedding_gen = EmbeddingGenerator(
api_key=os.environ["OPENAI_API_KEY"],
model="text-embedding-3-small",
dimensions=512 # Reduced for efficiency
)
index = SemanticSearchIndex(
host="localhost",
port=6333,
collection_name="scientific_papers",
vector_size=512
)
engine = SemanticSearchEngine(
index=index,
embedding_generator=embedding_gen,
top_k=5,
score_threshold=0.6
)
# Sample documents (in practice, load from files)
documents = [
{
"text": "Observation of the rare B^0_s→μ^+μ^- decay from the combined analysis of CMS and LHCb data. This decay is a sensitive probe of new physics beyond the Standard Model.",
"source": "arxiv:1411.4413"
},
{
"text": "Expected Performance of the ATLAS Experiment - Detector, Trigger and Physics. This paper describes the expected performance of the ATLAS detector at the Large Hadron Collider.",
"source": "arxiv:0901.0512"
},
{
"text": "Deep Search for Joint Sources of Gravitational Waves and High-Energy Neutrinos with IceCube. We present results of a search for joint sources of gravitational waves and high-energy neutrinos.",
"source": "arxiv:2105.13160"
}
]
# Index documents
all_chunks = []
for doc in documents:
chunks = chunk_document(doc["text"], chunk_size=256, chunk_overlap=64)
for chunk in chunks:
chunk["source"] = doc["source"]
all_chunks.extend(chunks)
# Generate embeddings and index
indexed_chunks = embedding_gen.embed_documents(all_chunks)
num_indexed = index.index_documents(indexed_chunks)
print(f"Indexed {num_indexed} chunks")
# Perform search
results = engine.search("rare particle decays beyond standard model")
print("\nSearch Results:")
for result in results:
print(f"Score: {result['score']:.4f}")
print(f"Text: {result['text'][:100]}..")
print(f"Source: {result['source']}")
print("---")
# Test hybrid search
hybrid_results = engine.hybrid_search("B^0_s→μ^+μ^-")
print("\nHybrid Search Results:")
for result in hybrid_results:
print(f"RRF Score: {result['rrf_score']:.4f}")
print(f"Text: {result['text'][:100]}..")
print("---")
if __name__ == "__main__":
main()
Performance Benchmarks and Scaling
Based on production deployments documented in Qdrant's case studies, a single node with 16GB RAM can handle:
- Indexing: ~10,000 vectors/second (1536 dimensions)
- Search: ~1,000 queries/second (p99 latency < 50ms)
- Storage: ~1 million vectors with payloads in ~4GB
For larger collections, Qdrant supports sharding across multiple nodes. The HNSW index parameters (m and ef_construct) directly impact the trade-off between memory usage and search speed. Higher m values (up to 64) improve recall but increase memory consumption linearly.
What's Next
This semantic search engine provides a solid foundation, but production systems often require additional capabilities:
- Multi-modal search: Extend to search across images, audio, or video by using appropriate embedding models
- Real-time indexing: Implement streaming ingestion with Kafka or RabbitMQ for live document updates
- A/B testing framework: Compare different embedding models or chunking strategies using metrics like NDCG or MRR
- Feedback loop: Incorporate user click data to fine-tune ranking weights
For further reading, explore Qdrant's filtering documentation for complex metadata queries, or OpenAI's embedding guide for advanced usage of the dimensions parameter.
The combination of Qdrant's efficient vector search and OpenAI's high-quality embeddings creates a semantic search system that can scale from personal knowledge bases to enterprise document retrieval platforms. Start with the code above, monitor your retrieval metrics, and iterate on chunking strategies based on your specific domain requirements.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Build a Telegram Bot with DeepSeek-R1 Reasoning
Practical tutorial: Build a Telegram bot with DeepSeek-R1 reasoning
How to Evaluate AI Model Enhancements: A Technical Framework 2026
Practical tutorial: Enhancements to existing AI models like ChatGPT can attract significant attention but are not groundbreaking shifts in t
How to Process Medical Data with Midjourney API
Practical tutorial: The story highlights a significant technical advancement in the capabilities of an existing AI tool, expanding its utili