AI Agent并发控制:高并发场景下的稳定性保障

13次阅读
没有评论

AI Agent并发控制:高并发场景下的稳定性保障

一、开场:当100个用户同时问问题

大家好,我是老金。

上周我们系统做活动,流量突然翻了10倍。

然后——崩了。

客服群里炸锅:

  • “AI怎么不回消息了?”
  • “页面一直在转圈!”
  • “显示超时错误!”

看了一下监控:

  • API请求QPS从10涨到150
  • LLM调用延迟从2秒变成20秒
  • 数据库连接池耗尽
  • 内存占用飙到95%

一顿排查后发现:并发没控制好

今天分享AI Agent在并发场景下的稳定性保障经验。

二、为什么AI Agent并发控制难?

LLM调用的特殊性

特点 影响
响应时间长(2-30秒) 长时间占用资源
Token限制 并发多了可能超限
成本敏感 每次调用都要花钱
不确定性 相同输入可能不同输出
无状态 每次请求独立

并发问题表现

# 问题1:资源耗尽
async def handle_request(user_input):
    # 每个请求都开一个LLM连接
    response = await llm.generate(user_input)  # 如果100个并发?
    return response

# 问题2:超时雪崩
# 当请求排队积压,超时越来越多,用户重试,请求更多...

# 问题3:成本失控
# 并发越高,Token消耗越快,账单越吓人

三、并发控制策略概览

┌─────────────────────────────────────────────────────────┐
│                 并发控制分层策略                         │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │ 第一层:流量入口控制                              │   │
│  │ - 限流(Rate Limiting)                          │   │
│  │ - 熔断(Circuit Breaker)                        │   │
│  │ - 降级(Degradation)                            │   │
│  └─────────────────────────────────────────────────┘   │
│                         ↓                               │
│  ┌─────────────────────────────────────────────────┐   │
│  │ 第二层:资源池管理                                │   │
│  │ - 连接池(Connection Pool)                      │   │
│  │ - 信号量(Semaphore)                            │   │
│  │ - 队列(Queue)                                  │   │
│  └─────────────────────────────────────────────────┘   │
│                         ↓                               │
│  ┌─────────────────────────────────────────────────┐   │
│  │ 第三层:请求调度                                  │   │
│  │ - 优先级队列                                      │   │
│  │ - 超时控制                                        │   │
│  │ - 重试策略                                        │   │
│  └─────────────────────────────────────────────────┘   │
│                         ↓                               │
│  ┌─────────────────────────────────────────────────┐   │
│  │ 第四层:监控告警                                  │   │
│  │ - 实时监控                                        │   │
│  │ - 异常检测                                        │   │
│  │ - 自动扩缩容                                      │   │
│  └─────────────────────────────────────────────────┘   │
│                                                         │
└─────────────────────────────────────────────────────────┘

四、限流策略

固定窗口限流

from collections import defaultdict
from time import time

class FixedWindowRateLimiter:
    """固定窗口限流"""

    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = defaultdict(list)

    def is_allowed(self, key: str) -> bool:
        """检查是否允许请求"""
        now = time()
        window_start = now - (now % self.window_seconds)

        # 清理旧窗口
        if not self.requests[key] or self.requests[key][0] = self.max_requests:
            return False

        self.requests[key].append(now)
        return True

    def get_remaining(self, key: str) -> int:
        """获取剩余配额"""
        return max(0, self.max_requests - len(self.requests[key]))

# 使用示例
limiter = FixedWindowRateLimiter(max_requests=100, window_seconds=60)

def handle_request(user_id: str, user_input: str):
    if not limiter.is_allowed(user_id):
        return {"error": "请求太频繁,请稍后再试"}

    return process_request(user_input)

滑动窗口限流

