Back to Tutorials
tutorialstutorialaillm

How to Automate CVE Analysis with LLMs and RAG

Practical tutorial: Automate CVE analysis with LLMs and RAG

BlogIA AcademyJune 8, 202619 min read3 661 words

How to Automate CVE Analysis with LLMs and RAG

Table of Contents

📺 Watch: Intro to Large Language Models

Video by Andrej Karpathy


Security teams face an overwhelming volume of Common Vulnerabilities and Exposures (CVEs) published daily. Manually analyzing each CVE for relevance, severity, and potential exploitability is no longer feasible at scale. This tutorial builds a production-ready system that automates CVE analysis using Large Language Models (LLMs) combined with Retrieval-Augmented Generation (RAG), enabling security engineers to triage vulnerabilities efficiently.

We'll construct a pipeline that ingests CVE records from the National Vulnerability Database (NVD), enriches them with MITRE CWE weakness classifications, stores embedding [1]s in a vector database, and answers complex security queries using multi-hop reasoning. The system leverages recent research from ArXiv on automated CVE-to-CWE mapping and RAG optimization.

Real-World Use Case and Architecture

In production environments, security operations centers (SOCs) receive hundreds of CVE notifications daily. The challenge isn't just reading them—it's understanding which vulnerabilities affect your specific technology stack, how they relate to known weaknesses, and what remediation priority they deserve.

Our architecture addresses three critical problems:

  1. Automated CWE Mapping: As documented in the paper "Automated Mapping of CVE Vulnerability Records to MITRE CWE Weaknesses" (ArXiv, 2024), manually mapping CVEs to CWEs is error-prone and slow. Our system automates this using LLM-based classification.

  2. Multi-Hop Reasoning: Security analysis often requires connecting multiple pieces of information—a CVE description, affected software versions, exploit availability, and CWE taxonomy. The "MultiHop-RAG: Benchmarking Retrieval-Augmented Generation for Multi-Hop Queries" paper (ArXiv, 2024) demonstrates that standard RAG systems struggle with queries requiring multiple retrieval steps. We implement a multi-hop retrieval strategy.

  3. Production Reliability: Drawing from "T-RAG: Lessons from the LLM Trenches" (ArXiv, 2024), we incorporate error handling, rate limiting, and fallback mechanisms essential for production deployments.

The system architecture consists of:

  • Ingestion Pipeline: Fetches CVE data from NVD API, processes JSON records
  • Embedding Service: Converts CVE descriptions and CWE mappings into vector embeddings
  • Vector Store: LanceDB for efficient similarity search
  • RAG Engine: LangChain [9]-based retrieval with multi-hop query decomposition
  • API Layer: FastAPI endpoints for querying and analysis

Prerequisites and Environment Setup

Before diving into implementation, ensure your environment meets these requirements:

System Requirements:

  • Python 3.10+
  • 8GB RAM minimum (16GB recommended for embedding generation)
  • GPU optional but recommended for faster embeddings

