Back to Tutorials
tutorialstutorialai

How to Build an AI Pentesting Assistant with LangChain

Practical tutorial: Build an AI-powered pentesting assistant

BlogIA AcademyMay 25, 202614 min read2 731 words

How to Build an AI Pentesting Assistant with LangChain

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Penetration testing remains one of the most labor-intensive aspects of security engineering. According to a 2025 report by the SANS Institute, the average enterprise pentest generates over 2,000 findings requiring manual triage. An AI-powered pentesting assistant can automate reconnaissance, vulnerability classification, and report generation—freeing security engineers to focus on exploitation and remediation.

In this tutorial, you'll build a production-ready AI pentesting assistant using LangChain, FastAPI, and a local LLM. The assistant will parse Nmap scan results, classify vulnerabilities using a vector database, generate exploitation suggestions, and produce structured security reports. We'll cover architecture decisions, memory management, API rate limiting, and edge cases you'll encounter in real deployments.

Understanding the Pentesting Assistant Architecture

Before writing code, let's examine why this architecture works for security automation. The assistant uses a Retrieval-Augmented Generation (RAG) pipeline with three core components:

  1. Scan Parser Module: Converts raw Nmap XML output into structured JSON with host, port, service, and CVE mappings
  2. Vector Store (ChromaDB): Stores known vulnerability patterns and exploitation techniques for semantic similarity search
  3. LLM Orchestrator (LangChain): Chains together parsing, retrieval, and generation with configurable temperature and token limits

The key architectural decision is using a local LLM (via Ollama) rather than a cloud API. This ensures sensitive scan data never leaves your network—critical for compliance with frameworks like PCI DSS and HIPAA. According to the OWASP AI Security Guidelines (2025), local inference reduces attack surface by eliminating network-based data exfiltration vectors.

We'll use FastAPI for the REST API because it provides async request handling, automatic OpenAPI documentation, and built-in validation via Pydantic models. LangChain's ConversationBufferMemory with a 10-turn limit prevents context window overflow while maintaining conversation coherence.

Prerequisites and Environment Setup

You'll need Python 3.11+ and Ollama installed locally. The following commands set up the environment on Ubuntu 24.04 LTS:

# Install system dependencies
sudo apt update && sudo apt install -y python3.11 python3.11-venv nmap

# Create virtual environment
python3.11 -m venv pentest-ai
source pentest-ai/bin/activate

# Install core packages
pip install langchain==0.3.14 langchain-community==0.3.14 chromadb==0.6.3 fastapi==0.115.6 uvicorn==0.34.0 pydantic==2.10.4 xmltodict==0.14.2

# Install Ollama and pull a security-focused model
curl -fsSL https://ollama.com/install.sh | sh
ollama pull llama3.2:3b  # 3B parameter model optimized for instruction following

The llama3.2:3b model provides a good balance between inference speed and quality for security tasks. For production deployments with GPU access, consider llama3.2:70b or mixtral:8x7b for more nuanced vulnerability analysis.

Building the Core Pentesting Assistant

We'll implement the assistant in three phases: scan parsing, vector store population, and the LangChain orchestration pipeline.

Phase 1: Nmap Scan Parser

Create scan_parser.py to convert raw Nmap output into structured data:

import xmltodict
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
from datetime import datetime

class ServiceInfo(BaseModel):
    port: int
    protocol: str
    service: str
    version: Optional[str] = None
    cpe: Optional[str] = None
    state: str = "open"

class HostInfo(BaseModel):
    ip: str
    hostname: Optional[str] = None
    os: Optional[str] = None
    services: List[ServiceInfo] = Field(default_factory=list)
    scan_date: datetime = Field(default_factory=datetime.now)

