AI Agent开发实战(八):部署上线从开发到生产

9次阅读
没有评论

AI Agent开发实战(八):部署上线从开发到生产

一、开场:开发完了,然后呢?

大家好,我是老金。

前面我们讲了:

  • 怎么开发Agent
  • 怎么测试Agent

今天讲最后一步:部署上线

一个Agent从开发环境到生产环境,需要考虑:

  • API服务化
  • 容器化部署
  • 负载均衡
  • 监控告警
  • 日志管理

二、部署架构

2.1 生产架构

┌─────────────────────────────────────────────────────────┐
│                  Agent生产部署架构                      │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  用户请求                                               │
│      ↓                                                  │
│  ┌─────────────────────────────────────────────────┐   │
│  │              负载均衡 (Nginx/ALB)               │   │
│  └────────────────────┬────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │              API Gateway                        │   │
│  │  • 认证授权                                      │   │
│  │  • 限流控制                                      │   │
│  │  • 请求路由                                      │   │
│  └────────────────────┬────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │           Agent服务集群 (FastAPI)               │   │
│  │  ┌───────┐  ┌───────┐  ┌───────┐              │   │
│  │  │ Pod 1 │  │ Pod 2 │  │ Pod 3 │              │   │
│  │  └───────┘  └───────┘  └───────┘              │   │
│  └────────────────────┬────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │              基础设施                           │   │
│  │  • Redis (缓存/会话)                            │   │
│  │  • PostgreSQL (持久化)                          │   │
│  │  • ChromaDB (向量存储)                          │   │
│  └─────────────────────────────────────────────────┘   │
│                                                         │
└─────────────────────────────────────────────────────────┘

三、API服务化

3.1 FastAPI服务

# src/api/main.py
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import asyncio
import uuid

from src.agents.tool_agent import ToolAgent
from src.agents.memory_agent import MemoryAgent
from src.utils.llm_client import LLMClient
from src.memory.session import SessionManager

# 创建应用
app = FastAPI(
    title="AI Agent API",
    description="AI Agent服务API",
    version="1.0.0"
)

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 请求模型
class ChatRequest(BaseModel):
    """对话请求"""
    message: str
    session_id: Optional[str] = None
    user_id: Optional[str] = None
    stream: bool = False

class ChatResponse(BaseModel):
    """对话响应"""
    response: str
    session_id: str
    message_id: str

# 依赖注入
def get_llm_client():
    """获取LLM客户端"""
    return LLMClient(provider="openai")

def get_session_manager():
    """获取会话管理器"""
    return SessionManager()

# API端点
@app.get("/health")
async def health_check():
    """健康检查"""
    return {"status": "healthy", "timestamp": datetime.now().isoformat()}

@app.post("/api/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest,
    llm: LLMClient = Depends(get_llm_client),
    session_manager: SessionManager = Depends(get_session_manager)
):
    """对话API"""
    try:
        # 创建或获取会话
        session_id = request.session_id or str(uuid.uuid4())

        # 创建Agent
        agent = MemoryAgent(
            llm_client=llm,
            user_id=request.user_id or "anonymous"
        )

        # 执行对话
        response = await agent.chat(request.message)

        return ChatResponse(
            response=response,
            session_id=session_id,
            message_id=str(uuid.uuid4())
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/chat/stream")
async def chat_stream(
    request: ChatRequest,
    llm: LLMClient = Depends(get_llm_client)
):
    """流式对话API"""
    async def generate():
        agent = ToolAgent(llm_client=llm)
        async for chunk in agent.chat_stream(request.message):
            yield f"data: {chunk}nn"
        yield "data: [DONE]nn"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

@app.post("/api/tools/{tool_name}")
async def execute_tool(
    tool_name: str,
    params: Dict[str, Any],
    llm: LLMClient = Depends(get_llm_client)
):
    """工具执行API"""
    from src.tools.registry import tool_registry

    tool = tool_registry.get(tool_name)
    if not tool:
        raise HTTPException(status_code=404, detail=f"Tool not found: {tool_name}")

    result = await tool.execute(**params)
    return {"success": result.success, "result": result.result, "error": result.error}

3.2 启动配置

# src/api/config.py
from pydantic import BaseSettings

class Settings(BaseSettings):
    """应用配置"""
    # 应用
    app_name: str = "AI Agent API"
    debug: bool = False
    version: str = "1.0.0"

    # API
    api_prefix: str = "/api"

    # LLM
    openai_api_key: str
    openai_base_url: str = "https://api.openai.com/v1"
    default_model: str = "gpt-4-turbo-preview"

    # Redis
    redis_url: str = "redis://localhost:6379"

    # Database
    database_url: str = "postgresql://localhost/agent_db"

    # 限流
    rate_limit: int = 100  # 每分钟请求数

    class Config:
        env_file = ".env"

settings = Settings()

四、容器化部署

4.1 Dockerfile

# Dockerfile
FROM python:3.11-slim

# 设置工作目录
WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY src/ ./src/
COPY config/ ./config/

# 环境变量
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]

4.2 Docker Compose

# docker-compose.yml
version: '3.8'

services:
  agent-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql://postgres:password@db:5432/agent_db
    depends_on:
      - redis
      - db
    volumes:
      - ./data:/app/data
    restart: always

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: always

  db:
    image: postgres:15-alpine
    environment:
      - POSTGRES_DB=agent_db
      - POSTGRES_PASSWORD=password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    restart: always

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - agent-api
    restart: always

volumes:
  redis_data:
  postgres_data:

4.3 Kubernetes部署

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: agent-api
  template:
    metadata:
      labels:
        app: agent-api
    spec:
      containers:
      - name: agent-api
        image: your-registry/agent-api:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: agent-secrets
              key: openai-api-key
        - name: REDIS_URL
          value: "redis://redis-service:6379"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: agent-api-service
spec:
  selector:
    app: agent-api
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: agent-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: agent-api
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

五、负载均衡与限流

5.1 Nginx配置

# nginx.conf
upstream agent_backend {
    least_conn;
    server agent-api-1:8000 weight=1;
    server agent-api-2:8000 weight=1;
    server agent-api-3:8000 weight=1;
}

server {
    listen 80;
    server_name api.your-domain.com;

    # 重定向到HTTPS
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl http2;
    server_name api.your-domain.com;

    # SSL配置
    ssl_certificate /etc/nginx/ssl/cert.pem;
    ssl_certificate_key /etc/nginx/ssl/key.pem;

    # 限流
    limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;

    location / {
        limit_req zone=api_limit burst=20 nodelay;

        proxy_pass http://agent_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        # 超时设置
        proxy_connect_timeout 60s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
    }

    # 流式响应特殊处理
    location /api/chat/stream {
        proxy_pass http://agent_backend;
        proxy_buffering off;
        proxy_cache off;
        proxy_set_header Connection '';
        chunked_transfer_encoding off;
    }
}

5.2 API限流

# src/api/middleware.py
from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse
import time
from collections import defaultdict
import asyncio

class RateLimiter:
    """限流器"""

    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute
        self.requests = defaultdict(list)
        self._lock = asyncio.Lock()

    async def is_allowed(self, client_id: str) -> bool:
        """检查是否允许请求"""
        async with self._lock:
            now = time.time()

            # 清理过期记录
            self.requests[client_id] = [
                t for t in self.requests[client_id]
                if now - t = self.requests_per_minute:
                return False

            # 记录请求
            self.requests[client_id].append(now)
            return True

# 中间件
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    """限流中间件"""
    client_id = request.client.host  # 或使用API Key

    if not await rate_limiter.is_allowed(client_id):
        raise HTTPException(
            status_code=429,
            detail="Too many requests"
        )

    return await call_next(request)

六、监控与日志

6.1 日志配置

# src/utils/logging_config.py
from loguru import logger
import sys

def setup_logging(log_level: str = "INFO", log_file: str = "logs/agent.log"):
    """配置日志"""
    # 移除默认处理器
    logger.remove()

    # 控制台输出(JSON格式,便于ELK收集)
    logger.add(
        sys.stdout,
        format='{{"time": "{time:YYYY-MM-DD HH:mm:ss}", "level": "{level}", "message": "{message}"}}',
        level=log_level,
        serialize=True
    )

    # 文件输出
    logger.add(
        log_file,
        rotation="100 MB",
        retention="30 days",
        compression="zip",
        level="DEBUG"
    )

    # 错误日志单独存储
    logger.add(
        "logs/error.log",
        rotation="10 MB",
        level="ERROR",
        backtrace=True,
        diagnose=True
    )

    return logger

# 在请求中记录
@app.middleware("http")
async def log_requests(request: Request, call_next):
    """请求日志"""
    start_time = time.time()

    # 请求信息
    logger.info(f"Request: {request.method} {request.url}")

    response = await call_next(request)

    # 响应信息
    duration = time.time() - start_time
    logger.info(f"Response: {response.status_code} in {duration:.2f}s")

    # 添加响应头
    response.headers["X-Process-Time"] = str(duration)

    return response

6.2 Prometheus监控

# src/api/metrics.py
from prometheus_client import Counter, Histogram, Gauge
from prometheus_fastapi_instrumentator import Instrumentator

# 指标定义
REQUEST_COUNT = Counter(
    'agent_requests_total',
    'Total request count',
    ['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'agent_request_latency_seconds',
    'Request latency',
    ['method', 'endpoint']
)

ACTIVE_SESSIONS = Gauge(
    'agent_active_sessions',
    'Number of active sessions'
)

LLM_TOKENS = Counter(
    'agent_llm_tokens_total',
    'Total LLM tokens used',
    ['model', 'type']  # type: input/output
)

# 集成到FastAPI
@app.on_event("startup")
async def startup():
    # 启动Prometheus监控
    Instrumentator().instrument(app).expose(app)

6.3 健康检查

# src/api/health.py
from fastapi import APIRouter
from typing import Dict

router = APIRouter()

@router.get("/health")
async def health_check() -> Dict:
    """健康检查"""
    checks = {
        "api": "healthy",
        "redis": await check_redis(),
        "database": await check_database(),
        "llm": await check_llm()
    }

    all_healthy = all(v == "healthy" for v in checks.values())

    return {
        "status": "healthy" if all_healthy else "unhealthy",
        "checks": checks,
        "timestamp": datetime.now().isoformat()
    }

@router.get("/ready")
async def readiness_check():
    """就绪检查"""
    return {"status": "ready"}

@router.get("/live")
async def liveness_check():
    """存活检查"""
    return {"status": "alive"}

async def check_redis() -> str:
    """检查Redis"""
    try:
        # 实际检查代码
        return "healthy"
    except:
        return "unhealthy"

async def check_database() -> str:
    """检查数据库"""
    try:
        return "healthy"
    except:
        return "unhealthy"

async def check_llm() -> str:
    """检查LLM"""
    try:
        return "healthy"
    except:
        return "unhealthy"

七、安全配置

7.1 认证授权

# src/api/auth.py
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Security(security)
) -> dict:
    """获取当前用户"""
    token = credentials.credentials

    try:
        payload = jwt.decode(
            token,
            settings.jwt_secret,
            algorithms=["HS256"]
        )
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

# 需要认证的端点
@app.post("/api/chat")
async def chat(
    request: ChatRequest,
    user: dict = Depends(get_current_user),
    llm: LLMClient = Depends(get_llm_client)
):
    """需要认证的对话API"""
    # ...

7.2 输入验证

# src/api/validation.py
from pydantic import BaseModel, validator, constr

class SafeChatRequest(BaseModel):
    """安全的对话请求"""
    message: constr(min_length=1, max_length=4000)  # 限制长度

    @validator('message')
    def sanitize_message(cls, v):
        """清理输入"""
        # 移除潜在的恶意内容
        # ...
        return v

八、最佳实践

8.1 部署检查清单

检查项 状态
环境变量配置 [ ]
SSL证书 [ ]
限流配置 [ ]
日志收集 [ ]
监控告警 [ ]
备份策略 [ ]
回滚方案 [ ]

8.2 性能优化

# 性能优化建议
PERFORMANCE_TIPS = [
    "使用异步IO",
    "启用响应缓存",
    "数据库连接池",
    "批量请求处理",
    "流式响应减少延迟"
]

九、总结

部署要点

  1. API服务化:FastAPI提供RESTful接口
  2. 容器化:Docker + Kubernetes
  3. 负载均衡:Nginx/云负载均衡
  4. 监控告警:Prometheus + Grafana
  5. 日志管理:ELK/云日志服务

下期预告

下一篇:Agent安全防护——保护你的Agent不被攻击!


往期回顾

正文完
 0
技术老金
版权声明:本站原创文章,由 技术老金 于2026-04-01发表,共计11381字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)