API Keys:

  • OpenAI [10] API key (for LLM access) or local model (we'll use OpenAI for this tutorial)
  • NVD API key (free, rate limits without one)

Install Dependencies:

# Create virtual environment
python -m venv cve-rag-env
source cve-rag-env/bin/activate  # Linux/Mac
# or .\cve-rag-env\Scripts\activate  # Windows

# Core dependencies
pip install langchain==0.1.0
pip install langchain-openai==0.0.2
pip install lancedb==0.4.0
pip install fastapi==0.109.0
pip install uvicorn==0.27.0
pip install pydantic==2.5.0
pip install httpx==0.26.0
pip install python-dotenv==1.0.0
pip install tiktoken==0.5.0
pip install tenacity==8.2.0  # For retry logic

Create a .env file for sensitive configuration:

OPENAI_API_KEY=sk-your-key-here
NVD_API_KEY=your-nvd-key-here
LANCE_DB_PATH=./cve_vector_store

Building the CVE Ingestion Pipeline

The ingestion pipeline is the foundation of our system. It must handle NVD's API rate limits, parse complex JSON structures, and extract relevant fields for embedding.

# ingestion.py
import json
import time
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
from pydantic import BaseModel, Field

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CVERecord(BaseModel):
    """Pydantic model for validated CVE data."""
    id: str = Field(.., pattern=r'^CVE-\d{4}-\d{4,}$')
    description: str
    published_date: datetime
    last_modified: datetime
    cvss_score: Optional[float] = None
    cvss_severity: Optional[str] = None
    affected_software: List[str] = []
    cwe_ids: List[str] = []
    exploit_available: bool = False

class NVDIngestor:
    """Handles NVD API interactions with rate limiting and retry logic."""

    BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.client = httpx.Client(
            headers={"apiKey": api_key},
            timeout=30.0
        )
        # NVD rate limit: 5 requests per 30 seconds without key, 50 with key
        self.rate_limit_delay = 0.6  # ~1.6 requests/second with key

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10)
    )
    def fetch_cves(self, 
                   start_index: int = 0, 
                   results_per_page: int = 50,
                   last_mod_start_date: Optional[str] = None) -> Dict:
        """
        Fetch CVEs from NVD with pagination support.

        Args:
            start_index: Pagination offset
            results_per_page: Max 200 per NVD API limit
            last_mod_start_date: ISO 8601 date for incremental updates

        Returns:
            Raw API response as dictionary
        """
        params = {
            "startIndex": start_index,
            "resultsPerPage": min(results_per_page, 200)
        }

        if last_mod_start_date:
            params["lastModStartDate"] = last_mod_start_date
            params["lastModEndDate"] = (
                datetime.utcnow().isoformat() + "Z"
            )

        response = self.client.get(self.BASE_URL, params=params)
        response.raise_for_status()

        # Respect rate limits
        time.sleep(self.rate_limit_delay)

        return response.json()

    def parse_cve_item(self, item: Dict) -> Optional[CVERecord]:
        """
        Extract structured data from raw NVD JSON item.

        Edge case: Some CVEs lack descriptions or have malformed data.
        We handle missing fields gracefully.
        """
        try:
            cve_data = item.get("cve", {})
            cve_id = cve_data.get("id", "")

            # Extract description (prefer English)
            descriptions = cve_data.get("descriptions", [])
            description = ""
            for desc in descriptions:
                if desc.get("lang") == "en":
                    description = desc.get("value", "")
                    break

            if not description:
                logger.warning(f"No English description for {cve_id}")
                return None

            # Extract CVSS score (v3 preferred, fallback to v2)
            metrics = cve_data.get("metrics", {})
            cvss_score = None
            cvss_severity = None

            for version in ["cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]:
                if version in metrics:
                    cvss_data = metrics[version][0].get("cvssData", {})
                    cvss_score = cvss_data.get("baseScore")
                    cvss_severity = cvss_data.get("baseSeverity")
                    break

            # Extract CWE IDs
            weaknesses = cve_data.get("weaknesses", [])
            cwe_ids = []
            for weakness in weaknesses:
                for desc in weakness.get("description", []):
                    if desc.get("value", "").startswith("CWE-"):
                        cwe_ids.append(desc["value"])

            # Extract affected software
            configurations = cve_data.get("configurations", [])
            affected_software = []
            for config in configurations:
                for node in config.get("nodes", []):
                    for match in node.get("cpeMatch", []):
                        criteria = match.get("criteria", "")
                        if criteria:
                            # Parse CPE 2.3 format: cpe:2.3:a:vendor:product:version
                            parts = criteria.split(":")
                            if len(parts) >= 5:
                                vendor = parts[3]
                                product = parts[4]
                                affected_software.append(f"{vendor}/{product}")

            return CVERecord(
                id=cve_id,
                description=description,
                published_date=datetime.fromisoformat(
                    cve_data.get("published", "").replace("Z", "+00:00")
                ),
                last_modified=datetime.fromisoformat(
                    cve_data.get("lastModified", "").replace("Z", "+00:00")
                ),
                cvss_score=cvss_score,
                cvss_severity=cvss_severity,
                affected_software=list(set(affected_software)),  # Deduplicate
                cwe_ids=cwe_ids,
                exploit_available=False  # Would require additional API
            )

        except Exception as e:
            logger.error(f"Failed to parse CVE item: {e}")
            return None

    def ingest_recent_cves(self, days_back: int = 7) -> List[CVERecord]:
        """
        Ingest CVEs modified in the last N days.

        Handles pagination automatically, up to NVD's max of 2000 results.
        """
        start_date = (datetime.utcnow() - timedelta(days=days_back)).isoformat() + "Z"

        all_cves = []
        start_index = 0
        total_results = None

        while total_results is None or start_index < total_results:
            logger.info(f"Fetching CVEs starting at index {start_index}")

            response = self.fetch_cves(
                start_index=start_index,
                last_mod_start_date=start_date
            )

            total_results = response.get("totalResults", 0)
            vulnerabilities = response.get("vulnerabilities", [])

            for item in vulnerabilities:
                cve = self.parse_cve_item(item)
                if cve:
                    all_cves.append(cve)

            start_index += len(vulnerabilities)

            # NVD limits to 2000 results per query
            if start_index >= 2000:
                logger.warning("Reached NVD 2000 result limit")
                break

        logger.info(f"Ingested {len(all_cves)} CVEs")
        return all_cves

Key design decisions in the ingestion pipeline:

  • Tenacity for retries: NVD API can be unreliable. Exponential backoff prevents hammering the service.
  • Pydantic validation: Ensures data integrity before it enters our pipeline. Malformed records are logged and skipped.
  • Rate limiting: Even with an API key, we respect NVD's limits. The 0.6s delay keeps us under 50 requests/30 seconds.
  • Graceful degradation: Missing fields don't crash the pipeline—we log warnings and continue.

Setting up the LanceDB Vector Store

LanceDB provides efficient vector storage with disk-based indexing, making it suitable for large CVE datasets without requiring GPU memory. We'll store embeddings alongside metadata for filtered retrieval.

# vector_store.py
import lancedb
import pyarrow as pa
from typing import List, Optional, Dict
import numpy as np
from langchain_openai import OpenAIEmbeddings
from ingestion import CVERecord

class CVEVectorStore:
    """Manages CVE embeddings in LanceDB with metadata filtering."""

    def __init__(self, db_path: str, embedding_model: str = "text-embedding-ada-002"):
        """
        Initialize LanceDB connection and embedding model.

        Args:
            db_path: Path to LanceDB database directory
            embedding_model: OpenAI embedding model name
        """
        self.db = lancedb.connect(db_path)
        self.embeddings = OpenAIEmbeddings(model=embedding_model)
        self.table_name = "cve_records"

        # Create table if it doesn't exist
        if self.table_name not in self.db.table_names():
            self._create_table()

    def _create_table(self):
        """Define schema and create empty table."""
        schema = pa.schema([
            pa.field("vector", pa.list_(pa.float32(), 1536)),  # Ada-002 dimension
            pa.field("cve_id", pa.string()),
            pa.field("description", pa.string()),
            pa.field("published_date", pa.timestamp("us")),
            pa.field("cvss_score", pa.float32()),
            pa.field("cvss_severity", pa.string()),
            pa.field("affected_software", pa.list_(pa.string())),
            pa.field("cwe_ids", pa.list_(pa.string())),
            pa.field("exploit_available", pa.bool_()),
            pa.field("text_chunk", pa.string()),  # For RAG context
        ])

        self.db.create_table(self.table_name, schema=schema)

    def _prepare_text_chunk(self, cve: CVERecord) -> str:
        """
        Create a searchable text chunk combining CVE fields.

        This is critical for RAG performance—we want the LLM to have
        all relevant context in a single retrievable chunk.
        """
        parts = [
            f"CVE ID: {cve.id}",
            f"Description: {cve.description}",
            f"Published: {cve.published_date.isoformat()}",
        ]

        if cve.cvss_score is not None:
            parts.append(f"CVSS Score: {cve.cvss_score} ({cve.cvss_severity})")

        if cve.cwe_ids:
            parts.append(f"CWE Weaknesses: {', '.join(cve.cwe_ids)}")

        if cve.affected_software:
            parts.append(f"Affected Software: {', '.join(cve.affected_software)}")

        return "\n".join(parts)

    def add_cves(self, cves: List[CVERecord], batch_size: int = 100):
        """
        Add CVE records to vector store in batches.

        Memory consideration: Embedding generation is memory-intensive.
        We process in batches to avoid OOM errors.
        """
        table = self.db.open_table(self.table_name)

        for i in range(0, len(cves), batch_size):
            batch = cves[i:i + batch_size]

            # Prepare text chunks for embedding
            texts = [self._prepare_text_chunk(cve) for cve in batch]

            # Generate embeddings (batched API call)
            vectors = self.embeddings.embed_documents(texts)

            # Prepare records for insertion
            records = []
            for j, cve in enumerate(batch):
                records.append({
                    "vector": vectors[j],
                    "cve_id": cve.id,
                    "description": cve.description,
                    "published_date": cve.published_date,
                    "cvss_score": cve.cvss_score or 0.0,
                    "cvss_severity": cve.cvss_severity or "NONE",
                    "affected_software": cve.affected_software,
                    "cwe_ids": cve.cwe_ids,
                    "exploit_available": cve.exploit_available,
                    "text_chunk": texts[j],
                })

            table.add(records)
            logger.info(f"Added batch {i//batch_size + 1}: {len(records)} CVEs")

    def search(self, 
               query: str, 
               k: int = 10,
               min_score: Optional[float] = None,
               cwe_filter: Optional[List[str]] = None,
               software_filter: Optional[List[str]] = None) -> List[Dict]:
        """
        Hybrid search with metadata filtering.

        Args:
            query: Natural language query
            k: Number of results
            min_score: Minimum CVSS score filter
            cwe_filter: List of CWE IDs to include
            software_filter: List of software to include

        Returns:
            List of matching CVE records with similarity scores
        """
        table = self.db.open_table(self.table_name)

        # Generate query embedding
        query_vector = self.embeddings.embed_query(query)

        # Build search with optional filters
        search_query = table.search(query_vector).limit(k * 2)  # Fetch more for filtering

        # Apply metadata filters (LanceDB supports pre-filtering)
        if min_score is not None:
            search_query = search_query.where(f"cvss_score >= {min_score}")

        if cwe_filter:
            # LanceDB doesn't support list contains natively; we use a workaround
            # In production, consider flattening CWE IDs into separate rows
            cwe_conditions = " OR ".join([f"cwe_ids LIKE '%{cwe}%'" for cwe in cwe_filter])
            search_query = search_query.where(cwe_conditions)

        if software_filter:
            sw_conditions = " OR ".join([f"affected_software LIKE '%{sw}%'" for sw in software_filter])
            search_query = search_query.where(sw_conditions)

        results = search_query.to_list()

        # Post-process: deduplicate and sort by score
        seen_ids = set()
        unique_results = []
        for r in results:
            if r["cve_id"] not in seen_ids:
                seen_ids.add(r["cve_id"])
                unique_results.append(r)
                if len(unique_results) >= k:
                    break

        return unique_results

Important implementation details:

  • Text chunk preparation: Combining multiple CVE fields into a single chunk improves retrieval quality. The LLM gets all relevant context without needing multiple retrievals.
  • Batch processing: OpenAI's embedding API has rate limits and memory constraints. Batches of 100 balance throughput and reliability.
  • Metadata filtering: LanceDB supports pre-filtering, which is more efficient than post-filtering for large datasets. The LIKE operator workaround handles list fields.
  • Deduplication: NVD sometimes returns duplicate entries. We track seen CVE IDs to ensure unique results.

Implementing Multi-Hop RAG for CVE Analysis

The core of our system is the RAG engine that answers complex security queries. Standard RAG retrieves documents once and generates an answer. Multi-hop RAG decomposes complex questions into sub-questions, retrieves context for each, and synthesizes the final answer.

# rag_engine.py
from typing import List, Dict, Optional
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
import re

class MultiHopRAGEngine:
    """
    Implements multi-hop retrieval for complex CVE analysis queries.

    Based on MultiHop-RAG benchmarking (ArXiv, 2024), standard RAG
    fails on queries requiring multiple reasoning steps. This engine
    decomposes queries and aggregates results.
    """

    def __init__(self, vector_store, llm_model: str = "gpt-4-turbo-preview"):
        self.vector_store = vector_store
        self.llm = ChatOpenAI(
            model=llm_model,
            temperature=0.1,  # Low temperature for factual answers
            max_tokens=2000
        )

        # Prompt for query decomposition
        self.decomposition_prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a security analyst specializing in CVE analysis.
            Decompose the following security question into 2-4 sub-questions that,
            when answered together, provide a complete answer.

            Rules:
            - Each sub-question should be independently answerable from CVE data
            - Include specific CWE IDs, software names, or date ranges if mentioned
            - Output one sub-question per line, no numbering"""),
            ("human", "{question}")
        ])

        # Prompt for final answer synthesis
        self.synthesis_prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a senior security engineer analyzing CVEs.
            Based on the retrieved CVE records below, answer the user's question.

            Context from CVE database:
            {context}

            Provide:
            1. Direct answer to the question
            2. Relevant CVE IDs with brief explanations
            3. Risk assessment (Critical/High/Medium/Low)
            4. Recommended actions

            If the context doesn't contain enough information, state what's missing.
            Base your answer ONLY on the provided context."""),
            ("human", "{question}")
        ])

    def decompose_query(self, question: str) -> List[str]:
        """
        Break complex questions into simpler sub-queries.

        Example:
        Input: "What critical CVEs affect Apache servers and have available exploits?"
        Output: [
            "CVEs with CVSS score >= 9.0 affecting Apache software",
            "CVEs with known exploits available",
            "Apache server CVEs published in the last 90 days"
        ]
        """
        chain = self.decomposition_prompt | self.llm | StrOutputParser()
        result = chain.invoke({"question": question})

        # Parse sub-questions (one per line)
        sub_questions = [
            line.strip() for line in result.split("\n") 
            if line.strip() and not line.strip().startswith("-")
        ]

        # Fallback: if decomposition fails, use original question
        if not sub_questions:
            sub_questions = [question]

        return sub_questions[:4]  # Limit to 4 sub-questions

    def retrieve_for_sub_question(self, sub_question: str, k: int = 5) -> List[Document]:
        """
        Retrieve relevant CVE records for a single sub-question.

        Uses heuristic parsing to extract filters from the sub-question.
        """
        # Extract potential filters from sub-question
        min_score = None
        cwe_filter = None
        software_filter = None

        # Check for severity keywords
        severity_map = {
            "critical": 9.0,
            "high": 7.0,
            "medium": 4.0,
            "low": 0.1
        }
        for severity, score in severity_map.items():
            if severity in sub_question.lower():
                min_score = score
                break

        # Check for CWE mentions
        cwe_pattern = r'CWE-\d+'
        cwe_matches = re.findall(cwe_pattern, sub_question, re.IGNORECASE)
        if cwe_matches:
            cwe_filter = [cwe.upper() for cwe in cwe_matches]

        # Check for software mentions (common patterns)
        software_keywords = ["apache", "nginx", "linux", "windows", "mysql", "postgresql"]
        for sw in software_keywords:
            if sw in sub_question.lower():
                software_filter = [sw]
                break

        # Perform search with extracted filters
        results = self.vector_store.search(
            query=sub_question,
            k=k,
            min_score=min_score,
            cwe_filter=cwe_filter,
            software_filter=software_filter
        )

        # Convert to LangChain Document format
        documents = []
        for r in results:
            doc = Document(
                page_content=r["text_chunk"],
                metadata={
                    "cve_id": r["cve_id"],
                    "cvss_score": r["cvss_score"],
                    "cwe_ids": r["cwe_ids"],
                    "affected_software": r["affected_software"],
                    "source": "nvd"
                }
            )
            documents.append(doc)

        return documents

    def answer_question(self, question: str) -> Dict:
        """
        Full multi-hop RAG pipeline.

        Steps:
        1. Decompose question into sub-questions
        2. Retrieve context for each sub-question
        3. Deduplicate and combine context
        4. Generate final answer with LLM
        """
        # Step 1: Decompose
        sub_questions = self.decompose_query(question)
        logger.info(f"Decomposed into {len(sub_questions)} sub-questions")

        # Step 2: Retrieve for each sub-question
        all_documents = []
        seen_cves = set()

        for sq in sub_questions:
            docs = self.retrieve_for_sub_question(sq)
            for doc in docs:
                cve_id = doc.metadata.get("cve_id")
                if cve_id and cve_id not in seen_cves:
                    seen_cves.add(cve_id)
                    all_documents.append(doc)

        # Limit context to avoid token limits
        # GPT-4 Turbo has 128K context, but we keep it manageable
        max_docs = 20
        if len(all_documents) > max_docs:
            # Sort by CVSS score descending, keep highest severity
            all_documents.sort(
                key=lambda d: d.metadata.get("cvss_score", 0),
                reverse=True
            )
            all_documents = all_documents[:max_docs]

        # Step 3: Prepare context
        context = "\n\n---\n\n".join([doc.page_content for doc in all_documents])

        # Step 4: Generate answer
        chain = self.synthesis_prompt | self.llm | StrOutputParser()
        answer = chain.invoke({
            "context": context,
            "question": question
        })

        return {
            "answer": answer,
            "sub_questions": sub_questions,
            "cves_retrieved": [doc.metadata["cve_id"] for doc in all_documents],
            "total_cves_found": len(seen_cves)
        }

