AI应用开发进阶(七):企业级AI应用架构设计从单体到微服务

98次阅读
没有评论

AI应用开发进阶(七):企业级AI应用架构设计从单体到微服务

一、开场:架构决定上限

大家好,我是老金。

AI应用从小作坊到企业级,架构是关键。

今天聊聊企业级AI应用的架构设计。

二、架构演进

2.1 单体架构

# 单体架构(适合MVP)
"""
┌─────────────────────────────────────┐
│           单体应用                   │
│  ┌─────────┐ ┌─────────┐           │
│  │   API   │ │  LLM    │           │
│  │  Layer  │ │ Client  │           │
│  └────┬────┘ └────┬────┘           │
│       └───────────┘                 │
│              │                      │
│       ┌──────┴──────┐              │
│       │  Database   │              │
│       └─────────────┘              │
└─────────────────────────────────────┘
"""

# 优点:简单、快速
# 缺点:扩展性差、耦合高

2.2 分层架构

# 分层架构
"""
┌─────────────────────────────────────┐
│           API Gateway               │
├─────────────────────────────────────┤
│         Business Layer              │
│  ┌─────────┐ ┌─────────┐           │
│  │  Chat   │ │  Agent  │           │
│  │ Service │ │ Service │           │
│  └────┬────┘ └────┬────┘           │
├───────┼───────────┼────────────────┤
│       │   Core    │                 │
│  ┌────┴────┐ ┌────┴────┐           │
│  │  LLM    │ │ Memory  │           │
│  │ Service │ │ Service │           │
│  └────┬────┘ └────┬────┘           │
├───────┼───────────┼────────────────┤
│       │ Infrastructure │            │
│  ┌────┴────┐ ┌────┴────┐           │
│  │  Cache  │ │   DB    │           │
│  └─────────┘ └─────────┘           │
└─────────────────────────────────────┘
"""

2.3 微服务架构

# 微服务架构(推荐)
MICROSERVICES_ARCH = {
    "api-gateway": {
        "职责": "路由、认证、限流",
        "技术": "Kong / Nginx / Envoy"
    },
    "chat-service": {
        "职责": "对话管理",
        "依赖": ["llm-service", "memory-service"]
    },
    "agent-service": {
        "职责": "Agent执行",
        "依赖": ["llm-service", "tool-service"]
    },
    "llm-service": {
        "职责": "LLM调用",
        "支持": ["openai", "anthropic", "local"]
    },
    "memory-service": {
        "职责": "记忆管理",
        "存储": ["redis", "vectordb"]
    },
    "tool-service": {
        "职责": "工具执行",
        "类型": ["search", "code", "api"]
    },
    "embedding-service": {
        "职责": "文本向量化",
        "模型": ["openai", "local"]
    }
}

三、核心服务设计

3.1 LLM服务

# LLM服务(统一抽象)
from abc import ABC, abstractmethod
from typing import AsyncIterator

class LLMProvider(ABC):
    """LLM提供商抽象"""

    @abstractmethod
    async def chat(self, messages: list, **kwargs) -> str:
        pass

    @abstractmethod
    async def chat_stream(self, messages: list, **kwargs) -> AsyncIterator[str]:
        pass

class OpenAIProvider(LLMProvider):
    """OpenAI实现"""

    async def chat(self, messages, **kwargs):
        response = await openai.ChatCompletion.acreate(
            model=kwargs.get('model', 'gpt-4'),
            messages=messages
        )
        return response.choices[0].message.content

class LocalProvider(LLMProvider):
    """本地模型实现"""

    async def chat(self, messages, **kwargs):
        # 调用本地vLLM服务
        async with httpx.AsyncClient() as client:
            response = await client.post(
                "http://localhost:8000/v1/chat/completions",
                json={"messages": messages}
            )
            return response.json()["choices"][0]["message"]["content"]

class LLMService:
    """LLM服务(路由+容错)"""

    def __init__(self):
        self.providers = {
            "openai": OpenAIProvider(),
            "local": LocalProvider()
        }
        self.fallback_order = ["openai", "local"]

    async def chat(self, messages, provider=None, **kwargs):
        """智能路由"""
        providers_to_try = [provider] if provider else self.fallback_order

        for p in providers_to_try:
            try:
                return await self.providers[p].chat(messages, **kwargs)
            except Exception as e:
                logger.warning(f"Provider {p} failed: {e}")
                continue

        raise Exception("All providers failed")

3.2 记忆服务

# 记忆服务
class MemoryService:
    """分层记忆服务"""

    def __init__(self):
        self.working_memory = RedisCache()  # 工作记忆
        self.short_term = PostgreSQL()       # 短期记忆
        self.long_term = ChromaDB()          # 长期记忆(向量)

    async def add(self, user_id: str, message: dict, level: str = "short"):
        """添加记忆"""
        if level == "working":
            await self.working_memory.set(
                f"working:{user_id}",
                message,
                ttl=3600
            )
        elif level == "short":
            await self.short_term.insert({
                "user_id": user_id,
                "content": message,
                "timestamp": datetime.now()
            })
        elif level == "long":
            embedding = await self._embed(message["content"])
            await self.long_term.add(
                ids=f"{user_id}:{uuid.uuid4()}",
                embeddings=embedding,
                documents=message["content"],
                metadatas={"user_id": user_id}
            )

    async def retrieve(self, user_id: str, query: str, top_k: int = 5):
        """检索记忆"""
        # 1. 工作记忆
        working = await self.working_memory.get(f"working:{user_id}")

        # 2. 短期记忆
        short = await self.short_term.query(
            f"SELECT * FROM memories WHERE user_id = '{user_id}' ORDER BY timestamp DESC LIMIT 10"
        )

        # 3. 长期记忆(语义检索)
        query_embedding = await self._embed(query)
        long = await self.long_term.query(
            query_embeddings=query_embedding,
            n_results=top_k,
            where={"user_id": user_id}
        )

        return {
            "working": working,
            "short_term": short,
            "long_term": long
        }

四、数据流设计

4.1 事件驱动架构

# 使用消息队列解耦
import asyncio
from typing import Callable

class EventBus:
    """事件总线"""

    def __init__(self):
        self.handlers: dict[str, list[Callable]] = {}
        self.queue = asyncio.Queue()

    def subscribe(self, event_type: str, handler: Callable):
        """订阅事件"""
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)

    async def publish(self, event_type: str, data: dict):
        """发布事件"""
        await self.queue.put({"type": event_type, "data": data})

    async def start(self):
        """启动事件循环"""
        while True:
            event = await self.queue.get()
            handlers = self.handlers.get(event["type"], [])

            for handler in handlers:
                asyncio.create_task(handler(event["data"]))

# 使用示例
event_bus = EventBus()

# 订阅
async def on_message_created(data):
    # 异步处理:记录日志、更新统计、触发工作流
    await analytics.record_message(data)
    await workflow.check_triggers(data)

event_bus.subscribe("message.created", on_message_created)

# 发布
await event_bus.publish("message.created", {
    "user_id": user_id,
    "content": content
})

五、部署架构

5.1 Kubernetes部署

# k8s/llm-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: llm-service
  template:
    metadata:
      labels:
        app: llm-service
    spec:
      containers:
      - name: llm
        image: ai-platform/llm-service:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-keys
              key: openai
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: llm-service
spec:
  selector:
    app: llm-service
  ports:
  - port: 80
    targetPort: 8000

5.2 服务网格

# Istio配置
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: ai-platform
spec:
  hosts:
  - ai-platform.example.com
  http:
  - match:
    - uri:
        prefix: /api/chat
    route:
    - destination:
        host: chat-service
      weight: 100
    timeout: 30s
    retries:
      attempts: 3
      perTryTimeout: 10s

  - match:
    - uri:
        prefix: /api/agent
    route:
    - destination:
        host: agent-service
    fault:
      delay:
        percentage:
          value: 0.1
        fixedDelay: 5s

六、总结

架构选择

阶段 推荐架构 说明
MVP 单体 快速验证
成长期 分层 模块分离
成熟期 微服务 独立扩展

关键原则

  1. 服务拆分:按业务边界
  2. 数据隔离:每个服务独立数据
  3. 异步通信:事件驱动解耦
  4. 容错设计:熔断、降级、重试

相关阅读

正文完
 0
技术老金
版权声明:本站原创文章,由 技术老金 于2026-04-04发表,共计5794字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)