How to Automate CVE Analysis with LLMs and RAG
Practical tutorial: Automate CVE analysis with LLMs and RAG
How to Automate CVE Analysis with LLMs and RAG
Table of Contents
- How to Automate CVE Analysis with LLMs and RAG
- Create virtual environment
- or .\cve-rag [5]-env\Scripts\activate # Windows
- Core dependencies
- ingestion.py
- vector_store.py
📺 Watch: Intro to Large Language Models
Video by Andrej Karpathy
Security teams face an overwhelming volume of Common Vulnerabilities and Exposures (CVEs) published daily. Manually analyzing each CVE for relevance, severity, and potential exploitability is no longer feasible at scale. This tutorial builds a production-ready system that automates CVE analysis using Large Language Models (LLMs) combined with Retrieval-Augmented Generation (RAG), enabling security engineers to triage vulnerabilities efficiently.
We'll construct a pipeline that ingests CVE records from the National Vulnerability Database (NVD), enriches them with MITRE CWE weakness classifications, stores embedding [1]s in a vector database, and answers complex security queries using multi-hop reasoning. The system leverages recent research from ArXiv on automated CVE-to-CWE mapping and RAG optimization.
Real-World Use Case and Architecture
In production environments, security operations centers (SOCs) receive hundreds of CVE notifications daily. The challenge isn't just reading them—it's understanding which vulnerabilities affect your specific technology stack, how they relate to known weaknesses, and what remediation priority they deserve.
Our architecture addresses three critical problems:
-
Automated CWE Mapping: As documented in the paper "Automated Mapping of CVE Vulnerability Records to MITRE CWE Weaknesses" (ArXiv, 2024), manually mapping CVEs to CWEs is error-prone and slow. Our system automates this using LLM-based classification.
-
Multi-Hop Reasoning: Security analysis often requires connecting multiple pieces of information—a CVE description, affected software versions, exploit availability, and CWE taxonomy. The "MultiHop-RAG: Benchmarking Retrieval-Augmented Generation for Multi-Hop Queries" paper (ArXiv, 2024) demonstrates that standard RAG systems struggle with queries requiring multiple retrieval steps. We implement a multi-hop retrieval strategy.
-
Production Reliability: Drawing from "T-RAG: Lessons from the LLM Trenches" (ArXiv, 2024), we incorporate error handling, rate limiting, and fallback mechanisms essential for production deployments.
The system architecture consists of:
- Ingestion Pipeline: Fetches CVE data from NVD API, processes JSON records
- Embedding Service: Converts CVE descriptions and CWE mappings into vector embeddings
- Vector Store: LanceDB for efficient similarity search
- RAG Engine: LangChain [9]-based retrieval with multi-hop query decomposition
- API Layer: FastAPI endpoints for querying and analysis
Prerequisites and Environment Setup
Before diving into implementation, ensure your environment meets these requirements:
System Requirements:
- Python 3.10+
- 8GB RAM minimum (16GB recommended for embedding generation)
- GPU optional but recommended for faster embeddings
API Keys:
- OpenAI [10] API key (for LLM access) or local model (we'll use OpenAI for this tutorial)
- NVD API key (free, rate limits without one)
Install Dependencies:
# Create virtual environment
python -m venv cve-rag-env
source cve-rag-env/bin/activate # Linux/Mac
# or .\cve-rag-env\Scripts\activate # Windows
# Core dependencies
pip install langchain==0.1.0
pip install langchain-openai==0.0.2
pip install lancedb==0.4.0
pip install fastapi==0.109.0
pip install uvicorn==0.27.0
pip install pydantic==2.5.0
pip install httpx==0.26.0
pip install python-dotenv==1.0.0
pip install tiktoken==0.5.0
pip install tenacity==8.2.0 # For retry logic
Create a .env file for sensitive configuration:
OPENAI_API_KEY=sk-your-key-here
NVD_API_KEY=your-nvd-key-here
LANCE_DB_PATH=./cve_vector_store
Building the CVE Ingestion Pipeline
The ingestion pipeline is the foundation of our system. It must handle NVD's API rate limits, parse complex JSON structures, and extract relevant fields for embedding.
# ingestion.py
import json
import time
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
from pydantic import BaseModel, Field
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CVERecord(BaseModel):
"""Pydantic model for validated CVE data."""
id: str = Field(.., pattern=r'^CVE-\d{4}-\d{4,}$')
description: str
published_date: datetime
last_modified: datetime
cvss_score: Optional[float] = None
cvss_severity: Optional[str] = None
affected_software: List[str] = []
cwe_ids: List[str] = []
exploit_available: bool = False
class NVDIngestor:
"""Handles NVD API interactions with rate limiting and retry logic."""
BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
def __init__(self, api_key: str):
self.api_key = api_key
self.client = httpx.Client(
headers={"apiKey": api_key},
timeout=30.0
)
# NVD rate limit: 5 requests per 30 seconds without key, 50 with key
self.rate_limit_delay = 0.6 # ~1.6 requests/second with key
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def fetch_cves(self,
start_index: int = 0,
results_per_page: int = 50,
last_mod_start_date: Optional[str] = None) -> Dict:
"""
Fetch CVEs from NVD with pagination support.
Args:
start_index: Pagination offset
results_per_page: Max 200 per NVD API limit
last_mod_start_date: ISO 8601 date for incremental updates
Returns:
Raw API response as dictionary
"""
params = {
"startIndex": start_index,
"resultsPerPage": min(results_per_page, 200)
}
if last_mod_start_date:
params["lastModStartDate"] = last_mod_start_date
params["lastModEndDate"] = (
datetime.utcnow().isoformat() + "Z"
)
response = self.client.get(self.BASE_URL, params=params)
response.raise_for_status()
# Respect rate limits
time.sleep(self.rate_limit_delay)
return response.json()
def parse_cve_item(self, item: Dict) -> Optional[CVERecord]:
"""
Extract structured data from raw NVD JSON item.
Edge case: Some CVEs lack descriptions or have malformed data.
We handle missing fields gracefully.
"""
try:
cve_data = item.get("cve", {})
cve_id = cve_data.get("id", "")
# Extract description (prefer English)
descriptions = cve_data.get("descriptions", [])
description = ""
for desc in descriptions:
if desc.get("lang") == "en":
description = desc.get("value", "")
break
if not description:
logger.warning(f"No English description for {cve_id}")
return None
# Extract CVSS score (v3 preferred, fallback to v2)
metrics = cve_data.get("metrics", {})
cvss_score = None
cvss_severity = None
for version in ["cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]:
if version in metrics:
cvss_data = metrics[version][0].get("cvssData", {})
cvss_score = cvss_data.get("baseScore")
cvss_severity = cvss_data.get("baseSeverity")
break
# Extract CWE IDs
weaknesses = cve_data.get("weaknesses", [])
cwe_ids = []
for weakness in weaknesses:
for desc in weakness.get("description", []):
if desc.get("value", "").startswith("CWE-"):
cwe_ids.append(desc["value"])
# Extract affected software
configurations = cve_data.get("configurations", [])
affected_software = []
for config in configurations:
for node in config.get("nodes", []):
for match in node.get("cpeMatch", []):
criteria = match.get("criteria", "")
if criteria:
# Parse CPE 2.3 format: cpe:2.3:a:vendor:product:version
parts = criteria.split(":")
if len(parts) >= 5:
vendor = parts[3]
product = parts[4]
affected_software.append(f"{vendor}/{product}")
return CVERecord(
id=cve_id,
description=description,
published_date=datetime.fromisoformat(
cve_data.get("published", "").replace("Z", "+00:00")
),
last_modified=datetime.fromisoformat(
cve_data.get("lastModified", "").replace("Z", "+00:00")
),
cvss_score=cvss_score,
cvss_severity=cvss_severity,
affected_software=list(set(affected_software)), # Deduplicate
cwe_ids=cwe_ids,
exploit_available=False # Would require additional API
)
except Exception as e:
logger.error(f"Failed to parse CVE item: {e}")
return None
def ingest_recent_cves(self, days_back: int = 7) -> List[CVERecord]:
"""
Ingest CVEs modified in the last N days.
Handles pagination automatically, up to NVD's max of 2000 results.
"""
start_date = (datetime.utcnow() - timedelta(days=days_back)).isoformat() + "Z"
all_cves = []
start_index = 0
total_results = None
while total_results is None or start_index < total_results:
logger.info(f"Fetching CVEs starting at index {start_index}")
response = self.fetch_cves(
start_index=start_index,
last_mod_start_date=start_date
)
total_results = response.get("totalResults", 0)
vulnerabilities = response.get("vulnerabilities", [])
for item in vulnerabilities:
cve = self.parse_cve_item(item)
if cve:
all_cves.append(cve)
start_index += len(vulnerabilities)
# NVD limits to 2000 results per query
if start_index >= 2000:
logger.warning("Reached NVD 2000 result limit")
break
logger.info(f"Ingested {len(all_cves)} CVEs")
return all_cves
Key design decisions in the ingestion pipeline:
- Tenacity for retries: NVD API can be unreliable. Exponential backoff prevents hammering the service.
- Pydantic validation: Ensures data integrity before it enters our pipeline. Malformed records are logged and skipped.
- Rate limiting: Even with an API key, we respect NVD's limits. The 0.6s delay keeps us under 50 requests/30 seconds.
- Graceful degradation: Missing fields don't crash the pipeline—we log warnings and continue.
Setting up the LanceDB Vector Store
LanceDB provides efficient vector storage with disk-based indexing, making it suitable for large CVE datasets without requiring GPU memory. We'll store embeddings alongside metadata for filtered retrieval.
# vector_store.py
import lancedb
import pyarrow as pa
from typing import List, Optional, Dict
import numpy as np
from langchain_openai import OpenAIEmbeddings
from ingestion import CVERecord
class CVEVectorStore:
"""Manages CVE embeddings in LanceDB with metadata filtering."""
def __init__(self, db_path: str, embedding_model: str = "text-embedding-ada-002"):
"""
Initialize LanceDB connection and embedding model.
Args:
db_path: Path to LanceDB database directory
embedding_model: OpenAI embedding model name
"""
self.db = lancedb.connect(db_path)
self.embeddings = OpenAIEmbeddings(model=embedding_model)
self.table_name = "cve_records"
# Create table if it doesn't exist
if self.table_name not in self.db.table_names():
self._create_table()
def _create_table(self):
"""Define schema and create empty table."""
schema = pa.schema([
pa.field("vector", pa.list_(pa.float32(), 1536)), # Ada-002 dimension
pa.field("cve_id", pa.string()),
pa.field("description", pa.string()),
pa.field("published_date", pa.timestamp("us")),
pa.field("cvss_score", pa.float32()),
pa.field("cvss_severity", pa.string()),
pa.field("affected_software", pa.list_(pa.string())),
pa.field("cwe_ids", pa.list_(pa.string())),
pa.field("exploit_available", pa.bool_()),
pa.field("text_chunk", pa.string()), # For RAG context
])
self.db.create_table(self.table_name, schema=schema)
def _prepare_text_chunk(self, cve: CVERecord) -> str:
"""
Create a searchable text chunk combining CVE fields.
This is critical for RAG performance—we want the LLM to have
all relevant context in a single retrievable chunk.
"""
parts = [
f"CVE ID: {cve.id}",
f"Description: {cve.description}",
f"Published: {cve.published_date.isoformat()}",
]
if cve.cvss_score is not None:
parts.append(f"CVSS Score: {cve.cvss_score} ({cve.cvss_severity})")
if cve.cwe_ids:
parts.append(f"CWE Weaknesses: {', '.join(cve.cwe_ids)}")
if cve.affected_software:
parts.append(f"Affected Software: {', '.join(cve.affected_software)}")
return "\n".join(parts)
def add_cves(self, cves: List[CVERecord], batch_size: int = 100):
"""
Add CVE records to vector store in batches.
Memory consideration: Embedding generation is memory-intensive.
We process in batches to avoid OOM errors.
"""
table = self.db.open_table(self.table_name)
for i in range(0, len(cves), batch_size):
batch = cves[i:i + batch_size]
# Prepare text chunks for embedding
texts = [self._prepare_text_chunk(cve) for cve in batch]
# Generate embeddings (batched API call)
vectors = self.embeddings.embed_documents(texts)
# Prepare records for insertion
records = []
for j, cve in enumerate(batch):
records.append({
"vector": vectors[j],
"cve_id": cve.id,
"description": cve.description,
"published_date": cve.published_date,
"cvss_score": cve.cvss_score or 0.0,
"cvss_severity": cve.cvss_severity or "NONE",
"affected_software": cve.affected_software,
"cwe_ids": cve.cwe_ids,
"exploit_available": cve.exploit_available,
"text_chunk": texts[j],
})
table.add(records)
logger.info(f"Added batch {i//batch_size + 1}: {len(records)} CVEs")
def search(self,
query: str,
k: int = 10,
min_score: Optional[float] = None,
cwe_filter: Optional[List[str]] = None,
software_filter: Optional[List[str]] = None) -> List[Dict]:
"""
Hybrid search with metadata filtering.
Args:
query: Natural language query
k: Number of results
min_score: Minimum CVSS score filter
cwe_filter: List of CWE IDs to include
software_filter: List of software to include
Returns:
List of matching CVE records with similarity scores
"""
table = self.db.open_table(self.table_name)
# Generate query embedding
query_vector = self.embeddings.embed_query(query)
# Build search with optional filters
search_query = table.search(query_vector).limit(k * 2) # Fetch more for filtering
# Apply metadata filters (LanceDB supports pre-filtering)
if min_score is not None:
search_query = search_query.where(f"cvss_score >= {min_score}")
if cwe_filter:
# LanceDB doesn't support list contains natively; we use a workaround
# In production, consider flattening CWE IDs into separate rows
cwe_conditions = " OR ".join([f"cwe_ids LIKE '%{cwe}%'" for cwe in cwe_filter])
search_query = search_query.where(cwe_conditions)
if software_filter:
sw_conditions = " OR ".join([f"affected_software LIKE '%{sw}%'" for sw in software_filter])
search_query = search_query.where(sw_conditions)
results = search_query.to_list()
# Post-process: deduplicate and sort by score
seen_ids = set()
unique_results = []
for r in results:
if r["cve_id"] not in seen_ids:
seen_ids.add(r["cve_id"])
unique_results.append(r)
if len(unique_results) >= k:
break
return unique_results
Important implementation details:
- Text chunk preparation: Combining multiple CVE fields into a single chunk improves retrieval quality. The LLM gets all relevant context without needing multiple retrievals.
- Batch processing: OpenAI's embedding API has rate limits and memory constraints. Batches of 100 balance throughput and reliability.
- Metadata filtering: LanceDB supports pre-filtering, which is more efficient than post-filtering for large datasets. The LIKE operator workaround handles list fields.
- Deduplication: NVD sometimes returns duplicate entries. We track seen CVE IDs to ensure unique results.
Implementing Multi-Hop RAG for CVE Analysis
The core of our system is the RAG engine that answers complex security queries. Standard RAG retrieves documents once and generates an answer. Multi-hop RAG decomposes complex questions into sub-questions, retrieves context for each, and synthesizes the final answer.
# rag_engine.py
from typing import List, Dict, Optional
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
import re
class MultiHopRAGEngine:
"""
Implements multi-hop retrieval for complex CVE analysis queries.
Based on MultiHop-RAG benchmarking (ArXiv, 2024), standard RAG
fails on queries requiring multiple reasoning steps. This engine
decomposes queries and aggregates results.
"""
def __init__(self, vector_store, llm_model: str = "gpt-4-turbo-preview"):
self.vector_store = vector_store
self.llm = ChatOpenAI(
model=llm_model,
temperature=0.1, # Low temperature for factual answers
max_tokens=2000
)
# Prompt for query decomposition
self.decomposition_prompt = ChatPromptTemplate.from_messages([
("system", """You are a security analyst specializing in CVE analysis.
Decompose the following security question into 2-4 sub-questions that,
when answered together, provide a complete answer.
Rules:
- Each sub-question should be independently answerable from CVE data
- Include specific CWE IDs, software names, or date ranges if mentioned
- Output one sub-question per line, no numbering"""),
("human", "{question}")
])
# Prompt for final answer synthesis
self.synthesis_prompt = ChatPromptTemplate.from_messages([
("system", """You are a senior security engineer analyzing CVEs.
Based on the retrieved CVE records below, answer the user's question.
Context from CVE database:
{context}
Provide:
1. Direct answer to the question
2. Relevant CVE IDs with brief explanations
3. Risk assessment (Critical/High/Medium/Low)
4. Recommended actions
If the context doesn't contain enough information, state what's missing.
Base your answer ONLY on the provided context."""),
("human", "{question}")
])
def decompose_query(self, question: str) -> List[str]:
"""
Break complex questions into simpler sub-queries.
Example:
Input: "What critical CVEs affect Apache servers and have available exploits?"
Output: [
"CVEs with CVSS score >= 9.0 affecting Apache software",
"CVEs with known exploits available",
"Apache server CVEs published in the last 90 days"
]
"""
chain = self.decomposition_prompt | self.llm | StrOutputParser()
result = chain.invoke({"question": question})
# Parse sub-questions (one per line)
sub_questions = [
line.strip() for line in result.split("\n")
if line.strip() and not line.strip().startswith("-")
]
# Fallback: if decomposition fails, use original question
if not sub_questions:
sub_questions = [question]
return sub_questions[:4] # Limit to 4 sub-questions
def retrieve_for_sub_question(self, sub_question: str, k: int = 5) -> List[Document]:
"""
Retrieve relevant CVE records for a single sub-question.
Uses heuristic parsing to extract filters from the sub-question.
"""
# Extract potential filters from sub-question
min_score = None
cwe_filter = None
software_filter = None
# Check for severity keywords
severity_map = {
"critical": 9.0,
"high": 7.0,
"medium": 4.0,
"low": 0.1
}
for severity, score in severity_map.items():
if severity in sub_question.lower():
min_score = score
break
# Check for CWE mentions
cwe_pattern = r'CWE-\d+'
cwe_matches = re.findall(cwe_pattern, sub_question, re.IGNORECASE)
if cwe_matches:
cwe_filter = [cwe.upper() for cwe in cwe_matches]
# Check for software mentions (common patterns)
software_keywords = ["apache", "nginx", "linux", "windows", "mysql", "postgresql"]
for sw in software_keywords:
if sw in sub_question.lower():
software_filter = [sw]
break
# Perform search with extracted filters
results = self.vector_store.search(
query=sub_question,
k=k,
min_score=min_score,
cwe_filter=cwe_filter,
software_filter=software_filter
)
# Convert to LangChain Document format
documents = []
for r in results:
doc = Document(
page_content=r["text_chunk"],
metadata={
"cve_id": r["cve_id"],
"cvss_score": r["cvss_score"],
"cwe_ids": r["cwe_ids"],
"affected_software": r["affected_software"],
"source": "nvd"
}
)
documents.append(doc)
return documents
def answer_question(self, question: str) -> Dict:
"""
Full multi-hop RAG pipeline.
Steps:
1. Decompose question into sub-questions
2. Retrieve context for each sub-question
3. Deduplicate and combine context
4. Generate final answer with LLM
"""
# Step 1: Decompose
sub_questions = self.decompose_query(question)
logger.info(f"Decomposed into {len(sub_questions)} sub-questions")
# Step 2: Retrieve for each sub-question
all_documents = []
seen_cves = set()
for sq in sub_questions:
docs = self.retrieve_for_sub_question(sq)
for doc in docs:
cve_id = doc.metadata.get("cve_id")
if cve_id and cve_id not in seen_cves:
seen_cves.add(cve_id)
all_documents.append(doc)
# Limit context to avoid token limits
# GPT-4 Turbo has 128K context, but we keep it manageable
max_docs = 20
if len(all_documents) > max_docs:
# Sort by CVSS score descending, keep highest severity
all_documents.sort(
key=lambda d: d.metadata.get("cvss_score", 0),
reverse=True
)
all_documents = all_documents[:max_docs]
# Step 3: Prepare context
context = "\n\n---\n\n".join([doc.page_content for doc in all_documents])
# Step 4: Generate answer
chain = self.synthesis_prompt | self.llm | StrOutputParser()
answer = chain.invoke({
"context": context,
"question": question
})
return {
"answer": answer,
"sub_questions": sub_questions,
"cves_retrieved": [doc.metadata["cve_id"] for doc in all_documents],
"total_cves_found": len(seen_cves)
}
The multi-hop implementation addresses several production concerns:
- Query decomposition: Complex questions like "Which critical Apache vulnerabilities from 2024 have CWE-79 and available exploits?" are broken into manageable sub-queries.
- Heuristic filter extraction: We parse sub-questions for severity levels, CWE IDs, and software names to leverage metadata filtering in LanceDB.
- Context window management: Even with large context windows, we limit to 20 documents to maintain answer quality and reduce latency.
- Deduplication across hops: Multiple sub-questions might retrieve the same CVE. We track seen IDs to avoid redundant context.
Building the FastAPI Application
Finally, we expose our system through a FastAPI application with proper error handling and async support.
# app.py
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional
import logging
from dotenv import load_dotenv
import os
from ingestion import NVDIngestor
from vector_store import CVEVectorStore
from rag_engine import MultiHopRAGEngine
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="CVE Analysis RAG API",
description="Automated CVE analysis using LLMs and RAG",
version="1.0.0"
)
# CORS for frontend integration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize components
nvd_ingestor = NVDIngestor(api_key=os.getenv("NVD_API_KEY"))
vector_store = CVEVectorStore(db_path=os.getenv("LANCE_DB_PATH", "./cve_vector_store"))
rag_engine = MultiHopRAGEngine(vector_store=vector_store)
class QueryRequest(BaseModel):
question: str = Field(.., min_length=5, max_length=500)
k: int = Field(default=10, ge=1, le=50)
class IngestRequest(BaseModel):
days_back: int = Field(default=7, ge=1, le=365)
@app.on_event("startup")
async def startup_event():
"""Verify components are initialized on startup."""
logger.info("CVE Analysis RAG API starting..")
# Check if vector store has data
table = vector_store.db.open_table(vector_store.table_name)
count = table.count_rows()
logger.info(f"Vector store contains {count} CVE records")
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "timestamp": "2026-06-08T00:00:00Z"}
@app.post("/ingest")
async def ingest_cves(request: IngestRequest):
"""
Ingest recent CVEs from NVD.
Edge case: If ingestion takes too long, consider background tasks.
For production, use Celery or similar task queue.
"""
try:
logger.info(f"Ingesting CVEs from last {request.days_back} days")
cves = nvd_ingestor.ingest_recent_cves(days_back=request.days_back)
if not cves:
raise HTTPException(status_code=404, detail="No new CVEs found")
vector_store.add_cves(cves)
return {
"message": f"Ingested {len(cves)} CVEs",
"cve_ids": [cve.id for cve in cves[:10]], # Sample first 10
"total": len(cves)
}
except Exception as e:
logger.error(f"Ingestion failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/query")
async def query_cves(request: QueryRequest):
"""
Answer a security question using multi-hop RAG.
Example queries:
- "What critical vulnerabilities affect Apache servers?"
- "Which CVEs from 2024 have CWE-79 and CVSS > 7.0?"
- "Are there any exploited vulnerabilities in our PostgreSQL databases?"
"""
try:
result = rag_engine.answer_question(request.question)
return result
except Exception as e:
logger.error(f"Query failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/search")
async def search_cves(
query: str = Query(.., min_length=3),
k: int = Query(10, ge=1, le=50),
min_score: Optional[float] = Query(None, ge=0.0, le=10.0),
cwe: Optional[str] = Query(None),
software: Optional[str] = Query(None)
):
"""
Direct vector search with optional filters.
Useful for programmatic access or debugging.
"""
try:
cwe_filter = [cwe] if cwe else None
software_filter = [software] if software else None
results = vector_store.search(
query=query,
k=k,
min_score=min_score,
cwe_filter=cwe_filter,
software_filter=software_filter
)
return {
"query": query,
"results_count": len(results),
"results": results
}
except Exception as e:
logger.error(f"Search failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)
Running the System
Start the API server:
uvicorn app:app --host 0.0.0.0 --port 8000 --reload
Ingest recent CVEs:
curl -X POST http://localhost:8000/ingest \
-H "Content-Type: application/json" \
-d '{"days_back": 7}'
Query the system:
curl -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-d '{"question": "What critical vulnerabilities affect Apache HTTP Server with available exploits?"}'
Edge Cases and Production Considerations
API Rate Limits: The NVD API limits to 50 requests per 30 seconds with an API key. Our ingestion pipeline respects this with a 0.6s delay. For large-scale ingestion, implement distributed rate limiting using Redis.
Memory Management: Embedding generation with text-embedding-ada-002 creates 1536-dimensional vectors. For 10,000 CVEs, this requires approximately 60MB for vectors alone, plus metadata. LanceDB's disk-based storage helps, but ensure your server has adequate RAM for batch processing.
Stale Data: CVEs are updated frequently. Implement incremental ingestion using lastModStartDate to only fetch recently modified records. Schedule daily updates via cron or a task scheduler.
LLM Hallucination: The RAG system is only as good as its retrieval. If the vector store lacks relevant CVEs, the LLM may fabricate answers. Always include the "If the context doesn't contain enough information" instruction in prompts.
Multi-Hop Limitations: As noted in the MultiHop-RAG paper (ArXiv, 2024), complex queries requiring 3+ hops may still fail. Monitor retrieval quality and consider implementing a verification step that checks if the answer is supported by retrieved documents.
What's Next
This system provides a foundation for automated CVE analysis, but production deployments require additional considerations:
-
Continuous Learning: Implement feedback loops where security analysts rate answers, improving retrieval and generation over time.
-
Exploit Intelligence Integration: Connect to exploit databases (Exploit-DB, Metasploit) to enrich CVE records with exploit availability—a critical factor in prioritization.
-
Custom Embedding Models: Fine-tune embedding models on security-specific text for improved retrieval accuracy. The
text-embedding-ada-002model works well, but domain-specific models may perform better. -
Multi-Model RAG: Use different LLMs for different tasks—a smaller, faster model for query decomposition and a larger model for answer synthesis.
-
Alerting Integration: Connect to SIEM systems (Splunk, Elastic) to automatically correlate CVEs with your asset inventory and generate prioritized alerts.
The combination of automated CVE ingestion, vector search, and multi-hop RAG transforms how security teams handle vulnerability intelligence. By reducing analysis time from hours to seconds, teams can focus on remediation rather than research.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Build a Multimodal App with Gemini 2.0 Vision API
Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API
How to Build an AI Pentesting Assistant with LangChain
Practical tutorial: Build an AI-powered pentesting assistant
How to Build Autonomous Scientific Discovery Agents with EurekAgent
Practical tutorial: The story discusses a significant advancement in AI research that could impact autonomous scientific discovery.