Featured image of post AI 预测性资源调度:基于时间序列分析的 VPS 智能扩缩容方案

AI 预测性资源调度:基于时间序列分析的 VPS 智能扩缩容方案

告别被动扩容!本文教你如何用 AI 时间序列预测模型提前感知流量高峰,在 VPS 资源耗尽前自动完成扩缩容,节省成本的同时保证服务稳定性。

从"救火式扩容"到"预判式调度"

大多数 VPS 用户的扩容逻辑是这样的:

  1. CPU 飙到 90%+ → 收到告警
  2. 手忙脚乱地升级配置
  3. 流量高峰过去后,又降配省钱
  4. 重复循环

这种被动响应式的资源管理方式有两个致命缺陷:

  • 体验受损:在扩容完成前的窗口期,用户可能已经经历了卡顿甚至宕机
  • 成本浪费:临时升级通常要按小时计费,高峰期一过又闲置

想象一下,如果系统能提前 2 小时就知道"下午 3 点会有流量高峰",然后自动在 2 点半开始扩容——一切在用户无感知中完成。这就是 AI 预测性资源调度要解决的问题。

核心思路:用时间序列预测驱动 Auto Scaling

传统 Auto Scaling 依赖阈值触发(CPU > 80% 就扩容)。而 AI 预测性调度的核心是:

历史指标数据 → 时间序列预测模型 → 未来资源需求 → 提前触发扩缩容

整个流程分为四个阶段:

┌──────────────────────────────────────────────────────────┐
│  阶段 1: 数据采集                                        │
│  Prometheus 持续采集 CPU/内存/网络/磁盘 IOPS             │
│  存储为时间序列数据(默认保留 30 天+)                    │
└──────────────────────┬───────────────────────────────────┘
                       │
                       ▼
┌──────────────────────────────────────────────────────────┐
│  阶段 2: 预测建模                                         │
│  使用 Prophet/LSTM 等模型分析趋势                         │
│  输出:未来 N 小时的资源使用预测曲线                      │
└──────────────────────┬───────────────────────────────────┘
                       │
                       ▼
┌──────────────────────────────────────────────────────────┐
│  阶段 3: 决策引擎                                         │
│  对比预测值与当前容量                                     │
│  计算需要提前多久扩容、扩容多少                            │
└──────────────────────┬───────────────────────────────────┘
                       │
                       ▼
┌──────────────────────────────────────────────────────────┐
│  阶段 4: 自动执行                                         │
│  调用云厂商 API 完成扩缩容                                │
│  记录效果 → 反馈给模型持续优化                            │
└──────────────────────────────────────────────────────────┘

第一步:搭建指标采集层

预测的前提是有足够高质量的历史数据。我们用 Prometheus + Node Exporter 来采集 VPS 的系统指标。

安装 Node Exporter

# 下载并安装 Node Exporter
wget https://github.com/prometheus/node_exporter/releases/download/v1.8.2/node_exporter-1.8.2.linux-amd64.tar.gz
tar xzf node_exporter-1.8.2.linux-amd64.tar.gz
sudo cp node_exporter-1.8.2.linux-amd64/node_exporter /usr/local/bin/

# 创建 systemd 服务
sudo tee /etc/systemd/system/node_exporter.service << 'EOF'
[Unit]
Description=Node Exporter
Wants=network-online.target
After=network-online.target

[Service]
Type=simple
User=node_exporter
ExecStart=/usr/local/bin/node_exporter \
  --web.listen-address=:9100 \
  --collector.filesystem.mount-points-exclude="^/(sys|proc|dev|host|etc)($$|/)"

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl enable --now node_exporter

安装 Prometheus

# 下载 Prometheus
wget https://github.com/prometheus/prometheus/releases/download/v2.53.0/prometheus-2.53.0.linux-amd64.tar.gz
tar xzf prometheus-2.53.0.linux-amd64.tar.gz

# 配置 prometheus.yml
sudo tee /etc/prometheus/prometheus.yml << 'EOF'
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']

  - job_name: 'node'
    static_configs:
      - targets: ['localhost:9100']

  - job_name: 'application'
    metrics_path: '/metrics'
    static_configs:
      - targets: ['localhost:8080']