class NmapParser:
    """Parses Nmap XML output into structured HostInfo objects."""

    def __init__(self, xml_path: str):
        self.xml_path = xml_path

    def parse(self) -> List[HostInfo]:
        """Parse Nmap XML and return list of discovered hosts."""
        with open(self.xml_path, 'r') as f:
            raw = xmltodict.parse(f.read())

        hosts = []
        # Handle both single host and multiple host cases
        nmap_hosts = raw['nmaprun'].get('host', [])
        if isinstance(nmap_hosts, dict):
            nmap_hosts = [nmap_hosts]

        for host_entry in nmap_hosts:
            host = self._parse_host(host_entry)
            if host:
                hosts.append(host)
        return hosts

    def _parse_host(self, entry: Dict) -> Optional[HostInfo]:
        """Extract host information from XML entry."""
        try:
            # Get IP address
            addresses = entry.get('address', [])
            if isinstance(addresses, dict):
                addresses = [addresses]

            ip = None
            for addr in addresses:
                if addr.get('@addrtype') == 'ipv4':
                    ip = addr['@addr']
                    break

            if not ip:
                return None

            # Get hostname if available
            hostnames = entry.get('hostnames', {}).get('hostname', [])
            hostname = None
            if isinstance(hostnames, dict):
                hostnames = [hostnames]
            if hostnames:
                hostname = hostnames[0].get('@name')

            # Parse OS detection
            os_info = entry.get('os', {}).get('osmatch', [])
            os_name = None
            if isinstance(os_info, dict):
                os_info = [os_info]
            if os_info:
                os_name = os_info[0].get('@name')

            # Parse ports and services
            ports = entry.get('ports', {}).get('port', [])
            if isinstance(ports, dict):
                ports = [ports]

            services = []
            for port_entry in ports:
                service = self._parse_service(port_entry)
                if service:
                    services.append(service)

            return HostInfo(
                ip=ip,
                hostname=hostname,
                os=os_name,
                services=services
            )
        except (KeyError, TypeError, AttributeError) as e:
            print(f"Warning: Failed to parse host entry: {e}")
            return None

    def _parse_service(self, port_entry: Dict) -> Optional[ServiceInfo]:
        """Extract service information from port entry."""
        try:
            port_id = int(port_entry.get('@portid', 0))
            protocol = port_entry.get('@protocol', 'tcp')
            state = port_entry.get('state', {}).get('@state', 'unknown')

            service_data = port_entry.get('service', {})
            service_name = service_data.get('@name', 'unknown')
            version = service_data.get('@version')
            cpe = service_data.get('@cpe')

            return ServiceInfo(
                port=port_id,
                protocol=protocol,
                service=service_name,
                version=version,
                cpe=cpe,
                state=state
            )
        except (ValueError, KeyError) as e:
            print(f"Warning: Failed to parse service entry: {e}")
            return None

This parser handles edge cases like missing hostnames, multiple IP addresses, and malformed XML entries. The xmltodict library converts Nmap's XML output into nested dictionaries, which we then validate through Pydantic models. This approach catches schema violations early rather than propagating malformed data downstream.

Phase 2: Vector Store for Vulnerability Patterns

Create vuln_store.py to populate ChromaDB with known vulnerability patterns:

from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OllamaEmbeddings
from langchain.schema import Document
from typing import List, Dict
import json

class VulnerabilityStore:
    """Manages vulnerability pattern storage and retrieval."""

    def __init__(self, persist_directory: str = "./chroma_db"):
        self.embeddings = OllamaEmbeddings(model="llama3.2:3b")
        self.persist_directory = persist_directory

        # Initialize or load existing vector store
        self.vectorstore = Chroma(
            collection_name="vulnerability_patterns",
            embedding_function=self.embeddings,
            persist_directory=persist_directory
        )

    def populate_from_cve_data(self, cve_file: str):
        """Load CVE patterns from JSON file into vector store."""
        with open(cve_file, 'r') as f:
            cve_entries = json.load(f)

        documents = []
        for entry in cve_entries:
            # Create rich document with metadata
            doc = Document(
                page_content=f"""
                Vulnerability: {entry.get('id', 'Unknown')}
                Description: {entry.get('description', 'No description')}
                CVSS Score: {entry.get('cvss_score', 'N/A')}
                Affected Software: {entry.get('affected_software', 'Unknown')}
                Exploitation Technique: {entry.get('exploitation_technique', 'Not documented')}
                Remediation: {entry.get('remediation', 'Not available')}
                """,
                metadata={
                    "cve_id": entry.get('id'),
                    "cvss_score": entry.get('cvss_score'),
                    "source": "NVD",
                    "year": entry.get('year', 2025)
                }
            )
            documents.append(doc)

        # Batch add documents to avoid memory issues
        batch_size = 100
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            self.vectorstore.add_documents(batch)

        self.vectorstore.persist()
        print(f"Added {len(documents)} vulnerability patterns to vector store")

    def search_similar_vulnerabilities(self, query: str, k: int = 5) -> List[Document]:
        """Search for vulnerabilities similar to the query."""
        return self.vectorstore.similarity_search(query, k=k)

