AI应用开发进阶(七):企业级AI应用架构设计从单体到微服务
一、开场:架构决定上限
大家好,我是老金。
AI应用从小作坊到企业级,架构是关键。
今天聊聊企业级AI应用的架构设计。
二、架构演进
2.1 单体架构
# 单体架构(适合MVP)
"""
┌─────────────────────────────────────┐
│ 单体应用 │
│ ┌─────────┐ ┌─────────┐ │
│ │ API │ │ LLM │ │
│ │ Layer │ │ Client │ │
│ └────┬────┘ └────┬────┘ │
│ └───────────┘ │
│ │ │
│ ┌──────┴──────┐ │
│ │ Database │ │
│ └─────────────┘ │
└─────────────────────────────────────┘
"""
# 优点:简单、快速
# 缺点:扩展性差、耦合高
2.2 分层架构
# 分层架构
"""
┌─────────────────────────────────────┐
│ API Gateway │
├─────────────────────────────────────┤
│ Business Layer │
│ ┌─────────┐ ┌─────────┐ │
│ │ Chat │ │ Agent │ │
│ │ Service │ │ Service │ │
│ └────┬────┘ └────┬────┘ │
├───────┼───────────┼────────────────┤
│ │ Core │ │
│ ┌────┴────┐ ┌────┴────┐ │
│ │ LLM │ │ Memory │ │
│ │ Service │ │ Service │ │
│ └────┬────┘ └────┬────┘ │
├───────┼───────────┼────────────────┤
│ │ Infrastructure │ │
│ ┌────┴────┐ ┌────┴────┐ │
│ │ Cache │ │ DB │ │
│ └─────────┘ └─────────┘ │
└─────────────────────────────────────┘
"""
2.3 微服务架构
# 微服务架构(推荐)
MICROSERVICES_ARCH = {
"api-gateway": {
"职责": "路由、认证、限流",
"技术": "Kong / Nginx / Envoy"
},
"chat-service": {
"职责": "对话管理",
"依赖": ["llm-service", "memory-service"]
},
"agent-service": {
"职责": "Agent执行",
"依赖": ["llm-service", "tool-service"]
},
"llm-service": {
"职责": "LLM调用",
"支持": ["openai", "anthropic", "local"]
},
"memory-service": {
"职责": "记忆管理",
"存储": ["redis", "vectordb"]
},
"tool-service": {
"职责": "工具执行",
"类型": ["search", "code", "api"]
},
"embedding-service": {
"职责": "文本向量化",
"模型": ["openai", "local"]
}
}
三、核心服务设计
3.1 LLM服务
# LLM服务(统一抽象)
from abc import ABC, abstractmethod
from typing import AsyncIterator
class LLMProvider(ABC):
"""LLM提供商抽象"""
@abstractmethod
async def chat(self, messages: list, **kwargs) -> str:
pass
@abstractmethod
async def chat_stream(self, messages: list, **kwargs) -> AsyncIterator[str]:
pass
class OpenAIProvider(LLMProvider):
"""OpenAI实现"""
async def chat(self, messages, **kwargs):
response = await openai.ChatCompletion.acreate(
model=kwargs.get('model', 'gpt-4'),
messages=messages
)
return response.choices[0].message.content
class LocalProvider(LLMProvider):
"""本地模型实现"""
async def chat(self, messages, **kwargs):
# 调用本地vLLM服务
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8000/v1/chat/completions",
json={"messages": messages}
)
return response.json()["choices"][0]["message"]["content"]
class LLMService:
"""LLM服务(路由+容错)"""
def __init__(self):
self.providers = {
"openai": OpenAIProvider(),
"local": LocalProvider()
}
self.fallback_order = ["openai", "local"]
async def chat(self, messages, provider=None, **kwargs):
"""智能路由"""
providers_to_try = [provider] if provider else self.fallback_order
for p in providers_to_try:
try:
return await self.providers[p].chat(messages, **kwargs)
except Exception as e:
logger.warning(f"Provider {p} failed: {e}")
continue
raise Exception("All providers failed")
3.2 记忆服务
# 记忆服务
class MemoryService:
"""分层记忆服务"""
def __init__(self):
self.working_memory = RedisCache() # 工作记忆
self.short_term = PostgreSQL() # 短期记忆
self.long_term = ChromaDB() # 长期记忆(向量)
async def add(self, user_id: str, message: dict, level: str = "short"):
"""添加记忆"""
if level == "working":
await self.working_memory.set(
f"working:{user_id}",
message,
ttl=3600
)
elif level == "short":
await self.short_term.insert({
"user_id": user_id,
"content": message,
"timestamp": datetime.now()
})
elif level == "long":
embedding = await self._embed(message["content"])
await self.long_term.add(
ids=f"{user_id}:{uuid.uuid4()}",
embeddings=embedding,
documents=message["content"],
metadatas={"user_id": user_id}
)
async def retrieve(self, user_id: str, query: str, top_k: int = 5):
"""检索记忆"""
# 1. 工作记忆
working = await self.working_memory.get(f"working:{user_id}")
# 2. 短期记忆
short = await self.short_term.query(
f"SELECT * FROM memories WHERE user_id = '{user_id}' ORDER BY timestamp DESC LIMIT 10"
)
# 3. 长期记忆(语义检索)
query_embedding = await self._embed(query)
long = await self.long_term.query(
query_embeddings=query_embedding,
n_results=top_k,
where={"user_id": user_id}
)
return {
"working": working,
"short_term": short,
"long_term": long
}
四、数据流设计
4.1 事件驱动架构
# 使用消息队列解耦
import asyncio
from typing import Callable
class EventBus:
"""事件总线"""
def __init__(self):
self.handlers: dict[str, list[Callable]] = {}
self.queue = asyncio.Queue()
def subscribe(self, event_type: str, handler: Callable):
"""订阅事件"""
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
async def publish(self, event_type: str, data: dict):
"""发布事件"""
await self.queue.put({"type": event_type, "data": data})
async def start(self):
"""启动事件循环"""
while True:
event = await self.queue.get()
handlers = self.handlers.get(event["type"], [])
for handler in handlers:
asyncio.create_task(handler(event["data"]))
# 使用示例
event_bus = EventBus()
# 订阅
async def on_message_created(data):
# 异步处理:记录日志、更新统计、触发工作流
await analytics.record_message(data)
await workflow.check_triggers(data)
event_bus.subscribe("message.created", on_message_created)
# 发布
await event_bus.publish("message.created", {
"user_id": user_id,
"content": content
})
五、部署架构
5.1 Kubernetes部署
# k8s/llm-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-service
spec:
replicas: 3
selector:
matchLabels:
app: llm-service
template:
metadata:
labels:
app: llm-service
spec:
containers:
- name: llm
image: ai-platform/llm-service:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-keys
key: openai
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: llm-service
spec:
selector:
app: llm-service
ports:
- port: 80
targetPort: 8000
5.2 服务网格
# Istio配置
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: ai-platform
spec:
hosts:
- ai-platform.example.com
http:
- match:
- uri:
prefix: /api/chat
route:
- destination:
host: chat-service
weight: 100
timeout: 30s
retries:
attempts: 3
perTryTimeout: 10s
- match:
- uri:
prefix: /api/agent
route:
- destination:
host: agent-service
fault:
delay:
percentage:
value: 0.1
fixedDelay: 5s
六、总结
架构选择
| 阶段 | 推荐架构 | 说明 |
|---|---|---|
| MVP | 单体 | 快速验证 |
| 成长期 | 分层 | 模块分离 |
| 成熟期 | 微服务 | 独立扩展 |
关键原则
- 服务拆分:按业务边界
- 数据隔离:每个服务独立数据
- 异步通信:事件驱动解耦
- 容错设计:熔断、降级、重试
相关阅读
正文完