AI Agent可观测性:日志、追踪、指标三位一体
一、开场:AI出问题时,你知道发生了什么吗?
大家好,我是老金。
上周用户反馈:”AI有时候不回消息。”
我问:”什么时候?什么情况?”
用户:”不知道,就是有时候会卡住。”
我打开日志一看——全是INFO级别的流水账,根本找不到问题在哪。
这让我意识到一个严重问题:AI Agent系统的可观测性太差了。
传统软件出问题可以看日志、看堆栈,AI Agent呢?推理过程在LLM那边,工具调用散落各处,根本不知道发生了什么。
今天聊聊如何给AI Agent装上”眼睛”,实现完整的可观测性。
二、可观测性三支柱
┌─────────────────────────────────────────────────────────┐
│ 可观测性三支柱 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 日志 │ │ 追踪 │ │ 指标 │ │
│ │ Logs │ │ Traces │ │ Metrics │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └───────────────┴───────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ 统一视图 │ │
│ │ (Grafana/Datadog) │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
• 日志:发生了什么?(事件记录)
• 追踪:怎么发生的?(调用链路)
• 指标:发生得怎么样?(量化数据)
三、结构化日志
为什么传统日志不够?
# ❌ 传统日志 - 难以查询和分析
print("用户查询订单ORD12345")
print("调用query_order工具")
print("返回结果:已发货")
# 搜索"ORD12345"可能找到几百条
# 想知道这个请求耗时多久?不知道
# 想知道工具调用参数?没有记录
结构化日志设计
import json
import structlog
from datetime import datetime
# 配置structlog
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
class AgentLogger:
"""Agent专用日志器"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.logger = logger.bind(agent_id=agent_id)
def log_request_start(self, request_id: str, user_input: str, user_id: str):
"""记录请求开始"""
self.logger.info(
"request_started",
request_id=request_id,
user_id=user_id,
user_input_length=len(user_input),
user_input_preview=user_input[:50] # 预览,不记录完整输入
)
def log_llm_call(self, request_id: str, model: str,
prompt_tokens: int, response_tokens: int, latency: float):
"""记录LLM调用"""
self.logger.info(
"llm_call",
request_id=request_id,
model=model,
prompt_tokens=prompt_tokens,
response_tokens=response_tokens,
total_tokens=prompt_tokens + response_tokens,
latency_seconds=latency,
estimated_cost=self._estimate_cost(model, prompt_tokens, response_tokens)
)
def log_tool_call(self, request_id: str, tool_name: str,
params: dict, result_summary: str, latency: float, success: bool):
"""记录工具调用"""
self.logger.info(
"tool_call",
request_id=request_id,
tool_name=tool_name,
params=params, # 注意敏感信息脱敏
result_summary=result_summary,
latency_seconds=latency,
success=success
)
def log_request_end(self, request_id: str, total_latency: float,
success: bool, error: str = None):
"""记录请求结束"""
self.logger.info(
"request_completed",
request_id=request_id,
total_latency_seconds=total_latency,
success=success,
error=error
)
def log_error(self, request_id: str, error_type: str,
error_message: str, stack_trace: str = None):
"""记录错误"""
self.logger.error(
"error_occurred",
request_id=request_id,
error_type=error_type,
error_message=error_message,
stack_trace=stack_trace
)
def _estimate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""估算成本"""
prices = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015}
}
price = prices.get(model, {"input": 0, "output": 0})
return (input_tokens * price["input"] + output_tokens * price["output"]) / 1000
日志使用示例
# 使用示例
agent_logger = AgentLogger(agent_id="customer_service_001")
async def handle_request(user_input: str, user_id: str):
request_id = generate_request_id()
start_time = time.time()
try:
# 记录开始
agent_logger.log_request_start(request_id, user_input, user_id)
# LLM调用
llm_start = time.time()
response = await llm.generate(user_input)
llm_latency = time.time() - llm_start
agent_logger.log_llm_call(
request_id=request_id,
model="gpt-4",
prompt_tokens=response.usage.prompt_tokens,
response_tokens=response.usage.completion_tokens,
latency=llm_latency
)
# 工具调用
if response.tool_calls:
for tool_call in response.tool_calls:
tool_start = time.time()
result = await execute_tool(tool_call)
tool_latency = time.time() - tool_start
agent_logger.log_tool_call(
request_id=request_id,
tool_name=tool_call.name,
params=tool_call.params,
result_summary=result[:100],
latency=tool_latency,
success=result.success
)
# 记录完成
total_latency = time.time() - start_time
agent_logger.log_request_end(request_id, total_latency, success=True)
return response
except Exception as e:
agent_logger.log_error(
request_id=request_id,
error_type=type(e).__name__,
error_message=str(e),
stack_trace=traceback.format_exc()
)
agent_logger.log_request_end(request_id, time.time() - start_time,
success=False, error=str(e))
raise
四、分布式追踪
为什么需要追踪?
AI Agent一次请求可能涉及:
- 多次LLM调用
- 多个工具调用
- 数据库查询
- 外部API调用
没有追踪,就像瞎子摸象。
使用OpenTelemetry
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# 配置Tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# 配置导出(到Jaeger/Tempo等)
otlp_exporter = OTLPSpanExporter(endpoint="localhost:4317")
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(otlp_exporter)
)
class AgentTracer:
"""Agent追踪器"""
def __init__(self, tracer_name: str = "ai_agent"):
self.tracer = trace.get_tracer(tracer_name)
def trace_request(self, request_id: str, user_input: str):
"""创建请求追踪"""
return self.tracer.start_as_current_span(
"agent.request",
attributes={
"request.id": request_id,
"user.input.length": len(user_input)
}
)
def trace_llm_call(self, model: str, prompt_tokens: int):
"""追踪LLM调用"""
return self.tracer.start_as_current_span(
"llm.generate",
attributes={
"llm.model": model,
"llm.prompt_tokens": prompt_tokens
}
)
def trace_tool_call(self, tool_name: str, params: dict):
"""追踪工具调用"""
return self.tracer.start_as_current_span(
f"tool.{tool_name}",
attributes={
"tool.name": tool_name,
"tool.params": json.dumps(params)
}
)
追踪使用示例
agent_tracer = AgentTracer()
async def handle_request_traced(user_input: str, user_id: str):
request_id = generate_request_id()
with agent_tracer.trace_request(request_id, user_input) as request_span:
# 设置用户信息
request_span.set_attribute("user.id", user_id)
# LLM调用
with agent_tracer.trace_llm_call("gpt-4", count_tokens(user_input)) as llm_span:
response = await llm.generate(user_input)
llm_span.set_attribute("llm.response_tokens", response.usage.completion_tokens)
llm_span.set_attribute("llm.latency_ms", response.latency * 1000)
# 工具调用
if response.tool_calls:
for tool_call in response.tool_calls:
with agent_tracer.trace_tool_call(tool_call.name, tool_call.params) as tool_span:
result = await execute_tool(tool_call)
tool_span.set_attribute("tool.success", result.success)
tool_span.set_attribute("tool.latency_ms", result.latency * 1000)
# 设置整体结果
request_span.set_attribute("request.success", True)
return response
追踪可视化
Jaeger追踪视图示例:
agent.request [12.5s]
├── llm.generate [2.3s]
│ ├── gpt-4
│ └── tokens: 1500 → 500
├── tool.query_order [0.5s]
│ ├── db.query [0.3s]
│ └── cache.check [0.2s]
├── llm.generate [3.1s] ← 第二次LLM调用
│ └── tokens: 2000 → 300
└── notification.send [0.3s]
五、指标收集
关键指标定义
from prometheus_client import Counter, Histogram, Gauge, Info
# 请求计数
REQUEST_COUNT = Counter(
'agent_requests_total',
'Total number of agent requests',
['agent_id', 'status']
)
# 请求延迟
REQUEST_LATENCY = Histogram(
'agent_request_latency_seconds',
'Request latency in seconds',
['agent_id'],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60]
)
# LLM调用统计
LLM_CALLS = Counter(
'agent_llm_calls_total',
'Total LLM calls',
['agent_id', 'model', 'status']
)
LLM_TOKENS = Counter(
'agent_llm_tokens_total',
'Total LLM tokens',
['agent_id', 'model', 'type'] # type: input/output
)
LLM_COST = Counter(
'agent_llm_cost_dollars',
'Total LLM cost in dollars',
['agent_id', 'model']
)
# 工具调用统计
TOOL_CALLS = Counter(
'agent_tool_calls_total',
'Total tool calls',
['agent_id', 'tool_name', 'status']
)
TOOL_LATENCY = Histogram(
'agent_tool_latency_seconds',
'Tool call latency',
['agent_id', 'tool_name'],
buckets=[0.01, 0.05, 0.1, 0.5, 1, 5]
)
# 活跃请求
ACTIVE_REQUESTS = Gauge(
'agent_active_requests',
'Number of active requests',
['agent_id']
)
# Agent信息
AGENT_INFO = Info(
'agent_build_info',
'Agent build information'
)
AGENT_INFO.info({
'version': '1.0.0',
'model': 'gpt-4',
'tools': 'query_order,cancel_order'
})
指标收集中间件
class MetricsMiddleware:
"""指标收集中间件"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
async def __call__(self, request, handler):
ACTIVE_REQUESTS.labels(agent_id=self.agent_id).inc()
start_time = time.time()
status = "success"
try:
result = await handler(request)
return result
except Exception as e:
status = "error"
raise
finally:
latency = time.time() - start_time
ACTIVE_REQUESTS.labels(agent_id=self.agent_id).dec()
REQUEST_COUNT.labels(agent_id=self.agent_id, status=status).inc()
REQUEST_LATENCY.labels(agent_id=self.agent_id).observe(latency)
自定义业务指标
# 业务相关指标
INTENT_RECOGNITION_ACCURACY = Gauge(
'agent_intent_accuracy',
'Intent recognition accuracy',
['agent_id']
)
KNOWLEDGE_BASE_HIT_RATE = Gauge(
'agent_kb_hit_rate',
'Knowledge base hit rate',
['agent_id']
)
USER_SATISFACTION = Histogram(
'agent_user_satisfaction',
'User satisfaction score',
['agent_id'],
buckets=[1, 2, 3, 4, 5]
)
# 成本相关指标
COST_PER_REQUEST = Histogram(
'agent_cost_per_request',
'Cost per request in dollars',
['agent_id'],
buckets=[0.01, 0.05, 0.1, 0.5, 1, 5]
)
六、统一仪表盘
Grafana仪表盘配置
{
"dashboard": {
"title": "AI Agent Observability",
"panels": [
{
"title": "请求概览",
"type": "stat",
"targets": [
{"expr": "rate(agent_requests_total[5m])", "legendFormat": "QPS"},
{"expr": "avg(agent_request_latency_seconds)", "legendFormat": "Avg Latency"}
]
},
{
"title": "LLM调用统计",
"type": "graph",
"targets": [
{"expr": "rate(agent_llm_calls_total[5m])", "legendFormat": "{{model}}"},
{"expr": "rate(agent_llm_tokens_total[5m])", "legendFormat": "{{type}} tokens"}
]
},
{
"title": "Token消耗趋势",
"type": "graph",
"targets": [
{"expr": "sum(rate(agent_llm_tokens_total[1h])) by (model, type)"}
]
},
{
"title": "成本统计",
"type": "stat",
"targets": [
{"expr": "sum(agent_llm_cost_dollars)", "legendFormat": "Total Cost"},
{"expr": "avg(agent_cost_per_request)", "legendFormat": "Avg Cost/Request"}
]
},
{
"title": "工具调用分布",
"type": "piechart",
"targets": [
{"expr": "sum by (tool_name) (agent_tool_calls_total)"}
]
},
{
"title": "错误率",
"type": "gauge",
"targets": [
{
"expr": "sum(rate(agent_requests_total{status='error'}[5m])) / sum(rate(agent_requests_total[5m])) * 100",
"legendFormat": "Error Rate %"
}
],
"thresholds": [
{"value": 1, "color": "green"},
{"value": 5, "color": "yellow"},
{"value": 10, "color": "red"}
]
}
]
}
}
告警规则
# alert_rules.yml
groups:
- name: agent_alerts
rules:
- alert: HighErrorRate
expr: sum(rate(agent_requests_total{status="error"}[5m])) / sum(rate(agent_requests_total[5m])) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "AI Agent错误率过高"
description: "错误率超过10%,当前: {{ $value }}"
- alert: HighLatency
expr: histogram_quantile(0.99, rate(agent_request_latency_seconds_bucket[5m])) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "AI Agent响应延迟过高"
description: "P99延迟超过30秒"
- alert: HighCost
expr: rate(agent_llm_cost_dollars[1h]) > 10
for: 10m
labels:
severity: warning
annotations:
summary: "LLM成本增长过快"
description: "每小时成本超过$10"
- alert: ToolCallFailure
expr: sum(rate(agent_tool_calls_total{status="error"}[5m])) by (tool_name) > 0.5
for: 5m
labels:
severity: warning
annotations:
summary: "工具调用失败率高"
description: "工具 {{ $labels.tool_name }} 失败率过高"
七、完整示例:可观测Agent
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class ObservabilityContext:
"""可观测性上下文"""
request_id: str
user_id: str
start_time: float
trace_span: any
logger: AgentLogger
tracer: AgentTracer
metrics: MetricsMiddleware
total_tokens: int = 0
total_cost: float = 0.0
tool_calls: list = None
def __post_init__(self):
self.tool_calls = []
def record_llm_call(self, model: str, input_tokens: int, output_tokens: int, latency: float):
"""记录LLM调用"""
self.total_tokens += input_tokens + output_tokens
cost = self._estimate_cost(model, input_tokens, output_tokens)
self.total_cost += cost
# 记录日志
self.logger.log_llm_call(
request_id=self.request_id,
model=model,
prompt_tokens=input_tokens,
response_tokens=output_tokens,
latency=latency
)
# 记录指标
LLM_CALLS.labels(agent_id=self.agent_id, model=model, status="success").inc()
LLM_TOKENS.labels(agent_id=self.agent_id, model=model, type="input").inc(input_tokens)
LLM_TOKENS.labels(agent_id=self.agent_id, model=model, type="output").inc(output_tokens)
LLM_COST.labels(agent_id=self.agent_id, model=model).inc(cost)
def record_tool_call(self, tool_name: str, params: dict, success: bool, latency: float):
"""记录工具调用"""
self.tool_calls.append({
"tool": tool_name,
"success": success,
"latency": latency
})
# 记录日志
self.logger.log_tool_call(
request_id=self.request_id,
tool_name=tool_name,
params=params,
result_summary="success" if success else "failed",
latency=latency,
success=success
)
# 记录指标
status = "success" if success else "error"
TOOL_CALLS.labels(agent_id=self.agent_id, tool_name=tool_name, status=status).inc()
TOOL_LATENCY.labels(agent_id=self.agent_id, tool_name=tool_name).observe(latency)
class ObservableAgent:
"""可观测Agent"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.logger = AgentLogger(agent_id)
self.tracer = AgentTracer(agent_id)
self.metrics = MetricsMiddleware(agent_id)
async def run(self, user_input: str, user_id: str) -> str:
"""执行请求(带完整可观测性)"""
request_id = generate_request_id()
start_time = time.time()
# 创建追踪span
with self.tracer.trace_request(request_id, user_input) as span:
# 创建可观测性上下文
ctx = ObservabilityContext(
request_id=request_id,
user_id=user_id,
start_time=start_time,
trace_span=span,
logger=self.logger,
tracer=self.tracer,
metrics=self.metrics
)
ctx.agent_id = self.agent_id
try:
# 记录请求开始
self.logger.log_request_start(request_id, user_input, user_id)
ACTIVE_REQUESTS.labels(agent_id=self.agent_id).inc()
# 执行请求
result = await self._execute_with_observability(user_input, ctx)
# 记录成功
status = "success"
error = None
return result
except Exception as e:
# 记录错误
status = "error"
error = str(e)
self.logger.log_error(request_id, type(e).__name__, str(e))
span.set_attribute("error", True)
span.set_attribute("error.message", str(e))
raise
finally:
# 记录请求结束
total_latency = time.time() - start_time
self.logger.log_request_end(request_id, total_latency, status == "success", error)
ACTIVE_REQUESTS.labels(agent_id=self.agent_id).dec()
REQUEST_COUNT.labels(agent_id=self.agent_id, status=status).inc()
REQUEST_LATENCY.labels(agent_id=self.agent_id).observe(total_latency)
COST_PER_REQUEST.labels(agent_id=self.agent_id).observe(ctx.total_cost)
# 设置追踪属性
span.set_attribute("request.total_tokens", ctx.total_tokens)
span.set_attribute("request.total_cost", ctx.total_cost)
span.set_attribute("request.tool_calls", len(ctx.tool_calls))
八、最佳实践
日志分级
| 级别 | 使用场景 | 示例 |
|---|---|---|
| DEBUG | 开发调试 | “工具参数: {…}” |
| INFO | 正常事件 | “请求开始”、”LLM调用” |
| WARN | 潜在问题 | “Token使用量较高”、”响应时间长” |
| ERROR | 错误事件 | “工具调用失败”、”LLM超时” |
敏感信息处理
SENSITIVE_FIELDS = ["password", "token", "api_key", "credit_card"]
def sanitize_for_log(data: dict) -> dict:
"""脱敏处理"""
sanitized = data.copy()
for field in SENSITIVE_FIELDS:
if field in sanitized:
sanitized[field] = "***REDACTED***"
return sanitized
采样策略
# 高流量场景使用采样
class SamplingLogger:
def __init__(self, sample_rate: float = 0.1):
self.sample_rate = sample_rate
def should_log(self) -> bool:
return random.random() < self.sample_rate
def log_request(self, request_id: str, data: dict):
if self.should_log():
logger.info("request_sampled", request_id=request_id, **data)
else:
# 只记录指标
REQUEST_COUNT.inc()
九、总结与检查清单
可观测性检查清单
| 维度 | 检查项 | 状态 |
|---|---|---|
| 日志 | 结构化格式 | ☐ |
| 日志 | 请求ID贯穿 | ☐ |
| 日志 | 敏感信息脱敏 | ☐ |
| 追踪 | 分布式追踪配置 | ☐ |
| 追踪 | 关键操作追踪 | ☐ |
| 指标 | 核心指标定义 | ☐ |
| 指标 | 成本指标 | ☐ |
| 指标 | 延迟指标 | ☐ |
| 告警 | 关键告警规则 | ☐ |
| 仪表盘 | 统一视图 | ☐ |
下期预告
明天聊聊AI Agent版本管理——从Prompt版本到模型版本!
往期回顾
正文完