The multi-hop implementation addresses several production concerns:

  • Query decomposition: Complex questions like "Which critical Apache vulnerabilities from 2024 have CWE-79 and available exploits?" are broken into manageable sub-queries.
  • Heuristic filter extraction: We parse sub-questions for severity levels, CWE IDs, and software names to leverage metadata filtering in LanceDB.
  • Context window management: Even with large context windows, we limit to 20 documents to maintain answer quality and reduce latency.
  • Deduplication across hops: Multiple sub-questions might retrieve the same CVE. We track seen IDs to avoid redundant context.

Building the FastAPI Application

Finally, we expose our system through a FastAPI application with proper error handling and async support.

# app.py
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional
import logging
from dotenv import load_dotenv
import os

from ingestion import NVDIngestor
from vector_store import CVEVectorStore
from rag_engine import MultiHopRAGEngine

load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(
    title="CVE Analysis RAG API",
    description="Automated CVE analysis using LLMs and RAG",
    version="1.0.0"
)

# CORS for frontend integration
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize components
nvd_ingestor = NVDIngestor(api_key=os.getenv("NVD_API_KEY"))
vector_store = CVEVectorStore(db_path=os.getenv("LANCE_DB_PATH", "./cve_vector_store"))
rag_engine = MultiHopRAGEngine(vector_store=vector_store)

