AI Agent开发实战(十):运维监控让Agent稳定运行

8次阅读
没有评论

AI Agent开发实战(十):运维监控让Agent稳定运行

一、开场:上线只是开始

大家好,我是老金。

Agent上线后不是结束,而是开始。

需要持续监控:

  • 性能指标
  • 错误率
  • 用户满意度
  • 成本控制

今天讲Agent运维监控。

二、监控体系

2.1 监控架构

┌─────────────────────────────────────────────────────────┐
│                  Agent运维监控架构                     │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │              数据采集层                         │   │
│  │  • 应用日志                                      │   │
│  │  • 指标数据                                      │   │
│  │  • 链路追踪                                      │   │
│  │  • 用户行为                                      │   │
│  └────────────────────┬────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │              数据存储层                         │   │
│  │  • Prometheus (指标)                            │   │
│  │  • Elasticsearch (日志)                         │   │
│  │  • Jaeger (追踪)                                │   │
│  │  • ClickHouse (分析)                            │   │
│  └────────────────────┬────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │              可视化层                           │   │
│  │  • Grafana                                      │   │
│  │  • Kibana                                       │   │
│  │  • 自定义Dashboard                              │   │
│  └────────────────────┬────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │              告警层                             │   │
│  │  • AlertManager                                 │   │
│  │  • PagerDuty                                    │   │
│  │  • 企业微信/钉钉                                │   │
│  └─────────────────────────────────────────────────┘   │
│                                                         │
└─────────────────────────────────────────────────────────┘

三、指标监控

3.1 核心指标

# src/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Info
import time
from functools import wraps
from typing import Callable, Any

# ============ 应用指标 ============

# 请求计数
REQUEST_COUNT = Counter(
    'agent_requests_total',
    'Total request count',
    ['method', 'endpoint', 'status', 'user_type']
)

# 请求延迟
REQUEST_LATENCY = Histogram(
    'agent_request_latency_seconds',
    'Request latency in seconds',
    ['method', 'endpoint'],
    buckets=[0.1, 0.3, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)

# 活跃会话
ACTIVE_SESSIONS = Gauge(
    'agent_active_sessions',
    'Number of active sessions',
    ['agent_type']
)

# ============ LLM指标 ============

# Token使用
LLM_TOKENS = Counter(
    'agent_llm_tokens_total',
    'Total LLM tokens used',
    ['provider', 'model', 'type']  # type: input/output/total
)

# LLM调用延迟
LLM_LATENCY = Histogram(
    'agent_llm_latency_seconds',
    'LLM API call latency',
    ['provider', 'model'],
    buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)

# LLM错误
LLM_ERRORS = Counter(
    'agent_llm_errors_total',
    'Total LLM errors',
    ['provider', 'model', 'error_type']
)

# ============ Agent指标 ============

# Agent执行次数
AGENT_EXECUTIONS = Counter(
    'agent_executions_total',
    'Total agent executions',
    ['agent_type', 'status']  # status: success/failure/timeout
)

# Agent执行时长
AGENT_DURATION = Histogram(
    'agent_duration_seconds',
    'Agent execution duration',
    ['agent_type'],
    buckets=[1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0]
)

# 工具调用
TOOL_CALLS = Counter(
    'agent_tool_calls_total',
    'Total tool calls',
    ['tool_name', 'status']
)

# ============ 成本指标 ============

# API成本
API_COST = Counter(
    'agent_api_cost_dollars',
    'Total API cost in dollars',
    ['provider', 'model']
)

# 装饰器:自动记录指标
def monitor_agent(agent_type: str):
    """Agent监控装饰器"""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            status = "success"

            try:
                result = await func(*args, **kwargs)
                return result
            except Exception as e:
                status = "failure"
                raise
            finally:
                duration = time.time() - start_time

                # 记录指标
                AGENT_EXECUTIONS.labels(
                    agent_type=agent_type,
                    status=status
                ).inc()

                AGENT_DURATION.labels(
                    agent_type=agent_type
                ).observe(duration)

        return wrapper
    return decorator

# 使用示例
@monitor_agent(agent_type="chat")
async def run_chat_agent(message: str):
    # Agent逻辑
    pass

3.2 自定义指标收集器

# src/monitoring/collector.py
from typing import Dict, Any, List
from datetime import datetime
import asyncio

class MetricsCollector:
    """指标收集器"""

    def __init__(self):
        self.buffer: List[Dict[str, Any]] = []
        self.flush_interval = 60  # 秒
        self._task = None

    async def start(self):
        """启动收集器"""
        self._task = asyncio.create_task(self._flush_loop())

    async def stop(self):
        """停止收集器"""
        if self._task:
            self._task.cancel()

    def record(
        self,
        metric_name: str,
        value: Any,
        tags: Dict[str, str] = None,
        timestamp: datetime = None
    ):
        """记录指标"""
        entry = {
            "metric": metric_name,
            "value": value,
            "tags": tags or {},
            "timestamp": timestamp or datetime.now()
        }

        self.buffer.append(entry)

    async def _flush_loop(self):
        """定期刷新"""
        while True:
            await asyncio.sleep(self.flush_interval)
            await self._flush()

    async def _flush(self):
        """刷新到存储"""
        if not self.buffer:
            return

        # 批量写入
        entries = self.buffer.copy()
        self.buffer.clear()

        # 写入到存储(如InfluxDB、ClickHouse)
        await self._write_to_storage(entries)

    async def _write_to_storage(self, entries: List[Dict]):
        """写入存储"""
        # 实现具体的存储逻辑
        pass

# Agent统计收集
class AgentStatsCollector(MetricsCollector):
    """Agent统计收集器"""

    def record_conversation(
        self,
        agent_type: str,
        user_id: str,
        duration: float,
        tokens: int,
        tool_calls: int,
        success: bool
    ):
        """记录对话统计"""
        self.record(
            metric_name="agent.conversation",
            value=1,
            tags={
                "agent_type": agent_type,
                "user_id": user_id,
                "success": str(success)
            }
        )

        self.record(
            metric_name="agent.duration",
            value=duration,
            tags={"agent_type": agent_type}
        )

        self.record(
            metric_name="agent.tokens",
            value=tokens,
            tags={"agent_type": agent_type}
        )

        self.record(
            metric_name="agent.tool_calls",
            value=tool_calls,
            tags={"agent_type": agent_type}
        )

四、日志管理

4.1 结构化日志

# src/monitoring/logging_config.py
from loguru import logger
import json
import sys

def setup_logging(
    log_level: str = "INFO",
    json_format: bool = True
):
    """配置结构化日志"""
    logger.remove()

    if json_format:
        # JSON格式(便于ELK收集)
        def json_sink(message):
            record = message.record
            log_entry = {
                "timestamp": record["time"].isoformat(),
                "level": record["level"].name,
                "message": record["message"],
                "module": record["module"],
                "function": record["function"],
                "line": record["line"],
                "extra": record["extra"]
            }
            print(json.dumps(log_entry, ensure_ascii=False))

        logger.add(
            json_sink,
            level=log_level,
            colorize=False
        )
    else:
        # 可读格式
        logger.add(
            sys.stdout,
            format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {module}:{function}:{line} | {message}",
            level=log_level,
            colorize=True
        )

    # 文件日志
    logger.add(
        "logs/agent_{time:YYYY-MM-DD}.log",
        rotation="00:00",
        retention="30 days",
        compression="zip",
        level="DEBUG",
        serialize=True  # JSON格式
    )

    return logger

# 使用:带上下文的日志
class ContextualLogger:
    """上下文日志"""

    def __init__(self, context: Dict[str, Any] = None):
        self.context = context or {}

    def with_context(self, **kwargs) -> 'ContextualLogger':
        """添加上下文"""
        new_context = {**self.context, **kwargs}
        return ContextualLogger(new_context)

    def info(self, message: str, **kwargs):
        logger.bind(**self.context, **kwargs).info(message)

    def error(self, message: str, **kwargs):
        logger.bind(**self.context, **kwargs).error(message)

    def warning(self, message: str, **kwargs):
        logger.bind(**self.context, **kwargs).warning(message)

# 使用示例
ctx_logger = ContextualLogger({"user_id": "user123", "session_id": "sess456"})
ctx_logger.info("User started conversation")
ctx_logger.with_context(agent_type="chat").info("Agent processing message")

4.2 请求追踪

# src/monitoring/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger import JaegerExporter
from functools import wraps
import uuid

# 配置追踪
tracer_provider = TracerProvider()
jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831
)
tracer_provider.add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)
trace.set_tracer_provider(tracer_provider)

tracer = trace.get_tracer(__name__)

