Multi-Agent系统设计:5种协作模式让AI团队”分工明确”

4次阅读
没有评论

一、 开场:从”单打独斗”到”团队协作”

大家好,我是老金。

过去一年,我见证了AI Agent从”单Agent”向”多Agent协作”的演进。

最初,我们做一个客服Agent,就是一个LLM加一堆工具调用。

后来发现:让一个Agent干所有事,效果并不好。

  • 任务太复杂,容易”精神分裂”
  • 专业领域不精通
  • 难以并行处理

于是,Multi-Agent架构应运而生。

就像人类社会一样:术业有专攻,协作出奇迹

今天这篇文章,我想聊聊Multi-Agent系统的设计模式、协作机制,以及实战经验

二、 Multi-Agent的核心设计模式

2.1 主管-工人模式(Supervisor-Worker)

最经典的模式:一个主管Agent负责任务分发,多个工人Agent负责执行。

         ┌─────────────┐
         │ Supervisor  │ ← 负责任务分解和分发
         │   Agent     │
         └──────┬──────┘
                │
    ┌───────────┼───────────┐
    │           │           │
┌───▼───┐   ┌───▼───┐   ┌───▼───┐
│Worker │   │Worker │   │Worker │
│  A    │   │  B    │   │  C    │
└───────┘   └───────┘   └───────┘

适用场景

  • 任务可以明确分解
  • 子任务之间相对独立
  • 需要并行处理

代码示例

class SupervisorAgent:
    def __init__(self):
        self.workers = {
            "researcher": ResearcherAgent(),
            "writer": WriterAgent(),
            "reviewer": ReviewerAgent()
        }
async def process(self, task):
    # 1. 分析任务
    subtasks = await self.decompose_task(task)

    # 2. 分发任务
    results = {}
    for subtask in subtasks:
        worker = self.select_worker(subtask)
        results[subtask.id] = await worker.execute(subtask)

    # 3. 整合结果
    final_result = await self.integrate(results)

    return final_result

def select_worker(self, subtask):
    """根据子任务类型选择Worker"""
    if subtask.type == "research":
        return self.workers["researcher"]
    elif subtask.type == "writing":
        return self.workers["writer"]
    # ...

2.2 对话辩论模式(Debate)

核心思想:让多个Agent从不同角度”辩论”,最终得出更全面的结论。

┌─────────────┐     ┌─────────────┐
│  Agent A    │ ←─→ │  Agent B    │
│  (正方)     │     │  (反方)     │
└──────┬──────┘     └──────┬──────┘
       │                   │
       └─────────┬─────────┘
                 │
          ┌──────▼──────┐
          │  Judge Agent│ ← 裁判Agent
          │  (决策者)   │
          └─────────────┘

适用场景

  • 需要多角度思考的问题
  • 创意生成、方案评估
  • 减少单Agent的偏见
class DebateSystem:
    async def debate(self, question, rounds=3):
        pro_argument = ""
        con_argument = ""
    for round_num in range(rounds):
        # Agent A 提出支持观点
        pro_argument = await self.pro_agent.argue(
            question, 
            previous_con=con_argument
        )

        # Agent B 提出反对观点
        con_argument = await self.con_agent.argue(
            question,
            previous_pro=pro_argument
        )

    # Judge Agent 综合判断
    final_decision = await self.judge_agent.decide(
        question,
        pro_arguments=pro_argument,
        con_arguments=con_argument
    )

    return final_decision

2.3 层级协作模式(Hierarchical)

核心思想:模拟企业组织架构,多层Agent协作。

              ┌─────────────┐
              │   CEO Agent │ ← 战略决策
              └──────┬──────┘
                     │
         ┌───────────┼───────────┐
         │           │           │
   ┌─────▼─────┐ ┌───▼───┐ ┌─────▼─────┐
   │ Manager A │ │Manager│ │ Manager C │ ← 战术执行
   └─────┬─────┘ │   B   │ └─────┬─────┘
         │       └───┬───┘       │
    ┌────┴────┐     │       ┌────┴────┐
    │         │     │       │         │
┌───▼───┐ ┌───▼───┐ │   ┌───▼───┐ ┌───▼───┐
│Worker │ │Worker │ │   │Worker │ │Worker │ ← 具体任务
│   1   │ │   2   │ │   │   3   │ │   4   │
└───────┘ └───────┘ │   └───────┘ └───────┘
                    │
               ┌────▼────┐
               │ Worker  │
               │    5    │
               └─────────┘

适用场景

  • 复杂的企业级应用
  • 需要权限分级
  • 大规模任务调度

2.4 流水线模式(Pipeline)

核心思想:任务按照固定流程流转,每个Agent负责一个环节。


Input → [Agent 1] → [Agent 2] → [Agent 3] → Output
          │            │            │
       数据清洗      分析处理      生成报告
class Pipeline:
    def __init__(self, agents):
        self.agents = agents
async def run(self, input_data):
    data = input_data

    for agent in self.agents:
        data = await agent.process(data)

        # 可选:中间结果验证
        if not self.validate(data):
            raise Exception(f"{agent.name} 输出验证失败")

    return data

使用示例

pipeline = Pipeline([
DataCleanerAgent(),
AnalyzerAgent(),
ReporterAgent()
])

result = await pipeline.run(raw_data)

2.5 黑板模式(Blackboard)

核心思想:所有Agent共享一个”黑板”,在上面读写信息。

                ┌─────────────────────┐
                │    Blackboard       │
                │  (共享状态存储)      │
                └──────────┬──────────┘
                           │
    ┌──────────────────────┼──────────────────────┐
    │                      │                      │
┌───▼───┐              ┌───▼───┐              ┌───▼───┐
│Agent A│              │Agent B│              │Agent C│
│(专家1)│              │(专家2)│              │(专家3)│
└───────┘              └───────┘              └───────┘
class Blackboard:
    def __init__(self):
        self.data = {}
        self.subscribers = []
def write(self, key, value, agent_id):
    self.data[key] = {
        "value": value,
        "author": agent_id,
        "timestamp": time.time()
    }
    self.notify_subscribers(key, value)

def read(self, key):
    return self.data.get(key, {}).get("value")

def subscribe(self, agent, keys):
    self.subscribers.append({
        "agent": agent,
        "keys": keys
    })

class ExpertAgent:
def init(self, blackboard, expertise):
self.blackboard = blackboard
self.expertise = expertise
self.id = str(uuid.uuid4())

async def contribute(self):
    # 从黑板读取相关信息
    context = self.blackboard.read("problem_context")

    # 根据专业知识做出贡献
    result = await self.analyze(context)

    # 写入黑板
    self.blackboard.write(
        f"{self.expertise}_analysis", 
        result, 
        self.id
    )

三、 Agent间通信机制

3.1 直接通信

class Agent:
    def __init__(self):
        self.peers = {}  # 其他Agent的引用
def register_peer(self, name, agent):
    self.peers[name] = agent

async def send_message(self, peer_name, message):
    return await self.peers[peer_name].receive(message)

async def receive(self, message):
    # 处理收到的消息
    return await self.process(message)

3.2 消息队列

import asyncio
from collections import defaultdict

class MessageBus: def init(self): self.queues = defaultdict(asyncio.Queue)

async def publish(self, topic, message):
    await self.queues[topic].put(message)

async def subscribe(self, topic, handler):
    while True:
        message = await self.queues[topic].get()
        await handler(message)

使用示例

bus = MessageBus()

Agent A 发布消息

await bus.publish("task_completed", {
"task_id": "123",
"result": "success"
})

Agent B 订阅消息

await bus.subscribe("task_completed", handle_task_result)

3.3 共享状态

class SharedState:
    def __init__(self):
        self.state = {}
        self.locks = defaultdict(asyncio.Lock)
async def update(self, key, value):
    async with self.locks[key]:
        self.state[key] = value

async def get(self, key):
    async with self.locks[key]:
        return self.state.get(key)

四、 实战案例:智能客服系统

以我之前做的智能客服系统为例:

4.1 系统架构

用户咨询
    ↓
┌─────────────────┐
│  Router Agent   │ ← 意图识别、任务分发
└────────┬────────┘
         │
    ┌────┴────┬─────────┬─────────┐
    │         │         │         │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│Order  │ │Product│ │Complaint│ │FAQ   │
│Agent  │ │Agent  │ │ Agent   │ │Agent │
└───────┘ └───────┘ └─────────┘ └───────┘
    │         │         │         │
    └─────────┴─────────┴─────────┘
                      │
              ┌───────▼───────┐
              │ Summary Agent │ ← 整合回复
              └───────────────┘

4.2 核心代码

class CustomerServiceSystem:
    def __init__(self):
        self.router = RouterAgent()
        self.agents = {
            "order": OrderAgent(),
            "product": ProductAgent(),
            "complaint": ComplaintAgent(),
            "faq": FAQAgent()
        }
        self.summarizer = SummaryAgent()
async def handle_query(self, user_input, context):
    # 1. 路由判断
    route_result = await self.router.analyze(user_input)

    # 2. 分发给专业Agent
    responses = []
    for agent_type in route_result["agents"]:
        agent = self.agents[agent_type]
        response = await agent.handle(user_input, context)
        responses.append(response)

    # 3. 整合回复
    final_response = await self.summarizer.summarize(
        user_input, 
        responses
    )

    return final_response

class RouterAgent:
async def analyze(self, query):
prompt = f"""
分析用户查询,判断应该由哪些专业团队处理:

    用户查询:{query}

    可选团队:
    - order: 订单查询、物流跟踪
    - product: 产品咨询、规格参数
    - complaint: 投诉建议、问题反馈
    - faq: 常见问题解答

    返回JSON格式:{{"agents": ["team1", "team2"]}}
    """
    result = await llm.call(prompt)
    return json.loads(result)

4.3 效果对比

指标 单Agent Multi-Agent
问题解决率 72% 91%
平均响应时间 8秒 5秒
用户满意度 3.8/5 4.6/5

五、 最佳实践

  1. 职责单一:每个Agent只做一件事,做好一件事
  2. 明确边界:定义清晰的输入输出规范
  3. 容错设计:单个Agent失败不影响整体
  4. 监控可观测:每个Agent的行为都要可追踪
  5. 成本控制:不是所有Agent都需要顶级模型

六、 写在最后

Multi-Agent不是银弹,但它确实是解决复杂问题的有效手段。

关键原则:让专业的Agent做专业的事,协作胜过全能

如果你正在设计Multi-Agent系统,希望这篇文章对你有帮助!

我是技术老金,我们下期见!


📌 往期精彩回顾

正文完
 0
技术老金
版权声明:本站原创文章,由 技术老金 于2026-03-29发表,共计6137字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)