How to Build an AI Pentesting Assistant with LangChain
Practical tutorial: Build an AI-powered pentesting assistant
How to Build an AI Pentesting Assistant with LangChain
Table of Contents
- How to Build an AI Pentesting Assistant with LangChain
- Create virtual environment
- Core dependencies
- Pull local LLM
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Penetration testing remains one of the most labor-intensive aspects of security engineering. According to a 2025 report by the SANS Institute, the average enterprise penetration test requires 200-400 person-hours per engagement, with 60% of that time spent on reconnaissance and documentation rather than actual exploitation. An AI-powered pentesting assistant can dramatically reduce this overhead by automating reconnaissance, suggesting attack vectors, and generating structured reports.
In this tutorial, you'll build a production-ready AI pentesting assistant using LangChain, FastAPI, and a local LLM. The system will accept target specifications, perform automated reconnaissance via Shodan and DNS enumeration, suggest attack paths based on the MITRE ATT&CK framework, and generate comprehensive penetration test reports. We'll focus on real-world edge cases: API rate limiting, context window management for long reconnaissance outputs, and secure handling of sensitive findings.
Architecture Overview: Why This Matters in Production
Before writing code, understand the architectural decisions that make this assistant production-viable. The system uses a retrieval-augmented generation (RAG) pattern with three core components:
- Tool Orchestration Layer: LangChain agents that manage tool selection and execution order
- Vector Knowledge Base: ChromaDB [9] storing MITRE ATT&CK techniques, CVE descriptions, and common exploit patterns
- Report Generation Pipeline: Structured output parsing with Pydantic models for consistent report formatting
The key insight is that LLMs alone cannot reliably execute multi-step security workflows. They hallucinate tool outputs, forget intermediate results, and fail to respect rate limits. By wrapping each tool in a LangChain BaseTool with explicit error handling and retry logic, we create a deterministic execution layer that the LLM orchestrates but cannot break.
For production deployments, consider running the LLM locally using Ollama with Mistral [8] 7B or Llama 3 8B. This avoids sending sensitive target data to third-party APIs. The vector store also runs locally, ensuring all findings remain within your infrastructure.
Prerequisites and Environment Setup
You'll need Python 3.11+, Ollama [7] for local LLM inference, and API keys for Shodan and CIRCL (optional for passive DNS). Install the required packages:
# Create virtual environment
python3.11 -m venv pentest-ai
source pentest-ai/bin/activate
# Core dependencies
pip install langchain==0.3.0 langchain-community==0.3.0 langchain-ollama [7]==0.2.0
pip install fastapi==0.115.0 uvicorn==0.30.0 pydantic==2.9.0
pip install chromadb==0.5.0 shodan==1.31.0 dnspython==2.6.0
pip install httpx==0.27.0 python-dotenv==1.0.0
# Pull local LLM
ollama pull mistral:7b-instruct-v0.3-q4_K_M
The langchain-ollama package provides direct integration with Ollama models, avoiding the need for OpenAI API calls. For production, you might use vLLM or TGI for better throughput, but Ollama works well for development and small-scale deployments.
Building the Core Pentesting Assistant
Step 1: Define the Tool Set
Each security tool becomes a LangChain BaseTool with input validation, rate limiting, and error handling. Here's the Shodan reconnaissance tool:
import shodan
import time
from typing import Optional, Type
from langchain.tools import BaseTool
from pydantic import BaseModel, Field
from datetime import datetime, timedelta
class ShodanInput(BaseModel):
target: str = Field(description="IP address or domain to scan")
max_results: int = Field(default=10, ge=1, le=100, description="Maximum results to return")
class ShodanReconTool(BaseTool):
name: str = "shodan_recon"
description: str = "Perform Shodan reconnaissance on a target IP or domain. Returns open ports, services, and banners."
args_schema: Type[BaseModel] = ShodanInput
def __init__(self, api_key: str, rate_limit_per_minute: int = 30):
super().__init__()
self.api = shodan.Shodan(api_key)
self.rate_limit = rate_limit_per_minute
self.request_timestamps = []
def _check_rate_limit(self):
"""Enforce rate limiting by tracking request timestamps."""
now = datetime.now()
cutoff = now - timedelta(minutes=1)
self.request_timestamps = [t for t in self.request_timestamps if t > cutoff]
if len(self.request_timestamps) >= self.rate_limit:
sleep_time = 60 - (now - self.request_timestamps[0]).total_seconds()
if sleep_time > 0:
time.sleep(sleep_time)
self.request_timestamps.append(now)
def _run(self, target: str, max_results: int = 10) -> str:
"""Execute Shodan search with error handling."""
self._check_rate_limit()
try:
# Handle both IP and domain lookups
if target.replace('.', '').isdigit():
result = self.api.host(target)
else:
# Domain lookup via DNS resolution
import socket
ip = socket.gethostbyname(target)
result = self.api.host(ip)
# Format output for LLM consumption
output = f"Shodan results for {target}:\n"
output += f"Organization: {result.get('org', 'N/A')}\n"
output += f"ISP: {result.get('isp', 'N/A')}\n"
output += f"Country: {result.get('country_name', 'N/A')}\n"
output += f"Open ports: {len(result.get('ports', []))}\n\n"
for service in result.get('data', [])[:max_results]:
port = service.get('port', 'unknown')
transport = service.get('transport', 'unknown')
product = service.get('product', 'unknown')
version = service.get('version', '')
output += f"Port {port}/{transport}: {product} {version}\n"
# Include banner snippet if available
if 'data' in service:
banner = service['data'][:200]
output += f" Banner: {banner}\n"
return output
except shodan.APIError as e:
return f"Shodan API error: {str(e)}"
except socket.gaierror:
return f"DNS resolution failed for {target}"
except Exception as e:
return f"Unexpected error: {str(e)}"
The rate limiting implementation is critical. Shodan's free tier allows 50 queries per month, while the paid tier supports 1 query per second. The _check_rate_limit method maintains a sliding window of timestamps and sleeps when approaching the limit. For production, you'd want to use a distributed rate limiter like Redis, but this works for single-instance deployments.
Step 2: Create the MITRE ATT&CK Knowledge Base
We'll populate a ChromaDB vector store with MITRE ATT&CK techniques. This allows the LLM to retrieve relevant attack patterns based on discovered services:
import chromadb
from chromadb.utils import embedding [3]_functions
import json
from typing import List, Dict
class MitreKnowledgeBase:
def __init__(self, persist_directory: str = "./chroma_db"):
self.client = chromadb.PersistentClient(path=persist_directory)
self.embedding_fn = embedding_functions.DefaultEmbeddingFunction()
# Create or get collection
self.collection = self.client.get_or_create_collection(
name="mitre_techniques",
embedding_function=self.embedding_fn,
metadata={"hnsw:space": "cosine"}
)
def populate_from_file(self, mitre_json_path: str):
"""Load MITRE ATT&CK techniques from a JSON export.
Expected format: List of dicts with keys:
- technique_id (e.g., T1190)
- name (e.g., Exploit Public-Facing Application)
- description
- tactics (list of strings)
- platforms (list of strings)
"""
with open(mitre_json_path, 'r') as f:
techniques = json.load(f)
documents = []
metadatas = []
ids = []
for tech in techniques:
# Create rich document for semantic search
doc = f"Technique {tech['technique_id']}: {tech['name']}\n"
doc += f"Tactics: {', '.join(tech['tactics'])}\n"
doc += f"Platforms: {', '.join(tech['platforms'])}\n"
doc += f"Description: {tech['description']}"
documents.append(doc)
metadatas.append({
"technique_id": tech['technique_id'],
"name": tech['name'],
"tactics": json.dumps(tech['tactics']),
"platforms": json.dumps(tech['platforms'])
})
ids.append(tech['technique_id'])
# Batch add to avoid memory issues
batch_size = 100
for i in range(0, len(documents), batch_size):
self.collection.add(
documents=documents[i:i+batch_size],
metadatas=metadatas[i:i+batch_size],
ids=ids[i:i+batch_size]
)
def query_techniques(self, query: str, n_results: int = 5) -> List[Dict]:
"""Retrieve relevant techniques based on service/port information."""
results = self.collection.query(
query_texts=[query],
n_results=n_results
)
techniques = []
for i, doc in enumerate(results['documents'][0]):
techniques.append({
'id': results['ids'][0][i],
'content': doc,
'metadata': results['metadatas'][0][i],
'distance': results['distances'][0][i]
})
return techniques
The vector store uses cosine similarity for retrieval. When the LLM discovers an Apache 2.4.49 server, it can query for "Apache vulnerability exploit" and retrieve T1190 (Exploit Public-Facing Application) with relevant CVE mappings. The hnsw:space parameter configures the HNSW index for cosine distance, which works well with the default embedding function.
Step 3: Build the Agent with Context Management
The LangChain agent orchestrates tools and manages the conversation context. We need special handling for long reconnaissance outputs that might exceed the LLM's context window:
from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain_ollama import ChatOllama
from langchain.memory import ConversationBufferWindowMemory
from langchain.schema import SystemMessage
class PentestAssistant:
def __init__(self, shodan_api_key: str, model_name: str = "mistral:7b-instruct-v0.3-q4_K_M"):
# Initialize LLM with conservative context window
self.llm = ChatOllama(
model=model_name,
temperature=0.3, # Low temperature for deterministic security analysis
num_predict=4096, # Max tokens to generate
num_ctx=8192, # Context window size
stop=["<|im_end|>", "Observation:"] # Stop sequences for clean tool output
)
# Initialize tools
self.tools = [
ShodanReconTool(api_key=shodan_api_key),
DnsEnumerationTool(), # Implement similarly to ShodanReconTool
PortScanTool(), # Wrapper around nmap or masscan
CveLookupTool(), # Query CIRCL or NVD API
]
# Create agent with memory
self.memory = ConversationBufferWindowMemory(
memory_key="chat_history",
k=5, # Keep last 5 exchanges
return_messages=True
)
# Custom prompt for pentesting context
self.prompt = PromptTemplate.from_template("""You are an expert penetration testing assistant. Your role is to:
1. Perform systematic reconnaissance using available tools
2. Identify vulnerabilities based on discovered services
3. Suggest attack vectors using MITRE ATT&CK framework
4. Generate structured findings with severity ratings
Available tools: {tools}
Tool names: {tool_names}
Instructions:
- Always start with passive reconnaissance before active scanning
- Respect rate limits and legal boundaries
- If a tool returns an error, try an alternative approach
- When suggesting exploits, include CVE references and MITRE technique IDs
- Format findings as structured JSON for report generation
{chat_history}
Human: {input}
{agent_scratchpad}""")
# Create agent
agent = create_react_agent(
llm=self.llm,
tools=self.tools,
prompt=self.prompt
)
self.agent_executor = AgentExecutor(
agent=agent,
tools=self.tools,
memory=self.memory,
verbose=True,
max_iterations=15, # Prevent infinite loops
early_stopping_method="generate",
handle_parsing_errors=True
)
def truncate_context(self, text: str, max_chars: int = 6000) -> str:
"""Truncate tool outputs to fit within context window.
This is critical for reconnaissance tools that return large outputs.
We keep the most important information (open ports, critical findings)
while discarding verbose banners.
"""
if len(text) <= max_chars:
return text
# Keep first 2000 chars (headers, summary)
# Keep last 4000 chars (detailed findings)
truncated = text[:2000] + "\n..[truncated]..\n" + text[-4000:]
return truncated
async def analyze_target(self, target: str) -> str:
"""Main entry point for target analysis."""
result = await self.agent_executor.ainvoke({
"input": f"Perform comprehensive reconnaissance on {target}. "
f"Start with passive techniques (Shodan, DNS), then suggest "
f"active scanning approaches. Include MITRE ATT&CK mappings."
})
return result['output']
The truncate_context method is essential. Shodan can return hundreds of banners for a single host, easily exceeding the 8K context window of Mistral 7B. By keeping the summary and most recent findings, we preserve actionable information while fitting within constraints.
Step 4: FastAPI Web Interface
Expose the assistant as a REST API with proper error handling and async support:
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import asyncio
from datetime import datetime
app = FastAPI(title="AI Pentesting Assistant API")
# Global assistant instance (in production, use dependency injection)
assistant = PentestAssistant(shodan_api_key="YOUR_SHODAN_KEY")
class AnalysisRequest(BaseModel):
target: str
scan_type: str = "passive" # passive, active, full
max_duration_minutes: int = 30
class AnalysisResponse(BaseModel):
target: str
status: str
findings: Optional[str] = None
started_at: datetime
completed_at: Optional[datetime] = None
error: Optional[str] = None
@app.post("/analyze", response_model=AnalysisResponse)
async def analyze_target(request: AnalysisRequest, background_tasks: BackgroundTasks):
"""Start a penetration test analysis on a target."""
# Validate target format
if not request.target or len(request.target) > 255:
raise HTTPException(status_code=400, detail="Invalid target specification")
# Check for internal IPs (security measure)
import ipaddress
try:
ip = ipaddress.ip_address(request.target)
if ip.is_private or ip.is_loopback:
raise HTTPException(status_code=400, detail="Cannot scan internal addresses")
except ValueError:
pass # Domain name, proceed
response = AnalysisResponse(
target=request.target,
status="running",
started_at=datetime.utcnow()
)
# Run analysis in background to avoid timeout
background_tasks.add_task(run_analysis, request, response)
return response
async def run_analysis(request: AnalysisRequest, response: AnalysisResponse):
"""Execute the analysis and update response."""
try:
# Set timeout for the entire analysis
result = await asyncio.wait_for(
assistant.analyze_target(request.target),
timeout=request.max_duration_minutes * 60
)
response.findings = result
response.status = "completed"
except asyncio.TimeoutError:
response.status = "timeout"
response.error = f"Analysis exceeded {request.max_duration_minutes} minutes"
except Exception as e:
response.status = "failed"
response.error = str(e)
finally:
response.completed_at = datetime.utcnow()
@app.get("/health")
async def health_check():
"""Verify the assistant is operational."""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
The API includes critical security measures: rejecting internal IP addresses, input length validation, and background task execution to prevent HTTP timeouts. In production, you'd add authentication, rate limiting per user, and result persistence to a database.
Edge Cases and Production Considerations
Context Window Overflow
The most common failure mode is the LLM losing track of earlier findings as the conversation grows. Our truncate_context method helps, but for long-running analyses, implement a summarization step:
async def summarize_findings(self, raw_output: str) -> str:
"""Summarize long outputs to preserve context."""
summary_prompt = f"Summarize the following penetration test findings in 3-5 bullet points:\n\n{raw_output}"
summary = await self.llm.ainvoke(summary_prompt)
return summary.content
Insert this summarization after each major tool execution to keep the agent's context focused.
API Rate Limiting and Retries
Shodan and other APIs have strict rate limits. Implement exponential backoff:
import random
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry_error_callback=lambda retry_state: f"Failed after {retry_state.attempt_number} attempts"
)
def shodan_search_with_retry(self, query: str):
"""Shodan search with exponential backoff."""
return self.api.search(query)
Secure Output Handling
Pentest findings often contain sensitive information. Never log raw findings to stdout in production:
import logging
import hashlib
class SecureLogger:
def __init__(self):
self.logger = logging.getLogger("pentest_ai")
handler = logging.FileHandler("audit.log")
handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
self.logger.addHandler(handler)
def log_finding(self, target: str, finding: str):
"""Log findings with target hashed for privacy."""
target_hash = hashlib.sha256(target.encode()).hexdigest()[:16]
self.logger.info(f"Target [{target_hash}]: {finding[:200]}..")
What's Next
This assistant provides a foundation for automated penetration testing, but several enhancements would make it production-ready:
- Multi-agent architecture: Deploy separate agents for reconnaissance, vulnerability analysis, and reporting, each with specialized models and context windows
- Parallel tool execution: Use
asyncio.gatherto run Shodan, DNS, and WHOIS lookups simultaneously - Report generation: Add a structured report template using Jinja2 or WeasyPrint for PDF output
- Continuous learning: Store successful attack paths in the vector database for future reference
The complete code is available on GitHub (note: this is a placeholder - implement your own repository). For production deployment, consider using Docker with GPU passthrough for local LLM inference, and implement proper authentication using API keys or OAuth2.
Remember that automated penetration testing has legal implications. Always obtain written authorization before scanning any target, and ensure your tool respects robots.txt and rate limits. The assistant is designed to augment human pentesters, not replace them - the final exploitation and reporting should always involve human expertise.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Analyze Security Logs with DeepSeek Locally
Practical tutorial: Analyze security logs with DeepSeek locally
How to Build a Claude 3.5 Artifact Generator with Python
Practical tutorial: Build a Claude 3.5 artifact generator
How to Build a RAG Pipeline with LanceDB and LangChain
Practical tutorial: It addresses a common issue with AI usage but lacks broad industry impact.