The vector store uses OllamaEmbeddings to generate 4096-dimensional embeddings for each vulnerability pattern. We batch document insertion to prevent memory exhaustion when processing large CVE datasets (the NVD database contains over 200,000 entries as of 2025). The persist() call ensures embeddings survive process restarts.

Phase 3: LangChain Orchestration Pipeline

Create pentest_assistant.py with the complete LangChain pipeline:

from langchain.chains import LLMChain
from langchain.memory import ConversationBufferMemory
from langchain.prompts import PromptTemplate
from langchain_community.llms import Ollama
from langchain.schema import BaseOutputParser
from typing import Dict, List, Optional
import json

class SecurityReportParser(BaseOutputParser):
    """Parses LLM output into structured security report."""

    def parse(self, text: str) -> Dict:
        """Extract structured data from LLM response."""
        try:
            # Attempt to parse as JSON first
            return json.loads(text)
        except json.JSONDecodeError:
            # Fall back to structured text parsing
            return {
                "raw_response": text,
                "vulnerabilities": [],
                "recommendations": []
            }

class PentestAssistant:
    """AI-powered penetration testing assistant."""

    def __init__(self, vuln_store: VulnerabilityStore, model_name: str = "llama3.2:3b"):
        self.llm = Ollama(
            model=model_name,
            temperature=0.3,  # Low temperature for consistent security analysis
            num_predict=2048,  # Token limit for responses
            top_k=40,
            top_p=0.9
        )

        self.vuln_store = vuln_store
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True,
            max_token_limit=4096  # Prevent context window overflow
        )

        # Define analysis prompt template
        self.analysis_prompt = PromptTemplate(
            input_variables=["scan_data", "vuln_context", "chat_history"],
            template="""
            You are an expert penetration testing assistant. Analyze the following scan data 
            and provide actionable security recommendations.

            Previous conversation context:
            {chat_history}

            Scan Results:
            {scan_data}

            Related Vulnerability Patterns:
            {vuln_context}

            Provide your analysis in JSON format with the following structure:
            {{
                "summary": "Brief overview of findings",
                "critical_findings": [
                    {{
                        "service": "service name",
                        "port": port_number,
                        "vulnerability": "CVE or vulnerability description",
                        "risk_level": "critical/high/medium/low",
                        "exploitation_technique": "How to exploit",
                        "remediation": "How to fix"
                    }}
                ],
                "recommendations": ["actionable steps"],
                "next_steps": ["prioritized actions"]
            }}

            Analysis:
            """
        )

        self.chain = LLMChain(
            llm=self.llm,
            prompt=self.analysis_prompt,
            memory=self.memory,
            output_parser=SecurityReportParser(),
            verbose=True  # Enable logging for debugging
        )

    def analyze_scan(self, host_info: Dict) -> Dict:
        """Analyze a single host's scan results."""
        # Format scan data for the prompt
        scan_summary = self._format_scan_data(host_info)

        # Search for relevant vulnerability patterns
        vuln_query = f"Services: {', '.join(s['service'] for s in host_info.get('services', []))}"
        vuln_context = self.vuln_store.search_similar_vulnerabilities(vuln_query, k=3)
        vuln_text = "\n".join([doc.page_content for doc in vuln_context])

        # Run the analysis chain
        try:
            result = self.chain.predict(
                scan_data=scan_summary,
                vuln_context=vuln_text
            )
            return result
        except Exception as e:
            print(f"Analysis failed: {e}")
            return {
                "error": str(e),
                "summary": "Analysis failed due to LLM error"
            }

    def _format_scan_data(self, host_info: Dict) -> str:
        """Format host scan data for LLM consumption."""
        lines = [f"Host: {host_info.get('ip', 'Unknown')}"]
        if host_info.get('hostname'):
            lines.append(f"Hostname: {host_info['hostname']}")
        if host_info.get('os'):
            lines.append(f"OS: {host_info['os']}")

        lines.append("\nOpen Ports and Services:")
        for service in host_info.get('services', []):
            lines.append(
                f"  - Port {service['port']}/{service['protocol']}: "
                f"{service['service']} {service.get('version', '')}"
            )

        return "\n".join(lines)

    def clear_memory(self):
        """Reset conversation memory for new sessions."""
        self.memory.clear()