class RequestTracer:
    """请求追踪器"""

    def __init__(self, request_id: str = None):
        self.request_id = request_id or str(uuid.uuid4())
        self.spans = []

    def start_span(self, name: str) -> trace.Span:
        """开始Span"""
        span = tracer.start_span(name)
        span.set_attribute("request_id", self.request_id)
        self.spans.append(span)
        return span

    def trace_agent(self, agent_type: str):
        """Agent追踪装饰器"""
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                with tracer.start_as_current_span(
                    f"agent.{agent_type}"
                ) as span:
                    span.set_attribute("agent_type", agent_type)
                    span.set_attribute("request_id", self.request_id)

                    try:
                        result = await func(*args, **kwargs)
                        span.set_attribute("status", "success")
                        return result
                    except Exception as e:
                        span.set_attribute("status", "error")
                        span.set_attribute("error_message", str(e))
                        raise
            return wrapper
        return decorator

    def trace_tool(self, tool_name: str):
        """工具追踪装饰器"""
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                with tracer.start_as_current_span(
                    f"tool.{tool_name}"
                ) as span:
                    span.set_attribute("tool_name", tool_name)

                    start_time = time.time()
                    try:
                        result = await func(*args, **kwargs)
                        span.set_attribute("status", "success")
                        return result
                    except Exception as e:
                        span.set_attribute("status", "error")
                        raise
                    finally:
                        duration = time.time() - start_time
                        span.set_attribute("duration_ms", duration * 1000)
            return wrapper
        return decorator

# 使用
tracer = RequestTracer()

@tracer.trace_agent("chat")
async def run_chat_agent(message):
    # ...
    pass

@tracer.trace_tool("get_weather")
async def execute_weather_tool(city):
    # ...
    pass

五、告警系统

5.1 告警规则

# alerting_rules.yml
groups:
  - name: agent_alerts
    interval: 30s
    rules:
      # 错误率告警
      - alert: HighErrorRate
        expr: |
          rate(agent_requests_total{status="error"}[5m]) 
          / 
          rate(agent_requests_total[5m]) 
          > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Agent错误率过高"
          description: "5分钟内错误率超过5%"

      # 延迟告警
      - alert: HighLatency
        expr: |
          histogram_quantile(0.95, 
            rate(agent_request_latency_seconds_bucket[5m])
          ) > 30
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Agent响应延迟过高"
          description: "P95延迟超过30秒"

      # Token成本告警
      - alert: HighTokenUsage
        expr: |
          rate(agent_llm_tokens_total[1h]) > 100000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Token使用量过高"
          description: "每小时Token使用超过10万"

      # Agent失败告警
      - alert: AgentFailure
        expr: |
          increase(agent_executions_total{status="failure"}[5m]) > 10
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Agent执行失败次数过多"
          description: "5分钟内超过10次失败"

5.2 告警通知

# src/monitoring/alerting.py
from typing import Dict, List, Any
from enum import Enum
import aiohttp
import asyncio

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