class QueryRequest(BaseModel):
    question: str = Field(.., min_length=5, max_length=500)
    k: int = Field(default=10, ge=1, le=50)

class IngestRequest(BaseModel):
    days_back: int = Field(default=7, ge=1, le=365)

@app.on_event("startup")
async def startup_event():
    """Verify components are initialized on startup."""
    logger.info("CVE Analysis RAG API starting..")
    # Check if vector store has data
    table = vector_store.db.open_table(vector_store.table_name)
    count = table.count_rows()
    logger.info(f"Vector store contains {count} CVE records")

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "healthy", "timestamp": "2026-06-08T00:00:00Z"}

@app.post("/ingest")
async def ingest_cves(request: IngestRequest):
    """
    Ingest recent CVEs from NVD.

    Edge case: If ingestion takes too long, consider background tasks.
    For production, use Celery or similar task queue.
    """
    try:
        logger.info(f"Ingesting CVEs from last {request.days_back} days")
        cves = nvd_ingestor.ingest_recent_cves(days_back=request.days_back)

        if not cves:
            raise HTTPException(status_code=404, detail="No new CVEs found")

        vector_store.add_cves(cves)

        return {
            "message": f"Ingested {len(cves)} CVEs",
            "cve_ids": [cve.id for cve in cves[:10]],  # Sample first 10
            "total": len(cves)
        }
    except Exception as e:
        logger.error(f"Ingestion failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/query")
