AI Agent开发实战(五):Multi-Agent协作让团队工作

9次阅读
没有评论

AI Agent开发实战(五):Multi-Agent协作让团队工作

一、开场:一个Agent不够用

大家好,我是老金。

复杂任务一个Agent搞不定:

  • 研究任务:需要搜索、分析、总结
  • 软件开发:需求分析、编码、测试、部署
  • 客服系统:分类、处理、升级、反馈

这时候需要多个Agent协作

  • 每个Agent专注一个任务
  • Agent之间可以通信和协作
  • 有协调者统一调度

今天我们实现Multi-Agent协作系统。

二、Multi-Agent架构

2.1 协作模式

┌─────────────────────────────────────────────────────────┐
│                  Multi-Agent协作模式                     │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  模式1:顺序执行                                        │
│  ┌───────┐    ┌───────┐    ┌───────┐                  │
│  │ Agent1│ → │ Agent2│ → │ Agent3│ → 输出            │
│  └───────┘    └───────┘    └───────┘                  │
│                                                         │
│  模式2:层级协调                                        │
│           ┌──────────────┐                             │
│           │ Orchestrator │                             │
│           └──────┬───────┘                             │
│        ┌─────────┼─────────┐                          │
│        ↓         ↓         ↓                          │
│   ┌────────┐┌────────┐┌────────┐                     │
│   │ Agent1 ││ Agent2 ││ Agent3 │                     │
│   └────────┘└────────┘└────────┘                     │
│                                                         │
│  模式3:对等协作                                        │
│   ┌────────┐  消息  ┌────────┐                        │
│   │ Agent1 │ ←───→ │ Agent2 │                        │
│   └────────┘        └────────┘                        │
│        ↑                ↑                              │
│        └──── 消息 ────┘                               │
│                                                         │
└─────────────────────────────────────────────────────────┘

2.2 消息定义

# src/agents/message.py
from pydantic import BaseModel
from typing import Any, Optional
from datetime import datetime
import uuid

class AgentMessage(BaseModel):
    """Agent间消息"""
    id: str = str(uuid.uuid4())
    sender: str           # 发送者Agent ID
    receiver: str         # 接收者Agent ID(可以是"all")
    content: str          # 消息内容
    type: str = "task"    # task | result | query | control
    metadata: dict = {}   # 额外元数据
    timestamp: datetime = datetime.now()

    # 任务相关
    task_id: Optional[str] = None
    parent_task_id: Optional[str] = None

    def to_dict(self) -> dict:
        return {
            "id": self.id,
            "sender": self.sender,
            "receiver": self.receiver,
            "content": self.content,
            "type": self.type,
            "metadata": self.metadata,
            "timestamp": self.timestamp.isoformat()
        }

三、Agent通信系统

3.1 消息总线

# src/agents/message_bus.py
from typing import Dict, List, Callable, Optional
from .message import AgentMessage
import asyncio
from collections import defaultdict

class MessageBus:
    """消息总线"""

    def __init__(self):
        # 订阅者:agent_id -> callback
        self._subscribers: Dict[str, Callable] = {}
        # 消息队列:agent_id -> messages
        self._queues: Dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
        # 广播历史
        self._history: List[AgentMessage] = []

    def subscribe(self, agent_id: str, callback: Callable):
        """订阅消息"""
        self._subscribers[agent_id] = callback

    def unsubscribe(self, agent_id: str):
        """取消订阅"""
        self._subscribers.pop(agent_id, None)

    async def send(self, message: AgentMessage):
        """发送消息"""
        # 记录历史
        self._history.append(message)

        # 发送到指定接收者
        if message.receiver != "all":
            if message.receiver in self._subscribers:
                await self._queues[message.receiver].put(message)
        else:
            # 广播给所有订阅者(除了发送者)
            for agent_id in self._subscribers:
                if agent_id != message.sender:
                    await self._queues[agent_id].put(message)

    async def receive(self, agent_id: str, timeout: float = None) -> Optional[AgentMessage]:
        """接收消息"""
        try:
            if timeout:
                message = await asyncio.wait_for(
                    self._queues[agent_id].get(),
                    timeout=timeout
                )
            else:
                message = await self._queues[agent_id].get()
            return message
        except asyncio.TimeoutError:
            return None

    def get_history(self, agent_id: str = None) -> List[AgentMessage]:
        """获取消息历史"""
        if agent_id:
            return [m for m in self._history if m.sender == agent_id or m.receiver == agent_id]
        return self._history