Key design decisions in this pipeline:

  • Temperature 0.3: Security analysis requires deterministic, factual responses. Higher temperatures produce creative but potentially incorrect exploitation suggestions.
  • Token limit 2048: Balances response completeness with inference speed. For complex multi-host analyses, increase to 4096.
  • ConversationBufferMemory with 4096 max tokens: Prevents context window overflow while maintaining conversation history. The 4096 limit corresponds to Llama 3.2's 8K context window, leaving room for the prompt and response.
  • SecurityReportParser: Handles both JSON and plain text responses, ensuring the assistant degrades gracefully when the LLM produces malformed output.

Building the FastAPI REST API

Create api.py to expose the assistant as a REST service:

from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional
import subprocess
import tempfile
import os
import asyncio
from concurrent.futures import ThreadPoolExecutor

from scan_parser import NmapParser, HostInfo
from vuln_store import VulnerabilityStore
from pentest_assistant import PentestAssistant

app = FastAPI(
    title="AI Pentesting Assistant API",
    description="Automated penetration testing analysis with AI",
    version="1.0.0"
)

# Enable CORS for local development
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize components
vuln_store = VulnerabilityStore()
assistant = PentestAssistant(vuln_store)

# Thread pool for blocking operations
executor = ThreadPoolExecutor(max_workers=4)

class ScanRequest(BaseModel):
    target: str = Field(.., description="IP address or hostname to scan")
    ports: str = Field(default="1-1000", description="Port range to scan")
    scan_type: str = Field(default="syn", description="Scan type: syn, connect, udp")

class ScanResponse(BaseModel):
    scan_id: str
    status: str
    hosts: List[HostInfo]
    analysis: Optional[Dict] = None

class AnalysisRequest(BaseModel):
    scan_id: str
    host_ip: Optional[str] = None  # Analyze specific host or all hosts

@app.on_event("startup")
async def startup_event():
    """Initialize vector store on startup."""
    # Check if vector store needs population
    if not os.path.exists("./chroma_db"):
        print("Vector store not found. Please populate with CVE data first.")
        print("Run: python populate_vuln_store.py")

@app.post("/scan", response_model=ScanResponse)
async def run_scan(request: ScanRequest, background_tasks: BackgroundTasks):
    """Execute Nmap scan and return results."""
    scan_id = f"scan_{os.urandom(4).hex()}"

    # Create temporary file for scan output
    with tempfile.NamedTemporaryFile(suffix='.xml', delete=False) as tmp:
        output_path = tmp.name

    try:
        # Build Nmap command
        cmd = [
            "nmap",
            "-oX", output_path,
            "-p", request.ports,
            "-s" + request.scan_type[0],  # sS, sT, sU
            request.target
        ]

        # Run scan in thread pool to avoid blocking
        loop = asyncio.get_event_loop()
        process = await loop.run_in_executor(
            executor,
            lambda: subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=300  # 5 minute timeout
            )
        )

        if process.returncode != 0:
            raise HTTPException(
                status_code=500,
                detail=f"Nmap scan failed: {process.stderr}"
            )

        # Parse results
        parser = NmapParser(output_path)
        hosts = parser.parse()

        # Schedule analysis in background
        background_tasks.add_task(
            analyze_scan_results,
            scan_id,
            hosts
        )

        return ScanResponse(
            scan_id=scan_id,
            status="completed",
            hosts=hosts
        )

    except subprocess.TimeoutExpired:
        raise HTTPException(status_code=408, detail="Scan timed out after 5 minutes")
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        # Clean up temp file
        if os.path.exists(output_path):
            os.unlink(output_path)

async def analyze_scan_results(scan_id: str, hosts: List[HostInfo]):
    """Background task to analyze scan results."""
    for host in hosts:
        host_dict = host.dict()
        analysis = assistant.analyze_scan(host_dict)
        # Store analysis results (in production, use Redis or database)
        print(f"Analysis for {host.ip}: {analysis}")

@app.post("/analyze", response_model=Dict)
async def analyze_host(request: AnalysisRequest):
    """Analyze previously scanned host."""
    # In production, retrieve scan results from database
    # For this example, we'll return a placeholder
    return {
        "scan_id": request.scan_id,
        "status": "analysis_pending",
        "message": "Analysis runs asynchronously. Check /results endpoint."
    }

