AI Agent版本管理:从Prompt版本到模型版本
一、开场:改了一个提示词,出问题了
大家好,我是老金。
上周出了个生产事故:改了一行系统提示词,结果AI回答质量骤降。
用户投诉:AI怎么变傻了?
我一看日志,发现是提示词改得太激进了。想回滚——但是上一个版本是什么来着?
找了一圈,发现提示词是硬编码在代码里的,没有版本记录。
最后只能凭记忆重新写了一遍,花了半天时间。
这让我意识到:AI Agent也需要版本管理,而且是多层次的版本管理。
今天聊聊Prompt版本、配置版本、模型版本的完整管理方案。
二、为什么AI Agent版本管理复杂?
多层版本依赖
AI Agent的"版本"包括:
1. 代码版本 → Git管理
2. Prompt版本 → 需要专门管理
3. 模型版本 → OpenAI/Anthropic控制
4. 工具版本 → API版本
5. 配置版本 → 参数配置
6. 知识库版本 → 向量库快照
版本变更的影响
| 变更类型 | 影响 | 回滚难度 |
|---|---|---|
| 代码变更 | 功能变化 | Git回滚 |
| Prompt变更 | 行为变化 | 需要记录 |
| 模型升级 | 能力变化 | 无法回滚 |
| 参数调整 | 性能变化 | 需要配置管理 |
常见问题
# ❌ 常见的混乱做法
# 问题1:硬编码Prompt
SYSTEM_PROMPT = """你是一个客服助手...""" # 改了没记录
# 问题2:参数写死
def __init__(self):
self.temperature = 0.7 # 为什么是0.7?什么时候改的?
# 问题3:模型名称写死
model = "gpt-4" # 想换成gpt-4-turbo?全局搜索替换?
# 问题4:知识库无版本
vector_store.add(docs) # 哪个版本的知识库?
三、Prompt版本管理
Prompt即代码
# prompts/__init__.py
from prompt_manager import PromptVersion
# 定义Prompt版本
SYSTEM_PROMPT = PromptVersion(
id="customer_service_system",
versions={
"v1.0": """你是一个客服助手,负责回答用户问题。""",
"v1.1": """你是一个专业的客服助手。
- 回答要准确、简洁
- 语气要友好
- 不确定时请承认""", # 2024-01-15: 添加行为准则
"v1.2": """你是一个专业的客服助手。
- 回答要准确、简洁
- 语气要友好、有同理心
- 不确定时请承认
- 优先使用工具查询信息""", # 2024-01-20: 强调工具使用
"v2.0": """你是一个智能客服助手,代表公司的服务品质。
## 核心职责
- 准确回答用户问题
- 必要时使用工具查询
- 处理异常情况
## 行为准则
1. 回答准确简洁,避免废话
2. 语气友好有同理心
3. 不确定时明确说明
4. 涉及订单/退款等操作需确认
## 工具使用
- 查询订单:使用query_order工具
- 查询商品:使用search_product工具
- 取消订单:需用户二次确认
""", # 2024-02-01: 结构化重写,提升稳定性
},
current="v2.0"
)
Prompt管理器
import json
from pathlib import Path
from datetime import datetime
class PromptManager:
"""Prompt版本管理器"""
def __init__(self, storage_path: str = "prompts/registry"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.registry = self._load_registry()
def register(self, prompt_id: str, content: str,
description: str = "", tags: list = None):
"""注册新版本Prompt"""
version_id = self._generate_version_id()
version = {
"id": version_id,
"prompt_id": prompt_id,
"content": content,
"description": description,
"tags": tags or [],
"created_at": datetime.now().isoformat(),
"created_by": get_current_user()
}
# 保存版本
version_file = self.storage_path / f"{prompt_id}_{version_id}.json"
version_file.write_text(json.dumps(version, ensure_ascii=False, indent=2))
# 更新注册表
if prompt_id not in self.registry:
self.registry[prompt_id] = {
"versions": [],
"current": None
}
self.registry[prompt_id]["versions"].append(version_id)
self.registry[prompt_id]["current"] = version_id
self._save_registry()
return version_id
def get(self, prompt_id: str, version: str = None) -> str:
"""获取Prompt"""
if prompt_id not in self.registry:
raise ValueError(f"Prompt {prompt_id} not found")
version_id = version or self.registry[prompt_id]["current"]
version_file = self.storage_path / f"{prompt_id}_{version_id}.json"
if not version_file.exists():
raise ValueError(f"Version {version_id} not found")
version_data = json.loads(version_file.read_text())
return version_data["content"]
def rollback(self, prompt_id: str, version: str):
"""回滚到指定版本"""
if prompt_id not in self.registry:
raise ValueError(f"Prompt {prompt_id} not found")
if version not in self.registry[prompt_id]["versions"]:
raise ValueError(f"Version {version} not found")
self.registry[prompt_id]["current"] = version
self._save_registry()
# 记录回滚操作
self._log_operation("rollback", prompt_id, version)
def diff(self, prompt_id: str, version1: str, version2: str) -> str:
"""对比两个版本差异"""
content1 = self.get(prompt_id, version1)
content2 = self.get(prompt_id, version2)
import difflib
diff = difflib.unified_diff(
content1.splitlines(keepends=True),
content2.splitlines(keepends=True),
fromfile=version1,
tofile=version2
)
return ''.join(diff)
def list_versions(self, prompt_id: str) -> list:
"""列出所有版本"""
if prompt_id not in self.registry:
return []
versions = []
for version_id in self.registry[prompt_id]["versions"]:
version_file = self.storage_path / f"{prompt_id}_{version_id}.json"
if version_file.exists():
version_data = json.loads(version_file.read_text())
versions.append({
"id": version_id,
"created_at": version_data["created_at"],
"description": version_data["description"],
"is_current": version_id == self.registry[prompt_id]["current"]
})
return sorted(versions, key=lambda x: x["created_at"], reverse=True)
def _generate_version_id(self) -> str:
return datetime.now().strftime("v%Y%m%d_%H%M%S")
def _load_registry(self) -> dict:
registry_file = self.storage_path / "registry.json"
if registry_file.exists():
return json.loads(registry_file.read_text())
return {}
def _save_registry(self):
registry_file = self.storage_path / "registry.json"
registry_file.write_text(json.dumps(self.registry, ensure_ascii=False, indent=2))
def _log_operation(self, operation: str, prompt_id: str, version: str):
log_file = self.storage_path / "operations.log"
log_entry = f"{datetime.now().isoformat()} | {operation} | {prompt_id} | {version}n"
log_file.write_text(log_entry, mode='a')
# 使用示例
prompt_manager = PromptManager()
# 注册新版本
version_id = prompt_manager.register(
prompt_id="customer_service_system",
content="你是一个智能客服助手...",
description="增加工具使用说明",
tags=["customer-service", "v2"]
)
# 获取当前版本
prompt = prompt_manager.get("customer_service_system")
# 获取特定版本
prompt_v1 = prompt_manager.get("customer_service_system", "v20240101_120000")
# 回滚
prompt_manager.rollback("customer_service_system", "v20240115_090000")
# 对比差异
diff = prompt_manager.diff(
"customer_service_system",
"v20240101_120000",
"v20240115_090000"
)
Prompt版本测试
class PromptVersionTester:
"""Prompt版本测试"""
def __init__(self, prompt_manager: PromptManager):
self.prompt_manager = prompt_manager
async def test_version(self, prompt_id: str, version: str, test_cases: list):
"""测试特定版本"""
prompt = self.prompt_manager.get(prompt_id, version)
results = []
for case in test_cases:
response = await self._run_with_prompt(prompt, case.input)
evaluation = await self._evaluate_response(response, case.expected)
results.append({
"case_id": case.id,
"input": case.input,
"response": response,
"evaluation": evaluation
})
return {
"prompt_id": prompt_id,
"version": version,
"results": results,
"pass_rate": sum(1 for r in results if r["evaluation"]["passed"]) / len(results)
}
async def compare_versions(self, prompt_id: str,
version1: str, version2: str, test_cases: list):
"""对比两个版本性能"""
result1 = await self.test_version(prompt_id, version1, test_cases)
result2 = await self.test_version(prompt_id, version2, test_cases)
return {
"version1": {"id": version1, "pass_rate": result1["pass_rate"]},
"version2": {"id": version2, "pass_rate": result2["pass_rate"]},
"improvement": result2["pass_rate"] - result1["pass_rate"]
}
四、配置版本管理
配置文件结构
# config/agent_config.yaml
# 版本信息
version: "2.1.0"
config_id: "agent_20240201_v1"
# 模型配置
model:
provider: "openai"
name: "gpt-4-turbo"
temperature: 0.7
max_tokens: 2000
# Prompt配置
prompts:
system: "customer_service_system@v2.0"
few_shot_examples: "customer_service_examples@v1.2"
# 工具配置
tools:
- name: "query_order"
enabled: true
version: "v1.0"
- name: "search_product"
enabled: true
version: "v2.0"
- name: "cancel_order"
enabled: true
version: "v1.0"
require_confirmation: true
# 并发配置
concurrency:
max_concurrent: 10
rate_limit: 100
timeout: 30
# 知识库配置
knowledge_base:
vector_store: "milvus"
collection: "customer_service_kb_v3"
embedding_model: "bge-large-zh"
top_k: 5
配置管理器
import yaml
from dataclasses import dataclass
from typing import Any, Dict, Optional
@dataclass
class AgentConfig:
"""Agent配置"""
version: str
config_id: str
model: Dict[str, Any]
prompts: Dict[str, str]
tools: list
concurrency: Dict[str, Any]
knowledge_base: Dict[str, Any]
class ConfigManager:
"""配置管理器"""
def __init__(self, config_dir: str = "config"):
self.config_dir = Path(config_dir)
self.configs = {}
self._load_all_configs()
def _load_all_configs(self):
"""加载所有配置"""
for config_file in self.config_dir.glob("*.yaml"):
config_data = yaml.safe_load(config_file.read_text())
config_id = config_data["config_id"]
self.configs[config_id] = AgentConfig(**config_data)
def get_config(self, config_id: str = None) -> AgentConfig:
"""获取配置"""
if config_id:
return self.configs.get(config_id)
return self.get_current_config()
def get_current_config(self) -> AgentConfig:
"""获取当前激活的配置"""
current_file = self.config_dir / "current.txt"
if current_file.exists():
config_id = current_file.read_text().strip()
return self.configs.get(config_id)
# 返回最新配置
return max(self.configs.values(), key=lambda c: c.version)
def activate_config(self, config_id: str):
"""激活配置"""
if config_id not in self.configs:
raise ValueError(f"Config {config_id} not found")
current_file = self.config_dir / "current.txt"
current_file.write_text(config_id)
# 记录激活日志
self._log_activation(config_id)
def create_config(self, base_config_id: str, changes: dict) -> str:
"""基于现有配置创建新配置"""
base = self.get_config(base_config_id)
if not base:
raise ValueError(f"Base config {base_config_id} not found")
# 创建新配置
new_config_dict = asdict(base)
new_config_dict.update(changes)
new_config_dict["version"] = self._increment_version(base.version)
new_config_dict["config_id"] = f"agent_{datetime.now().strftime('%Y%m%d')}_v1"
# 保存
new_config = AgentConfig(**new_config_dict)
config_file = self.config_dir / f"{new_config.config_id}.yaml"
config_file.write_text(yaml.dump(new_config_dict, allow_unicode=True))
self.configs[new_config.config_id] = new_config
return new_config.config_id
def diff_configs(self, config_id1: str, config_id2: str) -> dict:
"""对比两个配置差异"""
config1 = self.get_config(config_id1)
config2 = self.get_config(config_id2)
differences = {}
for key in asdict(config1).keys():
val1 = getattr(config1, key)
val2 = getattr(config2, key)
if val1 != val2:
differences[key] = {
"old": val1,
"new": val2
}
return differences
def _increment_version(self, version: str) -> str:
"""递增版本号"""
parts = version.split(".")
parts[-1] = str(int(parts[-1]) + 1)
return ".".join(parts)
def _log_activation(self, config_id: str):
"""记录激活日志"""
log_file = self.config_dir / "activation.log"
log_entry = f"{datetime.now().isoformat()} | {config_id} | {get_current_user()}n"
log_file.write_text(log_entry, mode='a')
五、模型版本管理
模型版本追踪
class ModelVersionManager:
"""模型版本管理器"""
def __init__(self):
self.model_registry = {
"openai": {
"gpt-4": {
"release_date": "2023-03-14",
"status": "stable",
"aliases": ["gpt-4-0314"],
"deprecation_date": None
},
"gpt-4-turbo": {
"release_date": "2024-01-25",
"status": "stable",
"aliases": ["gpt-4-0125-preview", "gpt-4-1106-preview"],
"deprecation_date": None
},
"gpt-3.5-turbo": {
"release_date": "2023-03-01",
"status": "stable",
"aliases": ["gpt-3.5-turbo-0125"],
"deprecation_date": None
}
},
"anthropic": {
"claude-3-opus": {
"release_date": "2024-03-04",
"status": "stable",
"deprecation_date": None
},
"claude-3-sonnet": {
"release_date": "2024-03-04",
"status": "stable",
"deprecation_date": None
}
}
}
def get_model_info(self, provider: str, model: str) -> dict:
"""获取模型信息"""
if provider not in self.model_registry:
raise ValueError(f"Unknown provider: {provider}")
# 支持别名查询
for model_name, info in self.model_registry[provider].items():
if model == model_name or model in info.get("aliases", []):
return {"name": model_name, **info}
raise ValueError(f"Unknown model: {model}")
def check_deprecation(self, provider: str, model: str) -> dict:
"""检查模型是否即将废弃"""
info = self.get_model_info(provider, model)
if info.get("deprecation_date"):
deprecation_date = datetime.fromisoformat(info["deprecation_date"])
days_until = (deprecation_date - datetime.now()).days
return {
"deprecated": True,
"deprecation_date": info["deprecation_date"],
"days_until_deprecation": days_until,
"recommended_alternative": info.get("recommended_alternative")
}
return {"deprecated": False}
def list_available_models(self, provider: str = None) -> list:
"""列出可用模型"""
if provider:
return list(self.model_registry.get(provider, {}).keys())
all_models = []
for prov, models in self.model_registry.items():
for model_name, info in models.items():
if info["status"] == "stable":
all_models.append({
"provider": prov,
"model": model_name,
"release_date": info["release_date"]
})
return all_models
模型迁移策略
class ModelMigration:
"""模型迁移管理"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.migrations = []
async def migrate(self, from_model: str, to_model: str,
strategy: str = "gradual"):
"""迁移到新模型"""
if strategy == "gradual":
return await self._gradual_migration(from_model, to_model)
elif strategy == "canary":
return await self._canary_migration(from_model, to_model)
else:
return await self._immediate_migration(from_model, to_model)
async def _gradual_migration(self, from_model: str, to_model: str):
"""渐进式迁移"""
# 10% -> 30% -> 50% -> 100%
percentages = [10, 30, 50, 100]
for percentage in percentages:
# 更新流量分配
self._update_traffic_split(from_model, to_model, percentage)
# 监控指标
await self._monitor_migration(from_model, to_model)
# 等待稳定
await asyncio.sleep(3600) # 1小时
return {"status": "completed", "from": from_model, "to": to_model}
async def _canary_migration(self, from_model: str, to_model: str):
"""金丝雀迁移"""
# 只对部分用户开放新模型
canary_users = self._get_canary_users()
for user_id in canary_users:
# 为这些用户使用新模型
response = await self._process_with_model(
user_id,
to_model,
get_user_request(user_id)
)
# 评估响应质量
evaluation = await self._evaluate_response(response)
if not evaluation["passed"]:
# 回滚
return {"status": "failed", "reason": evaluation["reason"]}
# 全部通过,推广到所有用户
return await self._immediate_migration(from_model, to_model)
async def _immediate_migration(self, from_model: str, to_model: str):
"""立即迁移"""
# 更新配置
update_agent_model(self.agent_id, to_model)
# 记录迁移
self._log_migration(from_model, to_model)
return {"status": "completed", "from": from_model, "to": to_model}
六、知识库版本管理
class KnowledgeBaseVersionManager:
"""知识库版本管理"""
def __init__(self, vector_store):
self.vector_store = vector_store
self.snapshots_dir = Path("kb_snapshots")
self.snapshots_dir.mkdir(exist_ok=True)
def create_snapshot(self, collection: str, description: str = ""):
"""创建知识库快照"""
snapshot_id = datetime.now().strftime("snapshot_%Y%m%d_%H%M%S")
# 导出向量数据
vectors = self.vector_store.export_collection(collection)
# 保存快照
snapshot_file = self.snapshots_dir / f"{snapshot_id}.json"
snapshot_data = {
"id": snapshot_id,
"collection": collection,
"description": description,
"created_at": datetime.now().isoformat(),
"document_count": len(vectors),
"vectors": vectors
}
snapshot_file.write_text(json.dumps(snapshot_data, ensure_ascii=False))
# 更新注册表
self._register_snapshot(snapshot_id, collection, description)
return snapshot_id
def restore_snapshot(self, snapshot_id: str):
"""恢复知识库快照"""
snapshot_file = self.snapshots_dir / f"{snapshot_id}.json"
if not snapshot_file.exists():
raise ValueError(f"Snapshot {snapshot_id} not found")
snapshot_data = json.loads(snapshot_file.read_text())
# 清空当前集合
self.vector_store.clear_collection(snapshot_data["collection"])
# 恢复数据
self.vector_store.import_vectors(
snapshot_data["collection"],
snapshot_data["vectors"]
)
return {
"status": "restored",
"snapshot_id": snapshot_id,
"document_count": snapshot_data["document_count"]
}
def list_snapshots(self, collection: str = None) -> list:
"""列出快照"""
snapshots = []
for snapshot_file in self.snapshots_dir.glob("snapshot_*.json"):
data = json.loads(snapshot_file.read_text())
if collection is None or data["collection"] == collection:
snapshots.append({
"id": data["id"],
"collection": data["collection"],
"description": data["description"],
"created_at": data["created_at"],
"document_count": data["document_count"]
})
return sorted(snapshots, key=lambda x: x["created_at"], reverse=True)
七、版本发布流程
发布检查清单
class ReleaseManager:
"""发布管理器"""
RELEASE_CHECKLIST = [
{
"item": "Prompt版本测试通过",
"check": lambda ctx: ctx.prompt_test_passed
},
{
"item": "配置变更已审核",
"check": lambda ctx: ctx.config_reviewed
},
{
"item": "回滚方案已准备",
"check": lambda ctx: ctx.rollback_ready
},
{
"item": "监控告警已配置",
"check": lambda ctx: ctx.monitoring_configured
},
{
"item": "文档已更新",
"check": lambda ctx: ctx.docs_updated
}
]
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.release_history = []
async def prepare_release(self, version: str, changes: dict):
"""准备发布"""
release_context = ReleaseContext(
version=version,
changes=changes,
agent_id=self.agent_id
)
# 执行检查清单
checklist_results = []
for item in self.RELEASE_CHECKLIST:
passed = item["check"](release_context)
checklist_results.append({
"item": item["item"],
"passed": passed
})
all_passed = all(r["passed"] for r in checklist_results)
return {
"version": version,
"ready": all_passed,
"checklist": checklist_results,
"changes": changes
}
async def release(self, version: str, strategy: str = "blue-green"):
"""执行发布"""
if strategy == "blue-green":
return await self._blue_green_release(version)
elif strategy == "canary":
return await self._canary_release(version)
elif strategy == "rolling":
return await self._rolling_release(version)
async def _blue_green_release(self, version: str):
"""蓝绿发布"""
# 部署新版本(绿)
await self._deploy_version(version, "green")
# 验证
if await self._verify_version(version):
# 切换流量
await self._switch_traffic("green")
return {"status": "success", "version": version}
else:
# 回滚
await self._switch_traffic("blue")
return {"status": "failed", "reason": "verification failed"}
async def rollback(self, version: str):
"""回滚到指定版本"""
# 获取版本配置
config = get_version_config(version)
# 恢复Prompt
prompt_manager.rollback(config.prompt_id, config.prompt_version)
# 恢复配置
config_manager.activate_config(config.config_id)
# 恢复知识库
if config.kb_snapshot:
kb_manager.restore_snapshot(config.kb_snapshot)
# 记录回滚
self._log_rollback(version)
return {"status": "rolled_back", "version": version}
八、版本对比与审计
版本对比报告
def generate_version_comparison_report(from_version: str, to_version: str) -> str:
"""生成版本对比报告"""
from_config = get_version_config(from_version)
to_config = get_version_config(to_version)
return f"""
# 版本对比报告
## 版本信息
- 源版本: {from_version}
- 目标版本: {to_version}
## 配置变更
{format_config_diff(from_config, to_config)}
## Prompt变更
{prompt_manager.diff(from_config.prompt_id, from_config.prompt_version, to_config.prompt_version)}
## 模型变更
- 源模型: {from_config.model}
- 目标模型: {to_config.model}
## 工具变更
{format_tool_changes(from_config.tools, to_config.tools)}
## 知识库变更
- 源知识库版本: {from_config.kb_snapshot}
- 目标知识库版本: {to_config.kb_snapshot}
- 文档数量变化: {to_config.kb_doc_count - from_config.kb_doc_count}
## 风险评估
{assess_risk(from_config, to_config)}
"""
审计日志
class AuditLogger:
"""审计日志"""
def __init__(self, log_file: str = "audit.log"):
self.log_file = Path(log_file)
def log_change(self, change_type: str, details: dict):
"""记录变更"""
entry = {
"timestamp": datetime.now().isoformat(),
"change_type": change_type,
"user": get_current_user(),
"details": details
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(entry, ensure_ascii=False) + "n")
def get_change_history(self, filter_type: str = None,
start_date: str = None, end_date: str = None):
"""获取变更历史"""
history = []
with open(self.log_file, 'r') as f:
for line in f:
entry = json.loads(line)
if filter_type and entry["change_type"] != filter_type:
continue
if start_date and entry["timestamp"] end_date:
continue
history.append(entry)
return history
九、总结与最佳实践
版本管理原则
- 一切皆版本:Prompt、配置、知识库都要有版本
- 变更可追溯:谁、何时、改了什么
- 回滚有能力:任何变更都能回滚
- 测试先行:新版本必须经过测试
- 渐进发布:避免一次性全量切换
检查清单
| 层级 | 管理项 | 状态 |
|---|---|---|
| Prompt | 版本注册 | ☐ |
| Prompt | 版本测试 | ☐ |
| 配置 | YAML版本化 | ☐ |
| 配置 | 变更审批 | ☐ |
| 模型 | 版本追踪 | ☐ |
| 模型 | 迁移策略 | ☐ |
| 知识库 | 快照机制 | ☐ |
| 发布 | 检查清单 | ☐ |
| 发布 | 回滚方案 | ☐ |
下期预告
明天聊聊AI Agent灾难恢复——当AI”挂了”怎么办?
往期回顾
正文完