AI Agent开发实战(十):运维监控让Agent稳定运行
一、开场:上线只是开始
大家好,我是老金。
Agent上线后不是结束,而是开始。
需要持续监控:
- 性能指标
- 错误率
- 用户满意度
- 成本控制
今天讲Agent运维监控。
二、监控体系
2.1 监控架构
┌─────────────────────────────────────────────────────────┐
│ Agent运维监控架构 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 数据采集层 │ │
│ │ • 应用日志 │ │
│ │ • 指标数据 │ │
│ │ • 链路追踪 │ │
│ │ • 用户行为 │ │
│ └────────────────────┬────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 数据存储层 │ │
│ │ • Prometheus (指标) │ │
│ │ • Elasticsearch (日志) │ │
│ │ • Jaeger (追踪) │ │
│ │ • ClickHouse (分析) │ │
│ └────────────────────┬────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 可视化层 │ │
│ │ • Grafana │ │
│ │ • Kibana │ │
│ │ • 自定义Dashboard │ │
│ └────────────────────┬────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 告警层 │ │
│ │ • AlertManager │ │
│ │ • PagerDuty │ │
│ │ • 企业微信/钉钉 │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
三、指标监控
3.1 核心指标
# src/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Info
import time
from functools import wraps
from typing import Callable, Any
# ============ 应用指标 ============
# 请求计数
REQUEST_COUNT = Counter(
'agent_requests_total',
'Total request count',
['method', 'endpoint', 'status', 'user_type']
)
# 请求延迟
REQUEST_LATENCY = Histogram(
'agent_request_latency_seconds',
'Request latency in seconds',
['method', 'endpoint'],
buckets=[0.1, 0.3, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)
# 活跃会话
ACTIVE_SESSIONS = Gauge(
'agent_active_sessions',
'Number of active sessions',
['agent_type']
)
# ============ LLM指标 ============
# Token使用
LLM_TOKENS = Counter(
'agent_llm_tokens_total',
'Total LLM tokens used',
['provider', 'model', 'type'] # type: input/output/total
)
# LLM调用延迟
LLM_LATENCY = Histogram(
'agent_llm_latency_seconds',
'LLM API call latency',
['provider', 'model'],
buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
# LLM错误
LLM_ERRORS = Counter(
'agent_llm_errors_total',
'Total LLM errors',
['provider', 'model', 'error_type']
)
# ============ Agent指标 ============
# Agent执行次数
AGENT_EXECUTIONS = Counter(
'agent_executions_total',
'Total agent executions',
['agent_type', 'status'] # status: success/failure/timeout
)
# Agent执行时长
AGENT_DURATION = Histogram(
'agent_duration_seconds',
'Agent execution duration',
['agent_type'],
buckets=[1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0]
)
# 工具调用
TOOL_CALLS = Counter(
'agent_tool_calls_total',
'Total tool calls',
['tool_name', 'status']
)
# ============ 成本指标 ============
# API成本
API_COST = Counter(
'agent_api_cost_dollars',
'Total API cost in dollars',
['provider', 'model']
)
# 装饰器:自动记录指标
def monitor_agent(agent_type: str):
"""Agent监控装饰器"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
status = "success"
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
status = "failure"
raise
finally:
duration = time.time() - start_time
# 记录指标
AGENT_EXECUTIONS.labels(
agent_type=agent_type,
status=status
).inc()
AGENT_DURATION.labels(
agent_type=agent_type
).observe(duration)
return wrapper
return decorator
# 使用示例
@monitor_agent(agent_type="chat")
async def run_chat_agent(message: str):
# Agent逻辑
pass
3.2 自定义指标收集器
# src/monitoring/collector.py
from typing import Dict, Any, List
from datetime import datetime
import asyncio
class MetricsCollector:
"""指标收集器"""
def __init__(self):
self.buffer: List[Dict[str, Any]] = []
self.flush_interval = 60 # 秒
self._task = None
async def start(self):
"""启动收集器"""
self._task = asyncio.create_task(self._flush_loop())
async def stop(self):
"""停止收集器"""
if self._task:
self._task.cancel()
def record(
self,
metric_name: str,
value: Any,
tags: Dict[str, str] = None,
timestamp: datetime = None
):
"""记录指标"""
entry = {
"metric": metric_name,
"value": value,
"tags": tags or {},
"timestamp": timestamp or datetime.now()
}
self.buffer.append(entry)
async def _flush_loop(self):
"""定期刷新"""
while True:
await asyncio.sleep(self.flush_interval)
await self._flush()
async def _flush(self):
"""刷新到存储"""
if not self.buffer:
return
# 批量写入
entries = self.buffer.copy()
self.buffer.clear()
# 写入到存储(如InfluxDB、ClickHouse)
await self._write_to_storage(entries)
async def _write_to_storage(self, entries: List[Dict]):
"""写入存储"""
# 实现具体的存储逻辑
pass
# Agent统计收集
class AgentStatsCollector(MetricsCollector):
"""Agent统计收集器"""
def record_conversation(
self,
agent_type: str,
user_id: str,
duration: float,
tokens: int,
tool_calls: int,
success: bool
):
"""记录对话统计"""
self.record(
metric_name="agent.conversation",
value=1,
tags={
"agent_type": agent_type,
"user_id": user_id,
"success": str(success)
}
)
self.record(
metric_name="agent.duration",
value=duration,
tags={"agent_type": agent_type}
)
self.record(
metric_name="agent.tokens",
value=tokens,
tags={"agent_type": agent_type}
)
self.record(
metric_name="agent.tool_calls",
value=tool_calls,
tags={"agent_type": agent_type}
)
四、日志管理
4.1 结构化日志
# src/monitoring/logging_config.py
from loguru import logger
import json
import sys
def setup_logging(
log_level: str = "INFO",
json_format: bool = True
):
"""配置结构化日志"""
logger.remove()
if json_format:
# JSON格式(便于ELK收集)
def json_sink(message):
record = message.record
log_entry = {
"timestamp": record["time"].isoformat(),
"level": record["level"].name,
"message": record["message"],
"module": record["module"],
"function": record["function"],
"line": record["line"],
"extra": record["extra"]
}
print(json.dumps(log_entry, ensure_ascii=False))
logger.add(
json_sink,
level=log_level,
colorize=False
)
else:
# 可读格式
logger.add(
sys.stdout,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {module}:{function}:{line} | {message}",
level=log_level,
colorize=True
)
# 文件日志
logger.add(
"logs/agent_{time:YYYY-MM-DD}.log",
rotation="00:00",
retention="30 days",
compression="zip",
level="DEBUG",
serialize=True # JSON格式
)
return logger
# 使用:带上下文的日志
class ContextualLogger:
"""上下文日志"""
def __init__(self, context: Dict[str, Any] = None):
self.context = context or {}
def with_context(self, **kwargs) -> 'ContextualLogger':
"""添加上下文"""
new_context = {**self.context, **kwargs}
return ContextualLogger(new_context)
def info(self, message: str, **kwargs):
logger.bind(**self.context, **kwargs).info(message)
def error(self, message: str, **kwargs):
logger.bind(**self.context, **kwargs).error(message)
def warning(self, message: str, **kwargs):
logger.bind(**self.context, **kwargs).warning(message)
# 使用示例
ctx_logger = ContextualLogger({"user_id": "user123", "session_id": "sess456"})
ctx_logger.info("User started conversation")
ctx_logger.with_context(agent_type="chat").info("Agent processing message")
4.2 请求追踪
# src/monitoring/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger import JaegerExporter
from functools import wraps
import uuid
# 配置追踪
tracer_provider = TracerProvider()
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831
)
tracer_provider.add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer(__name__)
class RequestTracer:
"""请求追踪器"""
def __init__(self, request_id: str = None):
self.request_id = request_id or str(uuid.uuid4())
self.spans = []
def start_span(self, name: str) -> trace.Span:
"""开始Span"""
span = tracer.start_span(name)
span.set_attribute("request_id", self.request_id)
self.spans.append(span)
return span
def trace_agent(self, agent_type: str):
"""Agent追踪装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
with tracer.start_as_current_span(
f"agent.{agent_type}"
) as span:
span.set_attribute("agent_type", agent_type)
span.set_attribute("request_id", self.request_id)
try:
result = await func(*args, **kwargs)
span.set_attribute("status", "success")
return result
except Exception as e:
span.set_attribute("status", "error")
span.set_attribute("error_message", str(e))
raise
return wrapper
return decorator
def trace_tool(self, tool_name: str):
"""工具追踪装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
with tracer.start_as_current_span(
f"tool.{tool_name}"
) as span:
span.set_attribute("tool_name", tool_name)
start_time = time.time()
try:
result = await func(*args, **kwargs)
span.set_attribute("status", "success")
return result
except Exception as e:
span.set_attribute("status", "error")
raise
finally:
duration = time.time() - start_time
span.set_attribute("duration_ms", duration * 1000)
return wrapper
return decorator
# 使用
tracer = RequestTracer()
@tracer.trace_agent("chat")
async def run_chat_agent(message):
# ...
pass
@tracer.trace_tool("get_weather")
async def execute_weather_tool(city):
# ...
pass
五、告警系统
5.1 告警规则
# alerting_rules.yml
groups:
- name: agent_alerts
interval: 30s
rules:
# 错误率告警
- alert: HighErrorRate
expr: |
rate(agent_requests_total{status="error"}[5m])
/
rate(agent_requests_total[5m])
> 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "Agent错误率过高"
description: "5分钟内错误率超过5%"
# 延迟告警
- alert: HighLatency
expr: |
histogram_quantile(0.95,
rate(agent_request_latency_seconds_bucket[5m])
) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "Agent响应延迟过高"
description: "P95延迟超过30秒"
# Token成本告警
- alert: HighTokenUsage
expr: |
rate(agent_llm_tokens_total[1h]) > 100000
for: 10m
labels:
severity: warning
annotations:
summary: "Token使用量过高"
description: "每小时Token使用超过10万"
# Agent失败告警
- alert: AgentFailure
expr: |
increase(agent_executions_total{status="failure"}[5m]) > 10
for: 5m
labels:
severity: critical
annotations:
summary: "Agent执行失败次数过多"
description: "5分钟内超过10次失败"
5.2 告警通知
# src/monitoring/alerting.py
from typing import Dict, List, Any
from enum import Enum
import aiohttp
import asyncio
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
class Alerter:
"""告警器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.webhook_url = config.get("webhook_url")
self.email_recipients = config.get("email_recipients", [])
async def send_alert(
self,
title: str,
message: str,
severity: AlertSeverity,
details: Dict[str, Any] = None
):
"""发送告警"""
# 发送Webhook
if self.webhook_url:
await self._send_webhook(title, message, severity, details)
# 发送邮件
if self.email_recipients and severity in [AlertSeverity.WARNING, AlertSeverity.CRITICAL]:
await self._send_email(title, message, severity, details)
# 企业微信/钉钉
if severity == AlertSeverity.CRITICAL:
await self._send_im(title, message, details)
async def _send_webhook(
self,
title: str,
message: str,
severity: AlertSeverity,
details: Dict[str, Any]
):
"""发送Webhook"""
payload = {
"title": title,
"message": message,
"severity": severity.value,
"details": details,
"timestamp": datetime.now().isoformat()
}
async with aiohttp.ClientSession() as session:
async with session.post(
self.webhook_url,
json=payload
) as response:
if response.status != 200:
logger.error(f"Webhook alert failed: {response.status}")
async def _send_email(
self,
title: str,
message: str,
severity: AlertSeverity,
details: Dict[str, Any]
):
"""发送邮件"""
# 实现邮件发送
pass
async def _send_im(
self,
title: str,
message: str,
details: Dict[str, Any]
):
"""发送即时消息"""
# 企业微信/钉钉机器人
pass
# 告警管理器
class AlertManager:
"""告警管理器"""
def __init__(self, alerter: Alerter):
self.alerter = alerter
self.alert_counts: Dict[str, int] = {}
self.cooldown: Dict[str, datetime] = {}
async def check_and_alert(
self,
alert_name: str,
condition: bool,
title: str,
message: str,
severity: AlertSeverity,
cooldown_seconds: int = 300
):
"""检查并告警"""
if not condition:
return
# 检查冷却时间
if alert_name in self.cooldown:
if datetime.now() - self.cooldown[alert_name] Dict[str, Any]:
"""获取概览数据"""
return {
"total_requests": await get_total_requests(),
"active_users": await get_active_users(),
"error_rate": await get_error_rate(),
"avg_latency": await get_avg_latency(),
"tokens_today": await get_tokens_today(),
"cost_today": await get_cost_today()
}
@router.get("/dashboard/trends")
async def get_trends(
metric: str,
period: str = "24h"
) -> List[Dict[str, Any]]:
"""获取趋势数据"""
# 解析时间范围
end_time = datetime.now()
if period == "24h":
start_time = end_time - timedelta(hours=24)
elif period == "7d":
start_time = end_time - timedelta(days=7)
else:
start_time = end_time - timedelta(hours=24)
# 查询数据
data = await query_metric(metric, start_time, end_time)
return data
@router.get("/dashboard/top_users")
async def get_top_users(limit: int = 10) -> List[Dict]:
"""获取活跃用户排行"""
return await query_top_users(limit)
@router.get("/dashboard/top_tools")
async def get_top_tools(limit: int = 10) -> List[Dict]:
"""获取热门工具排行"""
return await query_top_tools(limit)
七、成本优化
7.1 成本追踪
# src/monitoring/cost_tracking.py
from typing import Dict
from datetime import datetime
class CostTracker:
"""成本追踪器"""
# API价格(美元/1K tokens)
PRICING = {
"openai": {
"gpt-4-turbo-preview": {"input": 0.01, "output": 0.03},
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
},
"anthropic": {
"claude-3-opus": {"input": 0.015, "output": 0.075},
"claude-3-sonnet": {"input": 0.003, "output": 0.015},
}
}
def __init__(self):
self.costs: Dict[str, float] = {}
def calculate_cost(
self,
provider: str,
model: str,
input_tokens: int,
output_tokens: int
) -> float:
"""计算成本"""
if provider not in self.PRICING:
return 0.0
model_pricing = self.PRICING[provider].get(model, {})
if not model_pricing:
return 0.0
input_cost = (input_tokens / 1000) * model_pricing.get("input", 0)
output_cost = (output_tokens / 1000) * model_pricing.get("output", 0)
return input_cost + output_cost
def record_usage(
self,
provider: str,
model: str,
input_tokens: int,
output_tokens: int,
user_id: str = None
):
"""记录使用"""
cost = self.calculate_cost(provider, model, input_tokens, output_tokens)
# 更新累计成本
key = f"{provider}:{model}"
self.costs[key] = self.costs.get(key, 0) + cost
# 记录指标
API_COST.labels(provider=provider, model=model).inc(cost)
return cost
def get_report(self, period: str = "today") -> Dict:
"""获取成本报告"""
total = sum(self.costs.values())
by_model = {}
for key, cost in self.costs.items():
provider, model = key.split(":")
if model not in by_model:
by_model[model] = {"cost": 0, "provider": provider}
by_model[model]["cost"] += cost
return {
"total_cost": total,
"by_model": by_model,
"period": period
}
八、最佳实践
8.1 运维检查清单
| 检查项 | 频率 | 说明 |
|---|---|---|
| 错误率监控 | 实时 | <5% |
| 延迟监控 | 实时 | P95 < 30s |
| 成本监控 | 每日 | 预算控制 |
| 日志检查 | 每日 | 异常模式 |
| 性能优化 | 每周 | 瓶颈分析 |
| 备份检查 | 每周 | 数据安全 |
8.2 故障处理流程
故障发现 → 初步评估 → 影响范围 → 临时修复 → 根本原因 → 永久修复 → 复盘总结
九、总结
监控要点
- 指标监控:性能、错误、成本
- 日志管理:结构化、可追踪
- 告警系统:及时、准确
- 可视化:一目了然
- 成本优化:持续追踪
系列总结
AI Agent开发实战系列(10篇) 到此完结!
我们从环境搭建开始,经历了:
- 环境搭建
- 第一个Agent
- 工具调用
- 记忆系统
- Multi-Agent协作
- 工作流编排
- 测试与评估
- 部署上线
- 安全防护
- 运维监控
希望这个系列能帮你从零开始构建生产级AI Agent!
往期回顾
正文完