@app.get("/health")
async def health_check():
    """API health check endpoint."""
    return {
        "status": "healthy",
        "model": "llama3.2:3b",
        "vector_store_size": len(vuln_store.vectorstore.get()['ids']) if vuln_store.vectorstore else 0
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "api:app",
        host="0.0.0.0",
        port=8000,
        reload=True,
        workers=2  # Multiple workers for concurrent requests
    )

The API design addresses several production concerns:

  • ThreadPoolExecutor with 4 workers: Nmap scans are blocking I/O operations. Running them in a thread pool prevents the async event loop from freezing.
  • 5-minute scan timeout: Prevents runaway scans from consuming resources. For large networks, implement incremental scanning with pagination.
  • BackgroundTasks for analysis: Scan analysis runs asynchronously, allowing the API to return immediately. In production, store results in Redis or PostgreSQL for later retrieval.
  • Temporary file cleanup: The finally block ensures XML output files are deleted even if exceptions occur.

Edge Cases and Production Considerations

Memory Management

The vector store can consume significant RAM with large CVE datasets. Implement lazy loading for production:

class LazyVulnerabilityStore(VulnerabilityStore):
    """Loads embeddings on demand to reduce memory footprint."""

    def __init__(self, persist_directory: str = "./chroma_db"):
        self.persist_directory = persist_directory
        self._vectorstore = None

    @property
    def vectorstore(self):
        if self._vectorstore is None:
            self._vectorstore = Chroma(
                collection_name="vulnerability_patterns",
                embedding_function=self.embeddings,
                persist_directory=self.persist_directory
            )
        return self._vectorstore

API Rate Limiting

Protect against abuse with token bucket rate limiting:

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(429, _rate_limit_exceeded_handler)

@app.post("/scan")
@limiter.limit("10/minute")  # Max 10 scans per minute per IP
async def run_scan(request: ScanRequest, background_tasks: BackgroundTasks):
    # .. implementation

Error Handling for Malformed Input

Nmap can produce unexpected output for edge cases like IPv6 addresses or non-standard port ranges. Add input validation:

from pydantic import validator
import ipaddress

class ScanRequest(BaseModel):
    target: str

    @validator('target')
    def validate_target(cls, v):
        try:
            # Accept both IP addresses and hostnames
            ipaddress.ip_address(v)
        except ValueError:
            # Not an IP, assume hostname
            if not v.replace('.', '').replace('-', '').isalnum():
                raise ValueError(f"Invalid target: {v}")
        return v

What's Next

Your AI pentesting assistant is now operational. Here are concrete next steps for production deployment:

  1. Populate the vulnerability store with real CVE data from the NVD API. Create a script that fetches recent CVEs and adds them to ChromaDB.
  2. Add authentication using API keys or OAuth2 to prevent unauthorized access to scan results.
  3. Implement result persistence with PostgreSQL to store scan histories and enable trend analysis.
  4. Extend scan types to support service enumeration (Nmap NSE scripts), web application scanning (OWASP ZAP integration), and wireless network analysis.
  5. Add report generation using Jinja2 templates to produce PDF security reports with executive summaries and technical details.

The assistant's modular architecture allows you to swap components—replace ChromaDB with Pinecone for cloud-based vector search, or switch to GPT [5]-4 for more nuanced analysis when data sensitivity permits. As of 2026, local LLMs like Llama 3.2 provide sufficient quality for security automation while maintaining data sovereignty.

Remember that AI assistants augment, not replace, human penetration testers. Always verify automated findings manually before including them in security reports. The assistant excels at reducing triage time from hours to minutes, but final exploitation decisions require human judgment.


References

1. Wikipedia - Llama. Wikipedia. [Source]
2. Wikipedia - GPT. Wikipedia. [Source]
3. Wikipedia - Conifer cone. Wikipedia. [Source]
4. GitHub - meta-llama/llama. Github. [Source]
5. GitHub - Significant-Gravitas/AutoGPT. Github. [Source]
6. GitHub - pinecone-io/python-sdk. Github. [Source]
7. GitHub - milvus-io/milvus. Github. [Source]
8. LlamaIndex Pricing. Pricing. [Source]
9. Pinecone Pricing. Pricing. [Source]
tutorialai
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles