How to Analyze Security Logs with DeepSeek Locally
Practical tutorial: Analyze security logs with DeepSeek locally
How to Analyze Security Logs with DeepSeek Locally
Table of Contents
- How to Analyze Security Logs with DeepSeek Locally
- Core dependencies
- Install huggingface [4]-cli if needed
- Download DeepSeek-7B (this may take 10-30 minutes depending on bandwidth)
- security_analyzer.py
- FastAPI server for production deployment
📺 Watch: Neural Networks Explained
Video by 3Blue1Brown
Security log analysis is a critical function for any production infrastructure, yet most teams rely on either expensive SIEM solutions or cloud-based LLM APIs that send sensitive data to third parties. Running DeepSeek locally offers a compelling alternative: you get the analytical power of a large language model while keeping all log data on your own hardware. In this tutorial, I'll walk through building a production-ready security log analyzer using DeepSeek that runs entirely on your local machine, processes real log formats, and generates actionable security insights.
Why Local Log Analysis Matters in Production
When you're handling security logs, you're dealing with the most sensitive data in your organization—authentication attempts, network flows, system calls, and potential breach indicators. Sending this data to cloud APIs introduces compliance risks under regulations like GDPR, HIPAA, and SOC 2. According to research on "The Everyday Security of Living with Conflict" (ArXiv, 2024), security monitoring systems face unique challenges when operating in contested environments where data sovereignty is paramount. Running DeepSeek locally eliminates data exfiltration risks entirely.
The architecture we'll build processes logs through a pipeline: ingestion → parsing → vector embedding [3] → LLM analysis → alert generation. DeepSeek handles the natural language reasoning—identifying patterns, correlating events, and explaining anomalies in plain English. This approach scales from a single laptop analyzing a few thousand logs to a server processing millions of events daily.
Prerequisites and Environment Setup
Before writing any code, let's establish the environment. You'll need:
- Python 3.10 or later
- At least 8GB RAM (16GB recommended for larger models)
- A machine with AVX2 support (most x86 CPUs from 2013 onward)
- DeepSeek model weights (we'll use the 7B parameter version)
First, create a virtual environment and install dependencies:
python -m venv deepseek-security
source deepseek-security/bin/activate
# Core dependencies
pip install torch==2.1.2 transformers [4]==4.36.2 sentence-transformers==2.2.2
pip install pandas==2.1.4 numpy==1.26.2 pyyaml==6.0.1
pip install fastapi==0.108.0 uvicorn==0.25.0
pip install chromadb [8]==0.4.22 langchain==0.1.0
The transformers library provides the DeepSeek model interface, while sentence-transformers handles embedding generation for semantic search. We use chromadb as our vector store—it's lightweight, runs locally, and supports persistent storage. langchain provides the orchestration layer for chaining together embedding, retrieval, and LLM inference.
Download the DeepSeek model weights. As of early 2026, the 7B parameter model requires approximately 14GB of disk space:
# Install huggingface-cli if needed
pip install huggingface-hub
# Download DeepSeek-7B (this may take 10-30 minutes depending on bandwidth)
huggingface-cli download deepseek-ai/deepseek-llm-7b-chat --local-dir ./models/deepseek-7b
If you're resource-constrained, the 1.3B parameter version works for basic analysis but produces less nuanced security explanations. The 7B model strikes the best balance between accuracy and hardware requirements for production use.
Building the Security Log Analyzer
Core Architecture and Data Flow
The analyzer consists of four components that communicate through a pipeline pattern:
- Log Parser: Converts raw log lines into structured dictionaries
- Embedding Engine: Generates vector representations of log entries
- Vector Store: Enables semantic similarity search across historical logs
- DeepSeek Analyzer: Performs natural language reasoning on log patterns
Here's the complete implementation:
# security_analyzer.py
import json
import re
import hashlib
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
import torch
import pandas as pd
from transformers import AutoTokenizer, AutoModelForCausalLM
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
@dataclass
class LogEntry:
"""Structured representation of a parsed security log entry."""
raw_text: str
timestamp: datetime
source_ip: Optional[str] = None
destination_ip: Optional[str] = None
event_type: str = "unknown"
severity: str = "info"
user: Optional[str] = None
process: Optional[str] = None
message: str = ""
hash_id: str = field(init=False)
def __post_init__(self):
# Generate a deterministic hash for deduplication
content = f"{self.timestamp.isoformat()}|{self.source_ip}|{self.event_type}|{self.message}"
self.hash_id = hashlib.sha256(content.encode()).hexdigest()[:16]
class LogParser:
"""
Parses various security log formats into structured LogEntry objects.
Supports syslog, Apache/Nginx access logs, auth logs, and custom JSON formats.
"""
# Common log patterns
SYSLOG_PATTERN = re.compile(
r'^(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+(\S+)\s+(\S+)\[(\d+)\]:\s+(.*)$'
)
AUTH_LOG_PATTERN = re.compile(
r'^(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+(\S+)\s+(\S+):\s+(Failed|Accepted)\s+(password|publickey)\s+for\s+(\S+)\s+from\s+(\S+)\s+port\s+\d+\s+\S+$'
)
APACHE_COMBINED = re.compile(
r'^(\S+)\s+(\S+)\s+(\S+)\s+\[([^\]]+)\]\s+"([^"]+)"\s+(\d+)\s+(\d+)\s+"([^"]*)"\s+"([^"]*)"$'
)
def __init__(self):
self.parsers = [
self._parse_auth_log,
self._parse_syslog,
self._parse_apache,
self._parse_json,
]
def parse_line(self, line: str) -> Optional[LogEntry]:
"""Try each parser in order, return first successful parse."""
for parser in self.parsers:
try:
result = parser(line)
if result is not None:
return result
except Exception:
continue
# Fallback: create generic entry
return LogEntry(
raw_text=line,
timestamp=datetime.now(),
message=line[:500], # Truncate very long lines
event_type="unparsed"
)
def _parse_auth_log(self, line: str) -> Optional[LogEntry]:
"""Parse SSH authentication log entries."""
match = self.AUTH_LOG_PATTERN.match(line)
if not match:
return None
timestamp_str = match.group(1)
hostname = match.group(2)
service = match.group(3)
auth_result = match.group(4)
auth_method = match.group(5)
username = match.group(6)
source_ip = match.group(7)
# Parse timestamp (assumes current year)
timestamp = datetime.strptime(
f"{datetime.now().year} {timestamp_str}",
"%Y %b %d %H:%M:%S"
)
severity = "warning" if auth_result == "Failed" else "info"
return LogEntry(
raw_text=line,
timestamp=timestamp,
source_ip=source_ip,
event_type=f"ssh_{auth_result.lower()}",
severity=severity,
user=username,
process=f"{service}[{hostname}]",
message=f"SSH {auth_result} {auth_method} for {username} from {source_ip}"
)
def _parse_syslog(self, line: str) -> Optional[LogEntry]:
"""Parse standard syslog format entries."""
match = self.SYSLOG_PATTERN.match(line)
if not match:
return None
timestamp_str = match.group(1)
hostname = match.group(2)
process = match.group(3)
pid = match.group(4)
message = match.group(5)
timestamp = datetime.strptime(
f"{datetime.now().year} {timestamp_str}",
"%Y %b %d %H:%M:%S"
)
# Determine severity from message content
severity = "info"
if any(kw in message.lower() for kw in ['error', 'fail', 'denied', 'invalid']):
severity = "error"
elif any(kw in message.lower() for kw in ['warn', 'unauthorized']):
severity = "warning"
return LogEntry(
raw_text=line,
timestamp=timestamp,
event_type="syslog",
severity=severity,
process=f"{process}[{pid}]",
message=message
)
def _parse_apache(self, line: str) -> Optional[LogEntry]:
"""Parse Apache combined log format."""
match = self.APACHE_COMBINED.match(line)
if not match:
return None
source_ip = match.group(1)
timestamp_str = match.group(4)
request = match.group(5)
status_code = match.group(6)
user_agent = match.group(9)
# Apache timestamp format: 01/Jan/2024:12:00:00 +0000
timestamp = datetime.strptime(
timestamp_str.split()[0],
"%d/%b/%Y:%H:%M:%S"
)
severity = "info"
if status_code.startswith('4'):
severity = "warning"
elif status_code.startswith('5'):
severity = "error"
return LogEntry(
raw_text=line,
timestamp=timestamp,
source_ip=source_ip,
event_type="http_request",
severity=severity,
message=f"{request} -> {status_code} [{user_agent}]"
)
def _parse_json(self, line: str) -> Optional[LogEntry]:
"""Parse JSON-formatted log entries."""
try:
data = json.loads(line)
except json.JSONDecodeError:
return None
timestamp = datetime.fromisoformat(
data.get('timestamp', datetime.now().isoformat())
)
return LogEntry(
raw_text=line,
timestamp=timestamp,
source_ip=data.get('src_ip'),
destination_ip=data.get('dst_ip'),
event_type=data.get('event_type', 'json_log'),
severity=data.get('severity', 'info'),
user=data.get('user'),
process=data.get('process'),
message=data.get('message', '')
)
class SecurityAnalyzer:
"""
Core analysis engine that uses DeepSeek for log interpretation.
Maintains a vector store for semantic similarity search across historical logs.
"""
def __init__(
self,
model_path: str = "./models/deepseek-7b",
embedding_model: str = "all-MiniLM-L6-v2",
db_path: str = "./chroma_db",
device: str = "cpu"
):
self.device = device
# Initialize embedding model for semantic search
print("Loading embedding model..")
self.embedder = SentenceTransformer(embedding_model)
self.embedder.to(device)
# Initialize DeepSeek for analysis
print("Loading DeepSeek model (this may take several minutes)..")
self.tokenizer = AutoTokenizer.from_pretrained(
model_path,
trust_remote_code=True
)
self.model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
device_map="auto",
trust_remote_code=True
)
# Initialize vector store
self.chroma_client = chromadb.PersistentClient(
path=db_path,
settings=Settings(anonymized_telemetry=False)
)
self.collection = self.chroma_client.get_or_create_collection(
name="security_logs",
metadata={"hnsw:space": "cosine"}
)
self.parser = LogParser()
self.log_buffer: List[LogEntry] = []
def ingest_logs(self, log_lines: List[str], batch_size: int = 100) -> int:
"""
Parse and index log entries. Returns count of successfully processed entries.
Handles edge cases:
- Empty lines are skipped silently
- Malformed lines are parsed as generic entries
- Duplicate detection via hash_id
"""
processed = 0
batch_entries = []
batch_embeddings = []
batch_ids = []
for line in log_lines:
line = line.strip()
if not line:
continue
entry = self.parser.parse_line(line)
if entry is None:
continue
# Check for duplicates in current batch
if entry.hash_id in batch_ids:
continue
# Generate embedding for semantic search
embedding_text = f"{entry.event_type} {entry.severity} {entry.message}"
embedding = self.embedder.encode(embedding_text).tolist()
batch_entries.append(entry)
batch_embeddings.append(embedding)
batch_ids.append(entry.hash_id)
processed += 1
# Flush batch to vector store
if len(batch_entries) >= batch_size:
self._flush_batch(batch_entries, batch_embeddings, batch_ids)
batch_entries = []
batch_embeddings = []
batch_ids = []
# Flush remaining entries
if batch_entries:
self._flush_batch(batch_entries, batch_embeddings, batch_ids)
self.log_buffer.extend(batch_entries)
return processed
def _flush_batch(
self,
entries: List[LogEntry],
embeddings: List[List[float]],
ids: List[str]
):
"""Write batch to ChromaDB with metadata."""
metadatas = []
documents = []
for entry in entries:
metadatas.append({
"timestamp": entry.timestamp.isoformat(),
"event_type": entry.event_type,
"severity": entry.severity,
"source_ip": entry.source_ip or "",
"user": entry.user or "",
})
documents.append(entry.message)
self.collection.add(
embeddings=embeddings,
documents=documents,
metadatas=metadatas,
ids=ids
)
def find_similar_events(
self,
query: str,
n_results: int = 10,
filter_dict: Optional[Dict] = None
) -> List[Dict]:
"""
Semantic search for events similar to the query.
Supports metadata filtering (e.g., {"severity": "error"}).
"""
query_embedding = self.embedder.encode(query).tolist()
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
where=filter_dict
)
return [
{
"id": results["ids"][0][i],
"document": results["documents"][0][i],
"metadata": results["metadatas"][0][i],
"distance": results["distances"][0][i]
}
for i in range(len(results["ids"][0]))
]
def analyze_logs(
self,
query: str,
context_logs: List[LogEntry],
max_context_length: int = 2000
) -> str:
"""
Use DeepSeek to analyze a set of logs and answer a security question.
The prompt engineering is critical here. We structure the input to:
1. Provide clear context about the logs
2. Ask a specific analytical question
3. Request structured output (JSON) for programmatic consumption
"""
# Build context string from logs
context_parts = []
total_length = 0
for entry in context_logs:
entry_str = (
f"[{entry.timestamp.isoformat()}] "
f"{entry.event_type}/{entry.severity}: "
f"{entry.message[:200]}"
)
entry_length = len(entry_str) + 1
if total_length + entry_length > max_context_length:
break
context_parts.append(entry_str)
total_length += entry_length
context = "\n".join(context_parts)
# Construct the prompt
prompt = f"""You are a security log analyst. Analyze the following log entries and answer the question.
Security Logs:
{context}
Question: {query}
Provide your analysis in JSON format with these fields:
- "summary": Brief summary of findings (1-2 sentences)
- "threat_level": "low", "medium", "high", or "critical"
- "key_findings": List of specific observations
- "recommendations": List of actionable steps
- "confidence": Your confidence in this analysis (0.0 to 1.0)
Analysis:"""
# Generate response with DeepSeek
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=512,
temperature=0.3, # Low temperature for analytical tasks
do_sample=True,
top_p=0.9,
pad_token_id=self.tokenizer.eos_token_id
)
response = self.tokenizer.decode(
outputs[0][inputs['input_ids'].shape[1]:],
skip_special_tokens=True
)
return response.strip()
def detect_anomalies(
self,
window_minutes: int = 60,
threshold: float = 2.0
) -> List[Dict]:
"""
Detect anomalous log patterns using statistical analysis.
Uses a simple z-score approach on event frequencies. In production,
you'd want more sophisticated methods (isolation forest, LSTM, etc.).
"""
if not self.log_buffer:
return []
# Convert to DataFrame for analysis
records = []
for entry in self.log_buffer:
records.append({
'timestamp': entry.timestamp,
'event_type': entry.event_type,
'severity': entry.severity,
'source_ip': entry.source_ip,
})
df = pd.DataFrame(records)
# Group by event type and count in time windows
df['window'] = df['timestamp'].dt.floor(f'{window_minutes}min')
event_counts = df.groupby(['window', 'event_type']).size().reset_index(name='count')
# Calculate z-scores for each event type
anomalies = []
for event_type in event_counts['event_type'].unique():
type_data = event_counts[event_counts['event_type'] == event_type]
mean_count = type_data['count'].mean()
std_count = type_data['count'].std()
if std_count == 0:
continue
for _, row in type_data.iterrows():
z_score = (row['count'] - mean_count) / std_count
if z_score > threshold:
anomalies.append({
'timestamp': row['window'],
'event_type': event_type,
'count': int(row['count']),
'z_score': float(z_score),
'severity': 'high' if z_score > 3.0 else 'medium'
})
return sorted(anomalies, key=lambda x: x['z_score'], reverse=True)
# FastAPI server for production deployment
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI(title="Security Log Analyzer API")
analyzer = SecurityAnalyzer()
class LogBatch(BaseModel):
logs: List[str]
class AnalysisRequest(BaseModel):
query: str
n_context_logs: int = 20
filter: Optional[Dict] = None
@app.post("/ingest")
async def ingest_logs(batch: LogBatch):
"""Ingest a batch of log lines for analysis."""
count = analyzer.ingest_logs(batch.logs)
return {"processed": count, "status": "success"}
@app.post("/analyze")
async def analyze_logs(request: AnalysisRequest):
"""Analyze logs using DeepSeek with semantic context."""
# Find relevant context logs
similar = analyzer.find_similar_events(
request.query,
n_results=request.n_context_logs,
filter_dict=request.filter
)
# Convert to LogEntry objects for analysis
context_entries = []
for result in similar:
entry = LogEntry(
raw_text=result['document'],
timestamp=datetime.fromisoformat(result['metadata']['timestamp']),
event_type=result['metadata']['event_type'],
severity=result['metadata']['severity'],
source_ip=result['metadata'].get('source_ip'),
user=result['metadata'].get('user'),
message=result['document']
)
context_entries.append(entry)
analysis = analyzer.analyze_logs(request.query, context_entries)
return {
"analysis": analysis,
"context_logs_count": len(context_entries),
"similarity_scores": [r['distance'] for r in similar]
}
@app.get("/anomalies")
async def get_anomalies(window_minutes: int = 60, threshold: float = 2.0):
"""Detect anomalous log patterns."""
anomalies = analyzer.detect_anomalies(window_minutes, threshold)
return {"anomalies": anomalies, "count": len(anomalies)}
Running the Analyzer
Start the API server:
uvicorn security_analyzer:app --host 0.0.0.0 --port 8000 --reload
Ingest some sample logs:
import requests
# Sample security logs
sample_logs = [
'Jan 15 10:23:45 server sshd[1234]: Failed password for root from 192.168.1.100 port 22 ssh2',
'Jan 15 10:23:46 server sshd[1234]: Failed password for root from 192.168.1.100 port 22 ssh2',
'Jan 15 10:23:47 server sshd[1234]: Failed password for root from 192.168.1.100 port 22 ssh2',
'Jan 15 10:24:00 server sshd[1235]: Accepted publickey for admin from 10.0.0.50 port 44322 ssh2',
'192.168.1.100 - - [15/Jan/2024:10:25:00 +0000] "GET /wp-admin/admin-ajax.php HTTP/1.1" 404 1234 "-" "Mozilla/5.0"',
'{"timestamp": "2024-01-15T10:26:00", "event_type": "firewall_block", "src_ip": "185.220.101.1", "message": "Blocked inbound connection on port 445"}',
]
response = requests.post(
"http://localhost:8000/ingest",
json={"logs": sample_logs}
)
print(response.json())
Now run an analysis:
analysis = requests.post(
"http://localhost:8000/analyze",
json={
"query": "Is there evidence of a brute force attack? What patterns do you see?",
"n_context_logs": 10
}
)
print(analysis.json()['analysis'])
DeepSeek will produce output like:
{
"summary": "Multiple failed SSH authentication attempts from 192.168.1.100 targeting root account, followed by a successful login from a different IP, indicating a potential brute force attack.",
"threat_level": "high",
"key_findings": [
"Three failed root login attempts within 2 seconds from 192.168.1.100",
"Successful SSH login from 10.0.0.50 shortly after failed attempts",
"HTTP request to WordPress admin endpoint from the attacking IP",
"Firewall block on SMB port 445 from external IP"
],
"recommendations": [
"Block 192.168.1.100 at the firewall immediately",
"Enable fail2ban or similar rate-limiting for SSH",
"Audit the successful login from 10.0.0.50 for compromise indicators",
"Review WordPress admin access logs for unauthorized activity"
],
"confidence": 0.92
}
Production Considerations and Edge Cases
Memory Management
DeepSeek 7B requires approximately 14GB of RAM when loaded with float32 precision. On systems with limited memory, you have options:
-
Quantization: Load the model in 8-bit or 4-bit precision using
bitsandbytes:from transformers import BitsAndBytesConfig quantization_config = BitsAndBytesConfig( load_in_8bit=True, llm_int8_threshold=6.0 ) model = AutoModelForCausalLM.from_pretrained( model_path, quantization_config=quantization_config, device_map="auto" )This reduces memory usage to approximately 8GB for 8-bit or 4GB for 4-bit, with minimal accuracy loss for analytical tasks.
-
Model offloading: Use
device_map="auto"to split the model across CPU and GPU if available. -
Batch processing: Process logs in batches of 100-1000 to avoid memory spikes during embedding generation.
Handling Malformed Logs
In production, you'll encounter every imaginable log format variant. The parser handles this through graceful degradation:
- Unknown formats get tagged as "unparsed" with the raw text preserved
- Timestamp parsing failures fall back to current time
- Missing fields are set to
Nonerather than raising exceptions - Lines exceeding 500 characters are truncated to prevent embedding dimension issues
Vector Store Scaling
ChromaDB stores embeddings and metadata locally. For large-scale deployments (millions of logs), consider:
- Sharding: Split logs by date or source into separate collections
- TTL-based cleanup: Implement a retention policy that deletes old entries
- Batch indexing: Use the
batch_sizeparameter to control memory during ingestion
The hnsw:space parameter set to "cosine" provides good semantic search results for security logs. For exact nearest neighbor search, change to "l2" (Euclidean distance), but expect slower query times on large datasets.
API Rate Limiting and Concurrency
The FastAPI server uses synchronous model inference, which blocks the event loop. For production, wrap the model calls in a thread pool:
from concurrent.futures import ThreadPoolExecutor
import asyncio
executor = ThreadPoolExecutor(max_workers=1) # Only one model instance
@app.post("/analyze")
async def analyze_logs(request: AnalysisRequest):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor,
analyzer.analyze_logs,
request.query,
context_entries,
2000
)
return {"analysis": result}
This prevents the server from blocking while DeepSeek generates responses.
Conclusion
Running DeepSeek locally for security log analysis gives you the analytical power of a large language model without compromising data sovereignty. The system we built handles real-world log formats, performs semantic search across historical events, and generates actionable security insights with quantified confidence levels.
The key architectural decisions—using ChromaDB for vector storage, sentence-transformers for embeddings, and careful prompt engineering for DeepSeek—create a pipeline that's both accurate and maintainable. As of 2026, this approach is production-ready for organizations processing up to millions of log events daily on a single server.
Research on "Formal Modelling and Security Analysis of Bitcoin's Payment Protocol" (ArXiv, 2024) demonstrates that formal methods can enhance security analysis, and while our system uses statistical and semantic approaches rather than formal verification, the combination of vector search with LLM reasoning provides practical, actionable results for security teams.
What's Next
- Extend the parser: Add support for Windows Event Logs, AWS CloudTrail, and Kubernetes audit logs
- Implement alerting: Integrate with Slack, PagerDuty, or email for real-time notifications
- Add temporal correlation: Use the timestamp metadata to build time-series anomaly detection
- Explore fine-tuning: Fine-tune DeepSeek on your specific security log formats for improved accuracy
- Scale horizontally: Deploy behind a load balancer with shared ChromaDB storage for multi-node processing
The complete code is available on GitHub. For more tutorials on local LLM deployment and security automation, check out our guides on building production AI pipelines and securing your infrastructure with open-source tools.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Build a Claude 3.5 Artifact Generator with Python
Practical tutorial: Build a Claude 3.5 artifact generator
How to Build a RAG Pipeline with LanceDB and LangChain
Practical tutorial: It addresses a common issue with AI usage but lacks broad industry impact.
How to Build an AI Agent with CrewAI and DeepSeek-V3
Practical tutorial: Build an autonomous AI agent with CrewAI and DeepSeek-V3