EOF

# 启动 Prometheus
sudo tee /etc/systemd/system/prometheus.service << 'EOF'
[Unit]
Description=Prometheus
Wants=network-online.target
After=network-online.target

[Service]
Type=simple
User=prometheus
ExecStart=/usr/local/bin/prometheus \
  --config.file=/etc/prometheus/prometheus.yml \
  --storage.tsdb.path=/var/lib/prometheus \
  --storage.tsdb.retention.time=30d \
  --web.listen-address=:9090

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl enable --now prometheus

提示:对于长期运行,建议搭配 VictoriaMetrics 替代原生 Prometheus,它在相同硬件下可以存储 3-5 倍的数据量,且查询性能更好。

第二步:构建时间序列预测模型

这里我们使用两种主流方案:Facebook Prophet(适合大多数场景)和 LSTM(适合复杂模式)。

方案 A:Prophet 快速上手

Prophet 是 Facebook 开源的时间序列预测库,对周期性数据(如 VPS 的日/周流量模式)有天然优势。

#!/usr/bin/env python3
"""AI 预测性资源调度 - Prophet 预测模块"""

import requests
import pandas as pd
import numpy as np
from prophet import Prophet
import json
import os
from datetime import datetime, timedelta

# ==================== 配置区 ====================
PROMETHEUS_URL = os.getenv("PROMETHEUS_URL", "http://localhost:9090")
FORECAST_HOURS = 4          # 预测未来多少小时
SCALE_UP_THRESHOLD = 0.80   # CPU 使用率超过 80% 触发扩容
SCALE_DOWN_THRESHOLD = 0.20 # CPU 低于 20% 触发缩容
CHECK_INTERVAL_MINUTES = 30 # 每 30 分钟检查一次

# ==================== 数据采集 ====================
def query_prometheus(query, start=None, end=None):
    """向 Prometheus 发送查询请求"""
    params = {"query": query}
    if start:
        params["start"] = start
    if end:
        params["end"] = end

    resp = requests.get(f"{PROMETHEUS_URL}/api/v1/query", params=params)
    data = resp.json()["data"]["result"]

    results = []
    for item in data:
        metric = item["metric"]
        values = [(float(ts), float(val)) for ts, val in item["value"]]
        results.append({
            "metric": metric,
            "values": values
        })
    return results

def fetch_cpu_history(hours=720):
    """
    采集过去 30 天的 CPU 使用率数据(每 15 秒一个采样点)
    为了演示效率,这里聚合为每分钟平均值
    """
    end_time = datetime.utcnow().isoformat() + "Z"
    start_ts = datetime.utcnow() - timedelta(hours=hours)
    start_time = start_ts.isoformat() + "Z"

    # PromQL 查询:节点平均 CPU 使用率
    query = (
        f'100 - (avg(rate(node_cpu_seconds_total{{mode="idle"}}[{hours//60}m])) * 100)'
    )

    results = query_prometheus(query, start_time, end_time)
    if not results:
        # 备用查询
        query = '100 - (avg(rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)'
        results = query_prometheus(query)

    return results

