Introduction
How many log files does your VPS generate daily?
Nginx access logs, system kern.log, Docker container logs, application error logs… These files pile up mountains high, and when you’re woken at 3 AM by a monitoring alert, you’re staring at hundreds of thousands of log lines trying to find the one root cause of the service outage.
Traditional log analysis tools (grep, awk, ELK) excel at pattern matching, but they don’t understand semantics. They know “there were 503 errors,” but they don’t know “this is a cascading failure caused by the upstream database connection pool being exhausted.”
This article shows how to build an intelligent log analysis system using LLMs, automating the transformation from raw logs to actionable insights.
Why Traditional Log Analysis Falls Short
Three Key Pain Points
| Pain Point | Traditional Approach | AI-Enhanced Approach |
|---|---|---|
| No semantic understanding | Regex keyword matching | LLM understands context and meaning |
| Cross-source correlation is hard | Manual investigation across files | Automatic event correlation across log sources |
| Root causes are vague | Alerts only say “something broke” | Provides specific causes and fix recommendations |
Here’s a concrete example:
Traditional alert: Nginx returning大量 503 errors
AI analysis: Nginx 503s are caused by backend Node.js app timing out
connecting to PostgreSQL (PostgreSQL logs show
connection pool exhausted)
Root cause: pgbouncer max_connections=100, currently 98 active connections
Fix: Increase pgbouncer max_connections to 200 and check the app
for connection leaks
The second version tells you what happened, why it happened, and exactly how to fix it.
Architecture Design: AI Log Analysis Pipeline
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Log Ingest │────▶│ Preprocessing│────▶│ AI Engine │
│ FluentBit │ │ Filter/Agg │ │ LLM + RAG │
└─────────────┘ └──────────────┘ └──────┬──────┘
│
┌───────▼───────┐
│ Insights Out │
│ Slack/Email/Ticket │
└──────────────┘
1. Log Collection Layer
We recommend Fluent Bit as a lightweight log collector — it’s more resource-efficient than Fluentd, ideal for VPS environments:
# fluent-bit.conf
[SERVICE]
Flush 5
Log_Level info
Daemon off
[INPUT]
Name tail
Path /var/log/nginx/access.log, /var/log/nginx/error.log
Tag nginx.*
Rotate_Wait 30
Mem_Buf_Limit 5MB
Skip_Long_Lines Yes
[INPUT]
Name tail
Path /var/log/syslog, /var/log/kern.log
Tag system.*
DB /var/log/flb_sys.db
[INPUT]
Name tail
Path /var/log/docker/*.log
Tag docker.*
Parser docker
[OUTPUT]
Name stdout
Match *
Format json_flat
Installation:
# Ubuntu/Debian
curl https://fluentbit.io/install.sh | bash
sudo systemctl enable fluent-bit
sudo systemctl start fluent-bit
2. Preprocessing Pipeline
Before sending logs to the LLM, preprocess them to reduce token consumption and improve analysis quality:
# preprocess_logs.py
import json
import re
from datetime import datetime, timedelta
class LogPreprocessor:
"""Log preprocessing: deduplication, aggregation, context enrichment"""
# Common noise patterns
NOISE_PATTERNS = [
r'^GET /healthz HTTP', # Health checks
r'^GET /favicon\.ico', # Icon requests
r'^OPTIONS /', # CORS preflight
r'Server-Timing:.*', # Performance headers
]
def __init__(self, window_minutes=15):
self.window = timedelta(minutes=window_minutes)
self.patterns = [re.compile(p) for p in self.NOISE_PATTERNS]
def is_noise(self, line):
"""Check if a log line is noise"""
return any(p.search(line) for p in self.patterns)
def extract_error_context(self, error_line, all_lines, before=5, after=5):
"""Extract context around an error line"""
try:
idx = all_lines.index(error_line)
start = max(0, idx - before)
end = min(len(all_lines), idx + after + 1)
return all_lines[start:end]
except ValueError:
return [error_line]
def aggregate_similar_errors(self, error_lines):
"""Aggregate similar errors to reduce redundancy"""
groups = {}
for line in error_lines:
# Generalize dynamic parts of error messages
key = re.sub(r'\d+\.\d+\.\d+\.\d+', '<IP>', line)
key = re.sub(r'/\d+', '/<NUM>', key)
key = re.sub(r'[0-9a-f]{8}-[0-9a-f]{4}', '<UUID>', key)
groups.setdefault(key, []).append(line)
return {k: v[:3] for k, v in groups.items()} # Max 3 per group
3. AI Analysis Engine
This is the core. We use a locally deployed LLM (such as Ollama + Llama 3.1 or Qwen 2.5) to keep data on your VPS:
# ai_log_analyzer.py
import json
import requests
from datetime import datetime
class AILogAnalyzer:
"""LLM-powered log analysis engine"""
SYSTEM_PROMPT = """You are a senior SRE engineer and log analysis expert.
Your task is to extract key information from VPS logs, identify anomalies,
determine root causes, and provide remediation recommendations.
Follow these principles:
1. Prioritize ERROR/WARN/FATAL level logs
2. Cross-reference multiple log sources (Nginx, Docker, System)
3. Distinguish between transient errors and persistent failures
4. Provide specific, actionable fix steps with commands and config examples
5. If the root cause is uncertain, list likely hypotheses and verification methods"""
def __init__(self, ollama_url="http://localhost:11434"):
self.url = ollama_url
def analyze(self, log_groups, metadata=None):
"""Analyze log groups and return structured results"""
analysis_request = {
"timestamp": datetime.now().isoformat(),
"metadata": metadata or {},
"error_groups": {}
}
for group_name, entries in log_groups.items():
analysis_request["error_groups"][group_name] = {
"count": len(entries),
"sample_entries": entries[:5],
"time_range": {
"first": entries[0].get("timestamp", ""),
"last": entries[-1].get("timestamp", "")
} if entries else {}
}
# Build the prompt
prompt = f"""Analyze the following VPS log anomalies:
{json.dumps(analysis_request, indent=2, ensure_ascii=False)}
Please provide:
1. **Summary**: One-sentence overview of the problem
2. **Root Cause Analysis**: Detailed explanation of the causal chain
3. **Impact Assessment**: Which services/users are affected
4. **Remediation Steps**: Specific actions with commands
5. **Prevention**: How to avoid similar issues in the future"""
response = requests.post(
f"{self.url}/api/generate",
json={
"model": "qwen2.5:7b",
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.3,
"max_tokens": 2048
}
},
timeout=60
)
return response.json()
4. Insight Delivery Layer
After analysis, push results to the appropriate channels:
# notifier.py
import requests
import smtplib
from email.mime.text import MIMEText
class InsightNotifier:
"""Multi-channel notification sender"""
def __init__(self, config):
self.slack_webhook = config.get("slack_webhook")
self.email_smtp = config.get("email_smtp")
self.email_to = config.get("email_to")
def send_slack(self, title, content, severity="info"):
"""Send Slack notification"""
color_map = {
"critical": "#ff0000",
"warning": "#ffaa00",
"info": "#07c160"
}
payload = {
"attachments": [{
"color": color_map.get(severity, "#07c160"),
"title": title,
"text": content,
"ts": int(__import__('time').time())
}]
}
if self.slack_webhook:
requests.post(self.slack_webhook, json=payload, timeout=10)
def send_email(self, subject, body):
"""Send email notification"""
if self.email_smtp:
msg = MIMEText(body, 'html', 'utf-8')
msg['Subject'] = subject
msg['From'] = 'monitor@yourvps.com'
msg['To'] = self.email_to
smtp = smtplib.SMTP(self.email_smtp, 587)
smtp.starttls()
smtp.login('monitor@yourvps.com', 'password')
smtp.send_message(msg)
smtp.quit()
Hands-On: Complete Automated Diagnosis Script
Here’s a complete script you can run directly on your VPS, covering the full pipeline from log collection to AI analysis:
#!/bin/bash
# ai_log_diagnosis.sh - One-click AI log diagnosis
# Usage: sudo bash ai_log_diagnosis.sh
set -euo pipefail
LOG_DIR="/var/log"
OUTPUT_DIR="/tmp/log_analysis"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
OLLAMA_URL="http://localhost:11434"
MODEL="qwen2.5:7b"
mkdir -p "$OUTPUT_DIR"
echo "=== AI Log Diagnosis Started ==="
echo "Timestamp: $TIMESTAMP"
# Step 1: Collect logs from last 15 minutes
echo "[1/4] Collecting logs..."
tail_lines=500
# Nginx error log
if [ -f "$LOG_DIR/nginx/error.log" ]; then
tail -$tail_lines "$LOG_DIR/nginx/error.log" > "$OUTPUT_DIR/nginx_errors.log"
fi
# System logs
journalctl --since "15 minutes ago" --priority=err > "$OUTPUT_DIR/system_errors.log" 2>/dev/null || true
# Docker container logs
for container in $(docker ps --format '{{.Names}}' 2>/dev/null); do
docker logs --since 15m --tail 200 "$container" 2>/dev/null \
| grep -iE "error|fatal|panic|exception" \
> "$OUTPUT_DIR/container_${container}.log" 2>/dev/null || true
done
# PostgreSQL logs
if [ -f "$LOG_DIR/postgresql/postgresql-*.log" ]; then
tail -$tail_lines "$LOG_DIR/postgresql/postgresql-*.log" > "$OUTPUT_DIR/pg_errors.log"
fi
# Step 2: Merge all error logs
echo "[2/4] Merging logs..."
cat "$OUTPUT_DIR"/*.log 2>/dev/null | sort -u > "$OUTPUT_DIR/all_errors.txt"
TOTAL_ERRORS=$(wc -l < "$OUTPUT_DIR/all_errors.txt")
echo "Found $TOTAL_ERRORS error log lines"
if [ "$TOTAL_ERRORS" -eq 0 ]; then
echo "✅ No anomalies detected"
exit 0
fi
# Step 3: Call LLM for analysis
echo "[3/4] AI analysis in progress..."
PROMPT=$(cat <<EOF
You are a senior SRE engineer. Here are the error logs from my VPS
in the last 15 minutes:
$(head -200 "$OUTPUT_DIR/all_errors.txt")
Please analyze:
1. What is the root cause of these errors?
2. Is there a causal relationship between them?
3. What commands should I run to fix this?
4. How can I prevent this from happening again?
Please answer in English with specific command examples.
EOF
)
RESPONSE=$(curl -s "$OLLAMA_URL/api/generate" \
-d "{
\"model\": \"$MODEL\",
\"prompt\": \"$PROMPT\",
\"stream\": false
}" 2>/dev/null | python3 -c "
import sys, json
data = json.load(sys.stdin)
print(data.get('response', 'Analysis failed'))
" 2>/dev/null || echo "LLM unavailable, please check if Ollama is running")
echo "[4/4] Outputting results..."
echo "=========================="
echo "$RESPONSE"
echo "=========================="
echo "Detailed report saved to: $OUTPUT_DIR/diagnosis_${TIMESTAMP}.txt"
echo "=== Diagnosis Complete ==="
Advanced: Building a RAG Knowledge Base for Diagnostics
When your VPS runs a complex application stack, logs alone may not be enough to pinpoint problems. This is where RAG (Retrieval-Augmented Generation) comes in — injecting historical incident reports, runbooks, and knowledge base articles into the analysis context:
# rag_diagnoser.py
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.chains import RetrievalQA
class RAGDiagnoser:
"""Intelligent diagnosis system with RAG"""
def __init__(self, knowledge_dir="./knowledge_base"):
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
self.vectorstore = Chroma(
persist_directory=f"{knowledge_dir}/chroma_db",
embedding_function=self.embeddings
)
self.qa_chain = RetrievalQA.from_chain_type(
llm=None, # Pass your LLM instance
chain_type="stuff",
retriever=self.vectorstore.as_retriever(
search_kwargs={"k": 5}
)
)
def diagnose_with_knowledge(self, error_description):
"""Diagnose combining knowledge base"""
query = f"""Based on the following error description and historical
incident reports, analyze possible root causes and solutions:
Error description: {error_description}
Reference similar cases from the knowledge base and provide
diagnostic recommendations."""
result = self.qa_chain.run(query)
return result
Building the Knowledge Base
# 1. Organize historical incident reports
mkdir -p knowledge_base/articles
cp /var/log/incident_reports/*.md knowledge_base/articles/
# 2. Create vector index
python3 << 'PYEOF'
import os
from langchain.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
loader = DirectoryLoader(
'knowledge_base/articles',
glob="**/*.md",
loader_cls=TextLoader,
loader_kwargs={'encoding': 'utf-8'}
)
documents = loader.load()
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_documents(documents)
embeddings = HuggingFaceEmbeddings()
vectorstore = Chroma.from_documents(chunks, embeddings,
persist_directory='knowledge_base/chroma_db')
vectorstore.persist()
print(f"Indexed {len(chunks)} text chunks")
PYEOF
Performance Optimization: Reducing Analysis Latency
LLM analysis takes time. In production environments, consider these optimizations:
1. Smart Sampling Strategy
Don’t send all logs to the LLM:
def smart_sample(log_lines, max_tokens=3000):
"""Smart sampling: preserve critical logs, remove duplicates"""
sampled = []
token_count = 0
# Prioritize errors and warnings
priority = [l for l in log_lines if any(
kw in l.upper() for kw in ['ERROR', 'WARN', 'FATAL', 'CRIT']
)]
# Supplement with normal logs (sample 10%)
normal = [l for l in log_lines if l not in priority]
sampled.extend(priority[:50])
sampled.extend(normal[::10][:100])
# Truncate to token limit
for line in sampled:
if token_count + len(line) > max_tokens:
break
token_count += len(line)
return sampled
2. Local Model Selection
| Model | Parameters | Memory Needed | Best For |
|---|---|---|---|
| Qwen2.5-7B | 7B | ~6GB RAM | General log analysis |
| Phi-3-mini | 3.8B | ~4GB RAM | Resource-constrained VPS |
| Llama-3.1-8B | 8B | ~8GB RAM | High-precision analysis |
For a 2GB RAM VPS, Phi-3-mini is recommended; 4GB+ can use Qwen2.5-7B.
3. Caching Mechanism
Cache LLM analysis results for identical error patterns:
import hashlib
import json
import time
class AnalysisCache:
def __init__(self, cache_file="/tmp/log_analysis_cache.json"):
self.cache_file = cache_file
self.cache = self._load()
def _load(self):
try:
with open(self.cache_file) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _hash(self, text):
return hashlib.sha256(text.encode()).hexdigest()[:16]
def get(self, text):
key = self._hash(text)
entry = self.cache.get(key)
if entry and (time.time() - entry['time']) < 3600:
return entry['result']
return None
def put(self, text, result):
key = self._hash(text)
self.cache[key] = {'result': result, 'time': time.time()}
if len(self.cache) > 1000:
oldest = min(self.cache, key=lambda k: self.cache[k]['time'])
del self.cache[oldest]
with open(self.cache_file, 'w') as f:
json.dump(self.cache, f)
Complete Deployment Summary
1. Install Ollama + choose appropriate model
curl -fsSL https://ollama.com/install.sh | sh
ollama pull qwen2.5:7b
2. Install Fluent Bit for log collection
curl https://fluentbit.io/install.sh | bash
3. Deploy the analysis script
wget https://raw.githubusercontent.com/.../ai_log_diagnosis.sh
chmod +x ai_log_diagnosis.sh
4. Configure cron job (auto-analysis every 15 minutes)
crontab -e
*/15 * * * * /opt/scripts/ai_log_diagnosis.sh >> /var/log/ai_diag.log 2>&1
5. Configure notification channels (Slack / Email / Telegram)
# Fill in the webhook URL in the script
Conclusion
AI-powered log analysis doesn’t replace traditional ELK/Grafana monitoring — it adds a layer of intelligent understanding on top of it.
Traditional monitoring tells you “what happened.” AI analysis tells you “why it happened” and “what to do about it.” Together, they form a complete observability loop.
For individual developers and small teams, the unique value of this approach lies in:
- Zero additional cost: Everything runs locally, no third-party AI services needed
- Privacy and security: Log data never leaves your VPS
- Plug-and-play: The scripts above can be copied and run immediately
- Continuous improvement: Diagnostic accuracy improves as the knowledge base grows
Next time your VPS alarms at midnight, you won’t need to sift through megabytes of logs manually — let AI find the real problem for you.