async def query_cves(request: QueryRequest):
    """
    Answer a security question using multi-hop RAG.

    Example queries:
    - "What critical vulnerabilities affect Apache servers?"
    - "Which CVEs from 2024 have CWE-79 and CVSS > 7.0?"
    - "Are there any exploited vulnerabilities in our PostgreSQL databases?"
    """
    try:
        result = rag_engine.answer_question(request.question)
        return result
    except Exception as e:
        logger.error(f"Query failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/search")
async def search_cves(
    query: str = Query(.., min_length=3),
    k: int = Query(10, ge=1, le=50),
    min_score: Optional[float] = Query(None, ge=0.0, le=10.0),
    cwe: Optional[str] = Query(None),
    software: Optional[str] = Query(None)
):
    """
    Direct vector search with optional filters.

    Useful for programmatic access or debugging.
    """
    try:
        cwe_filter = [cwe] if cwe else None
        software_filter = [software] if software else None

        results = vector_store.search(
            query=query,
            k=k,
            min_score=min_score,
            cwe_filter=cwe_filter,
            software_filter=software_filter
        )

        return {
            "query": query,
            "results_count": len(results),
            "results": results
        }
    except Exception as e:
        logger.error(f"Search failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "app:app",
        host="0.0.0.0",
        port=8000,
        reload=True,
        log_level="info"
    )

