一、 开场:从”单打独斗”到”团队协作”
大家好,我是老金。
过去一年,我见证了AI Agent从”单Agent”向”多Agent协作”的演进。
最初,我们做一个客服Agent,就是一个LLM加一堆工具调用。
后来发现:让一个Agent干所有事,效果并不好。
- 任务太复杂,容易”精神分裂”
- 专业领域不精通
- 难以并行处理
于是,Multi-Agent架构应运而生。
就像人类社会一样:术业有专攻,协作出奇迹。
今天这篇文章,我想聊聊Multi-Agent系统的设计模式、协作机制,以及实战经验。
二、 Multi-Agent的核心设计模式
2.1 主管-工人模式(Supervisor-Worker)
最经典的模式:一个主管Agent负责任务分发,多个工人Agent负责执行。
┌─────────────┐
│ Supervisor │ ← 负责任务分解和分发
│ Agent │
└──────┬──────┘
│
┌───────────┼───────────┐
│ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│Worker │ │Worker │ │Worker │
│ A │ │ B │ │ C │
└───────┘ └───────┘ └───────┘
适用场景:
- 任务可以明确分解
- 子任务之间相对独立
- 需要并行处理
代码示例:
class SupervisorAgent: def __init__(self): self.workers = { "researcher": ResearcherAgent(), "writer": WriterAgent(), "reviewer": ReviewerAgent() }async def process(self, task): # 1. 分析任务 subtasks = await self.decompose_task(task) # 2. 分发任务 results = {} for subtask in subtasks: worker = self.select_worker(subtask) results[subtask.id] = await worker.execute(subtask) # 3. 整合结果 final_result = await self.integrate(results) return final_result def select_worker(self, subtask): """根据子任务类型选择Worker""" if subtask.type == "research": return self.workers["researcher"] elif subtask.type == "writing": return self.workers["writer"] # ...2.2 对话辩论模式(Debate)
核心思想:让多个Agent从不同角度”辩论”,最终得出更全面的结论。
┌─────────────┐ ┌─────────────┐ │ Agent A │ ←─→ │ Agent B │ │ (正方) │ │ (反方) │ └──────┬──────┘ └──────┬──────┘ │ │ └─────────┬─────────┘ │ ┌──────▼──────┐ │ Judge Agent│ ← 裁判Agent │ (决策者) │ └─────────────┘适用场景:
- 需要多角度思考的问题
- 创意生成、方案评估
- 减少单Agent的偏见
class DebateSystem: async def debate(self, question, rounds=3): pro_argument = "" con_argument = ""for round_num in range(rounds): # Agent A 提出支持观点 pro_argument = await self.pro_agent.argue( question, previous_con=con_argument ) # Agent B 提出反对观点 con_argument = await self.con_agent.argue( question, previous_pro=pro_argument ) # Judge Agent 综合判断 final_decision = await self.judge_agent.decide( question, pro_arguments=pro_argument, con_arguments=con_argument ) return final_decision2.3 层级协作模式(Hierarchical)
核心思想:模拟企业组织架构,多层Agent协作。
┌─────────────┐ │ CEO Agent │ ← 战略决策 └──────┬──────┘ │ ┌───────────┼───────────┐ │ │ │ ┌─────▼─────┐ ┌───▼───┐ ┌─────▼─────┐ │ Manager A │ │Manager│ │ Manager C │ ← 战术执行 └─────┬─────┘ │ B │ └─────┬─────┘ │ └───┬───┘ │ ┌────┴────┐ │ ┌────┴────┐ │ │ │ │ │ ┌───▼───┐ ┌───▼───┐ │ ┌───▼───┐ ┌───▼───┐ │Worker │ │Worker │ │ │Worker │ │Worker │ ← 具体任务 │ 1 │ │ 2 │ │ │ 3 │ │ 4 │ └───────┘ └───────┘ │ └───────┘ └───────┘ │ ┌────▼────┐ │ Worker │ │ 5 │ └─────────┘适用场景:
- 复杂的企业级应用
- 需要权限分级
- 大规模任务调度
2.4 流水线模式(Pipeline)
核心思想:任务按照固定流程流转,每个Agent负责一个环节。
Input → [Agent 1] → [Agent 2] → [Agent 3] → Output
│ │ │
数据清洗 分析处理 生成报告
class Pipeline: def __init__(self, agents): self.agents = agentsasync def run(self, input_data): data = input_data for agent in self.agents: data = await agent.process(data) # 可选:中间结果验证 if not self.validate(data): raise Exception(f"{agent.name} 输出验证失败") return data使用示例
pipeline = Pipeline([
DataCleanerAgent(),
AnalyzerAgent(),
ReporterAgent()
])result = await pipeline.run(raw_data)
2.5 黑板模式(Blackboard)
核心思想:所有Agent共享一个”黑板”,在上面读写信息。
┌─────────────────────┐ │ Blackboard │ │ (共享状态存储) │ └──────────┬──────────┘ │ ┌──────────────────────┼──────────────────────┐ │ │ │ ┌───▼───┐ ┌───▼───┐ ┌───▼───┐ │Agent A│ │Agent B│ │Agent C│ │(专家1)│ │(专家2)│ │(专家3)│ └───────┘ └───────┘ └───────┘class Blackboard: def __init__(self): self.data = {} self.subscribers = []def write(self, key, value, agent_id): self.data[key] = { "value": value, "author": agent_id, "timestamp": time.time() } self.notify_subscribers(key, value) def read(self, key): return self.data.get(key, {}).get("value") def subscribe(self, agent, keys): self.subscribers.append({ "agent": agent, "keys": keys })class ExpertAgent:
def init(self, blackboard, expertise):
self.blackboard = blackboard
self.expertise = expertise
self.id = str(uuid.uuid4())async def contribute(self): # 从黑板读取相关信息 context = self.blackboard.read("problem_context") # 根据专业知识做出贡献 result = await self.analyze(context) # 写入黑板 self.blackboard.write( f"{self.expertise}_analysis", result, self.id )三、 Agent间通信机制
3.1 直接通信
class Agent: def __init__(self): self.peers = {} # 其他Agent的引用def register_peer(self, name, agent): self.peers[name] = agent async def send_message(self, peer_name, message): return await self.peers[peer_name].receive(message) async def receive(self, message): # 处理收到的消息 return await self.process(message)3.2 消息队列
import asyncio from collections import defaultdictclass MessageBus: def init(self): self.queues = defaultdict(asyncio.Queue)
async def publish(self, topic, message): await self.queues[topic].put(message) async def subscribe(self, topic, handler): while True: message = await self.queues[topic].get() await handler(message)使用示例
bus = MessageBus()
Agent A 发布消息
await bus.publish("task_completed", {
"task_id": "123",
"result": "success"
})Agent B 订阅消息
await bus.subscribe("task_completed", handle_task_result)
3.3 共享状态
class SharedState: def __init__(self): self.state = {} self.locks = defaultdict(asyncio.Lock)async def update(self, key, value): async with self.locks[key]: self.state[key] = value async def get(self, key): async with self.locks[key]: return self.state.get(key)四、 实战案例:智能客服系统
以我之前做的智能客服系统为例:
4.1 系统架构
用户咨询 ↓ ┌─────────────────┐ │ Router Agent │ ← 意图识别、任务分发 └────────┬────────┘ │ ┌────┴────┬─────────┬─────────┐ │ │ │ │ ┌───▼───┐ ┌───▼───┐ ┌───▼───┐ ┌───▼───┐ │Order │ │Product│ │Complaint│ │FAQ │ │Agent │ │Agent │ │ Agent │ │Agent │ └───────┘ └───────┘ └─────────┘ └───────┘ │ │ │ │ └─────────┴─────────┴─────────┘ │ ┌───────▼───────┐ │ Summary Agent │ ← 整合回复 └───────────────┘4.2 核心代码
class CustomerServiceSystem: def __init__(self): self.router = RouterAgent() self.agents = { "order": OrderAgent(), "product": ProductAgent(), "complaint": ComplaintAgent(), "faq": FAQAgent() } self.summarizer = SummaryAgent()async def handle_query(self, user_input, context): # 1. 路由判断 route_result = await self.router.analyze(user_input) # 2. 分发给专业Agent responses = [] for agent_type in route_result["agents"]: agent = self.agents[agent_type] response = await agent.handle(user_input, context) responses.append(response) # 3. 整合回复 final_response = await self.summarizer.summarize( user_input, responses ) return final_responseclass RouterAgent:
async def analyze(self, query):
prompt = f"""
分析用户查询,判断应该由哪些专业团队处理:用户查询:{query} 可选团队: - order: 订单查询、物流跟踪 - product: 产品咨询、规格参数 - complaint: 投诉建议、问题反馈 - faq: 常见问题解答 返回JSON格式:{{"agents": ["team1", "team2"]}} """ result = await llm.call(prompt) return json.loads(result)4.3 效果对比
| 指标 | 单Agent | Multi-Agent |
|---|---|---|
| 问题解决率 | 72% | 91% |
| 平均响应时间 | 8秒 | 5秒 |
| 用户满意度 | 3.8/5 | 4.6/5 |
五、 最佳实践
- 职责单一:每个Agent只做一件事,做好一件事
- 明确边界:定义清晰的输入输出规范
- 容错设计:单个Agent失败不影响整体
- 监控可观测:每个Agent的行为都要可追踪
- 成本控制:不是所有Agent都需要顶级模型
六、 写在最后
Multi-Agent不是银弹,但它确实是解决复杂问题的有效手段。
关键原则:让专业的Agent做专业的事,协作胜过全能。
如果你正在设计Multi-Agent系统,希望这篇文章对你有帮助!
我是技术老金,我们下期见!
📌 往期精彩回顾