AI Agent并发控制:高并发场景下的稳定性保障
一、开场:当100个用户同时问问题
大家好,我是老金。
上周我们系统做活动,流量突然翻了10倍。
然后——崩了。
客服群里炸锅:
- “AI怎么不回消息了?”
- “页面一直在转圈!”
- “显示超时错误!”
看了一下监控:
- API请求QPS从10涨到150
- LLM调用延迟从2秒变成20秒
- 数据库连接池耗尽
- 内存占用飙到95%
一顿排查后发现:并发没控制好。
今天分享AI Agent在并发场景下的稳定性保障经验。
二、为什么AI Agent并发控制难?
LLM调用的特殊性
| 特点 | 影响 |
|---|---|
| 响应时间长(2-30秒) | 长时间占用资源 |
| Token限制 | 并发多了可能超限 |
| 成本敏感 | 每次调用都要花钱 |
| 不确定性 | 相同输入可能不同输出 |
| 无状态 | 每次请求独立 |
并发问题表现
# 问题1:资源耗尽
async def handle_request(user_input):
# 每个请求都开一个LLM连接
response = await llm.generate(user_input) # 如果100个并发?
return response
# 问题2:超时雪崩
# 当请求排队积压,超时越来越多,用户重试,请求更多...
# 问题3:成本失控
# 并发越高,Token消耗越快,账单越吓人
三、并发控制策略概览
┌─────────────────────────────────────────────────────────┐
│ 并发控制分层策略 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 第一层:流量入口控制 │ │
│ │ - 限流(Rate Limiting) │ │
│ │ - 熔断(Circuit Breaker) │ │
│ │ - 降级(Degradation) │ │
│ └─────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 第二层:资源池管理 │ │
│ │ - 连接池(Connection Pool) │ │
│ │ - 信号量(Semaphore) │ │
│ │ - 队列(Queue) │ │
│ └─────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 第三层:请求调度 │ │
│ │ - 优先级队列 │ │
│ │ - 超时控制 │ │
│ │ - 重试策略 │ │
│ └─────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 第四层:监控告警 │ │
│ │ - 实时监控 │ │
│ │ - 异常检测 │ │
│ │ - 自动扩缩容 │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
四、限流策略
固定窗口限流
from collections import defaultdict
from time import time
class FixedWindowRateLimiter:
"""固定窗口限流"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, key: str) -> bool:
"""检查是否允许请求"""
now = time()
window_start = now - (now % self.window_seconds)
# 清理旧窗口
if not self.requests[key] or self.requests[key][0] = self.max_requests:
return False
self.requests[key].append(now)
return True
def get_remaining(self, key: str) -> int:
"""获取剩余配额"""
return max(0, self.max_requests - len(self.requests[key]))
# 使用示例
limiter = FixedWindowRateLimiter(max_requests=100, window_seconds=60)
def handle_request(user_id: str, user_input: str):
if not limiter.is_allowed(user_id):
return {"error": "请求太频繁,请稍后再试"}
return process_request(user_input)
滑动窗口限流
class SlidingWindowRateLimiter:
"""滑动窗口限流 - 更精确"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, key: str) -> bool:
now = time()
# 移除窗口外的请求
self.requests[key] = [
t for t in self.requests[key]
if now - t = self.max_requests:
return False
self.requests[key].append(now)
return True
令牌桶限流
import asyncio
from asyncio import Lock
class TokenBucket:
"""令牌桶限流 - 支持突发流量"""
def __init__(self, rate: float, capacity: int):
"""
Args:
rate: 令牌生成速率(个/秒)
capacity: 桶容量
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_time = time()
self.lock = Lock()
async def consume(self, tokens: int = 1) -> bool:
"""消费令牌"""
async with self.lock:
now = time()
# 补充令牌
elapsed = now - self.last_time
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_time = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
async def wait_for_token(self, tokens: int = 1):
"""等待令牌可用"""
while not await self.consume(tokens):
await asyncio.sleep(0.1)
# 使用示例
bucket = TokenBucket(rate=10, capacity=50) # 10个/秒,最多突发50个
async def handle_request(user_input: str):
await bucket.wait_for_token()
return await process_request(user_input)
不同级别限流
class MultiLevelRateLimiter:
"""多级限流"""
def __init__(self):
# 全局限流
self.global_limiter = TokenBucket(rate=100, capacity=200)
# 用户级限流
self.user_limiters = defaultdict(
lambda: TokenBucket(rate=10, capacity=20)
)
# 接口级限流
self.endpoint_limiters = {
"chat": TokenBucket(rate=50, capacity=100),
"query": TokenBucket(rate=200, capacity=500),
}
async def check(self, user_id: str, endpoint: str) -> bool:
"""检查所有级别的限流"""
# 1. 全局检查
if not await self.global_limiter.consume():
raise RateLimitError("系统繁忙,请稍后再试")
# 2. 用户级检查
if not await self.user_limiters[user_id].consume():
raise RateLimitError("您的请求太频繁,请稍后再试")
# 3. 接口级检查
if endpoint in self.endpoint_limiters:
if not await self.endpoint_limiters[endpoint].consume():
raise RateLimitError("该功能繁忙,请稍后再试")
return True
五、熔断机制
熔断器模式
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # 正常状态
OPEN = "open" # 熔断状态
HALF_OPEN = "half_open" # 半开状态
class CircuitBreaker:
"""熔断器"""
def __init__(self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
half_open_requests: int = 3):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_requests = half_open_requests
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.half_open_count = 0
async def call(self, func, *args, **kwargs):
"""通过熔断器调用函数"""
if self.state == CircuitState.OPEN:
if self._should_attempt_recovery():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError("服务暂时不可用,请稍后再试")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _should_attempt_recovery(self) -> bool:
if self.last_failure_time is None:
return False
return datetime.now() - self.last_failure_time > timedelta(
seconds=self.recovery_timeout
)
def _on_success(self):
"""成功时重置"""
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.half_open_count += 1
if self.half_open_count >= self.half_open_requests:
self.state = CircuitState.CLOSED
self.half_open_count = 0
def _on_failure(self):
"""失败时计数"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.half_open_count = 0
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# 使用示例
llm_circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
async def call_llm_safe(prompt: str):
try:
return await llm_circuit.call(llm.generate, prompt)
except CircuitOpenError:
# 熔断状态,返回降级响应
return "服务暂时繁忙,请稍后再试"
六、连接池管理
LLM客户端连接池
import aiohttp
from asyncio import Semaphore
class LLMConnectionPool:
"""LLM连接池"""
def __init__(self,
max_connections: int = 10,
max_per_host: int = 5):
self.max_connections = max_connections
self.semaphore = Semaphore(max_connections)
self.session = None
async def initialize(self):
"""初始化连接池"""
connector = aiohttp.TCPConnector(
limit=self.max_connections,
limit_per_host=5,
keepalive_timeout=30
)
self.session = aiohttp.ClientSession(connector=connector)
async def call(self, prompt: str) -> str:
"""调用LLM"""
async with self.semaphore:
async with self.session.post(
"https://api.openai.com/v1/chat/completions",
json={
"model": "gpt-4",
"messages": [{"role": "user", "content": prompt}]
},
headers={"Authorization": f"Bearer {API_KEY}"}
) as response:
data = await response.json()
return data["choices"][0]["message"]["content"]
async def close(self):
"""关闭连接池"""
if self.session:
await self.session.close()
数据库连接池
import asyncpg
class DatabasePool:
"""数据库连接池"""
def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def initialize(self):
"""初始化连接池"""
self.pool = await asyncpg.create_pool(
self.dsn,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=30
)
async def execute(self, query: str, *args):
"""执行查询"""
async with self.pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args):
"""获取数据"""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
七、请求队列与调度
优先级队列
import asyncio
from dataclasses import dataclass, field
from typing import Any
import heapq
@dataclass(order=True)
class PrioritizedRequest:
"""带优先级的请求"""
priority: int
request_id: str = field(compare=False)
user_id: str = field(compare=False)
content: Any = field(compare=False)
class PriorityQueue:
"""优先级请求队列"""
def __init__(self, max_workers: int = 10):
self.queue = []
self.max_workers = max_workers
self.active_workers = 0
self.condition = asyncio.Condition()
async def enqueue(self, request: PrioritizedRequest):
"""入队"""
async with self.condition:
heapq.heappush(self.queue, request)
self.condition.notify()
async def worker(self, processor):
"""工作协程"""
while True:
async with self.condition:
while not self.queue or self.active_workers >= self.max_workers:
await self.condition.wait()
request = heapq.heappop(self.queue)
self.active_workers += 1
try:
await processor(request)
finally:
async with self.condition:
self.active_workers -= 1
self.condition.notify()
# 使用示例
queue = PriorityQueue(max_workers=10)
# VIP用户优先处理
await queue.enqueue(PrioritizedRequest(
priority=1, # 数字越小优先级越高
request_id="req_001",
user_id="vip_user",
content="帮我查订单"
))
# 普通用户
await queue.enqueue(PrioritizedRequest(
priority=10,
request_id="req_002",
user_id="normal_user",
content="帮我查订单"
))
超时控制
import asyncio
async def call_with_timeout(coro, timeout: float, default=None):
"""带超时的调用"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
return default
# 使用示例
async def process_request(user_input: str):
try:
response = await asyncio.wait_for(
llm.generate(user_input),
timeout=30.0 # 30秒超时
)
return response
except asyncio.TimeoutError:
return "抱歉,处理超时,请稍后再试"
八、降级策略
多级降级
class DegradationHandler:
"""降级处理器"""
def __init__(self):
self.degradation_level = 0 # 0=正常, 1=轻度, 2=中度, 3=重度
def check_and_adjust(self, metrics: dict):
"""根据指标调整降级级别"""
if metrics["error_rate"] > 0.5:
self.degradation_level = 3
elif metrics["error_rate"] > 0.3:
self.degradation_level = 2
elif metrics["error_rate"] > 0.1 or metrics["latency_p99"] > 10:
self.degradation_level = 1
else:
self.degradation_level = 0
async def process(self, user_input: str):
"""根据降级级别处理请求"""
if self.degradation_level == 0:
# 正常处理
return await self.full_process(user_input)
elif self.degradation_level == 1:
# 轻度降级:使用更快的模型
return await self.fast_model_process(user_input)
elif self.degradation_level == 2:
# 中度降级:使用缓存或模板
return await self.template_process(user_input)
else:
# 重度降级:返回静态响应
return self.static_response()
async def full_process(self, user_input: str):
"""完整处理"""
return await llm.generate(user_input, model="gpt-4")
async def fast_model_process(self, user_input: str):
"""快速模型处理"""
return await llm.generate(user_input, model="gpt-3.5-turbo")
async def template_process(self, user_input: str):
"""模板处理"""
# 尝试匹配预设模板
template = self.match_template(user_input)
if template:
return template
return await self.fast_model_process(user_input)
def static_response(self):
"""静态响应"""
return "系统繁忙,请稍后再试"
九、监控与告警
并发监控指标
class ConcurrencyMonitor:
"""并发监控"""
def __init__(self):
self.metrics = {
"active_requests": 0,
"queued_requests": 0,
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"timeout_requests": 0,
"total_latency": 0,
}
self.start_time = time()
@contextmanager
def track_request(self):
"""追踪请求"""
self.metrics["active_requests"] += 1
self.metrics["total_requests"] += 1
start = time()
try:
yield
self.metrics["successful_requests"] += 1
except asyncio.TimeoutError:
self.metrics["timeout_requests"] += 1
self.metrics["failed_requests"] += 1
raise
except Exception:
self.metrics["failed_requests"] += 1
raise
finally:
self.metrics["active_requests"] -= 1
self.metrics["total_latency"] += time() - start
def get_stats(self) -> dict:
"""获取统计信息"""
elapsed = time() - self.start_time
return {
"active_requests": self.metrics["active_requests"],
"qps": self.metrics["total_requests"] / elapsed,
"success_rate": self.metrics["successful_requests"] / max(1, self.metrics["total_requests"]),
"error_rate": self.metrics["failed_requests"] / max(1, self.metrics["total_requests"]),
"timeout_rate": self.metrics["timeout_requests"] / max(1, self.metrics["total_requests"]),
"avg_latency": self.metrics["total_latency"] / max(1, self.metrics["total_requests"])
}
告警规则
class AlertManager:
"""告警管理"""
def __init__(self):
self.alert_rules = [
{
"name": "高错误率",
"condition": lambda m: m["error_rate"] > 0.1,
"level": "warning",
"message": "错误率超过10%"
},
{
"name": "超时激增",
"condition": lambda m: m["timeout_rate"] > 0.05,
"level": "warning",
"message": "超时率超过5%"
},
{
"name": "并发过高",
"condition": lambda m: m["active_requests"] > 80,
"level": "critical",
"message": "活跃请求数超过80"
},
]
def check_alerts(self, metrics: dict) -> list:
"""检查告警"""
alerts = []
for rule in self.alert_rules:
if rule["condition"](metrics):
alerts.append({
"name": rule["name"],
"level": rule["level"],
"message": rule["message"],
"metrics": metrics
})
return alerts
十、实战配置示例
生产环境配置
# config.py
CONCURRENCY_CONFIG = {
# 限流配置
"rate_limit": {
"global_qps": 100,
"user_qps": 10,
"burst_capacity": 50
},
# 连接池配置
"connection_pool": {
"llm_max_connections": 20,
"db_min_connections": 5,
"db_max_connections": 30
},
# 超时配置
"timeout": {
"llm_request": 30,
"tool_call": 10,
"total_request": 60
},
# 熔断配置
"circuit_breaker": {
"failure_threshold": 5,
"recovery_timeout": 60
},
# 降级配置
"degradation": {
"enabled": True,
"fallback_model": "gpt-3.5-turbo",
"static_response": "系统繁忙"
}
}
十一、总结与检查清单
并发控制检查清单
| 层级 | 检查项 | 状态 |
|---|---|---|
| 入口 | 全局限流配置 | ☐ |
| 入口 | 用户级限流配置 | ☐ |
| 入口 | 熔断器配置 | ☐ |
| 资源 | 连接池大小合理 | ☐ |
| 资源 | 信号量限制 | ☐ |
| 调度 | 超时时间设置 | ☐ |
| 调度 | 重试策略配置 | ☐ |
| 降级 | 降级策略定义 | ☐ |
| 监控 | 关键指标监控 | ☐ |
| 监控 | 告警规则配置 | ☐ |
下期预告
明天聊聊AI Agent测试策略——从单元测试到端到端测试!
往期回顾
正文完