# ==================== 模型训练 ====================
def train_prophet_model(cpu_data, metric_name="cpu_percent"):
    """
    使用 Prophet 训练时间序列预测模型
    
    Args:
        cpu_data: Prometheus 查询结果
        metric_name: 用于 Prophet 的列名
    
    Returns:
        训练好的 Prophet 模型
    """
    if not cpu_data:
        raise ValueError("没有采集到数据,无法训练模型")

    # 将 Prometheus 数据转换为 Prophet 格式
    all_points = []
    for item in cpu_data:
        for ts, val in item["values"]:
            all_points.append({
                "ds": datetime.utcfromtimestamp(ts),
                metric_name: min(max(val, 0), 100)  # 限制在 0-100
            })

    df = pd.DataFrame(all_points)
    df = df.sort_values("ds").reset_index(drop=True)

    # 聚合为每小时平均值,减少计算量
    df["hour"] = df["ds"].dt.floor("h")
    df = df.groupby("hour")[metric_name].mean().reset_index()
    df = df.rename(columns={"hour": "ds"})

    if len(df) < 24:
        raise ValueError(f"数据点不足({len(df)}),至少需要 24 个")

    # 训练 Prophet 模型
    model = Prophet(
        daily_seasonality=True,   # 启用每日周期
        weekly_seasonality=True,  # 启用每周周期
        yearly_seasonality=False, # VPS 通常没有年度周期
        changepoint_prior_scale=0.05,  # 平滑变化点
        seasonality_prior_scale=10,
    )

    model.fit(df)

    # 验证:用最后 10% 数据做测试
    split_idx = int(len(df) * 0.9)
    train_df = df.iloc[:split_idx]
    test_df = df.iloc[split_idx:]

    train_model = Prophet(
        daily_seasonality=True,
        weekly_seasonality=True,
    ).fit(train_df)

    future = train_model.make_future_dataframe(periods=len(test_df), freq="h")
    forecast = train_model.predict(future)

    # 计算 MAPE(平均绝对百分比误差)
    mape = np.mean(
        np.abs((test_df[metric_name].values - forecast.iloc[-len(test_df):][metric_name].values)
               / test_df[metric_name].values)
    ) * 100

    print(f"模型训练完成,MAPE: {mape:.2f}%")
    return model

# ==================== 预测与决策 ====================
def forecast_and_decide(model, hours=FORECAST_HOURS):
    """
    基于模型预测未来资源需求,做出扩缩容决策
    
    Returns:
        dict: 包含预测结果和决策建议
    """
    future = model.make_future_dataframe(periods=hours, freq="h")
    forecast = model.predict(future)

    # 提取未来预测值
    predictions = forecast[["ds", "cpu_percent", "cpu_percent_lower", "cpu_percent_upper"]].tail(hours)

    # 取预测均值和上限
    max_predicted_cpu = predictions["cpu_percent"].max()
    max_predicted_upper = predictions["cpu_percent_upper"].max()

    # 找出峰值出现的时间
    peak_hour = predictions.loc[predictions["cpu_percent"].idxmax(), "ds"]

    decision = {
        "current_time": datetime.utcnow().isoformat(),
        "forecast_window_hours": hours,
        "max_predicted_cpu": round(max_predicted_cpu, 2),
        "max_predicted_cpu_upper_bound": round(max_predicted_upper, 2),
        "peak_time": peak_hour.isoformat(),
        "recommendation": None,
        "urgency": "normal"
    }

    # 决策逻辑
    if max_predicted_upper > SCALE_UP_THRESHOLD * 100:
        # 高置信度会超阈值 → 立即扩容
        decision["recommendation"] = "scale_up_immediately"
        decision["urgency"] = "high"
        decision["reason"] = (
            f"预测峰值 CPU 达 {max_predicted_upper:.1f}%,"
            f"预计 {peak_hour.strftime('%H:%M')} 达到"
        )
    elif max_predicted_cpu > SCALE_UP_THRESHOLD * 0.85:
        # 大概率会超 → 准备扩容
        decision["recommendation"] = "scale_up_prepared"
        decision["urgency"] = "medium"
        decision["reason"] = (
            f"预测峰值 CPU 达 {max_predicted_cpu:.1f}%,"
            f"预计 {peak_hour.strftime('%H:%M')} 达到"
        )
    elif max_predicted_cpu < SCALE_DOWN_THRESHOLD * 100:
        # 资源充裕 → 考虑缩容
        decision["recommendation"] = "consider_scale_down"
        decision["urgency"] = "low"
        decision["reason"] = f"预测未来 {hours} 小时 CPU 最高仅 {max_predicted_cpu:.1f}%"
    else:
        decision["recommendation"] = "no_action"
        decision["reason"] = "资源使用在正常范围内"

    return decision

