AI Agent灾难恢复:当AI”挂了”怎么办?

22次阅读
没有评论

AI Agent灾难恢复:当AI”挂了”怎么办?

一、开场:凌晨三点的告警电话

大家好,我是老金。

上周三凌晨3点,被告警电话吵醒:AI Agent服务全面不可用。

爬起来看日志:

  • LLM API返回429错误(超限)
  • 数据库连接池耗尽
  • 向量数据库挂了
  • 服务响应超时

用户在群里骂:”AI怎么又挂了?”

我一边处理,一边想:如果有完善的灾难恢复方案,就不会这么狼狈了。

今天分享AI Agent的灾难恢复策略,从预防到恢复的完整方案。

二、AI Agent的灾难类型

灾难分类

┌─────────────────────────────────────────────────────────┐
│                  AI Agent灾难分类                        │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │ 依赖故障                                         │   │
│  │ • LLM API不可用                                  │   │
│  │ • 向量数据库宕机                                 │   │
│  │ • 外部工具服务故障                               │   │
│  └─────────────────────────────────────────────────┘   │
│                                                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │ 资源耗尽                                         │   │
│  │ • Token配额用尽                                  │   │
│  │ • 数据库连接池耗尽                               │   │
│  │ • 内存溢出                                       │   │
│  │ • CPU过载                                        │   │
│  └─────────────────────────────────────────────────┘   │
│                                                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │ 质量灾难                                         │   │
│  │ • 输出质量骤降                                   │   │
│  │ • 大量幻觉输出                                   │   │
│  │ • 恶意Prompt攻击                                 │   │
│  └─────────────────────────────────────────────────┘   │
│                                                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │ 数据灾难                                         │   │
│  │ • 知识库被误删                                   │   │
│  │ • 配置被错误修改                                 │   │
│  │ • 用户数据丢失                                   │   │
│  └─────────────────────────────────────────────────┘   │
│                                                         │
└─────────────────────────────────────────────────────────┘

影响等级

等级 描述 影响范围 恢复时间目标
P0 完全不可用 全部用户 < 15分钟
P1 核心功能不可用 部分用户 < 1小时
P2 部分功能降级 少量用户 < 4小时
P3 轻微影响 个别用户 str:
    """生成缓存键"""
    return hashlib.md5(prompt.encode()).hexdigest()

## 四、向量数据库故障处理

### 多副本架构

```python
class VectorStoreFailover:
    """向量数据库故障切换"""

    def __init__(self, primary, replica):
        self.primary = primary
        self.replica = replica
        self.primary_healthy = True

    async def search(self, query_vector, top_k: int = 10):
        """搜索(带故障切换)"""
        if self.primary_healthy:
            try:
                return await self.primary.search(query_vector, top_k)
            except Exception as e:
                logger.error(f"Primary vector store failed: {e}")
                self.primary_healthy = False
                # 切换到副本
                asyncio.create_task(self._check_primary_health())

        # 使用副本
        return await self.replica.search(query_vector, top_k)

    async def add(self, vectors, metadata):
        """添加向量(写主写从)"""
        try:
            # 写主库
            await self.primary.add(vectors, metadata)

            # 异步同步到副本
            asyncio.create_task(
                self.replica.add(vectors, metadata)
            )
        except Exception as e:
            logger.error(f"Failed to add vectors: {e}")
            raise

    async def _check_primary_health(self):
        """检查主库健康"""
        while True:
            await asyncio.sleep(30)
            try:
                await self.primary.ping()
                self.primary_healthy = True
                logger.info("Primary vector store recovered")
                return
            except:
                continue

离线模式

class OfflineVectorStore:
    """离线向量存储(降级模式)"""

    def __init__(self, snapshot_path: str):
        self.snapshot_path = Path(snapshot_path)
        self.local_index = None
        self._load_snapshot()

    def _load_snapshot(self):
        """加载本地快照"""
        snapshot_file = self.snapshot_path / "latest.json"
        if snapshot_file.exists():
            with open(snapshot_file, 'r') as f:
                self.local_index = json.load(f)
            logger.info(f"Loaded offline snapshot: {len(self.local_index)} vectors")

    async def search(self, query_vector, top_k: int = 10):
        """本地搜索(降级模式)"""
        if not self.local_index:
            raise ValueError("No offline snapshot available")

        # 简单的余弦相似度搜索
        similarities = []
        for item in self.local_index:
            similarity = cosine_similarity(query_vector, item["vector"])
            similarities.append((similarity, item))

        # 排序返回top_k
        similarities.sort(reverse=True, key=lambda x: x[0])
        return [item for _, item in similarities[:top_k]]

五、资源耗尽处理

Token配额管理

class QuotaManager:
    """配额管理器"""

    def __init__(self, daily_limit: int, alert_threshold: float = 0.8):
        self.daily_limit = daily_limit
        self.alert_threshold = alert_threshold
        self.usage = {"input": 0, "output": 0}
        self.usage_date = datetime.now().date()

    def record_usage(self, input_tokens: int, output_tokens: int):
        """记录使用量"""
        self._check_date()

        self.usage["input"] += input_tokens
        self.usage["output"] += output_tokens

        total = self.usage["input"] + self.usage["output"]

        # 检查告警阈值
        if total &gt; self.daily_limit * self.alert_threshold:
            self._send_alert(total)

        # 检查是否超限
        if total &gt; self.daily_limit:
            raise QuotaExceededError(
                f"Daily quota exceeded: {total}/{self.daily_limit}"
            )

    def get_remaining(self) -&gt; int:
        """获取剩余配额"""
        self._check_date()
        total = self.usage["input"] + self.usage["output"]
        return max(0, self.daily_limit - total)

    def _check_date(self):
        """检查日期,每天重置"""
        today = datetime.now().date()
        if today != self.usage_date:
            self.usage = {"input": 0, "output": 0}
            self.usage_date = today

    def _send_alert(self, current_usage: int):
        """发送告警"""
        message = f"""
        ⚠️ Token配额告警

        当前使用:{current_usage:,}
        日配额:{self.daily_limit:,}
        使用率:{current_usage/self.daily_limit*100:.1f}%

        请关注配额使用情况。
        """
        send_alert(message)

优雅降级

class DegradationManager:
    """降级管理器"""

    def __init__(self):
        self.degradation_rules = {
            "token_quota_low": {
                "trigger": lambda ctx: ctx.remaining_quota  5,
                "action": "enable_cache_mode"
            },
            "latency_high": {
                "trigger": lambda ctx: ctx.p99_latency &gt; 30,
                "action": "reduce_concurrency"
            },
            "error_rate_high": {
                "trigger": lambda ctx: ctx.error_rate &gt; 0.1,
                "action": "enable_fallback_mode"
            }
        }

        self.degradation_state = "normal"  # normal, degraded, minimal

    def check_and_degrade(self, context: dict) -&gt; str:
        """检查并执行降级"""
        for rule_name, rule in self.degradation_rules.items():
            if rule["trigger"](type('Context', (), context)):
                action = rule["action"]
                self._execute_action(action)
                self.degradation_state = "degraded"
                return action

        return None

    def _execute_action(self, action: str):
        """执行降级动作"""
        if action == "switch_to_cheap_model":
            # 切换到便宜模型
            update_model("gpt-3.5-turbo")
            logger.warning("Switched to cheaper model due to quota")

        elif action == "enable_cache_mode":
            # 启用缓存优先模式
            enable_cache_priority()
            logger.warning("Enabled cache priority mode")

        elif action == "reduce_concurrency":
            # 降低并发
            set_max_concurrency(5)
            logger.warning("Reduced concurrency due to high latency")

        elif action == "enable_fallback_mode":
            # 启用降级模式
            enable_static_responses()
            logger.warning("Enabled fallback mode due to errors")

    def recover(self):
        """恢复到正常模式"""
        self.degradation_state = "normal"
        restore_normal_settings()
        logger.info("Recovered to normal mode")

六、数据灾难恢复

知识库备份策略

class KnowledgeBaseBackup:
    """知识库备份"""

    def __init__(self, vector_store, backup_dir: str = "backups"):
        self.vector_store = vector_store
        self.backup_dir = Path(backup_dir)
        self.backup_dir.mkdir(exist_ok=True)

    async def create_backup(self, collection: str):
        """创建备份"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_id = f"{collection}_{timestamp}"

        # 导出数据
        data = await self.vector_store.export_collection(collection)

        # 保存备份
        backup_file = self.backup_dir / f"{backup_id}.json"
        backup_data = {
            "id": backup_id,
            "collection": collection,
            "created_at": datetime.now().isoformat(),
            "document_count": len(data),
            "data": data
        }

        with open(backup_file, 'w') as f:
            json.dump(backup_data, f)

        # 压缩
        self._compress_backup(backup_file)

        # 清理旧备份(保留最近7天)
        self._cleanup_old_backups(collection, keep_days=7)

        return backup_id

    async def restore_backup(self, backup_id: str):
        """恢复备份"""
        backup_file = self.backup_dir / f"{backup_id}.json"

        if not backup_file.exists():
            # 尝试解压
            self._decompress_backup(backup_id)

        if not backup_file.exists():
            raise ValueError(f"Backup {backup_id} not found")

        with open(backup_file, 'r') as f:
            backup_data = json.load(f)

        # 清空当前集合
        await self.vector_store.clear_collection(backup_data["collection"])

        # 恢复数据
        await self.vector_store.import_data(
            backup_data["collection"],
            backup_data["data"]
        )

        logger.info(f"Restored backup {backup_id}: {backup_data['document_count']} documents")

        return {
            "status": "restored",
            "backup_id": backup_id,
            "document_count": backup_data["document_count"]
        }

    def list_backups(self, collection: str = None) -&gt; list:
        """列出备份"""
        backups = []

        for backup_file in self.backup_dir.glob("*.json"):
            with open(backup_file, 'r') as f:
                data = json.load(f)

            if collection is None or data["collection"] == collection:
                backups.append({
                    "id": data["id"],
                    "collection": data["collection"],
                    "created_at": data["created_at"],
                    "document_count": data["document_count"]
                })

        return sorted(backups, key=lambda x: x["created_at"], reverse=True)

    def _compress_backup(self, backup_file: Path):
        """压缩备份"""
        import gzip
        import shutil

        with open(backup_file, 'rb') as f_in:
            with gzip.open(f"{backup_file}.gz", 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)

        backup_file.unlink()  # 删除原文件

    def _cleanup_old_backups(self, collection: str, keep_days: int):
        """清理旧备份"""
        cutoff = datetime.now() - timedelta(days=keep_days)

        for backup_file in self.backup_dir.glob(f"{collection}_*.json.gz"):
            # 从文件名解析日期
            date_str = backup_file.stem.split('_')[-2] + backup_file.stem.split('_')[-1]
            backup_date = datetime.strptime(date_str, "%Y%m%d%H%M%S")

            if backup_date &lt; cutoff:
                backup_file.unlink()
                logger.info(f&quot;Deleted old backup: {backup_file}&quot;)

自动备份调度

from apscheduler.schedulers.asyncio import AsyncIOScheduler

class BackupScheduler:
    &quot;&quot;&quot;备份调度器&quot;&quot;&quot;

    def __init__(self, kb_backup: KnowledgeBaseBackup):
        self.kb_backup = kb_backup
        self.scheduler = AsyncIOScheduler()

    def start(self):
        &quot;&quot;&quot;启动调度&quot;&quot;&quot;
        # 每天凌晨2点备份
        self.scheduler.add_job(
            self._daily_backup,
            &#039;cron&#039;,
            hour=2,
            minute=0
        )

        # 每周日凌晨3点完整备份
        self.scheduler.add_job(
            self._weekly_full_backup,
            &#039;cron&#039;,
            day_of_week=&#039;sun&#039;,
            hour=3,
            minute=0
        )

        self.scheduler.start()

    async def _daily_backup(self):
        &quot;&quot;&quot;每日备份&quot;&quot;&quot;
        collections = [&quot;customer_service_kb&quot;, &quot;product_kb&quot;]

        for collection in collections:
            try:
                backup_id = await self.kb_backup.create_backup(collection)
                logger.info(f&quot;Daily backup created: {backup_id}&quot;)
            except Exception as e:
                logger.error(f&quot;Daily backup failed for {collection}: {e}&quot;)

    async def _weekly_full_backup(self):
        &quot;&quot;&quot;每周完整备份&quot;&quot;&quot;
        # 导出所有集合
        # 上传到云存储
        pass

七、故障恢复流程

标准恢复流程

class DisasterRecovery:
    &quot;&quot;&quot;灾难恢复&quot;&quot;&quot;

    RECOVERY_STEPS = [
        {
            &quot;phase&quot;: &quot;detection&quot;,
            &quot;steps&quot;: [
                &quot;确认故障类型&quot;,
                &quot;评估影响范围&quot;,
                &quot;确定恢复优先级&quot;
            ]
        },
        {
            &quot;phase&quot;: &quot;containment&quot;,
            &quot;steps&quot;: [
                &quot;隔离故障组件&quot;,
                &quot;启动降级模式&quot;,
                &quot;通知相关方&quot;
            ]
        },
        {
            &quot;phase&quot;: &quot;recovery&quot;,
            &quot;steps&quot;: [
                &quot;执行恢复操作&quot;,
                &quot;验证服务恢复&quot;,
                &quot;逐步恢复流量&quot;
            ]
        },
        {
            &quot;phase&quot;: &quot;postmortem&quot;,
            &quot;steps&quot;: [
                &quot;分析故障原因&quot;,
                &quot;更新文档&quot;,
                &quot;改进监控&quot;
            ]
        }
    ]

    async def execute_recovery(self, incident: dict):
        &quot;&quot;&quot;执行恢复流程&quot;&quot;&quot;
        recovery_log = []

        for phase in self.RECOVERY_STEPS:
            phase_result = await self._execute_phase(phase, incident)
            recovery_log.append(phase_result)

            if not phase_result[&quot;success&quot;]:
                break

        return {
            &quot;incident&quot;: incident,
            &quot;recovery_log&quot;: recovery_log,
            &quot;status&quot;: &quot;recovered&quot; if phase_result[&quot;success&quot;] else &quot;failed&quot;
        }

    async def _execute_phase(self, phase: dict, incident: dict):
        &quot;&quot;&quot;执行恢复阶段&quot;&quot;&quot;
        results = []

        for step in phase[&quot;steps&quot;]:
            result = await self._execute_step(step, incident)
            results.append({
                &quot;step&quot;: step,
                &quot;result&quot;: result
            })

        return {
            &quot;phase&quot;: phase[&quot;phase&quot;],
            &quot;success&quot;: all(r[&quot;result&quot;][&quot;success&quot;] for r in results),
            &quot;results&quot;: results
        }

    async def _execute_step(self, step: str, incident: dict):
        &quot;&quot;&quot;执行恢复步骤&quot;&quot;&quot;
        # 根据步骤类型执行相应操作
        if step == &quot;确认故障类型&quot;:
            return await self._identify_fault_type(incident)
        elif step == &quot;隔离故障组件&quot;:
            return await self._isolate_faulty_component(incident)
        # ... 其他步骤

        return {&quot;success&quot;: True, &quot;message&quot;: f&quot;Executed: {step}&quot;}

恢复操作手册

# AI Agent故障恢复手册

## 场景1:LLM API不可用

### 症状
- API返回5xx错误
- 超时错误增加
- 响应时间变长

### 恢复步骤
1. 检查API状态页面
2. 切换到备用供应商
   ```bash
   python scripts/switch_llm_provider.py --provider anthropic
  1. 启用缓存模式
    python scripts/enable_cache_mode.py
  2. 通知用户服务降级

