AI Agent版本管理:从Prompt版本到模型版本

17次阅读
没有评论

AI Agent版本管理:从Prompt版本到模型版本

一、开场:改了一个提示词,出问题了

大家好,我是老金。

上周出了个生产事故:改了一行系统提示词,结果AI回答质量骤降。

用户投诉:AI怎么变傻了?

我一看日志,发现是提示词改得太激进了。想回滚——但是上一个版本是什么来着?

找了一圈,发现提示词是硬编码在代码里的,没有版本记录。

最后只能凭记忆重新写了一遍,花了半天时间。

这让我意识到:AI Agent也需要版本管理,而且是多层次的版本管理

今天聊聊Prompt版本、配置版本、模型版本的完整管理方案。

二、为什么AI Agent版本管理复杂?

多层版本依赖

AI Agent的"版本"包括:

1. 代码版本 → Git管理
2. Prompt版本 → 需要专门管理
3. 模型版本 → OpenAI/Anthropic控制
4. 工具版本 → API版本
5. 配置版本 → 参数配置
6. 知识库版本 → 向量库快照

版本变更的影响

变更类型 影响 回滚难度
代码变更 功能变化 Git回滚
Prompt变更 行为变化 需要记录
模型升级 能力变化 无法回滚
参数调整 性能变化 需要配置管理

常见问题

# ❌ 常见的混乱做法

# 问题1:硬编码Prompt
SYSTEM_PROMPT = """你是一个客服助手..."""  # 改了没记录

# 问题2:参数写死
def __init__(self):
    self.temperature = 0.7  # 为什么是0.7?什么时候改的?

# 问题3:模型名称写死
model = "gpt-4"  # 想换成gpt-4-turbo?全局搜索替换?

# 问题4:知识库无版本
vector_store.add(docs)  # 哪个版本的知识库?

三、Prompt版本管理

Prompt即代码

# prompts/__init__.py
from prompt_manager import PromptVersion

# 定义Prompt版本
SYSTEM_PROMPT = PromptVersion(
    id="customer_service_system",
    versions={
        "v1.0": """你是一个客服助手,负责回答用户问题。""",

        "v1.1": """你是一个专业的客服助手。
- 回答要准确、简洁
- 语气要友好
- 不确定时请承认""",  # 2024-01-15: 添加行为准则

        "v1.2": """你是一个专业的客服助手。
- 回答要准确、简洁
- 语气要友好、有同理心
- 不确定时请承认
- 优先使用工具查询信息""",  # 2024-01-20: 强调工具使用

        "v2.0": """你是一个智能客服助手,代表公司的服务品质。

## 核心职责
- 准确回答用户问题
- 必要时使用工具查询
- 处理异常情况

## 行为准则
1. 回答准确简洁,避免废话
2. 语气友好有同理心
3. 不确定时明确说明
4. 涉及订单/退款等操作需确认

## 工具使用
- 查询订单:使用query_order工具
- 查询商品:使用search_product工具
- 取消订单:需用户二次确认
""",  # 2024-02-01: 结构化重写,提升稳定性
    },
    current="v2.0"
)

Prompt管理器

import json
from pathlib import Path
from datetime import datetime