Running the System

Start the API server:

uvicorn app:app --host 0.0.0.0 --port 8000 --reload

Ingest recent CVEs:

curl -X POST http://localhost:8000/ingest \
  -H "Content-Type: application/json" \
  -d '{"days_back": 7}'

Query the system:

curl -X POST http://localhost:8000/query \
  -H "Content-Type: application/json" \
  -d '{"question": "What critical vulnerabilities affect Apache HTTP Server with available exploits?"}'

Edge Cases and Production Considerations

API Rate Limits: The NVD API limits to 50 requests per 30 seconds with an API key. Our ingestion pipeline respects this with a 0.6s delay. For large-scale ingestion, implement distributed rate limiting using Redis.

Memory Management: Embedding generation with text-embedding-ada-002 creates 1536-dimensional vectors. For 10,000 CVEs, this requires approximately 60MB for vectors alone, plus metadata. LanceDB's disk-based storage helps, but ensure your server has adequate RAM for batch processing.

Stale Data: CVEs are updated frequently. Implement incremental ingestion using lastModStartDate to only fetch recently modified records. Schedule daily updates via cron or a task scheduler.

LLM Hallucination: The RAG system is only as good as its retrieval. If the vector store lacks relevant CVEs, the LLM may fabricate answers. Always include the "If the context doesn't contain enough information" instruction in prompts.