预计恢复时间

  • 自动切换:1-2分钟
  • 手动干预:5-10分钟

场景2:向量数据库故障

症状

  • RAG检索失败
  • 搜索返回空结果

恢复步骤

  1. 切换到副本
    python scripts/switch_vector_store.py --target replica
  2. 启用离线模式
    python scripts/enable_offline_mode.py
  3. 恢复数据(如果需要)
    python scripts/restore_backup.py --backup-id 

预计恢复时间

  • 切换副本:1分钟
  • 数据恢复:5-30分钟

场景3:Token配额耗尽

症状

  • API返回配额错误
  • 服务拒绝请求

恢复步骤

  1. 切换到便宜模型
    python scripts/switch_model.py --model gpt-3.5-turbo
  2. 启用缓存优先
  3. 紧急充值配额
  4. 申请增加配额

预计恢复时间

  • 切换模型:即时
  • 充值配额:10-30分钟

八、演练与测试

故障注入测试

class ChaosTesting:
    """混沌测试"""

    async def inject_llm_failure(self):
        """注入LLM故障"""
        # 模拟API超时
        mock_llm.set_timeout(60)

        # 验证故障切换是否工作
        response = await agent.run("测试消息")

        assert response is not None
        assert response.provider != "primary"

    async def inject_vector_store_failure(self):
        """注入向量库故障"""
        # 关闭向量库
        await vector_store.stop()

        # 验证降级模式
        response = await agent.run("搜索产品")

        assert response is not None
        assert "缓存" in response or "降级" in response

    async def inject_high_load(self):
        """注入高负载"""
        # 模拟100并发请求
        tasks = [agent.run(f"测试消息{i}") for i in range(100)]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        success_count = sum(1 for r in results if not isinstance(r, Exception))
        assert success_count &gt;= 80  # 至少80%成功

定期演练

# 演练计划
DRILL_SCHEDULE = {
    "monthly": [
        {
            "name": "LLM切换演练",
            "procedure": "手动触发LLM故障,验证自动切换",
            "duration": "15分钟",
            "impact": "无"
        },
        {
            "name": "备份恢复演练",
            "procedure": "从备份恢复知识库",
            "duration": "30分钟",
            "impact": "测试环境"
        }
    ],
    "quarterly": [
        {
            "name": "全链路故障演练",
            "procedure": "模拟完整故障场景",
            "duration": "2小时",
            "impact": "低流量时段"
        }
    ]
}

九、总结与检查清单

灾难恢复检查清单

检查项 状态 最后验证
LLM多供应商配置
向量库多副本
自动备份机制
降级模式配置
故障切换测试
恢复文档完整
告警配置完善
定期演练执行

关键指标

指标 目标 当前
平均恢复时间(MTTR) 99%
故障切换成功率 > 95%
演练通过率 100%

下期预告

明天聊聊AI Agent合规指南——企业级部署的法律红线!


往期回顾

正文完
 0
技术老金
版权声明:本站原创文章,由 技术老金 于2026-03-30发表,共计12162字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)