AI Agent开发实战(五):Multi-Agent协作让团队工作
一、开场:一个Agent不够用
大家好,我是老金。
复杂任务一个Agent搞不定:
- 研究任务:需要搜索、分析、总结
- 软件开发:需求分析、编码、测试、部署
- 客服系统:分类、处理、升级、反馈
这时候需要多个Agent协作:
- 每个Agent专注一个任务
- Agent之间可以通信和协作
- 有协调者统一调度
今天我们实现Multi-Agent协作系统。
二、Multi-Agent架构
2.1 协作模式
┌─────────────────────────────────────────────────────────┐
│ Multi-Agent协作模式 │
├─────────────────────────────────────────────────────────┤
│ │
│ 模式1:顺序执行 │
│ ┌───────┐ ┌───────┐ ┌───────┐ │
│ │ Agent1│ → │ Agent2│ → │ Agent3│ → 输出 │
│ └───────┘ └───────┘ └───────┘ │
│ │
│ 模式2:层级协调 │
│ ┌──────────────┐ │
│ │ Orchestrator │ │
│ └──────┬───────┘ │
│ ┌─────────┼─────────┐ │
│ ↓ ↓ ↓ │
│ ┌────────┐┌────────┐┌────────┐ │
│ │ Agent1 ││ Agent2 ││ Agent3 │ │
│ └────────┘└────────┘└────────┘ │
│ │
│ 模式3:对等协作 │
│ ┌────────┐ 消息 ┌────────┐ │
│ │ Agent1 │ ←───→ │ Agent2 │ │
│ └────────┘ └────────┘ │
│ ↑ ↑ │
│ └──── 消息 ────┘ │
│ │
└─────────────────────────────────────────────────────────┘
2.2 消息定义
# src/agents/message.py
from pydantic import BaseModel
from typing import Any, Optional
from datetime import datetime
import uuid
class AgentMessage(BaseModel):
"""Agent间消息"""
id: str = str(uuid.uuid4())
sender: str # 发送者Agent ID
receiver: str # 接收者Agent ID(可以是"all")
content: str # 消息内容
type: str = "task" # task | result | query | control
metadata: dict = {} # 额外元数据
timestamp: datetime = datetime.now()
# 任务相关
task_id: Optional[str] = None
parent_task_id: Optional[str] = None
def to_dict(self) -> dict:
return {
"id": self.id,
"sender": self.sender,
"receiver": self.receiver,
"content": self.content,
"type": self.type,
"metadata": self.metadata,
"timestamp": self.timestamp.isoformat()
}
三、Agent通信系统
3.1 消息总线
# src/agents/message_bus.py
from typing import Dict, List, Callable, Optional
from .message import AgentMessage
import asyncio
from collections import defaultdict
class MessageBus:
"""消息总线"""
def __init__(self):
# 订阅者:agent_id -> callback
self._subscribers: Dict[str, Callable] = {}
# 消息队列:agent_id -> messages
self._queues: Dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
# 广播历史
self._history: List[AgentMessage] = []
def subscribe(self, agent_id: str, callback: Callable):
"""订阅消息"""
self._subscribers[agent_id] = callback
def unsubscribe(self, agent_id: str):
"""取消订阅"""
self._subscribers.pop(agent_id, None)
async def send(self, message: AgentMessage):
"""发送消息"""
# 记录历史
self._history.append(message)
# 发送到指定接收者
if message.receiver != "all":
if message.receiver in self._subscribers:
await self._queues[message.receiver].put(message)
else:
# 广播给所有订阅者(除了发送者)
for agent_id in self._subscribers:
if agent_id != message.sender:
await self._queues[agent_id].put(message)
async def receive(self, agent_id: str, timeout: float = None) -> Optional[AgentMessage]:
"""接收消息"""
try:
if timeout:
message = await asyncio.wait_for(
self._queues[agent_id].get(),
timeout=timeout
)
else:
message = await self._queues[agent_id].get()
return message
except asyncio.TimeoutError:
return None
def get_history(self, agent_id: str = None) -> List[AgentMessage]:
"""获取消息历史"""
if agent_id:
return [m for m in self._history if m.sender == agent_id or m.receiver == agent_id]
return self._history
# 全局消息总线
message_bus = MessageBus()
3.2 通信Agent
# src/agents/communicating_agent.py
from .base import BaseAgent
from .message import AgentMessage
from .message_bus import message_bus
from typing import Optional, List
import asyncio
class CommunicatingAgent(BaseAgent):
"""可通信的Agent"""
def __init__(self, agent_id: str, **kwargs):
super().__init__(**kwargs)
self.agent_id = agent_id
# 订阅消息
message_bus.subscribe(agent_id, self._on_message)
# 启动消息监听
self._listening = True
self._listen_task = asyncio.create_task(self._listen_messages())
async def _listen_messages(self):
"""监听消息"""
while self._listening:
message = await message_bus.receive(self.agent_id, timeout=1.0)
if message:
await self._handle_message(message)
async def _on_message(self, message: AgentMessage):
"""消息回调"""
pass
async def _handle_message(self, message: AgentMessage):
"""处理消息"""
if message.type == "task":
# 执行任务
result = await self.run(message.content)
# 发送结果
await self.send_message(
receiver=message.sender,
content=result,
type="result",
task_id=message.task_id
)
elif message.type == "query":
# 回答查询
answer = await self.think()
await self.send_message(
receiver=message.sender,
content=answer,
type="result"
)
async def send_message(
self,
receiver: str,
content: str,
type: str = "task",
**kwargs
):
"""发送消息"""
message = AgentMessage(
sender=self.agent_id,
receiver=receiver,
content=content,
type=type,
**kwargs
)
await message_bus.send(message)
async def broadcast(self, content: str, type: str = "task"):
"""广播消息"""
await self.send_message(
receiver="all",
content=content,
type=type
)
def stop(self):
"""停止监听"""
self._listening = False
message_bus.unsubscribe(self.agent_id)
四、Orchestrator协调器
4.1 任务分解器
# src/agents/orchestrator.py
from .communicating_agent import CommunicatingAgent
from .message import AgentMessage
from ..utils.llm_client import LLMClient
from typing import List, Dict, Any
import json
class Orchestrator(CommunicatingAgent):
"""协调器"""
def __init__(
self,
agent_id: str = "orchestrator",
workers: Dict[str, str] = None, # worker_id -> role
**kwargs
):
super().__init__(agent_id=agent_id, **kwargs)
self.workers = workers or {}
self.tasks: Dict[str, dict] = {}
self.results: Dict[str, List[AgentMessage]] = {}
async def decompose_task(self, task: str) -> List[dict]:
"""分解任务"""
prompt = f"""作为一个任务协调者,将以下任务分解为子任务:
任务:{task}
可用的工作者角色:
{json.dumps(self.workers, ensure_ascii=False, indent=2)}
请输出JSON格式的子任务列表:
[
{{
"subtask": "子任务描述",
"assigned_to": "工作者ID",
"dependencies": ["依赖的子任务ID"]
}}
]
注意:
1. 子任务应该清晰具体
2. 分配给合适角色的工作者
3. 标明依赖关系
"""
response = await self.llm.chat([
{"role": "system", "content": "你是一个任务分解专家。"},
{"role": "user", "content": prompt}
])
# 解析JSON
try:
subtasks = json.loads(response)
return subtasks
except:
return [{"subtask": task, "assigned_to": list(self.workers.keys())[0], "dependencies": []}]
async def assign_task(self, subtask: dict) -> str:
"""分配任务"""
worker_id = subtask["assigned_to"]
message = AgentMessage(
sender=self.agent_id,
receiver=worker_id,
content=subtask["subtask"],
type="task",
metadata={"dependencies": subtask.get("dependencies", [])}
)
await message_bus.send(message)
return message.id
async def collect_results(self, task_id: str, timeout: float = 60.0) -> List[str]:
"""收集结果"""
results = []
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time = len(self.workers):
break
await asyncio.sleep(0.1)
return results
async def synthesize(self, results: List[str], original_task: str) -> str:
"""综合结果"""
prompt = f"""将以下子任务的结果综合为最终答案:
原始任务:{original_task}
子任务结果:
{chr(10).join([f'{i+1}. {r}' for i, r in enumerate(results)])}
请输出综合后的完整答案:"""
return await self.llm.chat([{"role": "user", "content": prompt}])
async def execute_task(self, task: str) -> str:
"""执行任务"""
# 1. 分解任务
subtasks = await self.decompose_task(task)
# 2. 分配任务
task_ids = []
for subtask in subtasks:
task_id = await self.assign_task(subtask)
task_ids.append(task_id)
# 3. 等待结果
await asyncio.sleep(2) # 等待工作者处理
# 4. 收集结果
results = []
for task_id in task_ids:
# 简化:直接获取历史中的结果
history = message_bus.get_history()
for msg in history:
if msg.type == "result" and msg.receiver == self.agent_id:
results.append(msg.content)
# 5. 综合
if results:
return await self.synthesize(results, task)
else:
return "任务执行失败:没有收到结果"
async def _handle_message(self, message: AgentMessage):
"""处理消息"""
if message.type == "result":
# 保存结果
if message.task_id not in self.results:
self.results[message.task_id] = []
self.results[message.task_id].append(message)
elif message.type == "task":
# 新任务
result = await self.execute_task(message.content)
await self.send_message(
receiver=message.sender,
content=result,
type="result",
task_id=message.task_id
)
五、Worker工作者
5.1 专业Worker
# src/agents/worker.py
from .communicating_agent import CommunicatingAgent
from typing import Optional
class WorkerAgent(CommunicatingAgent):
"""工作者Agent"""
def __init__(
self,
agent_id: str,
role: str,
capabilities: List[str] = None,
**kwargs
):
super().__init__(agent_id=agent_id, **kwargs)
self.role = role
self.capabilities = capabilities or []
async def process_task(self, task: str) -> str:
"""处理任务"""
# 基于角色处理
system_prompt = f"""你是一个{self.role}。
能力:{', '.join(self.capabilities)}
请专注于你的专业领域处理任务。
"""
response = await self.llm.chat([
{"role": "system", "content": system_prompt},
{"role": "user", "content": task}
])
return response
async def _handle_message(self, message: AgentMessage):
"""处理消息"""
if message.type == "task":
# 处理任务
result = await self.process_task(message.content)
# 返回结果
await self.send_message(
receiver=message.sender,
content=result,
type="result",
task_id=message.task_id
)
# 预定义的工作者
class ResearcherWorker(WorkerAgent):
"""研究工作者"""
def __init__(self, **kwargs):
super().__init__(
role="研究员",
capabilities=["信息搜索", "资料整理", "数据分析"],
**kwargs
)
class WriterWorker(WorkerAgent):
"""写作工作者"""
def __init__(self, **kwargs):
super().__init__(
role="作家",
capabilities=["文章写作", "内容创作", "文案编辑"],
**kwargs
)
class CoderWorker(WorkerAgent):
"""编程工作者"""
def __init__(self, **kwargs):
super().__init__(
role="程序员",
capabilities=["代码编写", "Bug修复", "代码审查"],
**kwargs
)
六、完整Multi-Agent系统
6.1 Agent团队
# src/agents/team.py
from .orchestrator import Orchestrator
from .worker import WorkerAgent, ResearcherWorker, WriterWorker, CoderWorker
from ..utils.llm_client import LLMClient
from typing import Dict, List, Optional
import asyncio
class AgentTeam:
"""Agent团队"""
def __init__(self, llm_client: LLMClient):
self.llm = llm_client
self.agents: Dict[str, CommunicatingAgent] = {}
self.orchestrator: Optional[Orchestrator] = None
def add_agent(self, agent: CommunicatingAgent):
"""添加Agent"""
self.agents[agent.agent_id] = agent
def setup_default_team(self):
"""设置默认团队"""
# 创建工作者
researcher = ResearcherWorker(
agent_id="researcher",
llm_client=self.llm
)
writer = WriterWorker(
agent_id="writer",
llm_client=self.llm
)
coder = CoderWorker(
agent_id="coder",
llm_client=self.llm
)
# 创建协调器
self.orchestrator = Orchestrator(
agent_id="orchestrator",
llm_client=self.llm,
workers={
"researcher": "研究员",
"writer": "作家",
"coder": "程序员"
}
)
# 注册
self.add_agent(researcher)
self.add_agent(writer)
self.add_agent(coder)
self.add_agent(self.orchestrator)
async def execute(self, task: str) -> str:
"""执行任务"""
if not self.orchestrator:
raise ValueError("Orchestrator not initialized")
return await self.orchestrator.execute_task(task)
def stop_all(self):
"""停止所有Agent"""
for agent in self.agents.values():
agent.stop()
6.2 使用示例
# examples/multi_agent_demo.py
import asyncio
from src.utils.llm_client import LLMClient
from src.agents.team import AgentTeam
async def main():
llm = LLMClient(provider="openai", model="gpt-4-turbo-preview")
# 创建团队
team = AgentTeam(llm_client=llm)
team.setup_default_team()
# 执行任务
task = "研究AI Agent的发展现状,然后写一篇技术博客"
print(f"任务:{task}n")
result = await team.execute(task)
print(f"结果:{result}")
# 停止
team.stop_all()
if __name__ == "__main__":
asyncio.run(main())
七、协作模式实现
7.1 顺序协作
# src/agents/pipeline.py
from typing import List
from .base import BaseAgent
class Pipeline:
"""顺序执行管道"""
def __init__(self, agents: List[BaseAgent]):
self.agents = agents
async def execute(self, input_text: str) -> str:
"""顺序执行"""
current = input_text
for i, agent in enumerate(self.agents):
print(f"执行第{i+1}个Agent...")
current = await agent.run(current)
return current
# 使用示例
async def pipeline_demo():
llm = LLMClient(provider="openai")
# 创建管道:研究 -> 写作 -> 编辑
researcher = ChatAgent(llm, system_prompt="你是研究员...")
writer = ChatAgent(llm, system_prompt="你是作家...")
editor = ChatAgent(llm, system_prompt="你是编辑...")
pipeline = Pipeline([researcher, writer, editor])
result = await pipeline.execute("研究AI Agent并写一篇文章")
7.2 并行协作
# src/agents/parallel.py
from typing import List, Dict
from .base import BaseAgent
import asyncio
class ParallelExecutor:
"""并行执行器"""
def __init__(self, agents: List[BaseAgent]):
self.agents = agents
async def execute(self, input_text: str) -> List[str]:
"""并行执行"""
tasks = [agent.run(input_text) for agent in self.agents]
results = await asyncio.gather(*tasks)
return results
# 使用示例
async def parallel_demo():
llm = LLMClient(provider="openai")
# 创建多个专家
experts = [
ChatAgent(llm, system_prompt=f"你是{role}专家...")
for role in ["技术", "市场", "产品"]
]
executor = ParallelExecutor(experts)
# 并行获取不同视角的分析
results = await executor.execute("分析AI Agent的商业前景")
for i, result in enumerate(results):
print(f"专家{i+1}观点:{result}n")
八、最佳实践
8.1 Agent分工原则
| 原则 | 说明 |
|---|---|
| 单一职责 | 每个Agent专注一个领域 |
| 能力互补 | 不同Agent能力不同 |
| 清晰边界 | 任务分配明确 |
| 简单通信 | 消息格式统一 |
8.2 协作模式选择
| 场景 | 推荐模式 |
|---|---|
| 有先后依赖 | 顺序执行 |
| 独立子任务 | 并行执行 |
| 复杂任务 | 层级协调 |
| 需要协商 | 对等协作 |
九、总结
Multi-Agent要点
- 消息通信:Agent之间通过消息协作
- 任务分解:复杂任务拆分为子任务
- 协调调度:Orchestrator统一管理
- 结果综合:合并多个Agent的输出
下期预告
下一篇:Agent工作流编排——用DSL定义复杂工作流!
往期回顾
正文完