Multi-Hop Limitations: As noted in the MultiHop-RAG paper (ArXiv, 2024), complex queries requiring 3+ hops may still fail. Monitor retrieval quality and consider implementing a verification step that checks if the answer is supported by retrieved documents.

What's Next

This system provides a foundation for automated CVE analysis, but production deployments require additional considerations:

  1. Continuous Learning: Implement feedback loops where security analysts rate answers, improving retrieval and generation over time.

  2. Exploit Intelligence Integration: Connect to exploit databases (Exploit-DB, Metasploit) to enrich CVE records with exploit availability—a critical factor in prioritization.

  3. Custom Embedding Models: Fine-tune embedding models on security-specific text for improved retrieval accuracy. The text-embedding-ada-002 model works well, but domain-specific models may perform better.

  4. Multi-Model RAG: Use different LLMs for different tasks—a smaller, faster model for query decomposition and a larger model for answer synthesis.

  5. Alerting Integration: Connect to SIEM systems (Splunk, Elastic) to automatically correlate CVEs with your asset inventory and generate prioritized alerts.

The combination of automated CVE ingestion, vector search, and multi-hop RAG transforms how security teams handle vulnerability intelligence. By reducing analysis time from hours to seconds, teams can focus on remediation rather than research.


References

1. Wikipedia - Embedding. Wikipedia. [Source]
2. Wikipedia - Rag. Wikipedia. [Source]
3. Wikipedia - OpenAI. Wikipedia. [Source]
4. arXiv - T-RAG: Lessons from the LLM Trenches. Arxiv. [Source]
5. arXiv - MultiHop-RAG: Benchmarking Retrieval-Augmented Generation fo. Arxiv. [Source]
6. GitHub - fighting41love/funNLP. Github. [Source]
7. GitHub - Shubhamsaboo/awesome-llm-apps. Github. [Source]
8. GitHub - openai/openai-python. Github. [Source]
9. GitHub - langchain-ai/langchain. Github. [Source]
10. OpenAI Pricing. Pricing. [Source]
tutorialaillmrag
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles