Back to Tutorials
tutorialstutorialai

How to Build an AI Pentesting Assistant with LangChain

Practical tutorial: Build an AI-powered pentesting assistant

BlogIA AcademyMay 22, 202613 min read2 433 words

How to Build an AI Pentesting Assistant with LangChain

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Penetration testing remains one of the most labor-intensive aspects of security engineering. According to a 2025 report by the SANS Institute, the average enterprise penetration test requires 200-400 person-hours per engagement, with 60% of that time spent on reconnaissance and documentation rather than actual exploitation. An AI-powered pentesting assistant can dramatically reduce this overhead by automating reconnaissance, suggesting attack vectors, and generating structured reports.

In this tutorial, you'll build a production-ready AI pentesting assistant using LangChain, FastAPI, and a local LLM. The system will accept target specifications, perform automated reconnaissance via Shodan and DNS enumeration, suggest attack paths based on the MITRE ATT&CK framework, and generate comprehensive penetration test reports. We'll focus on real-world edge cases: API rate limiting, context window management for long reconnaissance outputs, and secure handling of sensitive findings.

Architecture Overview: Why This Matters in Production

Before writing code, understand the architectural decisions that make this assistant production-viable. The system uses a retrieval-augmented generation (RAG) pattern with three core components:

  1. Tool Orchestration Layer: LangChain agents that manage tool selection and execution order
  2. Vector Knowledge Base: ChromaDB [9] storing MITRE ATT&CK techniques, CVE descriptions, and common exploit patterns
  3. Report Generation Pipeline: Structured output parsing with Pydantic models for consistent report formatting

The key insight is that LLMs alone cannot reliably execute multi-step security workflows. They hallucinate tool outputs, forget intermediate results, and fail to respect rate limits. By wrapping each tool in a LangChain BaseTool with explicit error handling and retry logic, we create a deterministic execution layer that the LLM orchestrates but cannot break.

For production deployments, consider running the LLM locally using Ollama with Mistral [8] 7B or Llama 3 8B. This avoids sending sensitive target data to third-party APIs. The vector store also runs locally, ensuring all findings remain within your infrastructure.

Prerequisites and Environment Setup

You'll need Python 3.11+, Ollama [7] for local LLM inference, and API keys for Shodan and CIRCL (optional for passive DNS). Install the required packages:

# Create virtual environment
python3.11 -m venv pentest-ai
source pentest-ai/bin/activate

# Core dependencies
pip install langchain==0.3.0 langchain-community==0.3.0 langchain-ollama [7]==0.2.0
pip install fastapi==0.115.0 uvicorn==0.30.0 pydantic==2.9.0
pip install chromadb==0.5.0 shodan==1.31.0 dnspython==2.6.0
pip install httpx==0.27.0 python-dotenv==1.0.0

# Pull local LLM
ollama pull mistral:7b-instruct-v0.3-q4_K_M

The langchain-ollama package provides direct integration with Ollama models, avoiding the need for OpenAI API calls. For production, you might use vLLM or TGI for better throughput, but Ollama works well for development and small-scale deployments.

Building the Core Pentesting Assistant

Step 1: Define the Tool Set

Each security tool becomes a LangChain BaseTool with input validation, rate limiting, and error handling. Here's the Shodan reconnaissance tool:

import shodan
import time
from typing import Optional, Type
from langchain.tools import BaseTool
from pydantic import BaseModel, Field
from datetime import datetime, timedelta

class ShodanInput(BaseModel):
    target: str = Field(description="IP address or domain to scan")
    max_results: int = Field(default=10, ge=1, le=100, description="Maximum results to return")

class ShodanReconTool(BaseTool):
    name: str = "shodan_recon"
    description: str = "Perform Shodan reconnaissance on a target IP or domain. Returns open ports, services, and banners."
    args_schema: Type[BaseModel] = ShodanInput

    def __init__(self, api_key: str, rate_limit_per_minute: int = 30):
        super().__init__()
        self.api = shodan.Shodan(api_key)
        self.rate_limit = rate_limit_per_minute
        self.request_timestamps = []

    def _check_rate_limit(self):
        """Enforce rate limiting by tracking request timestamps."""
        now = datetime.now()
        cutoff = now - timedelta(minutes=1)
        self.request_timestamps = [t for t in self.request_timestamps if t > cutoff]

        if len(self.request_timestamps) >= self.rate_limit:
            sleep_time = 60 - (now - self.request_timestamps[0]).total_seconds()
            if sleep_time > 0:
                time.sleep(sleep_time)

        self.request_timestamps.append(now)

    def _run(self, target: str, max_results: int = 10) -> str:
        """Execute Shodan search with error handling."""
        self._check_rate_limit()

        try:
            # Handle both IP and domain lookups
            if target.replace('.', '').isdigit():
                result = self.api.host(target)
            else:
                # Domain lookup via DNS resolution
                import socket
                ip = socket.gethostbyname(target)
                result = self.api.host(ip)

            # Format output for LLM consumption
            output = f"Shodan results for {target}:\n"
            output += f"Organization: {result.get('org', 'N/A')}\n"
            output += f"ISP: {result.get('isp', 'N/A')}\n"
            output += f"Country: {result.get('country_name', 'N/A')}\n"
            output += f"Open ports: {len(result.get('ports', []))}\n\n"

            for service in result.get('data', [])[:max_results]:
                port = service.get('port', 'unknown')
                transport = service.get('transport', 'unknown')
                product = service.get('product', 'unknown')
                version = service.get('version', '')
                output += f"Port {port}/{transport}: {product} {version}\n"

                # Include banner snippet if available
                if 'data' in service:
                    banner = service['data'][:200]
                    output += f"  Banner: {banner}\n"

            return output

        except shodan.APIError as e:
            return f"Shodan API error: {str(e)}"
        except socket.gaierror:
            return f"DNS resolution failed for {target}"
        except Exception as e:
            return f"Unexpected error: {str(e)}"

The rate limiting implementation is critical. Shodan's free tier allows 50 queries per month, while the paid tier supports 1 query per second. The _check_rate_limit method maintains a sliding window of timestamps and sleeps when approaching the limit. For production, you'd want to use a distributed rate limiter like Redis, but this works for single-instance deployments.

Step 2: Create the MITRE ATT&CK Knowledge Base

We'll populate a ChromaDB vector store with MITRE ATT&CK techniques. This allows the LLM to retrieve relevant attack patterns based on discovered services:

import chromadb
from chromadb.utils import embedding [3]_functions
import json
from typing import List, Dict

class MitreKnowledgeBase:
    def __init__(self, persist_directory: str = "./chroma_db"):
        self.client = chromadb.PersistentClient(path=persist_directory)
        self.embedding_fn = embedding_functions.DefaultEmbeddingFunction()

        # Create or get collection
        self.collection = self.client.get_or_create_collection(
            name="mitre_techniques",
            embedding_function=self.embedding_fn,
            metadata={"hnsw:space": "cosine"}
        )

    def populate_from_file(self, mitre_json_path: str):
        """Load MITRE ATT&CK techniques from a JSON export.

        Expected format: List of dicts with keys:
        - technique_id (e.g., T1190)
        - name (e.g., Exploit Public-Facing Application)
        - description
        - tactics (list of strings)
        - platforms (list of strings)
        """
        with open(mitre_json_path, 'r') as f:
            techniques = json.load(f)

        documents = []
        metadatas = []
        ids = []

        for tech in techniques:
            # Create rich document for semantic search
            doc = f"Technique {tech['technique_id']}: {tech['name']}\n"
            doc += f"Tactics: {', '.join(tech['tactics'])}\n"
            doc += f"Platforms: {', '.join(tech['platforms'])}\n"
            doc += f"Description: {tech['description']}"

            documents.append(doc)
            metadatas.append({
                "technique_id": tech['technique_id'],
                "name": tech['name'],
                "tactics": json.dumps(tech['tactics']),
                "platforms": json.dumps(tech['platforms'])
            })
            ids.append(tech['technique_id'])

        # Batch add to avoid memory issues
        batch_size = 100
        for i in range(0, len(documents), batch_size):
            self.collection.add(
                documents=documents[i:i+batch_size],
                metadatas=metadatas[i:i+batch_size],
                ids=ids[i:i+batch_size]
            )

    def query_techniques(self, query: str, n_results: int = 5) -> List[Dict]:
        """Retrieve relevant techniques based on service/port information."""
        results = self.collection.query(
            query_texts=[query],
            n_results=n_results
        )

        techniques = []
        for i, doc in enumerate(results['documents'][0]):
            techniques.append({
                'id': results['ids'][0][i],
                'content': doc,
                'metadata': results['metadatas'][0][i],
                'distance': results['distances'][0][i]
            })

        return techniques

The vector store uses cosine similarity for retrieval. When the LLM discovers an Apache 2.4.49 server, it can query for "Apache vulnerability exploit" and retrieve T1190 (Exploit Public-Facing Application) with relevant CVE mappings. The hnsw:space parameter configures the HNSW index for cosine distance, which works well with the default embedding function.

Step 3: Build the Agent with Context Management

The LangChain agent orchestrates tools and manages the conversation context. We need special handling for long reconnaissance outputs that might exceed the LLM's context window:

from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain_ollama import ChatOllama
from langchain.memory import ConversationBufferWindowMemory
from langchain.schema import SystemMessage

class PentestAssistant:
    def __init__(self, shodan_api_key: str, model_name: str = "mistral:7b-instruct-v0.3-q4_K_M"):
        # Initialize LLM with conservative context window
        self.llm = ChatOllama(
            model=model_name,
            temperature=0.3,  # Low temperature for deterministic security analysis
            num_predict=4096,  # Max tokens to generate
            num_ctx=8192,      # Context window size
            stop=["<|im_end|>", "Observation:"]  # Stop sequences for clean tool output
        )

        # Initialize tools
        self.tools = [
            ShodanReconTool(api_key=shodan_api_key),
            DnsEnumerationTool(),  # Implement similarly to ShodanReconTool
            PortScanTool(),        # Wrapper around nmap or masscan
            CveLookupTool(),       # Query CIRCL or NVD API
        ]

        # Create agent with memory
        self.memory = ConversationBufferWindowMemory(
            memory_key="chat_history",
            k=5,  # Keep last 5 exchanges
            return_messages=True
        )

        # Custom prompt for pentesting context
        self.prompt = PromptTemplate.from_template("""You are an expert penetration testing assistant. Your role is to:
1. Perform systematic reconnaissance using available tools
2. Identify vulnerabilities based on discovered services
3. Suggest attack vectors using MITRE ATT&CK framework
4. Generate structured findings with severity ratings

Available tools: {tools}

Tool names: {tool_names}

Instructions:
- Always start with passive reconnaissance before active scanning
- Respect rate limits and legal boundaries
- If a tool returns an error, try an alternative approach
- When suggesting exploits, include CVE references and MITRE technique IDs
- Format findings as structured JSON for report generation

{chat_history}

Human: {input}

{agent_scratchpad}""")

        # Create agent
        agent = create_react_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=self.prompt
        )

        self.agent_executor = AgentExecutor(
            agent=agent,
            tools=self.tools,
            memory=self.memory,
            verbose=True,
            max_iterations=15,  # Prevent infinite loops
            early_stopping_method="generate",
            handle_parsing_errors=True
        )

    def truncate_context(self, text: str, max_chars: int = 6000) -> str:
        """Truncate tool outputs to fit within context window.

        This is critical for reconnaissance tools that return large outputs.
        We keep the most important information (open ports, critical findings)
        while discarding verbose banners.
        """
        if len(text) <= max_chars:
            return text

        # Keep first 2000 chars (headers, summary)
        # Keep last 4000 chars (detailed findings)
        truncated = text[:2000] + "\n..[truncated]..\n" + text[-4000:]
        return truncated

    async def analyze_target(self, target: str) -> str:
        """Main entry point for target analysis."""
        result = await self.agent_executor.ainvoke({
            "input": f"Perform comprehensive reconnaissance on {target}. "
                     f"Start with passive techniques (Shodan, DNS), then suggest "
                     f"active scanning approaches. Include MITRE ATT&CK mappings."
        })
        return result['output']

The truncate_context method is essential. Shodan can return hundreds of banners for a single host, easily exceeding the 8K context window of Mistral 7B. By keeping the summary and most recent findings, we preserve actionable information while fitting within constraints.

Step 4: FastAPI Web Interface

Expose the assistant as a REST API with proper error handling and async support:

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
from datetime import datetime

app = FastAPI(title="AI Pentesting Assistant API")

# Global assistant instance (in production, use dependency injection)
assistant = PentestAssistant(shodan_api_key="YOUR_SHODAN_KEY")

class AnalysisRequest(BaseModel):
    target: str
    scan_type: str = "passive"  # passive, active, full
    max_duration_minutes: int = 30

class AnalysisResponse(BaseModel):
    target: str
    status: str
    findings: Optional[str] = None
    started_at: datetime
    completed_at: Optional[datetime] = None
    error: Optional[str] = None