class SlidingWindowRateLimiter:
    """滑动窗口限流 - 更精确"""

    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = defaultdict(list)

    def is_allowed(self, key: str) -> bool:
        now = time()
        # 移除窗口外的请求
        self.requests[key] = [
            t for t in self.requests[key] 
            if now - t = self.max_requests:
            return False

        self.requests[key].append(now)
        return True

令牌桶限流

import asyncio
from asyncio import Lock

class TokenBucket:
    """令牌桶限流 - 支持突发流量"""

    def __init__(self, rate: float, capacity: int):
        """
        Args:
            rate: 令牌生成速率(个/秒)
            capacity: 桶容量
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_time = time()
        self.lock = Lock()

    async def consume(self, tokens: int = 1) -> bool:
        """消费令牌"""
        async with self.lock:
            now = time()
            # 补充令牌
            elapsed = now - self.last_time
            self.tokens = min(
                self.capacity,
                self.tokens + elapsed * self.rate
            )
            self.last_time = now

            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    async def wait_for_token(self, tokens: int = 1):
        """等待令牌可用"""
        while not await self.consume(tokens):
            await asyncio.sleep(0.1)

# 使用示例
bucket = TokenBucket(rate=10, capacity=50)  # 10个/秒,最多突发50个

async def handle_request(user_input: str):
    await bucket.wait_for_token()
    return await process_request(user_input)

不同级别限流

class MultiLevelRateLimiter:
    """多级限流"""

    def __init__(self):
        # 全局限流
        self.global_limiter = TokenBucket(rate=100, capacity=200)

        # 用户级限流
        self.user_limiters = defaultdict(
            lambda: TokenBucket(rate=10, capacity=20)
        )

        # 接口级限流
        self.endpoint_limiters = {
            "chat": TokenBucket(rate=50, capacity=100),
            "query": TokenBucket(rate=200, capacity=500),
        }

    async def check(self, user_id: str, endpoint: str) -> bool:
        """检查所有级别的限流"""
        # 1. 全局检查
        if not await self.global_limiter.consume():
            raise RateLimitError("系统繁忙,请稍后再试")

        # 2. 用户级检查
        if not await self.user_limiters[user_id].consume():
            raise RateLimitError("您的请求太频繁,请稍后再试")

        # 3. 接口级检查
        if endpoint in self.endpoint_limiters:
            if not await self.endpoint_limiters[endpoint].consume():
                raise RateLimitError("该功能繁忙,请稍后再试")

        return True

五、熔断机制

熔断器模式

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"      # 正常状态
    OPEN = "open"          # 熔断状态
    HALF_OPEN = "half_open"  # 半开状态

class CircuitBreaker:
    """熔断器"""

    def __init__(self, 
                 failure_threshold: int = 5,
                 recovery_timeout: int = 30,
                 half_open_requests: int = 3):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_requests = half_open_requests

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
        self.half_open_count = 0

    async def call(self, func, *args, **kwargs):
        """通过熔断器调用函数"""
        if self.state == CircuitState.OPEN:
            if self._should_attempt_recovery():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitOpenError("服务暂时不可用,请稍后再试")

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _should_attempt_recovery(self) -> bool:
        if self.last_failure_time is None:
            return False
        return datetime.now() - self.last_failure_time > timedelta(
            seconds=self.recovery_timeout
        )

    def _on_success(self):
        """成功时重置"""
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_count += 1
            if self.half_open_count >= self.half_open_requests:
                self.state = CircuitState.CLOSED
                self.half_open_count = 0

    def _on_failure(self):
        """失败时计数"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            self.half_open_count = 0
        elif self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# 使用示例
llm_circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=60)

async def call_llm_safe(prompt: str):
    try:
        return await llm_circuit.call(llm.generate, prompt)
    except CircuitOpenError:
        # 熔断状态,返回降级响应
        return "服务暂时繁忙,请稍后再试"

六、连接池管理

LLM客户端连接池

import aiohttp
from asyncio import Semaphore

class LLMConnectionPool:
    """LLM连接池"""

    def __init__(self, 
                 max_connections: int = 10,
                 max_per_host: int = 5):
        self.max_connections = max_connections
        self.semaphore = Semaphore(max_connections)
        self.session = None

    async def initialize(self):
        """初始化连接池"""
        connector = aiohttp.TCPConnector(
            limit=self.max_connections,
            limit_per_host=5,
            keepalive_timeout=30
        )
        self.session = aiohttp.ClientSession(connector=connector)

    async def call(self, prompt: str) -> str:
        """调用LLM"""
        async with self.semaphore:
            async with self.session.post(
                "https://api.openai.com/v1/chat/completions",
                json={
                    "model": "gpt-4",
                    "messages": [{"role": "user", "content": prompt}]
                },
                headers={"Authorization": f"Bearer {API_KEY}"}
            ) as response:
                data = await response.json()
                return data["choices"][0]["message"]["content"]

    async def close(self):
        """关闭连接池"""
        if self.session:
            await self.session.close()

数据库连接池

import asyncpg

class DatabasePool:
    """数据库连接池"""

    def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20):
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self.pool = None

    async def initialize(self):
        """初始化连接池"""
        self.pool = await asyncpg.create_pool(
            self.dsn,
            min_size=self.min_size,
            max_size=self.max_size,
            command_timeout=30
        )

    async def execute(self, query: str, *args):
        """执行查询"""
        async with self.pool.acquire() as conn:
            return await conn.execute(query, *args)

    async def fetch(self, query: str, *args):
        """获取数据"""
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)

七、请求队列与调度

优先级队列

import asyncio
from dataclasses import dataclass, field
from typing import Any
import heapq

@dataclass(order=True)
class PrioritizedRequest:
    """带优先级的请求"""
    priority: int
    request_id: str = field(compare=False)
    user_id: str = field(compare=False)
    content: Any = field(compare=False)

class PriorityQueue:
    """优先级请求队列"""

    def __init__(self, max_workers: int = 10):
        self.queue = []
        self.max_workers = max_workers
        self.active_workers = 0
        self.condition = asyncio.Condition()

    async def enqueue(self, request: PrioritizedRequest):
        """入队"""
        async with self.condition:
            heapq.heappush(self.queue, request)
            self.condition.notify()

    async def worker(self, processor):
        """工作协程"""
        while True:
            async with self.condition:
                while not self.queue or self.active_workers >= self.max_workers:
                    await self.condition.wait()

                request = heapq.heappop(self.queue)
                self.active_workers += 1

            try:
                await processor(request)
            finally:
                async with self.condition:
                    self.active_workers -= 1
                    self.condition.notify()

# 使用示例
queue = PriorityQueue(max_workers=10)

# VIP用户优先处理
await queue.enqueue(PrioritizedRequest(
    priority=1,  # 数字越小优先级越高
    request_id="req_001",
    user_id="vip_user",
    content="帮我查订单"
))

# 普通用户
await queue.enqueue(PrioritizedRequest(
    priority=10,
    request_id="req_002",
    user_id="normal_user",
    content="帮我查订单"
))

超时控制

import asyncio

async def call_with_timeout(coro, timeout: float, default=None):
    """带超时的调用"""
    try:
        return await asyncio.wait_for(coro, timeout=timeout)
    except asyncio.TimeoutError:
        return default

# 使用示例
async def process_request(user_input: str):
    try:
        response = await asyncio.wait_for(
            llm.generate(user_input),
            timeout=30.0  # 30秒超时
        )
        return response
    except asyncio.TimeoutError:
        return "抱歉,处理超时,请稍后再试"

八、降级策略

多级降级

class DegradationHandler:
    """降级处理器"""

    def __init__(self):
        self.degradation_level = 0  # 0=正常, 1=轻度, 2=中度, 3=重度

    def check_and_adjust(self, metrics: dict):
        """根据指标调整降级级别"""
        if metrics["error_rate"] > 0.5:
            self.degradation_level = 3
        elif metrics["error_rate"] > 0.3:
            self.degradation_level = 2
        elif metrics["error_rate"] > 0.1 or metrics["latency_p99"] > 10:
            self.degradation_level = 1
        else:
            self.degradation_level = 0

    async def process(self, user_input: str):
        """根据降级级别处理请求"""
        if self.degradation_level == 0:
            # 正常处理
            return await self.full_process(user_input)

        elif self.degradation_level == 1:
            # 轻度降级:使用更快的模型
            return await self.fast_model_process(user_input)

        elif self.degradation_level == 2:
            # 中度降级:使用缓存或模板
            return await self.template_process(user_input)

        else:
            # 重度降级:返回静态响应
            return self.static_response()

    async def full_process(self, user_input: str):
        """完整处理"""
        return await llm.generate(user_input, model="gpt-4")

    async def fast_model_process(self, user_input: str):
        """快速模型处理"""
        return await llm.generate(user_input, model="gpt-3.5-turbo")

    async def template_process(self, user_input: str):
        """模板处理"""
        # 尝试匹配预设模板
        template = self.match_template(user_input)
        if template:
            return template
        return await self.fast_model_process(user_input)

    def static_response(self):
        """静态响应"""
        return "系统繁忙,请稍后再试"

九、监控与告警

并发监控指标

class ConcurrencyMonitor:
    """并发监控"""

    def __init__(self):
        self.metrics = {
            "active_requests": 0,
            "queued_requests": 0,
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "timeout_requests": 0,
            "total_latency": 0,
        }
        self.start_time = time()

    @contextmanager
    def track_request(self):
        """追踪请求"""
        self.metrics["active_requests"] += 1
        self.metrics["total_requests"] += 1
        start = time()

        try:
            yield
            self.metrics["successful_requests"] += 1
        except asyncio.TimeoutError:
            self.metrics["timeout_requests"] += 1
            self.metrics["failed_requests"] += 1
            raise
        except Exception:
            self.metrics["failed_requests"] += 1
            raise
        finally:
            self.metrics["active_requests"] -= 1
            self.metrics["total_latency"] += time() - start

    def get_stats(self) -> dict:
        """获取统计信息"""
        elapsed = time() - self.start_time

        return {
            "active_requests": self.metrics["active_requests"],
            "qps": self.metrics["total_requests"] / elapsed,
            "success_rate": self.metrics["successful_requests"] / max(1, self.metrics["total_requests"]),
            "error_rate": self.metrics["failed_requests"] / max(1, self.metrics["total_requests"]),
            "timeout_rate": self.metrics["timeout_requests"] / max(1, self.metrics["total_requests"]),
            "avg_latency": self.metrics["total_latency"] / max(1, self.metrics["total_requests"])
        }

告警规则

class AlertManager:
    """告警管理"""

    def __init__(self):
        self.alert_rules = [
            {
                "name": "高错误率",
                "condition": lambda m: m["error_rate"] > 0.1,
                "level": "warning",
                "message": "错误率超过10%"
            },
            {
                "name": "超时激增",
                "condition": lambda m: m["timeout_rate"] > 0.05,
                "level": "warning",
                "message": "超时率超过5%"
            },
            {
                "name": "并发过高",
                "condition": lambda m: m["active_requests"] > 80,
                "level": "critical",
                "message": "活跃请求数超过80"
            },
        ]

    def check_alerts(self, metrics: dict) -> list:
        """检查告警"""
        alerts = []
        for rule in self.alert_rules:
            if rule["condition"](metrics):
                alerts.append({
                    "name": rule["name"],
                    "level": rule["level"],
                    "message": rule["message"],
                    "metrics": metrics
                })

        return alerts

十、实战配置示例

生产环境配置

# config.py

CONCURRENCY_CONFIG = {
    # 限流配置
    "rate_limit": {
        "global_qps": 100,
        "user_qps": 10,
        "burst_capacity": 50
    },

    # 连接池配置
    "connection_pool": {
        "llm_max_connections": 20,
        "db_min_connections": 5,
        "db_max_connections": 30
    },

    # 超时配置
    "timeout": {
        "llm_request": 30,
        "tool_call": 10,
        "total_request": 60
    },

    # 熔断配置
    "circuit_breaker": {
        "failure_threshold": 5,
        "recovery_timeout": 60
    },

    # 降级配置
    "degradation": {
        "enabled": True,
        "fallback_model": "gpt-3.5-turbo",
        "static_response": "系统繁忙"
    }
}

十一、总结与检查清单

并发控制检查清单

层级 检查项 状态
入口 全局限流配置
入口 用户级限流配置
入口 熔断器配置
资源 连接池大小合理
资源 信号量限制
调度 超时时间设置
调度 重试策略配置
降级 降级策略定义
监控 关键指标监控
监控 告警规则配置

下期预告

明天聊聊AI Agent测试策略——从单元测试到端到端测试!


往期回顾

正文完
 0
技术老金
版权声明:本站原创文章,由 技术老金 于2026-03-30发表,共计12221字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)