AI Agent灾难恢复:当AI”挂了”怎么办?
一、开场:凌晨三点的告警电话
大家好,我是老金。
上周三凌晨3点,被告警电话吵醒:AI Agent服务全面不可用。
爬起来看日志:
- LLM API返回429错误(超限)
- 数据库连接池耗尽
- 向量数据库挂了
- 服务响应超时
用户在群里骂:”AI怎么又挂了?”
我一边处理,一边想:如果有完善的灾难恢复方案,就不会这么狼狈了。
今天分享AI Agent的灾难恢复策略,从预防到恢复的完整方案。
二、AI Agent的灾难类型
灾难分类
┌─────────────────────────────────────────────────────────┐
│ AI Agent灾难分类 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 依赖故障 │ │
│ │ • LLM API不可用 │ │
│ │ • 向量数据库宕机 │ │
│ │ • 外部工具服务故障 │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 资源耗尽 │ │
│ │ • Token配额用尽 │ │
│ │ • 数据库连接池耗尽 │ │
│ │ • 内存溢出 │ │
│ │ • CPU过载 │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 质量灾难 │ │
│ │ • 输出质量骤降 │ │
│ │ • 大量幻觉输出 │ │
│ │ • 恶意Prompt攻击 │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 数据灾难 │ │
│ │ • 知识库被误删 │ │
│ │ • 配置被错误修改 │ │
│ │ • 用户数据丢失 │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
影响等级
| 等级 | 描述 | 影响范围 | 恢复时间目标 |
|---|---|---|---|
| P0 | 完全不可用 | 全部用户 | < 15分钟 |
| P1 | 核心功能不可用 | 部分用户 | < 1小时 |
| P2 | 部分功能降级 | 少量用户 | < 4小时 |
| P3 | 轻微影响 | 个别用户 | str: |
"""生成缓存键"""
return hashlib.md5(prompt.encode()).hexdigest()
## 四、向量数据库故障处理
### 多副本架构
```python
class VectorStoreFailover:
"""向量数据库故障切换"""
def __init__(self, primary, replica):
self.primary = primary
self.replica = replica
self.primary_healthy = True
async def search(self, query_vector, top_k: int = 10):
"""搜索(带故障切换)"""
if self.primary_healthy:
try:
return await self.primary.search(query_vector, top_k)
except Exception as e:
logger.error(f"Primary vector store failed: {e}")
self.primary_healthy = False
# 切换到副本
asyncio.create_task(self._check_primary_health())
# 使用副本
return await self.replica.search(query_vector, top_k)
async def add(self, vectors, metadata):
"""添加向量(写主写从)"""
try:
# 写主库
await self.primary.add(vectors, metadata)
# 异步同步到副本
asyncio.create_task(
self.replica.add(vectors, metadata)
)
except Exception as e:
logger.error(f"Failed to add vectors: {e}")
raise
async def _check_primary_health(self):
"""检查主库健康"""
while True:
await asyncio.sleep(30)
try:
await self.primary.ping()
self.primary_healthy = True
logger.info("Primary vector store recovered")
return
except:
continue
离线模式
class OfflineVectorStore:
"""离线向量存储(降级模式)"""
def __init__(self, snapshot_path: str):
self.snapshot_path = Path(snapshot_path)
self.local_index = None
self._load_snapshot()
def _load_snapshot(self):
"""加载本地快照"""
snapshot_file = self.snapshot_path / "latest.json"
if snapshot_file.exists():
with open(snapshot_file, 'r') as f:
self.local_index = json.load(f)
logger.info(f"Loaded offline snapshot: {len(self.local_index)} vectors")
async def search(self, query_vector, top_k: int = 10):
"""本地搜索(降级模式)"""
if not self.local_index:
raise ValueError("No offline snapshot available")
# 简单的余弦相似度搜索
similarities = []
for item in self.local_index:
similarity = cosine_similarity(query_vector, item["vector"])
similarities.append((similarity, item))
# 排序返回top_k
similarities.sort(reverse=True, key=lambda x: x[0])
return [item for _, item in similarities[:top_k]]
五、资源耗尽处理
Token配额管理
class QuotaManager:
"""配额管理器"""
def __init__(self, daily_limit: int, alert_threshold: float = 0.8):
self.daily_limit = daily_limit
self.alert_threshold = alert_threshold
self.usage = {"input": 0, "output": 0}
self.usage_date = datetime.now().date()
def record_usage(self, input_tokens: int, output_tokens: int):
"""记录使用量"""
self._check_date()
self.usage["input"] += input_tokens
self.usage["output"] += output_tokens
total = self.usage["input"] + self.usage["output"]
# 检查告警阈值
if total > self.daily_limit * self.alert_threshold:
self._send_alert(total)
# 检查是否超限
if total > self.daily_limit:
raise QuotaExceededError(
f"Daily quota exceeded: {total}/{self.daily_limit}"
)
def get_remaining(self) -> int:
"""获取剩余配额"""
self._check_date()
total = self.usage["input"] + self.usage["output"]
return max(0, self.daily_limit - total)
def _check_date(self):
"""检查日期,每天重置"""
today = datetime.now().date()
if today != self.usage_date:
self.usage = {"input": 0, "output": 0}
self.usage_date = today
def _send_alert(self, current_usage: int):
"""发送告警"""
message = f"""
⚠️ Token配额告警
当前使用:{current_usage:,}
日配额:{self.daily_limit:,}
使用率:{current_usage/self.daily_limit*100:.1f}%
请关注配额使用情况。
"""
send_alert(message)
优雅降级
class DegradationManager:
"""降级管理器"""
def __init__(self):
self.degradation_rules = {
"token_quota_low": {
"trigger": lambda ctx: ctx.remaining_quota 5,
"action": "enable_cache_mode"
},
"latency_high": {
"trigger": lambda ctx: ctx.p99_latency > 30,
"action": "reduce_concurrency"
},
"error_rate_high": {
"trigger": lambda ctx: ctx.error_rate > 0.1,
"action": "enable_fallback_mode"
}
}
self.degradation_state = "normal" # normal, degraded, minimal
def check_and_degrade(self, context: dict) -> str:
"""检查并执行降级"""
for rule_name, rule in self.degradation_rules.items():
if rule["trigger"](type('Context', (), context)):
action = rule["action"]
self._execute_action(action)
self.degradation_state = "degraded"
return action
return None
def _execute_action(self, action: str):
"""执行降级动作"""
if action == "switch_to_cheap_model":
# 切换到便宜模型
update_model("gpt-3.5-turbo")
logger.warning("Switched to cheaper model due to quota")
elif action == "enable_cache_mode":
# 启用缓存优先模式
enable_cache_priority()
logger.warning("Enabled cache priority mode")
elif action == "reduce_concurrency":
# 降低并发
set_max_concurrency(5)
logger.warning("Reduced concurrency due to high latency")
elif action == "enable_fallback_mode":
# 启用降级模式
enable_static_responses()
logger.warning("Enabled fallback mode due to errors")
def recover(self):
"""恢复到正常模式"""
self.degradation_state = "normal"
restore_normal_settings()
logger.info("Recovered to normal mode")
六、数据灾难恢复
知识库备份策略
class KnowledgeBaseBackup:
"""知识库备份"""
def __init__(self, vector_store, backup_dir: str = "backups"):
self.vector_store = vector_store
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(exist_ok=True)
async def create_backup(self, collection: str):
"""创建备份"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_id = f"{collection}_{timestamp}"
# 导出数据
data = await self.vector_store.export_collection(collection)
# 保存备份
backup_file = self.backup_dir / f"{backup_id}.json"
backup_data = {
"id": backup_id,
"collection": collection,
"created_at": datetime.now().isoformat(),
"document_count": len(data),
"data": data
}
with open(backup_file, 'w') as f:
json.dump(backup_data, f)
# 压缩
self._compress_backup(backup_file)
# 清理旧备份(保留最近7天)
self._cleanup_old_backups(collection, keep_days=7)
return backup_id
async def restore_backup(self, backup_id: str):
"""恢复备份"""
backup_file = self.backup_dir / f"{backup_id}.json"
if not backup_file.exists():
# 尝试解压
self._decompress_backup(backup_id)
if not backup_file.exists():
raise ValueError(f"Backup {backup_id} not found")
with open(backup_file, 'r') as f:
backup_data = json.load(f)
# 清空当前集合
await self.vector_store.clear_collection(backup_data["collection"])
# 恢复数据
await self.vector_store.import_data(
backup_data["collection"],
backup_data["data"]
)
logger.info(f"Restored backup {backup_id}: {backup_data['document_count']} documents")
return {
"status": "restored",
"backup_id": backup_id,
"document_count": backup_data["document_count"]
}
def list_backups(self, collection: str = None) -> list:
"""列出备份"""
backups = []
for backup_file in self.backup_dir.glob("*.json"):
with open(backup_file, 'r') as f:
data = json.load(f)
if collection is None or data["collection"] == collection:
backups.append({
"id": data["id"],
"collection": data["collection"],
"created_at": data["created_at"],
"document_count": data["document_count"]
})
return sorted(backups, key=lambda x: x["created_at"], reverse=True)
def _compress_backup(self, backup_file: Path):
"""压缩备份"""
import gzip
import shutil
with open(backup_file, 'rb') as f_in:
with gzip.open(f"{backup_file}.gz", 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
backup_file.unlink() # 删除原文件
def _cleanup_old_backups(self, collection: str, keep_days: int):
"""清理旧备份"""
cutoff = datetime.now() - timedelta(days=keep_days)
for backup_file in self.backup_dir.glob(f"{collection}_*.json.gz"):
# 从文件名解析日期
date_str = backup_file.stem.split('_')[-2] + backup_file.stem.split('_')[-1]
backup_date = datetime.strptime(date_str, "%Y%m%d%H%M%S")
if backup_date < cutoff:
backup_file.unlink()
logger.info(f"Deleted old backup: {backup_file}")
自动备份调度
from apscheduler.schedulers.asyncio import AsyncIOScheduler
class BackupScheduler:
"""备份调度器"""
def __init__(self, kb_backup: KnowledgeBaseBackup):
self.kb_backup = kb_backup
self.scheduler = AsyncIOScheduler()
def start(self):
"""启动调度"""
# 每天凌晨2点备份
self.scheduler.add_job(
self._daily_backup,
'cron',
hour=2,
minute=0
)
# 每周日凌晨3点完整备份
self.scheduler.add_job(
self._weekly_full_backup,
'cron',
day_of_week='sun',
hour=3,
minute=0
)
self.scheduler.start()
async def _daily_backup(self):
"""每日备份"""
collections = ["customer_service_kb", "product_kb"]
for collection in collections:
try:
backup_id = await self.kb_backup.create_backup(collection)
logger.info(f"Daily backup created: {backup_id}")
except Exception as e:
logger.error(f"Daily backup failed for {collection}: {e}")
async def _weekly_full_backup(self):
"""每周完整备份"""
# 导出所有集合
# 上传到云存储
pass
七、故障恢复流程
标准恢复流程
class DisasterRecovery:
"""灾难恢复"""
RECOVERY_STEPS = [
{
"phase": "detection",
"steps": [
"确认故障类型",
"评估影响范围",
"确定恢复优先级"
]
},
{
"phase": "containment",
"steps": [
"隔离故障组件",
"启动降级模式",
"通知相关方"
]
},
{
"phase": "recovery",
"steps": [
"执行恢复操作",
"验证服务恢复",
"逐步恢复流量"
]
},
{
"phase": "postmortem",
"steps": [
"分析故障原因",
"更新文档",
"改进监控"
]
}
]
async def execute_recovery(self, incident: dict):
"""执行恢复流程"""
recovery_log = []
for phase in self.RECOVERY_STEPS:
phase_result = await self._execute_phase(phase, incident)
recovery_log.append(phase_result)
if not phase_result["success"]:
break
return {
"incident": incident,
"recovery_log": recovery_log,
"status": "recovered" if phase_result["success"] else "failed"
}
async def _execute_phase(self, phase: dict, incident: dict):
"""执行恢复阶段"""
results = []
for step in phase["steps"]:
result = await self._execute_step(step, incident)
results.append({
"step": step,
"result": result
})
return {
"phase": phase["phase"],
"success": all(r["result"]["success"] for r in results),
"results": results
}
async def _execute_step(self, step: str, incident: dict):
"""执行恢复步骤"""
# 根据步骤类型执行相应操作
if step == "确认故障类型":
return await self._identify_fault_type(incident)
elif step == "隔离故障组件":
return await self._isolate_faulty_component(incident)
# ... 其他步骤
return {"success": True, "message": f"Executed: {step}"}
恢复操作手册
# AI Agent故障恢复手册
## 场景1:LLM API不可用
### 症状
- API返回5xx错误
- 超时错误增加
- 响应时间变长
### 恢复步骤
1. 检查API状态页面
2. 切换到备用供应商
```bash
python scripts/switch_llm_provider.py --provider anthropic
- 启用缓存模式
python scripts/enable_cache_mode.py - 通知用户服务降级
预计恢复时间
- 自动切换:1-2分钟
- 手动干预:5-10分钟
场景2:向量数据库故障
症状
- RAG检索失败
- 搜索返回空结果
恢复步骤
- 切换到副本
python scripts/switch_vector_store.py --target replica - 启用离线模式
python scripts/enable_offline_mode.py - 恢复数据(如果需要)
python scripts/restore_backup.py --backup-id
预计恢复时间
- 切换副本:1分钟
- 数据恢复:5-30分钟
场景3:Token配额耗尽
症状
- API返回配额错误
- 服务拒绝请求
恢复步骤
- 切换到便宜模型
python scripts/switch_model.py --model gpt-3.5-turbo - 启用缓存优先
- 紧急充值配额
- 申请增加配额
预计恢复时间
- 切换模型:即时
- 充值配额:10-30分钟
八、演练与测试
故障注入测试
class ChaosTesting:
"""混沌测试"""
async def inject_llm_failure(self):
"""注入LLM故障"""
# 模拟API超时
mock_llm.set_timeout(60)
# 验证故障切换是否工作
response = await agent.run("测试消息")
assert response is not None
assert response.provider != "primary"
async def inject_vector_store_failure(self):
"""注入向量库故障"""
# 关闭向量库
await vector_store.stop()
# 验证降级模式
response = await agent.run("搜索产品")
assert response is not None
assert "缓存" in response or "降级" in response
async def inject_high_load(self):
"""注入高负载"""
# 模拟100并发请求
tasks = [agent.run(f"测试消息{i}") for i in range(100)]
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception))
assert success_count >= 80 # 至少80%成功
定期演练
# 演练计划
DRILL_SCHEDULE = {
"monthly": [
{
"name": "LLM切换演练",
"procedure": "手动触发LLM故障,验证自动切换",
"duration": "15分钟",
"impact": "无"
},
{
"name": "备份恢复演练",
"procedure": "从备份恢复知识库",
"duration": "30分钟",
"impact": "测试环境"
}
],
"quarterly": [
{
"name": "全链路故障演练",
"procedure": "模拟完整故障场景",
"duration": "2小时",
"impact": "低流量时段"
}
]
}
九、总结与检查清单
灾难恢复检查清单
| 检查项 | 状态 | 最后验证 |
|---|---|---|
| LLM多供应商配置 | ☐ | |
| 向量库多副本 | ☐ | |
| 自动备份机制 | ☐ | |
| 降级模式配置 | ☐ | |
| 故障切换测试 | ☐ | |
| 恢复文档完整 | ☐ | |
| 告警配置完善 | ☐ | |
| 定期演练执行 | ☐ |
关键指标
| 指标 | 目标 | 当前 |
|---|---|---|
| 平均恢复时间(MTTR) | 99% | |
| 故障切换成功率 | > 95% | |
| 演练通过率 | 100% |
下期预告
明天聊聊AI Agent合规指南——企业级部署的法律红线!
往期回顾
正文完