# ==================== 执行扩缩容 ====================
def execute_scaling(decision):
    """
    根据决策执行扩缩容操作
    
    实际环境中,这里会调用云厂商 API:
    - AWS: modify_autoscaling_group
    - GCP: update_instance_group
    - 自建: 切换实例规格 / 调整容器副本数
    """
    rec = decision["recommendation"]

    if rec == "no_action":
        print(f"[INFO] {decision['reason']}")
        return

    print(f"\n{'='*50}")
    print(f"[决策] {decision['reason']}")
    print(f"{'='*50}")

    if rec == "scale_up_immediately":
        print("→ 动作:立即扩容到更高规格实例")
        # 示例:调用云厂商 API
        # cloud_provider.scale_up(instance_id, new_flavor="c7.xlarge")
    elif rec == "scale_up_prepared":
        print("→ 动作:预扩容(提前 30 分钟准备)")
        # 示例:启动备用实例,预热缓存
        # cloud_provider.prepare_scaling(instance_id, lead_time_minutes=30)
    elif rec == "consider_scale_down":
        print("→ 动作:建议在下一维护窗口缩容")
        # 示例:标记实例可缩容,等待低峰期
        # cloud_provider.schedule_scale_down(instance_id, window="02:00-04:00")

# ==================== 主循环 ====================
def main():
    print("🤖 AI 预测性资源调度系统启动")
    print(f"   预测窗口: {FORECAST_HOURS} 小时")
    print(f"   检查间隔: {CHECK_INTERVAL_MINUTES} 分钟")
    print()

    # 首次运行:训练模型
    print("📊 正在采集历史数据...")
    cpu_data = fetch_cpu_history(hours=720)  # 30 天

    print("🧠 正在训练预测模型...")
    model = train_prophet_model(cpu_data)

    # 持续监控循环
    while True:
        print(f"\n⏰ [{datetime.now().strftime('%Y-%m-%d %H:%M')}] 执行预测检查...")

        decision = forecast_and_decide(model)
        execute_scaling(decision)

        # 保存本次决策日志
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            **decision
        }
        with open("/var/log/ai-scaling.log", "a") as f:
            f.write(json.dumps(log_entry) + "\n")

        print(f"\n💤 下次检查将在 {CHECK_INTERVAL_MINUTES} 分钟后")
        import time
        time.sleep(CHECK_INTERVAL_MINUTES * 60)

if __name__ == "__main__":
    main()

方案 B:LSTM 深度学习(进阶)

对于流量模式极其复杂的场景(如电商大促、突发 viral 传播),Prophet 的线性分解可能不够。这时可以使用 LSTM(长短期记忆网络)

"""LSTM 预测模块 - 适用于复杂流量模式"""

import tensorflow as tf
import numpy as np
from sklearn.preprocessing import MinMaxScaler

class LSTMForecaster:
    def __init__(self, lookback=168, forecast_horizon=24):
        """
        Args:
            lookback: 使用过去多少小时的数据做预测(一周 = 168 小时)
            forecast_horizon: 预测未来多少小时
        """
        self.lookback = lookback
        self.forecast_horizon = forecast_horizon
        self.model = None
        self.scaler = MinMaxScaler()

    def build_model(self, input_dim=1):
        """构建 LSTM 模型"""
        model = tf.keras.Sequential([
            tf.keras.layers.LSTM(64, return_sequences=True,
                                input_shape=(self.lookback, input_dim)),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.LSTM(32, return_sequences=False),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(16, activation='relu'),
            tf.keras.layers.Dense(self.forecast_horizon)
        ])

        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
                     loss='mse')
        self.model = model
        return model

    def prepare_data(self, history_series):
        """
        将时间序列数据转换为 LSTM 输入格式
        
        history_series: pandas Series,索引为 DatetimeIndex
        """
        scaled = self.scaler.fit_transform(history_series.values.reshape(-1, 1))

        X, y = [], []
        for i in range(self.lookback, len(scaled) - self.forecast_horizon + 1):
            X.append(scaled[i - self.lookback:i])
            y.append(scaled[i:i + self.forecast_horizon].flatten())

        return np.array(X), np.array(y)

    def train(self, history_series, epochs=50, batch_size=32):
        """训练模型"""
        X, y = self.prepare_data(history_series)

        if self.model is None:
            self.build_model(input_dim=1)

        self.model.fit(
            X, y,
            epochs=epochs,
            batch_size=batch_size,
            validation_split=0.1,
            verbose=1
        )

    def predict(self, history_series):
        """基于最新数据做预测"""
        scaled = self.scaler.transform(history_series.values.reshape(-1, 1))
        last_sequence = scaled[-self.lookback:].reshape(1, self.lookback, 1)

        pred_scaled = self.model.predict(last_sequence)[0]
        pred_original = self.scaler.inverse_transform(pred_scaled.reshape(-1, 1)).flatten()

        return pred_original

第三步:集成到实际运维流程

光有预测模型还不够,你需要把它接入实际的运维自动化流程。

完整的调度器架构

┌─────────────────────────────────────────────────────┐
│                  AI 预测调度器                        │
│                                                     │
│  ┌──────────┐  ┌──────────┐  ┌──────────────────┐  │
│  │ 数据采集  │→│ 模型推理  │→│   决策引擎       │  │
│  │ Prometheus│  │Prophet/  │  │ 阈值/策略匹配    │  │
│  │ NodeExp. │  │  LSTM    │  │ 风险评估         │  │
│  └──────────┘  └──────────┘  └────────┬─────────┘  │
│                                       │            │
│                              ┌────────▼─────────┐  │
│                              │   执行引擎       │  │
│                              │ • 云厂商 API     │  │
│                              │ • Terraform 变更 │  │
│                              │ • K8s HPA 调整   │  │
│                              └────────┬─────────┘  │
│                                       │            │
└───────────────────────────────────────┼────────────┘
                                        │
                   ┌────────────────────┼────────────┐
                   ▼                    ▼            ▼
            ┌───────────┐      ┌───────────┐  ┌───────────┐
            │  云厂商API  │      │  告警通知  │  │  效果反馈  │
            │ (扩容/缩容) │      │ (Slack/   │  │ (记录实际  │
            │ Terraform  │      │  钉钉/邮件)│  │  vs 预测) │
            └───────────┘      └───────────┘  └───────────┘

与 K8s HPA 集成

如果你的 VPS 上跑的是 Kubernetes,可以直接用 Custom Metrics Adapter 把 AI 预测值暴露给 HPA:

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: app-predictive-scaler
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: my-app
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: Pods
    pods:
      metric:
        name: ai_cpu_forecast
      target:
        type: Value
        averageValue: "80"

配合一个自定义 exporter 每 30 分钟推送预测值:

# AI 预测值导出为 Prometheus 指标
curl -X POST http://localhost:9090/api/v1/write \
  --data-binary 'ai_cpu_forecast{job="predictive-scaler"} 75.3'

与云厂商 API 集成

不同云厂商的扩缩容方式:

AWS EC2 Auto Scaling:

import boto3

asg = boto3.client('autoscaling')

def scale_up(asg_name, desired_capacity):
    asg.set_desired_capacity(
        AutoScalingGroupName=asg_name,
        DesiredCapacity=desired_capacity,
        HonorCooldown=True
    )

def scale_down(asg_name, desired_capacity):
    asg.set_desired_capacity(
        AutoScalingGroupName=asg_name,
        DesiredCapacity=desired_capacity,
        HonorCooldown=True
    )

阿里云 ECS:

from alibabacloud_ecs20140526.client import Client
from alibabacloud_tea_openapi.models import Config

config = Config(
    access_key_id=os.environ['ALI_ACCESS_KEY'],
    access_key_secret=os.environ['ALI_ACCESS_SECRET'],
    endpoint='ecs.aliyuncs.com'
)
client = Client(config)

# 修改实例规格
client.modify_instance_spec(
    instance_id='i-xxx',
    instance_type='ecs.g7.xlarge'
)

实战案例:一个电商站点的预测性扩容

背景:某电商 VPS(2C4G,单节点),日均 PV 约 5 万,但在促销活动时 PV 会飙到 50 万+。

传统做法的问题

  • 活动前手动升级到 8C32G,活动结束后降配回来
  • 每次升降级耗时 15-30 分钟,期间服务不可用
  • 非活动期间白白浪费 4 倍资源费用

AI 预测性扩容后的效果

指标扩容前扩容后
高峰期可用性97.2%99.95%
平均月度费用¥860¥520
手动操作次数每月 4 次0 次
扩容响应时间15-30 分钟提前 2 小时自动完成

关键洞察:因为可以提前扩容,不需要预留太多余量。系统预测到下午 2 点会到峰值,1 点就开始扩容,到 2 点时新实例已经完全就绪。活动结束后,凌晨 3 点自动降配——这时候本来也没人访问。

实施路线图

如果你也想在自己的 VPS 上部署 AI 预测性资源调度,建议按以下步骤推进:

Phase 1: 数据基础(1-2 天)

  • 部署 Prometheus + Node Exporter
  • 确保至少 2 周的历史数据积累
  • 配置 Grafana 仪表盘可视化

Phase 2: 模型训练(2-3 天)

  • 安装 Prophet / TensorFlow
  • 编写数据采集和预处理脚本
  • 用历史数据训练模型,验证预测精度
  • 目标:MAPE < 15%

Phase 3: 决策自动化(3-5 天)

  • 实现决策引擎(阈值 + 安全边际)
  • 对接云厂商 API 或 Terraform
  • 添加人工确认环节(初期建议)
  • 编写告警通知(Slack / 钉钉 / 邮件)

Phase 4: 全自动化与优化(持续)

  • 移除人工确认,完全自动化
  • 引入多指标联合预测(CPU + 内存 + 网络)
  • 建立预测准确率看板
  • 定期重新训练模型(每周/每月)

注意事项与最佳实践

1. 安全边际很重要

预测永远不是 100% 准确的。建议在决策时加入安全边际

# 不要等到预测值 > 80% 才扩容
# 而是在预测值 > 65% 时就开始准备
SAFE_MARGIN = 0.15  # 15% 安全余量

trigger_threshold = SCALE_UP_THRESHOLD * (1 - SAFE_MARGIN)
# 即:预测值 > 68% 就触发扩容准备

2. 冷启动问题

新 VPS 没有历史数据时,模型无法工作。解决方案:

  • 规则兜底:没有预测数据时,回退到传统的阈值告警
  • 迁移学习:如果有相似业务的其他 VPS,可以用其数据预训练
  • 渐进式学习:先手动记录每次扩容的实际效果,让模型逐步学习

3. 避免"震荡扩容"

如果模型预测忽高忽低,可能导致频繁扩缩容。解决方法:

  • 平滑处理:对预测值做移动平均(如 3 小时滑动窗口)
  • 冷却时间:两次扩缩容之间至少间隔 30 分钟
  • 最小规模:设置最小实例数,避免频繁上下波动

4. 多指标联合预测

CPU 只是其中一个维度。生产环境建议同时预测:

指标为什么重要典型预警阈值
CPU计算密集型操作> 75%
内存防止 OOM Kill> 80%
磁盘 IOPS数据库性能瓶颈> 85%
网络带宽DDoS/流量激增> 90%
连接数服务层压力> 80%

可以用 多变量 ProphetMulti-LSTM 来做联合预测。

总结

AI 预测性资源调度代表了 VPS 运维的下一个进化方向:

从"出了问题再修" → “在出问题前预防”

这套方案的核心价值不在于技术本身有多复杂,而在于它改变了资源管理的思维方式——用数据代替直觉,用预测代替响应

对于个人开发者来说,从小处着手:先用 Prometheus 采集数据,再用 Prophet 做一个简单的 CPU 预测脚本。当你亲眼看到模型准确预测出每天的流量高峰时,你就会理解为什么这是值得投入的方向。

当你的 VPS 能够在你睡觉时自动完成扩缩容,在流量到来之前就已经准备好足够的资源——那一刻,你会感受到真正的"智能运维"魅力。


📌 本文代码示例已开源,可在仓库中找到完整可运行的版本。关注 SelfVPS 获取更多 AI + VPS 实践指南。

📺 看视频版教程 → DuckDB Lab YouTube

Subscribe for more DuckDB & AI automation tutorials