class PromptManager:
    """Prompt版本管理器"""

    def __init__(self, storage_path: str = "prompts/registry"):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(parents=True, exist_ok=True)
        self.registry = self._load_registry()

    def register(self, prompt_id: str, content: str, 
                 description: str = "", tags: list = None):
        """注册新版本Prompt"""
        version_id = self._generate_version_id()

        version = {
            "id": version_id,
            "prompt_id": prompt_id,
            "content": content,
            "description": description,
            "tags": tags or [],
            "created_at": datetime.now().isoformat(),
            "created_by": get_current_user()
        }

        # 保存版本
        version_file = self.storage_path / f"{prompt_id}_{version_id}.json"
        version_file.write_text(json.dumps(version, ensure_ascii=False, indent=2))

        # 更新注册表
        if prompt_id not in self.registry:
            self.registry[prompt_id] = {
                "versions": [],
                "current": None
            }
        self.registry[prompt_id]["versions"].append(version_id)
        self.registry[prompt_id]["current"] = version_id
        self._save_registry()

        return version_id

    def get(self, prompt_id: str, version: str = None) -> str:
        """获取Prompt"""
        if prompt_id not in self.registry:
            raise ValueError(f"Prompt {prompt_id} not found")

        version_id = version or self.registry[prompt_id]["current"]
        version_file = self.storage_path / f"{prompt_id}_{version_id}.json"

        if not version_file.exists():
            raise ValueError(f"Version {version_id} not found")

        version_data = json.loads(version_file.read_text())
        return version_data["content"]

    def rollback(self, prompt_id: str, version: str):
        """回滚到指定版本"""
        if prompt_id not in self.registry:
            raise ValueError(f"Prompt {prompt_id} not found")

        if version not in self.registry[prompt_id]["versions"]:
            raise ValueError(f"Version {version} not found")

        self.registry[prompt_id]["current"] = version
        self._save_registry()

        # 记录回滚操作
        self._log_operation("rollback", prompt_id, version)

    def diff(self, prompt_id: str, version1: str, version2: str) -> str:
        """对比两个版本差异"""
        content1 = self.get(prompt_id, version1)
        content2 = self.get(prompt_id, version2)

        import difflib
        diff = difflib.unified_diff(
            content1.splitlines(keepends=True),
            content2.splitlines(keepends=True),
            fromfile=version1,
            tofile=version2
        )
        return ''.join(diff)

    def list_versions(self, prompt_id: str) -> list:
        """列出所有版本"""
        if prompt_id not in self.registry:
            return []

        versions = []
        for version_id in self.registry[prompt_id]["versions"]:
            version_file = self.storage_path / f"{prompt_id}_{version_id}.json"
            if version_file.exists():
                version_data = json.loads(version_file.read_text())
                versions.append({
                    "id": version_id,
                    "created_at": version_data["created_at"],
                    "description": version_data["description"],
                    "is_current": version_id == self.registry[prompt_id]["current"]
                })

        return sorted(versions, key=lambda x: x["created_at"], reverse=True)

    def _generate_version_id(self) -> str:
        return datetime.now().strftime("v%Y%m%d_%H%M%S")

    def _load_registry(self) -> dict:
        registry_file = self.storage_path / "registry.json"
        if registry_file.exists():
            return json.loads(registry_file.read_text())
        return {}

    def _save_registry(self):
        registry_file = self.storage_path / "registry.json"
        registry_file.write_text(json.dumps(self.registry, ensure_ascii=False, indent=2))

    def _log_operation(self, operation: str, prompt_id: str, version: str):
        log_file = self.storage_path / "operations.log"
        log_entry = f"{datetime.now().isoformat()} | {operation} | {prompt_id} | {version}n"
        log_file.write_text(log_entry, mode='a')


# 使用示例
prompt_manager = PromptManager()

# 注册新版本
version_id = prompt_manager.register(
    prompt_id="customer_service_system",
    content="你是一个智能客服助手...",
    description="增加工具使用说明",
    tags=["customer-service", "v2"]
)

# 获取当前版本
prompt = prompt_manager.get("customer_service_system")

# 获取特定版本
prompt_v1 = prompt_manager.get("customer_service_system", "v20240101_120000")

# 回滚
prompt_manager.rollback("customer_service_system", "v20240115_090000")

# 对比差异
diff = prompt_manager.diff(
    "customer_service_system", 
    "v20240101_120000", 
    "v20240115_090000"
)

Prompt版本测试

class PromptVersionTester:
    """Prompt版本测试"""

    def __init__(self, prompt_manager: PromptManager):
        self.prompt_manager = prompt_manager

    async def test_version(self, prompt_id: str, version: str, test_cases: list):
        """测试特定版本"""
        prompt = self.prompt_manager.get(prompt_id, version)

        results = []
        for case in test_cases:
            response = await self._run_with_prompt(prompt, case.input)

            evaluation = await self._evaluate_response(response, case.expected)
            results.append({
                "case_id": case.id,
                "input": case.input,
                "response": response,
                "evaluation": evaluation
            })

        return {
            "prompt_id": prompt_id,
            "version": version,
            "results": results,
            "pass_rate": sum(1 for r in results if r["evaluation"]["passed"]) / len(results)
        }

    async def compare_versions(self, prompt_id: str, 
                               version1: str, version2: str, test_cases: list):
        """对比两个版本性能"""
        result1 = await self.test_version(prompt_id, version1, test_cases)
        result2 = await self.test_version(prompt_id, version2, test_cases)

        return {
            "version1": {"id": version1, "pass_rate": result1["pass_rate"]},
            "version2": {"id": version2, "pass_rate": result2["pass_rate"]},
            "improvement": result2["pass_rate"] - result1["pass_rate"]
        }

四、配置版本管理

配置文件结构

# config/agent_config.yaml

# 版本信息
version: "2.1.0"
config_id: "agent_20240201_v1"

# 模型配置
model:
  provider: "openai"
  name: "gpt-4-turbo"
  temperature: 0.7
  max_tokens: 2000

# Prompt配置
prompts:
  system: "customer_service_system@v2.0"
  few_shot_examples: "customer_service_examples@v1.2"

# 工具配置
tools:
  - name: "query_order"
    enabled: true
    version: "v1.0"
  - name: "search_product"
    enabled: true
    version: "v2.0"
  - name: "cancel_order"
    enabled: true
    version: "v1.0"
    require_confirmation: true

# 并发配置
concurrency:
  max_concurrent: 10
  rate_limit: 100
  timeout: 30

# 知识库配置
knowledge_base:
  vector_store: "milvus"
  collection: "customer_service_kb_v3"
  embedding_model: "bge-large-zh"
  top_k: 5

配置管理器

import yaml
from dataclasses import dataclass
from typing import Any, Dict, Optional

@dataclass
class AgentConfig:
    """Agent配置"""
    version: str
    config_id: str
    model: Dict[str, Any]
    prompts: Dict[str, str]
    tools: list
    concurrency: Dict[str, Any]
    knowledge_base: Dict[str, Any]

class ConfigManager:
    """配置管理器"""

    def __init__(self, config_dir: str = "config"):
        self.config_dir = Path(config_dir)
        self.configs = {}
        self._load_all_configs()

    def _load_all_configs(self):
        """加载所有配置"""
        for config_file in self.config_dir.glob("*.yaml"):
            config_data = yaml.safe_load(config_file.read_text())
            config_id = config_data["config_id"]
            self.configs[config_id] = AgentConfig(**config_data)

    def get_config(self, config_id: str = None) -> AgentConfig:
        """获取配置"""
        if config_id:
            return self.configs.get(config_id)
        return self.get_current_config()

    def get_current_config(self) -> AgentConfig:
        """获取当前激活的配置"""
        current_file = self.config_dir / "current.txt"
        if current_file.exists():
            config_id = current_file.read_text().strip()
            return self.configs.get(config_id)

        # 返回最新配置
        return max(self.configs.values(), key=lambda c: c.version)

    def activate_config(self, config_id: str):
        """激活配置"""
        if config_id not in self.configs:
            raise ValueError(f"Config {config_id} not found")

        current_file = self.config_dir / "current.txt"
        current_file.write_text(config_id)

        # 记录激活日志
        self._log_activation(config_id)

    def create_config(self, base_config_id: str, changes: dict) -> str:
        """基于现有配置创建新配置"""
        base = self.get_config(base_config_id)
        if not base:
            raise ValueError(f"Base config {base_config_id} not found")

        # 创建新配置
        new_config_dict = asdict(base)
        new_config_dict.update(changes)
        new_config_dict["version"] = self._increment_version(base.version)
        new_config_dict["config_id"] = f"agent_{datetime.now().strftime('%Y%m%d')}_v1"

        # 保存
        new_config = AgentConfig(**new_config_dict)
        config_file = self.config_dir / f"{new_config.config_id}.yaml"
        config_file.write_text(yaml.dump(new_config_dict, allow_unicode=True))

        self.configs[new_config.config_id] = new_config
        return new_config.config_id

    def diff_configs(self, config_id1: str, config_id2: str) -> dict:
        """对比两个配置差异"""
        config1 = self.get_config(config_id1)
        config2 = self.get_config(config_id2)

        differences = {}
        for key in asdict(config1).keys():
            val1 = getattr(config1, key)
            val2 = getattr(config2, key)
            if val1 != val2:
                differences[key] = {
                    "old": val1,
                    "new": val2
                }

        return differences

    def _increment_version(self, version: str) -> str:
        """递增版本号"""
        parts = version.split(".")
        parts[-1] = str(int(parts[-1]) + 1)
        return ".".join(parts)

    def _log_activation(self, config_id: str):
        """记录激活日志"""
        log_file = self.config_dir / "activation.log"
        log_entry = f"{datetime.now().isoformat()} | {config_id} | {get_current_user()}n"
        log_file.write_text(log_entry, mode='a')

五、模型版本管理

模型版本追踪

class ModelVersionManager:
    """模型版本管理器"""

    def __init__(self):
        self.model_registry = {
            "openai": {
                "gpt-4": {
                    "release_date": "2023-03-14",
                    "status": "stable",
                    "aliases": ["gpt-4-0314"],
                    "deprecation_date": None
                },
                "gpt-4-turbo": {
                    "release_date": "2024-01-25",
                    "status": "stable",
                    "aliases": ["gpt-4-0125-preview", "gpt-4-1106-preview"],
                    "deprecation_date": None
                },
                "gpt-3.5-turbo": {
                    "release_date": "2023-03-01",
                    "status": "stable",
                    "aliases": ["gpt-3.5-turbo-0125"],
                    "deprecation_date": None
                }
            },
            "anthropic": {
                "claude-3-opus": {
                    "release_date": "2024-03-04",
                    "status": "stable",
                    "deprecation_date": None
                },
                "claude-3-sonnet": {
                    "release_date": "2024-03-04",
                    "status": "stable",
                    "deprecation_date": None
                }
            }
        }

    def get_model_info(self, provider: str, model: str) -> dict:
        """获取模型信息"""
        if provider not in self.model_registry:
            raise ValueError(f"Unknown provider: {provider}")

        # 支持别名查询
        for model_name, info in self.model_registry[provider].items():
            if model == model_name or model in info.get("aliases", []):
                return {"name": model_name, **info}

        raise ValueError(f"Unknown model: {model}")

    def check_deprecation(self, provider: str, model: str) -> dict:
        """检查模型是否即将废弃"""
        info = self.get_model_info(provider, model)

        if info.get("deprecation_date"):
            deprecation_date = datetime.fromisoformat(info["deprecation_date"])
            days_until = (deprecation_date - datetime.now()).days

            return {
                "deprecated": True,
                "deprecation_date": info["deprecation_date"],
                "days_until_deprecation": days_until,
                "recommended_alternative": info.get("recommended_alternative")
            }

        return {"deprecated": False}

    def list_available_models(self, provider: str = None) -> list:
        """列出可用模型"""
        if provider:
            return list(self.model_registry.get(provider, {}).keys())

        all_models = []
        for prov, models in self.model_registry.items():
            for model_name, info in models.items():
                if info["status"] == "stable":
                    all_models.append({
                        "provider": prov,
                        "model": model_name,
                        "release_date": info["release_date"]
                    })

        return all_models

模型迁移策略

class ModelMigration:
    """模型迁移管理"""

    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.migrations = []

    async def migrate(self, from_model: str, to_model: str, 
                      strategy: str = "gradual"):
        """迁移到新模型"""
        if strategy == "gradual":
            return await self._gradual_migration(from_model, to_model)
        elif strategy == "canary":
            return await self._canary_migration(from_model, to_model)
        else:
            return await self._immediate_migration(from_model, to_model)

    async def _gradual_migration(self, from_model: str, to_model: str):
        """渐进式迁移"""
        # 10% -> 30% -> 50% -> 100%
        percentages = [10, 30, 50, 100]

        for percentage in percentages:
            # 更新流量分配
            self._update_traffic_split(from_model, to_model, percentage)

            # 监控指标
            await self._monitor_migration(from_model, to_model)

            # 等待稳定
            await asyncio.sleep(3600)  # 1小时

        return {"status": "completed", "from": from_model, "to": to_model}

    async def _canary_migration(self, from_model: str, to_model: str):
        """金丝雀迁移"""
        # 只对部分用户开放新模型
        canary_users = self._get_canary_users()

        for user_id in canary_users:
            # 为这些用户使用新模型
            response = await self._process_with_model(
                user_id, 
                to_model, 
                get_user_request(user_id)
            )

            # 评估响应质量
            evaluation = await self._evaluate_response(response)

            if not evaluation["passed"]:
                # 回滚
                return {"status": "failed", "reason": evaluation["reason"]}

        # 全部通过,推广到所有用户
        return await self._immediate_migration(from_model, to_model)

    async def _immediate_migration(self, from_model: str, to_model: str):
        """立即迁移"""
        # 更新配置
        update_agent_model(self.agent_id, to_model)

        # 记录迁移
        self._log_migration(from_model, to_model)

        return {"status": "completed", "from": from_model, "to": to_model}

六、知识库版本管理

class KnowledgeBaseVersionManager:
    """知识库版本管理"""

    def __init__(self, vector_store):
        self.vector_store = vector_store
        self.snapshots_dir = Path("kb_snapshots")
        self.snapshots_dir.mkdir(exist_ok=True)

    def create_snapshot(self, collection: str, description: str = ""):
        """创建知识库快照"""
        snapshot_id = datetime.now().strftime("snapshot_%Y%m%d_%H%M%S")

        # 导出向量数据
        vectors = self.vector_store.export_collection(collection)

        # 保存快照
        snapshot_file = self.snapshots_dir / f"{snapshot_id}.json"
        snapshot_data = {
            "id": snapshot_id,
            "collection": collection,
            "description": description,
            "created_at": datetime.now().isoformat(),
            "document_count": len(vectors),
            "vectors": vectors
        }
        snapshot_file.write_text(json.dumps(snapshot_data, ensure_ascii=False))

        # 更新注册表
        self._register_snapshot(snapshot_id, collection, description)

        return snapshot_id

    def restore_snapshot(self, snapshot_id: str):
        """恢复知识库快照"""
        snapshot_file = self.snapshots_dir / f"{snapshot_id}.json"

        if not snapshot_file.exists():
            raise ValueError(f"Snapshot {snapshot_id} not found")

        snapshot_data = json.loads(snapshot_file.read_text())

        # 清空当前集合
        self.vector_store.clear_collection(snapshot_data["collection"])

        # 恢复数据
        self.vector_store.import_vectors(
            snapshot_data["collection"],
            snapshot_data["vectors"]
        )

        return {
            "status": "restored",
            "snapshot_id": snapshot_id,
            "document_count": snapshot_data["document_count"]
        }

    def list_snapshots(self, collection: str = None) -> list:
        """列出快照"""
        snapshots = []

        for snapshot_file in self.snapshots_dir.glob("snapshot_*.json"):
            data = json.loads(snapshot_file.read_text())
            if collection is None or data["collection"] == collection:
                snapshots.append({
                    "id": data["id"],
                    "collection": data["collection"],
                    "description": data["description"],
                    "created_at": data["created_at"],
                    "document_count": data["document_count"]
                })

        return sorted(snapshots, key=lambda x: x["created_at"], reverse=True)

七、版本发布流程

发布检查清单

class ReleaseManager:
    """发布管理器"""

    RELEASE_CHECKLIST = [
        {
            "item": "Prompt版本测试通过",
            "check": lambda ctx: ctx.prompt_test_passed
        },
        {
            "item": "配置变更已审核",
            "check": lambda ctx: ctx.config_reviewed
        },
        {
            "item": "回滚方案已准备",
            "check": lambda ctx: ctx.rollback_ready
        },
        {
            "item": "监控告警已配置",
            "check": lambda ctx: ctx.monitoring_configured
        },
        {
            "item": "文档已更新",
            "check": lambda ctx: ctx.docs_updated
        }
    ]

    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.release_history = []

    async def prepare_release(self, version: str, changes: dict):
        """准备发布"""
        release_context = ReleaseContext(
            version=version,
            changes=changes,
            agent_id=self.agent_id
        )

        # 执行检查清单
        checklist_results = []
        for item in self.RELEASE_CHECKLIST:
            passed = item["check"](release_context)
            checklist_results.append({
                "item": item["item"],
                "passed": passed
            })

        all_passed = all(r["passed"] for r in checklist_results)

        return {
            "version": version,
            "ready": all_passed,
            "checklist": checklist_results,
            "changes": changes
        }

    async def release(self, version: str, strategy: str = "blue-green"):
        """执行发布"""
        if strategy == "blue-green":
            return await self._blue_green_release(version)
        elif strategy == "canary":
            return await self._canary_release(version)
        elif strategy == "rolling":
            return await self._rolling_release(version)

    async def _blue_green_release(self, version: str):
        """蓝绿发布"""
        # 部署新版本(绿)
        await self._deploy_version(version, "green")

        # 验证
        if await self._verify_version(version):
            # 切换流量
            await self._switch_traffic("green")
            return {"status": "success", "version": version}
        else:
            # 回滚
            await self._switch_traffic("blue")
            return {"status": "failed", "reason": "verification failed"}

    async def rollback(self, version: str):
        """回滚到指定版本"""
        # 获取版本配置
        config = get_version_config(version)

        # 恢复Prompt
        prompt_manager.rollback(config.prompt_id, config.prompt_version)

        # 恢复配置
        config_manager.activate_config(config.config_id)

        # 恢复知识库
        if config.kb_snapshot:
            kb_manager.restore_snapshot(config.kb_snapshot)

        # 记录回滚
        self._log_rollback(version)

        return {"status": "rolled_back", "version": version}

八、版本对比与审计

版本对比报告

def generate_version_comparison_report(from_version: str, to_version: str) -> str:
    """生成版本对比报告"""
    from_config = get_version_config(from_version)
    to_config = get_version_config(to_version)

    return f"""
# 版本对比报告

## 版本信息
- 源版本: {from_version}
- 目标版本: {to_version}

## 配置变更
{format_config_diff(from_config, to_config)}

## Prompt变更
{prompt_manager.diff(from_config.prompt_id, from_config.prompt_version, to_config.prompt_version)}

## 模型变更
- 源模型: {from_config.model}
- 目标模型: {to_config.model}

## 工具变更
{format_tool_changes(from_config.tools, to_config.tools)}

## 知识库变更
- 源知识库版本: {from_config.kb_snapshot}
- 目标知识库版本: {to_config.kb_snapshot}
- 文档数量变化: {to_config.kb_doc_count - from_config.kb_doc_count}

## 风险评估
{assess_risk(from_config, to_config)}
    """

审计日志

class AuditLogger:
    """审计日志"""

    def __init__(self, log_file: str = "audit.log"):
        self.log_file = Path(log_file)

    def log_change(self, change_type: str, details: dict):
        """记录变更"""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "change_type": change_type,
            "user": get_current_user(),
            "details": details
        }

        with open(self.log_file, 'a') as f:
            f.write(json.dumps(entry, ensure_ascii=False) + "n")

    def get_change_history(self, filter_type: str = None, 
                          start_date: str = None, end_date: str = None):
        """获取变更历史"""
        history = []

        with open(self.log_file, 'r') as f:
            for line in f:
                entry = json.loads(line)

                if filter_type and entry["change_type"] != filter_type:
                    continue

                if start_date and entry["timestamp"]  end_date:
                    continue

                history.append(entry)

        return history

九、总结与最佳实践

版本管理原则

  1. 一切皆版本:Prompt、配置、知识库都要有版本
  2. 变更可追溯:谁、何时、改了什么
  3. 回滚有能力:任何变更都能回滚
  4. 测试先行:新版本必须经过测试
  5. 渐进发布:避免一次性全量切换

检查清单

层级 管理项 状态
Prompt 版本注册
Prompt 版本测试
配置 YAML版本化
配置 变更审批
模型 版本追踪
模型 迁移策略
知识库 快照机制
发布 检查清单
发布 回滚方案

下期预告

明天聊聊AI Agent灾难恢复——当AI”挂了”怎么办?


往期回顾

正文完
 0
技术老金
版权声明:本站原创文章,由 技术老金 于2026-03-30发表,共计17591字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)