Introduction
As a VPS operator, you’ve probably experienced scenarios like these:
- At 3 AM, you get a vague alert SMS. You SSH into the server and see pages of dense logs — you have no idea what went wrong
- The website slows down. After three hours of digging through Nginx error logs, you discover it was just an expired SSL certificate
- Disk space fills up from some service’s debug logs, but you can’t spot the pattern, so you resort to brute-force deletion
The bottleneck of traditional log analysis is that humans are bad at finding patterns in massive unstructured text.
LLMs excel at exactly that — understanding semantics, identifying anomalies, and summarizing findings. This article shows you how to combine LLMs + RAG + a lightweight rule engine to build an intelligent log analysis system on your VPS, achieving:
- 🔍 Anomaly Detection: Automatically identify error patterns and abnormal behaviors in logs
- 🧠 Root Cause Analysis: Leverage LLM context understanding to infer causes and suggest fixes
- 📝 Alert Summarization: Compress hundreds of log lines into a readable summary
- ⚡ Real-time Response: Receive structured reports the moment an issue occurs
1. Architecture Overview
┌─────────────────────────────────────────────────────────┐
│ Your VPS │
│ │
│ ┌──────────┐ ┌──────────────┐ ┌───────────────┐ │
│ │ Apps/ │───▶│ Log Collector │───▶│ Pattern Matcher│ │
│ │ Services │ │ (Vector │ │ (Regex + │ │
│ │ (Nginx │ │ journald) │ │ Thresholds) │ │
│ │ Docker) │ └──────────────┘ └───────┬───────┘ │
│ └──────────┘ ┌────────────▼───────┐ │
│ │ LLM Analysis Engine│ │
│ │ (Local/Cloud API) │ │
│ └────────────┬───────┘ │
│ │ │
│ ┌────────────────▼───────┐ │
│ │ Alert Notification │ │
│ │ (Telegram/DingTalk) │ │
│ └────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
Core flow:
- Log Collection Layer: Use Vector or Fluent Bit to unify log ingestion from all services
- Fast Filtering Layer: Apply regex and thresholds for preliminary screening, reducing data sent to LLM
- AI Analysis Layer: Send filtered log snippets to LLM for semantic analysis and root cause inference
- Notification Layer: Push AI-generated analysis results to you in a structured format
2. Log Collection: Unified Pipeline with Vector
Vector is a high-performance, low-resource log pipeline tool — lighter than Fluentd, more feature-rich than Fluent Bit.
Install Vector
# Ubuntu/Debian
curl --proto '=https' --tlsv1.2 -sSf https://sh.vector.dev | sh
sudo systemctl enable vector
sudo systemctl start vector
Configure Vector to Collect Multi-Source Logs
Create /etc/vector/vector.toml:
# Source: Nginx error logs
[sources.nginx_errors]
type = "file"
include = ["/var/log/nginx/error.log"]
read_from = "beginning"
# Source: Docker container logs
[sources.docker_logs]
type = "docker_logs"
# Source: systemd journal
[sources.journald]
type = "journald"
journal_directory = "/var/log/journal"
# Transform: Flag important logs
[transforms.flag_important]
type = "remap"
inputs = ["nginx_errors", "docker_logs", "journald"]
source = '''
.important = contains(.message, "error") ||
contains(.message, "fatal") ||
contains(.message, "panic") ||
contains(.message, "OOM") ||
contains(.message, "timeout") ||
contains(.message, "connection refused") ||
(.level == "err" or .level == "critical")
'''
# Transform: Coalesce logs within a time window
[transforms.windowed_logs]
type = "coalesce"
inputs = ["flag_important"]
max_wait_ms = 30000 # 30-second window
# Output: Send to local HTTP endpoint for AI service consumption
[transforms.to_ai_service]
type = "remap"
inputs = ["windowed_logs"]
source = '''
.ai_payload = {
"timestamp": now(),
"service": .service // "unknown",
"level": .level // "info",
"message": .message,
"host": host,
}
'''
[sinks.http_ai]
type = "http"
inputs = ["to_ai_service"]
uri = "http://127.0.0.1:8080/logs"
method = "post"
encoding.codec = "json"
Vector has minimal resource overhead — typically 10-20 MB RAM, virtually undetectable on your VPS.
3. Fast Pre-filtering: Reducing LLM Call Costs
Sending all logs to an LLM is neither economical nor efficient. We need a pre-filtering layer to identify which logs actually require AI intervention.
3.1 Rule-based Quick Filter
# filters.py - Quick rule engine
import re
from datetime import datetime, timedelta
# Predefined error patterns
ERROR_PATTERNS = [
re.compile(r'(?i)(error|fail|fatal|panic|crash)'),
re.compile(r'(?i)(OOM|out of memory|killed process)'),
re.compile(r'(?i)(connection refused|timeout|deadline exceeded)'),
re.compile(r'(?i)(SSL handshake failed|certificate.*expired)'),
re.compile(r'(?i)(disk full|no space left|quota exceeded)'),
re.compile(r'(?i)(permission denied|access forbidden)'),
re.compile(r'(?i)(segmentation fault|core dumped)'),
]
# Frequency thresholds: trigger only if pattern exceeds N occurrences in 5 min
FREQ_THRESHOLD = {
'auth_failure': 10,
'http_error': 50,
'connection_refused': 5,
}
class QuickFilter:
def __init__(self):
self.pattern_counts = {}
def needs_ai_analysis(self, log_entry: dict) -> bool:
"""Determine if a log entry needs AI deep analysis"""
message = log_entry.get('message', '')
# Check against error patterns
if not any(p.search(message) for p in ERROR_PATTERNS):
return False
# Count frequency
severity = self._classify_severity(message)
key = f"{severity}:{log_entry.get('service', 'unknown')}"
self.pattern_counts[key] = self.pattern_counts.get(key, 0) + 1
# Trigger only when threshold is exceeded
threshold = FREQ_THRESHOLD.get(severity, 5)
return self.pattern_counts[key] >= threshold
def _classify_severity(self, message: str) -> str:
if re.search(r'(?i)(fatal|panic|core dump)', message):
return 'critical'
if re.search(r'(?i)(OOM|killed process)', message):
return 'oom'
if re.search(r'(?i)(error|fail)', message):
return 'error'
return 'warning'
3.2 Sliding Window Aggregation
For bursts of identical logs (e.g., a crashing service producing 100 errors per second), we don’t make 100 LLM calls. Instead, we aggregate into one representative sample:
# aggregator.py
from collections import defaultdict
import hashlib
class LogAggregator:
def __init__(self, window_seconds=60):
self.window = window_seconds
self.buckets = defaultdict(list)
def add(self, log_entry: dict):
msg_hash = hashlib.md5(
log_entry['message'].encode()
).hexdigest()[:8]
bucket_key = f"{msg_hash}:{log_entry.get('service', 'unknown')}"
self.buckets[bucket_key].append(log_entry)
def get_aggregated(self) -> list:
"""Return aggregated log summaries"""
results = []
for key, entries in self.buckets.items():
results.append({
'bucket_key': key,
'count': len(entries),
'first_seen': min(e.get('timestamp') for e in entries),
'last_seen': max(e.get('timestamp') for e in entries),
'sample': entries[-1],
'messages': list(set(e['message'] for e in entries)),
})
return results
4. AI Analysis Engine: LLM-Powered Smart Diagnostics
This is the core of the system. We use a lightweight HTTP service to receive filtered logs and invoke an LLM for analysis.
4.1 Choose Your LLM Backend
Options:
- Cloud API: OpenRouter, Together AI, Google Gemini, etc.
- Local Deployment: Ollama + Llama 3 / Qwen 2.5 (8B recommended)
The example below uses the OpenRouter API. Switching to local Ollama is straightforward.
4.2 Analysis Service Code
# ai_service.py - AI log analysis service
import http.server
import json
import requests
from datetime import datetime
from filters import QuickFilter
from aggregator import LogAggregator
# LLM Configuration
LLM_API_URL = "https://openrouter.ai/api/v1/chat/completions"
LLM_API_KEY = "sk-or-xxxxx" # Replace with your API Key
LLM_MODEL = "qwen/qwen-2.5-7b-instruct:free" # Free tier available
# For local Ollama, use:
# LLM_API_URL = "http://localhost:11434/api/chat"
FILTER = QuickFilter()
AGGREGATOR = LogAggregator(window_seconds=60)
SYSTEM_PROMPT = """You are a professional VPS operations expert specializing in log analysis and troubleshooting.
Your responsibilities:
1. **Identify Anomalies**: Find genuine anomalous events from provided logs
2. **Root Cause Analysis**: Infer the most likely cause based on error messages, service dependencies, and timeline
3. **Risk Assessment**: Judge the severity and urgency of the issue
4. **Fix Recommendations**: Provide concrete, actionable remediation steps
Reply in the following JSON format:
{
"summary": "One-sentence problem summary",
"severity": "critical|high|medium|low|info",
"root_cause": "Detailed cause analysis",
"affected_services": ["List of affected components"],
"recommendations": ["Specific remediation steps"],
"keywords": ["Tags for future search"]
}
Note: Return ONLY JSON, no additional explanatory text."""
class LogAnalysisHandler(http.server.BaseHTTPRequestHandler):
def do_POST(self):
if self.path != '/logs':
self.send_response(404)
self.end_headers()
return
content_length = int(self.headers['Content-Length'])
log_entry = json.loads(self.rfile.read(content_length))
AGGREGATOR.add(log_entry)
if not FILTER.needs_ai_analysis(log_entry):
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps({"status": "filtered"}).encode())
return
aggregated = AGGREGATOR.get_aggregated()
user_content = self._build_prompt(aggregated)
response = self._call_llm(user_content)
if response and self._should_alert(response):
self._send_alert(response)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({
"status": "analyzed",
"analysis": response
}).encode())
def _build_prompt(self, aggregated):
log_snippets = []
for item in aggregated:
sample = item['sample']
count = item['count']
snippet = f"[{item['last_seen']}] [{sample.get('service', '?')}] " \
f"(occurred {count} times): {sample.get('message', '')}"
log_snippets.append(snippet)
return "Here are the collected anomalous logs:\n\n" + "\n".join(log_snippets) + \
"\n\nPlease analyze these logs and provide a diagnostic report."
def _call_llm(self, user_content: str) -> dict:
payload = {
"model": LLM_MODEL,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content}
],
"response_format": {"type": "json_object"},
"temperature": 0.3
}
headers = {
"Authorization": f"Bearer {LLM_API_KEY}",
"Content-Type": "application/json"
}
resp = requests.post(LLM_API_URL, json=payload, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
content = data['choices'][0]['message']['content']
return json.loads(content)
def _should_alert(self, analysis: dict) -> bool:
return analysis.get('severity') in ('critical', 'high')
def _send_alert(self, analysis: dict):
bot_token = "YOUR_BOT_TOKEN"
chat_id = "YOUR_CHAT_ID"
severity_emoji = {'critical': '🔴', 'high': '🟠', 'medium': '🟡', 'low': '🔵'}
emoji = severity_emoji.get(analysis.get('severity', 'low'), '🔵')
alert_text = (
f"{emoji} *VPS Anomaly Alert*\n\n"
f"*Summary*: {analysis.get('summary', 'N/A')}\n"
f"*Severity*: {analysis.get('severity', 'unknown')}\n\n"
f"*Root Cause*: {analysis.get('root_cause', 'N/A')}\n\n"
f"*Recommendations*:\n"
)
for i, rec in enumerate(analysis.get('recommendations', []), 1):
alert_text += f"{i}. {rec}\n"
telegram_url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
requests.post(telegram_url, json={
"chat_id": chat_id,
"text": alert_text,
"parse_mode": "Markdown"
}, timeout=10)
if __name__ == '__main__':
server = http.server.HTTPServer(('127.0.0.1', 8080), LogAnalysisHandler)
print("AI Log Analysis service started on :8080")
server.serve_forever()
4.3 Local Deployment with Ollama (Zero API Cost)
If you want to avoid API costs entirely, run a local model with Ollama:
# Install Ollama
curl -fsSL https://ollama.com/install.sh | sh
# Pull a model suitable for log analysis (Qwen 2.5 7B has strong Chinese understanding)
ollama pull qwen2.5:7b
# Create a custom Modelfile
cat > Modelfile << 'EOF'
FROM qwen2.5:7b
SYSTEM """You are a professional VPS operations expert specializing in log analysis and troubleshooting.
Respond in JSON format with summary, severity, root_cause, and recommendations fields."""
EOF
ollama create ai-log-analyzer -f Modelfile
# Start the Ollama server
ollama serve
Update ai_service.py:
LLM_API_URL = "http://localhost:11434/api/chat"
LLM_MODEL = "ai-log-analyzer"
Resource requirements: A 7B model needs approximately 4-6 GB RAM, so an 8GB+ VPS is recommended. For 2GB VPS instances, use cloud APIs with free tiers instead.
5. Alert Notification Integration
5.1 Telegram Bot Alerts
import telebot
TELEGRAM_BOT_TOKEN = "your_bot_token"
TELEGRAM_CHAT_ID = "your_chat_id"
def send_telegram_alert(summary: str, analysis: dict):
bot = telebot.TeleBot(TELEGRAM_BOT_TOKEN)
severity_map = {
'critical': '🔴 Critical',
'high': '🟠 High',
'medium': '🟡 Medium',
'low': '🔵 Low',
}
text = (
f"🖥️ *VPS Log Analysis Report*\n\n"
f"⚠️ *Status*: {severity_map.get(analysis.get('severity'), 'Unknown')}\n"
f"📋 *Summary*: {summary}\n\n"
f"*Root Cause*:\n{analysis.get('root_cause', 'N/A')}\n\n"
f"*Recommended Actions*:\n"
)
for i, rec in enumerate(analysis.get('recommendations', []), 1):
text += f"{i}. {rec}\n"
bot.send_message(TELEGRAM_CHAT_ID, text, parse_mode="Markdown")
5.2 DingTalk Robot Alerts
import hmac
import hashlib
import base64
import time
import urllib.parse
import requests
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=xxx"
SECRET = "SECxxxxxxxx"
def send_dingtalk_alert(analysis: dict):
timestamp = str(round(time.time() * 1000))
string_to_sign = f"{timestamp}\n{SECRET}"
hmac_code = hmac.new(
SECRET.encode(), string_to_sign.encode(), digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
url = f"{DINGTALK_WEBHOOK}×tamp={timestamp}&sign={sign}"
severity_text = {
'critical': '🔴 Critical Alert',
'high': '🟠 High Alert',
'medium': '🟡 Medium Alert',
'low': '🔵 Low Warning',
}
payload = {
"msgtype": "markdown",
"markdown": {
"title": f"VPS Anomaly Detected - {severity_text.get(analysis.get('severity', ''), 'Unknown')}",
"text": (
f"### {severity_text.get(analysis.get('severity', ''), 'Unknown')}\\n\\n"
f"**Summary**: {analysis.get('summary', 'N/A')}\\n\\n"
f"**Root Cause**: {analysis.get('root_cause', 'N/A')}\\n\\n"
f"**Recommendations**:\\n"
) + "".join(
f"{i}. {rec}\\n"
for i, rec in enumerate(analysis.get('recommendations', []), 1)
),
}
}
requests.post(url, json=payload, timeout=10)
6. Complete Deployment Guide
6.1 One-Click Deployment Script
#!/bin/bash
# deploy.sh - One-click deployment of AI log analysis system
set -euo pipefail
echo "🚀 Deploying AI log analysis system..."
# 1. Install Vector
echo "📦 Installing Vector..."
curl --proto '=https' --tlsv1.2 -sSf https://sh.vector.dev | bash -s -- -y
systemctl enable vector
systemctl start vector
# 2. Install Python dependencies
echo "🐍 Installing Python dependencies..."
pip3 install requests python-telegram-bot
# 3. Create AI analysis service directory
echo "🤖 Creating AI analysis service..."
mkdir -p ~/ai-log-analyzer
cd ~/ai-log-analyzer
# Copy filters.py, aggregator.py, ai_service.py here
# (create from the code blocks above)
# 4. Create systemd service
cat > /etc/systemd/system/ai-log-analyzer.service << 'EOF'
[Unit]
Description=AI Log Analysis Service
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/root/ai-log-analyzer
ExecStart=/usr/bin/python3 /root/ai-log-analyzer/ai_service.py
Restart=always
RestartSec=10
Environment="PATH=/usr/local/bin:/usr/bin:/bin"
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable ai-log-analyzer
systemctl start ai-log-analyzer
# 5. Configure Vector to output to AI service
cat > /etc/vector/vector.toml << 'EOF'
[sources.all_logs]
type = "docker_logs"
[transforms.filter_errors]
type = "remap"
inputs = ["all_logs"]
source = """
.important = contains(.message, "error") ||
contains(.message, "fatal") ||
contains(.message, "panic") ||
contains(.message, "OOM")
"""
[sinks.http_ai]
type = "http"
inputs = ["filter_errors"]
uri = "http://127.0.0.1:8080/logs"
method = "post"
encoding.codec = "json"
EOF
systemctl restart vector
echo "✅ Deployment complete!"
echo "📊 Vector log collection: systemd service 'vector'"
echo "🤖 AI analysis service: http://127.0.0.1:8080/logs"
echo "📱 Alert notifications: Telegram / DingTalk integrated"
6.2 Verify the Deployment
# Check Vector is running
systemctl status vector
# Check AI service is running
systemctl status ai-log-analyzer
# Simulate sending a test log
curl -X POST http://127.0.0.1:8080/logs \
-H "Content-Type: application/json" \
-d '{
"message": "Connection refused to database at 127.0.0.1:5432",
"service": "myapp",
"level": "error",
"timestamp": "'$(date -Iseconds)'",
"host": "vps-001"
}'
# You should receive a JSON analysis response
7. Advanced Optimizations
7.1 Historical Log RAG (Retrieval-Augmented Generation)
When the LLM answers “what is this problem?”, referencing historical solutions significantly improves accuracy. We can implement this with a lightweight vector store:
# rag_store.py - Simple vector knowledge base
import sqlite3
import hashlib
from datetime import datetime
class LogKnowledgeBase:
def __init__(self, db_path="/root/ai-log-analyzer/knowledge.db"):
self.conn = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
self.conn.execute('''
CREATE TABLE IF NOT EXISTS knowledge (
id INTEGER PRIMARY KEY AUTOINCREMENT,
problem_hash TEXT UNIQUE,
symptom TEXT,
root_cause TEXT,
solution TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
usage_count INTEGER DEFAULT 0
)
''')
self.conn.commit()
def store_solution(self, symptom: str, root_cause: str, solution: str):
"""Store a complete troubleshooting record"""
problem_hash = hashlib.sha256(symptom.encode()).hexdigest()[:16]
existing = self.conn.execute(
'SELECT id FROM knowledge WHERE problem_hash = ?',
(problem_hash,)
).fetchone()
if existing:
self.conn.execute(
'UPDATE knowledge SET usage_count = usage_count + 1 WHERE id = ?',
(existing[0],)
)
else:
self.conn.execute(
'INSERT INTO knowledge (problem_hash, symptom, root_cause, solution) VALUES (?, ?, ?, ?)',
(problem_hash, symptom, root_cause, solution)
)
self.conn.commit()
def search_similar(self, query: str, limit=3):
"""Search for similar historical incidents"""
results = self.conn.execute('''
SELECT symptom, root_cause, solution, usage_count
FROM knowledge
ORDER BY usage_count DESC
LIMIT ?
''', (limit,)).fetchall()
return results
Inject historical cases into the LLM system prompt:
knowledge_base = LogKnowledgeBase()
similar_cases = knowledge_base.search_similar(user_log_content)
if similar_cases:
cases_text = "\n\n".join([
f"Historical case - Symptom: {r[0]}, Cause: {r[1]}, Solution: {r[2]}"
for r in similar_cases
])
SYSTEM_PROMPT += f"\n\nRefer to similar historical cases:\n{cases_text}"
7.2 Scheduled Daily Log Review
Beyond real-time analysis, set up a daily scheduled review:
# crontab -e
# Analyze yesterday's log summary every day at 9 AM
0 9 * * * cd ~/ai-log-analyzer && python3 daily_report.py
# daily_report.py - Daily log review report
from datetime import datetime, timedelta
import subprocess
def generate_daily_report():
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
result = subprocess.run(
['journalctl', '--since', yesterday, '--until', f'{yesterday}T23:59:59',
'-p', 'err', '--no-pager'],
capture_output=True, text=True
)
logs = result.stdout
if not logs.strip():
print("No error logs yesterday — everything is fine ✅")
return
# Send to LLM for summarization
# ... (reuse the LLM calling logic from ai_service.py)
# Send daily report via Telegram
send_telegram_alert(f"📊 Yesterday's Log Review ({yesterday})", analysis)
generate_daily_report()
7.3 Resource Optimization
| Component | Memory Usage | Notes |
|---|---|---|
| Vector | 10-20 MB | Log collection pipeline |
| Python Service | 30-50 MB | Filtering + aggregation + HTTP server |
| LLM API Calls | ~0 (cloud) | On-demand, no resident memory |
| Ollama Local Model | 4-6 GB | 7B model, optional |
Recommended configurations: A 2GB VPS can run Cloud API + Vector + Python service. An 8GB+ VPS can deploy a local Ollama model for fully offline analysis.
8. Real-World Example
Suppose your Nginx and Docker produce these logs:
2026-06-16 02:15:33 [nginx] error: upstream timed out (110: Connection timed out)
2026-06-16 02:15:34 [nginx] error: 1 upstream server temporarily disabled
2026-06-16 02:15:35 [docker] myapp: Connection refused to postgres:5432
2026-06-16 02:15:36 [docker] myapp: Retrying connection (attempt 2/3)
2026-06-16 02:15:38 [kernel] Out of memory: Killed process 1234 (java)
The LLM might return:
{
"summary": "PostgreSQL connection timeout triggered a cascading failure, ultimately causing OOM kill of the Java process",
"severity": "critical",
"root_cause": "PostgreSQL became unreachable (possibly due to connection exhaustion or database crash), causing myapp retry storms, memory buildup, and triggering the kernel OOM killer",
"affected_services": ["nginx", "myapp", "postgres", "java"],
"recommendations": [
"Check PostgreSQL connections: SELECT count(*) FROM pg_stat_activity;",
"Verify database is running: systemctl status postgresql",
"Add connection pool timeouts and circuit breakers to myapp",
"Review Java process memory limits; consider increasing VPS RAM or adjusting JVM params",
"Enable PostgreSQL slow query log to check for table locks or full table scans"
],
"keywords": ["postgres-connection", "oom-killer", "retry-storm", "upstream-timeout"]
}
Then you receive a structured alert message via Telegram — crystal clear at a glance.
Summary
Introducing AI into VPS log analysis is essentially leveraging machine semantic understanding to compensate for human attention bottlenecks. The core value of this system:
- 🎯 Precision Filtering: Rule engines eliminate 95% of irrelevant logs; LLM is only called for genuinely important anomalies
- 🧠 Intelligent Diagnosis: LLM understands causal relationships between errors, providing analysis far beyond regex matching
- 💰 Cost Control: Daily LLM calls may number only a few to dozens — API costs remain minimal
- 🔧 Easy Extension: Can integrate with Metrics and Traces for a complete observability stack
Next steps:
- Get the Vector → Filter → AI pipeline running on a test VPS
- Gradually onboard more service logs
- Accumulate historical cases to build your own troubleshooting knowledge base
- Evolve from “reactive firefighting” to “proactive early warning”
Logs don’t lie, but humans get tired. Let AI keep watch while you sleep.