class Alerter:
    """告警器"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.webhook_url = config.get("webhook_url")
        self.email_recipients = config.get("email_recipients", [])

    async def send_alert(
        self,
        title: str,
        message: str,
        severity: AlertSeverity,
        details: Dict[str, Any] = None
    ):
        """发送告警"""
        # 发送Webhook
        if self.webhook_url:
            await self._send_webhook(title, message, severity, details)

        # 发送邮件
        if self.email_recipients and severity in [AlertSeverity.WARNING, AlertSeverity.CRITICAL]:
            await self._send_email(title, message, severity, details)

        # 企业微信/钉钉
        if severity == AlertSeverity.CRITICAL:
            await self._send_im(title, message, details)

    async def _send_webhook(
        self,
        title: str,
        message: str,
        severity: AlertSeverity,
        details: Dict[str, Any]
    ):
        """发送Webhook"""
        payload = {
            "title": title,
            "message": message,
            "severity": severity.value,
            "details": details,
            "timestamp": datetime.now().isoformat()
        }

        async with aiohttp.ClientSession() as session:
            async with session.post(
                self.webhook_url,
                json=payload
            ) as response:
                if response.status != 200:
                    logger.error(f"Webhook alert failed: {response.status}")

    async def _send_email(
        self,
        title: str,
        message: str,
        severity: AlertSeverity,
        details: Dict[str, Any]
    ):
        """发送邮件"""
        # 实现邮件发送
        pass

    async def _send_im(
        self,
        title: str,
        message: str,
        details: Dict[str, Any]
    ):
        """发送即时消息"""
        # 企业微信/钉钉机器人
        pass

# 告警管理器
class AlertManager:
    """告警管理器"""

    def __init__(self, alerter: Alerter):
        self.alerter = alerter
        self.alert_counts: Dict[str, int] = {}
        self.cooldown: Dict[str, datetime] = {}

    async def check_and_alert(
        self,
        alert_name: str,
        condition: bool,
        title: str,
        message: str,
        severity: AlertSeverity,
        cooldown_seconds: int = 300
    ):
        """检查并告警"""
        if not condition:
            return

        # 检查冷却时间
        if alert_name in self.cooldown:
            if datetime.now() - self.cooldown[alert_name]  Dict[str, Any]:
    """获取概览数据"""
    return {
        "total_requests": await get_total_requests(),
        "active_users": await get_active_users(),
        "error_rate": await get_error_rate(),
        "avg_latency": await get_avg_latency(),
        "tokens_today": await get_tokens_today(),
        "cost_today": await get_cost_today()
    }

@router.get("/dashboard/trends")
async def get_trends(
    metric: str,
    period: str = "24h"
) -> List[Dict[str, Any]]:
    """获取趋势数据"""
    # 解析时间范围
    end_time = datetime.now()
    if period == "24h":
        start_time = end_time - timedelta(hours=24)
    elif period == "7d":
        start_time = end_time - timedelta(days=7)
    else:
        start_time = end_time - timedelta(hours=24)

    # 查询数据
    data = await query_metric(metric, start_time, end_time)

    return data

@router.get("/dashboard/top_users")
async def get_top_users(limit: int = 10) -> List[Dict]:
    """获取活跃用户排行"""
    return await query_top_users(limit)

@router.get("/dashboard/top_tools")
async def get_top_tools(limit: int = 10) -> List[Dict]:
    """获取热门工具排行"""
    return await query_top_tools(limit)

七、成本优化

7.1 成本追踪

# src/monitoring/cost_tracking.py
from typing import Dict
from datetime import datetime

class CostTracker:
    """成本追踪器"""

    # API价格(美元/1K tokens)
    PRICING = {
        "openai": {
            "gpt-4-turbo-preview": {"input": 0.01, "output": 0.03},
            "gpt-4": {"input": 0.03, "output": 0.06},
            "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
        },
        "anthropic": {
            "claude-3-opus": {"input": 0.015, "output": 0.075},
            "claude-3-sonnet": {"input": 0.003, "output": 0.015},
        }
    }

    def __init__(self):
        self.costs: Dict[str, float] = {}

    def calculate_cost(
        self,
        provider: str,
        model: str,
        input_tokens: int,
        output_tokens: int
    ) -> float:
        """计算成本"""
        if provider not in self.PRICING:
            return 0.0

        model_pricing = self.PRICING[provider].get(model, {})
        if not model_pricing:
            return 0.0

        input_cost = (input_tokens / 1000) * model_pricing.get("input", 0)
        output_cost = (output_tokens / 1000) * model_pricing.get("output", 0)

        return input_cost + output_cost

    def record_usage(
        self,
        provider: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        user_id: str = None
    ):
        """记录使用"""
        cost = self.calculate_cost(provider, model, input_tokens, output_tokens)

        # 更新累计成本
        key = f"{provider}:{model}"
        self.costs[key] = self.costs.get(key, 0) + cost

        # 记录指标
        API_COST.labels(provider=provider, model=model).inc(cost)

        return cost

    def get_report(self, period: str = "today") -> Dict:
        """获取成本报告"""
        total = sum(self.costs.values())

        by_model = {}
        for key, cost in self.costs.items():
            provider, model = key.split(":")
            if model not in by_model:
                by_model[model] = {"cost": 0, "provider": provider}
            by_model[model]["cost"] += cost

        return {
            "total_cost": total,
            "by_model": by_model,
            "period": period
        }

八、最佳实践

8.1 运维检查清单

检查项 频率 说明
错误率监控 实时 <5%
延迟监控 实时 P95 < 30s
成本监控 每日 预算控制
日志检查 每日 异常模式
性能优化 每周 瓶颈分析
备份检查 每周 数据安全

8.2 故障处理流程

故障发现 → 初步评估 → 影响范围 → 临时修复 → 根本原因 → 永久修复 → 复盘总结

九、总结

监控要点

  1. 指标监控:性能、错误、成本
  2. 日志管理:结构化、可追踪
  3. 告警系统:及时、准确
  4. 可视化:一目了然
  5. 成本优化:持续追踪

系列总结

AI Agent开发实战系列(10篇) 到此完结!

我们从环境搭建开始,经历了:

  1. 环境搭建
  2. 第一个Agent
  3. 工具调用
  4. 记忆系统
  5. Multi-Agent协作
  6. 工作流编排
  7. 测试与评估
  8. 部署上线
  9. 安全防护
  10. 运维监控

希望这个系列能帮你从零开始构建生产级AI Agent!


往期回顾

正文完
 0
技术老金
版权声明:本站原创文章,由 技术老金 于2026-04-01发表,共计13368字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)