# 全局消息总线
message_bus = MessageBus()

3.2 通信Agent

# src/agents/communicating_agent.py
from .base import BaseAgent
from .message import AgentMessage
from .message_bus import message_bus
from typing import Optional, List
import asyncio

class CommunicatingAgent(BaseAgent):
    """可通信的Agent"""

    def __init__(self, agent_id: str, **kwargs):
        super().__init__(**kwargs)
        self.agent_id = agent_id

        # 订阅消息
        message_bus.subscribe(agent_id, self._on_message)

        # 启动消息监听
        self._listening = True
        self._listen_task = asyncio.create_task(self._listen_messages())

    async def _listen_messages(self):
        """监听消息"""
        while self._listening:
            message = await message_bus.receive(self.agent_id, timeout=1.0)
            if message:
                await self._handle_message(message)

    async def _on_message(self, message: AgentMessage):
        """消息回调"""
        pass

    async def _handle_message(self, message: AgentMessage):
        """处理消息"""
        if message.type == "task":
            # 执行任务
            result = await self.run(message.content)

            # 发送结果
            await self.send_message(
                receiver=message.sender,
                content=result,
                type="result",
                task_id=message.task_id
            )
        elif message.type == "query":
            # 回答查询
            answer = await self.think()
            await self.send_message(
                receiver=message.sender,
                content=answer,
                type="result"
            )

    async def send_message(
        self,
        receiver: str,
        content: str,
        type: str = "task",
        **kwargs
    ):
        """发送消息"""
        message = AgentMessage(
            sender=self.agent_id,
            receiver=receiver,
            content=content,
            type=type,
            **kwargs
        )
        await message_bus.send(message)

    async def broadcast(self, content: str, type: str = "task"):
        """广播消息"""
        await self.send_message(
            receiver="all",
            content=content,
            type=type
        )

    def stop(self):
        """停止监听"""
        self._listening = False
        message_bus.unsubscribe(self.agent_id)

四、Orchestrator协调器

4.1 任务分解器

# src/agents/orchestrator.py
from .communicating_agent import CommunicatingAgent
from .message import AgentMessage
from ..utils.llm_client import LLMClient
from typing import List, Dict, Any
import json

class Orchestrator(CommunicatingAgent):
    """协调器"""

    def __init__(
        self,
        agent_id: str = "orchestrator",
        workers: Dict[str, str] = None,  # worker_id -> role
        **kwargs
    ):
        super().__init__(agent_id=agent_id, **kwargs)
        self.workers = workers or {}
        self.tasks: Dict[str, dict] = {}
        self.results: Dict[str, List[AgentMessage]] = {}

    async def decompose_task(self, task: str) -> List[dict]:
        """分解任务"""
        prompt = f"""作为一个任务协调者,将以下任务分解为子任务:

任务:{task}

可用的工作者角色:
{json.dumps(self.workers, ensure_ascii=False, indent=2)}

请输出JSON格式的子任务列表:
[
  {{
    "subtask": "子任务描述",
    "assigned_to": "工作者ID",
    "dependencies": ["依赖的子任务ID"]
  }}
]

注意:
1. 子任务应该清晰具体
2. 分配给合适角色的工作者
3. 标明依赖关系
        """

        response = await self.llm.chat([
            {"role": "system", "content": "你是一个任务分解专家。"},
            {"role": "user", "content": prompt}
        ])

        # 解析JSON
        try:
            subtasks = json.loads(response)
            return subtasks
        except:
            return [{"subtask": task, "assigned_to": list(self.workers.keys())[0], "dependencies": []}]

    async def assign_task(self, subtask: dict) -> str:
        """分配任务"""
        worker_id = subtask["assigned_to"]

        message = AgentMessage(
            sender=self.agent_id,
            receiver=worker_id,
            content=subtask["subtask"],
            type="task",
            metadata={"dependencies": subtask.get("dependencies", [])}
        )

        await message_bus.send(message)
        return message.id

    async def collect_results(self, task_id: str, timeout: float = 60.0) -> List[str]:
        """收集结果"""
        results = []
        start_time = asyncio.get_event_loop().time()

        while asyncio.get_event_loop().time() - start_time = len(self.workers):
                break

            await asyncio.sleep(0.1)

        return results

    async def synthesize(self, results: List[str], original_task: str) -> str:
        """综合结果"""
        prompt = f"""将以下子任务的结果综合为最终答案:

原始任务:{original_task}

子任务结果:
{chr(10).join([f'{i+1}. {r}' for i, r in enumerate(results)])}

请输出综合后的完整答案:"""

        return await self.llm.chat([{"role": "user", "content": prompt}])

    async def execute_task(self, task: str) -> str:
        """执行任务"""
        # 1. 分解任务
        subtasks = await self.decompose_task(task)

        # 2. 分配任务
        task_ids = []
        for subtask in subtasks:
            task_id = await self.assign_task(subtask)
            task_ids.append(task_id)

        # 3. 等待结果
        await asyncio.sleep(2)  # 等待工作者处理

        # 4. 收集结果
        results = []
        for task_id in task_ids:
            # 简化:直接获取历史中的结果
            history = message_bus.get_history()
            for msg in history:
                if msg.type == "result" and msg.receiver == self.agent_id:
                    results.append(msg.content)

        # 5. 综合
        if results:
            return await self.synthesize(results, task)
        else:
            return "任务执行失败:没有收到结果"

    async def _handle_message(self, message: AgentMessage):
        """处理消息"""
        if message.type == "result":
            # 保存结果
            if message.task_id not in self.results:
                self.results[message.task_id] = []
            self.results[message.task_id].append(message)
        elif message.type == "task":
            # 新任务
            result = await self.execute_task(message.content)
            await self.send_message(
                receiver=message.sender,
                content=result,
                type="result",
                task_id=message.task_id
            )

五、Worker工作者

5.1 专业Worker

# src/agents/worker.py
from .communicating_agent import CommunicatingAgent
from typing import Optional

class WorkerAgent(CommunicatingAgent):
    """工作者Agent"""

    def __init__(
        self,
        agent_id: str,
        role: str,
        capabilities: List[str] = None,
        **kwargs
    ):
        super().__init__(agent_id=agent_id, **kwargs)
        self.role = role
        self.capabilities = capabilities or []

    async def process_task(self, task: str) -> str:
        """处理任务"""
        # 基于角色处理
        system_prompt = f"""你是一个{self.role}。

能力:{', '.join(self.capabilities)}

请专注于你的专业领域处理任务。
"""

        response = await self.llm.chat([
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": task}
        ])

        return response

    async def _handle_message(self, message: AgentMessage):
        """处理消息"""
        if message.type == "task":
            # 处理任务
            result = await self.process_task(message.content)

            # 返回结果
            await self.send_message(
                receiver=message.sender,
                content=result,
                type="result",
                task_id=message.task_id
            )

# 预定义的工作者
class ResearcherWorker(WorkerAgent):
    """研究工作者"""

    def __init__(self, **kwargs):
        super().__init__(
            role="研究员",
            capabilities=["信息搜索", "资料整理", "数据分析"],
            **kwargs
        )

class WriterWorker(WorkerAgent):
    """写作工作者"""

    def __init__(self, **kwargs):
        super().__init__(
            role="作家",
            capabilities=["文章写作", "内容创作", "文案编辑"],
            **kwargs
        )

class CoderWorker(WorkerAgent):
    """编程工作者"""

    def __init__(self, **kwargs):
        super().__init__(
            role="程序员",
            capabilities=["代码编写", "Bug修复", "代码审查"],
            **kwargs
        )

六、完整Multi-Agent系统

6.1 Agent团队

# src/agents/team.py
from .orchestrator import Orchestrator
from .worker import WorkerAgent, ResearcherWorker, WriterWorker, CoderWorker
from ..utils.llm_client import LLMClient
from typing import Dict, List, Optional
import asyncio

class AgentTeam:
    """Agent团队"""

    def __init__(self, llm_client: LLMClient):
        self.llm = llm_client
        self.agents: Dict[str, CommunicatingAgent] = {}
        self.orchestrator: Optional[Orchestrator] = None

    def add_agent(self, agent: CommunicatingAgent):
        """添加Agent"""
        self.agents[agent.agent_id] = agent

    def setup_default_team(self):
        """设置默认团队"""
        # 创建工作者
        researcher = ResearcherWorker(
            agent_id="researcher",
            llm_client=self.llm
        )

        writer = WriterWorker(
            agent_id="writer",
            llm_client=self.llm
        )

        coder = CoderWorker(
            agent_id="coder",
            llm_client=self.llm
        )

        # 创建协调器
        self.orchestrator = Orchestrator(
            agent_id="orchestrator",
            llm_client=self.llm,
            workers={
                "researcher": "研究员",
                "writer": "作家",
                "coder": "程序员"
            }
        )

        # 注册
        self.add_agent(researcher)
        self.add_agent(writer)
        self.add_agent(coder)
        self.add_agent(self.orchestrator)

    async def execute(self, task: str) -> str:
        """执行任务"""
        if not self.orchestrator:
            raise ValueError("Orchestrator not initialized")

        return await self.orchestrator.execute_task(task)

    def stop_all(self):
        """停止所有Agent"""
        for agent in self.agents.values():
            agent.stop()

6.2 使用示例

# examples/multi_agent_demo.py
import asyncio
from src.utils.llm_client import LLMClient
from src.agents.team import AgentTeam

async def main():
    llm = LLMClient(provider="openai", model="gpt-4-turbo-preview")

    # 创建团队
    team = AgentTeam(llm_client=llm)
    team.setup_default_team()

    # 执行任务
    task = "研究AI Agent的发展现状,然后写一篇技术博客"

    print(f"任务:{task}n")
    result = await team.execute(task)

    print(f"结果:{result}")

    # 停止
    team.stop_all()

if __name__ == "__main__":
    asyncio.run(main())

七、协作模式实现

7.1 顺序协作

# src/agents/pipeline.py
from typing import List
from .base import BaseAgent

class Pipeline:
    """顺序执行管道"""

    def __init__(self, agents: List[BaseAgent]):
        self.agents = agents

    async def execute(self, input_text: str) -> str:
        """顺序执行"""
        current = input_text

        for i, agent in enumerate(self.agents):
            print(f"执行第{i+1}个Agent...")
            current = await agent.run(current)

        return current

# 使用示例
async def pipeline_demo():
    llm = LLMClient(provider="openai")

    # 创建管道:研究 -> 写作 -> 编辑
    researcher = ChatAgent(llm, system_prompt="你是研究员...")
    writer = ChatAgent(llm, system_prompt="你是作家...")
    editor = ChatAgent(llm, system_prompt="你是编辑...")

    pipeline = Pipeline([researcher, writer, editor])

    result = await pipeline.execute("研究AI Agent并写一篇文章")

7.2 并行协作

# src/agents/parallel.py
from typing import List, Dict
from .base import BaseAgent
import asyncio

class ParallelExecutor:
    """并行执行器"""

    def __init__(self, agents: List[BaseAgent]):
        self.agents = agents

    async def execute(self, input_text: str) -> List[str]:
        """并行执行"""
        tasks = [agent.run(input_text) for agent in self.agents]
        results = await asyncio.gather(*tasks)
        return results

# 使用示例
async def parallel_demo():
    llm = LLMClient(provider="openai")

    # 创建多个专家
    experts = [
        ChatAgent(llm, system_prompt=f"你是{role}专家...")
        for role in ["技术", "市场", "产品"]
    ]

    executor = ParallelExecutor(experts)

    # 并行获取不同视角的分析
    results = await executor.execute("分析AI Agent的商业前景")

    for i, result in enumerate(results):
        print(f"专家{i+1}观点:{result}n")

八、最佳实践

8.1 Agent分工原则

原则 说明
单一职责 每个Agent专注一个领域
能力互补 不同Agent能力不同
清晰边界 任务分配明确
简单通信 消息格式统一

8.2 协作模式选择

场景 推荐模式
有先后依赖 顺序执行
独立子任务 并行执行
复杂任务 层级协调
需要协商 对等协作

九、总结

Multi-Agent要点

  1. 消息通信:Agent之间通过消息协作
  2. 任务分解:复杂任务拆分为子任务
  3. 协调调度:Orchestrator统一管理
  4. 结果综合:合并多个Agent的输出

下期预告

下一篇:Agent工作流编排——用DSL定义复杂工作流!


往期回顾

正文完
 0
技术老金
版权声明:本站原创文章,由 技术老金 于2026-04-01发表,共计11511字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)