@app.post("/analyze", response_model=AnalysisResponse)
async def analyze_target(request: AnalysisRequest, background_tasks: BackgroundTasks):
    """Start a penetration test analysis on a target."""

    # Validate target format
    if not request.target or len(request.target) > 255:
        raise HTTPException(status_code=400, detail="Invalid target specification")

    # Check for internal IPs (security measure)
    import ipaddress
    try:
        ip = ipaddress.ip_address(request.target)
        if ip.is_private or ip.is_loopback:
            raise HTTPException(status_code=400, detail="Cannot scan internal addresses")
    except ValueError:
        pass  # Domain name, proceed

    response = AnalysisResponse(
        target=request.target,
        status="running",
        started_at=datetime.utcnow()
    )

    # Run analysis in background to avoid timeout
    background_tasks.add_task(run_analysis, request, response)

    return response

async def run_analysis(request: AnalysisRequest, response: AnalysisResponse):
    """Execute the analysis and update response."""
    try:
        # Set timeout for the entire analysis
        result = await asyncio.wait_for(
            assistant.analyze_target(request.target),
            timeout=request.max_duration_minutes * 60
        )
        response.findings = result
        response.status = "completed"
    except asyncio.TimeoutError:
        response.status = "timeout"
        response.error = f"Analysis exceeded {request.max_duration_minutes} minutes"
    except Exception as e:
        response.status = "failed"
        response.error = str(e)
    finally:
        response.completed_at = datetime.utcnow()

@app.get("/health")
async def health_check():
    """Verify the assistant is operational."""
    return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}

The API includes critical security measures: rejecting internal IP addresses, input length validation, and background task execution to prevent HTTP timeouts. In production, you'd add authentication, rate limiting per user, and result persistence to a database.

Edge Cases and Production Considerations

Context Window Overflow

The most common failure mode is the LLM losing track of earlier findings as the conversation grows. Our truncate_context method helps, but for long-running analyses, implement a summarization step:

async def summarize_findings(self, raw_output: str) -> str:
    """Summarize long outputs to preserve context."""
    summary_prompt = f"Summarize the following penetration test findings in 3-5 bullet points:\n\n{raw_output}"
    summary = await self.llm.ainvoke(summary_prompt)
    return summary.content

Insert this summarization after each major tool execution to keep the agent's context focused.

API Rate Limiting and Retries

Shodan and other APIs have strict rate limits. Implement exponential backoff:

import random
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=60),
    retry_error_callback=lambda retry_state: f"Failed after {retry_state.attempt_number} attempts"
)
def shodan_search_with_retry(self, query: str):
    """Shodan search with exponential backoff."""
    return self.api.search(query)

Secure Output Handling

Pentest findings often contain sensitive information. Never log raw findings to stdout in production:

import logging
import hashlib

class SecureLogger:
    def __init__(self):
        self.logger = logging.getLogger("pentest_ai")
        handler = logging.FileHandler("audit.log")
        handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
        self.logger.addHandler(handler)

    def log_finding(self, target: str, finding: str):
        """Log findings with target hashed for privacy."""
        target_hash = hashlib.sha256(target.encode()).hexdigest()[:16]
        self.logger.info(f"Target [{target_hash}]: {finding[:200]}..")

What's Next

This assistant provides a foundation for automated penetration testing, but several enhancements would make it production-ready:

  1. Multi-agent architecture: Deploy separate agents for reconnaissance, vulnerability analysis, and reporting, each with specialized models and context windows
  2. Parallel tool execution: Use asyncio.gather to run Shodan, DNS, and WHOIS lookups simultaneously
  3. Report generation: Add a structured report template using Jinja2 or WeasyPrint for PDF output
  4. Continuous learning: Store successful attack paths in the vector database for future reference

The complete code is available on GitHub (note: this is a placeholder - implement your own repository). For production deployment, consider using Docker with GPU passthrough for local LLM inference, and implement proper authentication using API keys or OAuth2.

Remember that automated penetration testing has legal implications. Always obtain written authorization before scanning any target, and ensure your tool respects robots.txt and rate limits. The assistant is designed to augment human pentesters, not replace them - the final exploitation and reporting should always involve human expertise.


References

1. Wikipedia - Mistral. Wikipedia. [Source]
2. Wikipedia - ChromaDB. Wikipedia. [Source]
3. Wikipedia - Embedding. Wikipedia. [Source]
4. GitHub - mistralai/mistral-inference. Github. [Source]
5. GitHub - chroma-core/chroma. Github. [Source]
6. GitHub - fighting41love/funNLP. Github. [Source]
7. GitHub - ollama/ollama. Github. [Source]
8. Mistral AI Pricing. Pricing. [Source]
9. ChromaDB Pricing. Pricing. [Source]
tutorialai
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles