AI Agent开发实战(六):工作流编排用DSL定义流程
一、开场:固定代码不够灵活
大家好,我是老金。
之前我们用代码实现Agent协作,但有个问题:
- 改流程要改代码
- 非技术人员无法修改
- 复杂流程难以维护
解决方案是工作流编排:
- 用DSL定义流程
- 可视化编辑
- 动态调整
今天我们实现Agent工作流系统。
二、工作流架构
2.1 核心概念
┌─────────────────────────────────────────────────────────┐
│ 工作流核心概念 │
├─────────────────────────────────────────────────────────┤
│ │
│ Workflow(工作流) │
│ └── Step(步骤) │
│ ├── Agent执行 │
│ ├── 条件判断 │
│ ├── 循环 │
│ └── 并行 │
│ │
│ 数据流: │
│ Input → Step1 → Step2 → ... → Output │
│ │
│ 控制流: │
│ → 条件分支 → A/B路径 │
│ → 循环 → 重复执行 │
│ → 并行 → 同时执行多个 │
│ │
└─────────────────────────────────────────────────────────┘
2.2 工作流定义
# src/workflow/models.py
from pydantic import BaseModel
from typing import Any, Dict, List, Optional, Union
from enum import Enum
class StepType(str, Enum):
"""步骤类型"""
AGENT = "agent" # Agent执行
TOOL = "tool" # 工具调用
CONDITION = "condition" # 条件判断
PARALLEL = "parallel" # 并行执行
LOOP = "loop" # 循环
TRANSFORM = "transform" # 数据转换
HUMAN = "human" # 人工介入
class Step(BaseModel):
"""工作流步骤"""
id: str
name: str
type: StepType
# Agent执行配置
agent_id: Optional[str] = None
prompt_template: Optional[str] = None
# 条件配置
condition: Optional[str] = None # 条件表达式
true_step: Optional[str] = None
false_step: Optional[str] = None
# 并行配置
parallel_steps: Optional[List[str]] = None
# 循环配置
loop_steps: Optional[List[str]] = None
loop_condition: Optional[str] = None
# 数据转换
transform_expr: Optional[str] = None
# 工具调用
tool_name: Optional[str] = None
tool_params: Optional[Dict[str, Any]] = None
# 下一步
next_step: Optional[str] = None
class Workflow(BaseModel):
"""工作流定义"""
id: str
name: str
description: str = ""
version: str = "1.0"
# 输入定义
input_schema: Dict[str, Any] = {}
# 步骤定义
steps: Dict[str, Step] = {}
# 入口步骤
entry_step: str
# 输出定义
output_key: str = "result"
class WorkflowState(BaseModel):
"""工作流运行状态"""
workflow_id: str
execution_id: str
# 当前步骤
current_step: Optional[str] = None
# 数据存储
data: Dict[str, Any] = {}
# 执行历史
history: List[Dict[str, Any]] = []
# 状态
status: str = "pending" # pending | running | completed | failed
error: Optional[str] = None
三、DSL定义工作流
3.1 YAML DSL
# workflows/research_and_write.yaml
id: research_and_write
name: 研究并写作
description: 研究主题然后写文章
version: "1.0"
input_schema:
topic:
type: string
description: 研究主题
steps:
# 步骤1:研究
research:
id: research
name: 研究主题
type: agent
agent_id: researcher
prompt_template: |
请深入研究以下主题:
{{ topic }}
输出:研究发现摘要
next_step: outline
# 步骤2:生成大纲
outline:
id: outline
name: 生成大纲
type: agent
agent_id: writer
prompt_template: |
基于以下研究发现,生成文章大纲:
{{ steps.research.output }}
next_step: write
# 步骤3:写作
write:
id: write
name: 撰写文章
type: agent
agent_id: writer
prompt_template: |
根据大纲撰写完整文章:
大纲:
{{ steps.outline.output }}
研究资料:
{{ steps.research.output }}
next_step: review
# 步骤4:审核
review:
id: review
name: 审核文章
type: condition
condition: "{{ data.quality_score > 0.8 }}"
true_step: publish
false_step: improve
# 步骤5a:发布
publish:
id: publish
name: 发布文章
type: tool
tool_name: publish_article
tool_params:
title: "{{ topic }}"
content: "{{ steps.write.output }}"
next_step: end
# 步骤5b:改进
improve:
id: improve
name: 改进文章
type: agent
agent_id: editor
prompt_template: |
请改进以下文章:
{{ steps.write.output }}
next_step: review
entry_step: research
output_key: steps.publish.output
3.2 DSL解析器
# src/workflow/parser.py
import yaml
import json
from typing import Dict, Any
from .models import Workflow, Step, StepType
class WorkflowParser:
"""工作流解析器"""
@staticmethod
def from_yaml(yaml_path: str) -> Workflow:
"""从YAML文件解析"""
with open(yaml_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
return WorkflowParser.from_dict(data)
@staticmethod
def from_json(json_str: str) -> Workflow:
"""从JSON字符串解析"""
data = json.loads(json_str)
return WorkflowParser.from_dict(data)
@staticmethod
def from_dict(data: Dict[str, Any]) -> Workflow:
"""从字典解析"""
# 解析步骤
steps = {}
for step_id, step_data in data.get('steps', {}).items():
step = Step(
id=step_id,
name=step_data.get('name', step_id),
type=StepType(step_data.get('type', 'agent')),
agent_id=step_data.get('agent_id'),
prompt_template=step_data.get('prompt_template'),
condition=step_data.get('condition'),
true_step=step_data.get('true_step'),
false_step=step_data.get('false_step'),
parallel_steps=step_data.get('parallel_steps'),
loop_steps=step_data.get('loop_steps'),
loop_condition=step_data.get('loop_condition'),
transform_expr=step_data.get('transform_expr'),
tool_name=step_data.get('tool_name'),
tool_params=step_data.get('tool_params'),
next_step=step_data.get('next_step')
)
steps[step_id] = step
return Workflow(
id=data['id'],
name=data['name'],
description=data.get('description', ''),
version=data.get('version', '1.0'),
input_schema=data.get('input_schema', {}),
steps=steps,
entry_step=data['entry_step'],
output_key=data.get('output_key', 'result')
)
四、工作流引擎
4.1 执行引擎
# src/workflow/engine.py
from .models import Workflow, WorkflowState, Step, StepType
from .parser import WorkflowParser
from ..agents.base import BaseAgent
from ..tools.base import BaseTool
from typing import Dict, Any, Optional
import asyncio
import uuid
import re
class WorkflowEngine:
"""工作流引擎"""
def __init__(
self,
agents: Dict[str, BaseAgent],
tools: Dict[str, BaseTool],
llm_client = None
):
self.agents = agents
self.tools = tools
self.llm = llm_client
async def execute(
self,
workflow: Workflow,
inputs: Dict[str, Any]
) -> WorkflowState:
"""执行工作流"""
# 初始化状态
state = WorkflowState(
workflow_id=workflow.id,
execution_id=str(uuid.uuid4()),
data={"inputs": inputs},
status="running"
)
# 从入口步骤开始
state.current_step = workflow.entry_step
try:
while state.current_step:
step = workflow.steps.get(state.current_step)
if not step:
raise ValueError(f"Step not found: {state.current_step}")
# 执行步骤
await self._execute_step(step, state, workflow)
# 记录历史
state.history.append({
"step_id": step.id,
"step_name": step.name,
"timestamp": datetime.now().isoformat()
})
state.status = "completed"
except Exception as e:
state.status = "failed"
state.error = str(e)
return state
async def _execute_step(
self,
step: Step,
state: WorkflowState,
workflow: Workflow
):
"""执行单个步骤"""
if step.type == StepType.AGENT:
await self._execute_agent_step(step, state)
elif step.type == StepType.TOOL:
await self._execute_tool_step(step, state)
elif step.type == StepType.CONDITION:
await self._execute_condition_step(step, state)
elif step.type == StepType.PARALLEL:
await self._execute_parallel_step(step, state, workflow)
elif step.type == StepType.TRANSFORM:
await self._execute_transform_step(step, state)
elif step.type == StepType.HUMAN:
await self._execute_human_step(step, state)
async def _execute_agent_step(self, step: Step, state: WorkflowState):
"""执行Agent步骤"""
agent = self.agents.get(step.agent_id)
if not agent:
raise ValueError(f"Agent not found: {step.agent_id}")
# 渲染提示词
prompt = self._render_template(step.prompt_template, state)
# 执行Agent
result = await agent.run(prompt)
# 保存结果
state.data[f"steps.{step.id}.output"] = result
# 设置下一步
state.current_step = step.next_step
async def _execute_tool_step(self, step: Step, state: WorkflowState):
"""执行工具步骤"""
tool = self.tools.get(step.tool_name)
if not tool:
raise ValueError(f"Tool not found: {step.tool_name}")
# 渲染参数
params = {}
if step.tool_params:
for key, value in step.tool_params.items():
params[key] = self._render_template(str(value), state)
# 执行工具
result = await tool.execute(**params)
# 保存结果
state.data[f"steps.{step.id}.output"] = result.to_string()
# 设置下一步
state.current_step = step.next_step
async def _execute_condition_step(self, step: Step, state: WorkflowState):
"""执行条件步骤"""
# 评估条件
condition_result = self._evaluate_condition(step.condition, state)
# 根据结果选择下一步
if condition_result:
state.current_step = step.true_step
else:
state.current_step = step.false_step
async def _execute_parallel_step(
self,
step: Step,
state: WorkflowState,
workflow: Workflow
):
"""执行并行步骤"""
if not step.parallel_steps:
return
# 并行执行所有子步骤
tasks = []
for sub_step_id in step.parallel_steps:
sub_step = workflow.steps.get(sub_step_id)
if sub_step:
tasks.append(self._execute_step(sub_step, state, workflow))
await asyncio.gather(*tasks)
# 设置下一步
state.current_step = step.next_step
async def _execute_transform_step(self, step: Step, state: WorkflowState):
"""执行数据转换步骤"""
# 执行转换表达式
result = self._evaluate_expression(step.transform_expr, state)
# 保存结果
state.data[f"steps.{step.id}.output"] = result
# 设置下一步
state.current_step = step.next_step
async def _execute_human_step(self, step: Step, state: WorkflowState):
"""执行人工步骤"""
# 等待人工输入
# 实际实现中可能需要外部触发
state.data[f"steps.{step.id}.waiting"] = True
# 这里简化为暂停
# 真实实现需要持久化和恢复机制
state.current_step = step.next_step
def _render_template(self, template: str, state: WorkflowState) -> str:
"""渲染模板"""
# 简单的模板渲染
# 支持 {{ variable }} 语法
result = template
# 替换 {{ ... }}
pattern = r'{{s*([^}]+)s*}}'
def replace(match):
expr = match.group(1).strip()
value = self._evaluate_expression(expr, state)
return str(value)
result = re.sub(pattern, replace, result)
return result
def _evaluate_expression(self, expr: str, state: WorkflowState) -> Any:
"""评估表达式"""
# 支持访问 state.data 中的值
# 例如:data.inputs.topic, steps.research.output
context = {
"data": state.data,
"steps": {}
}
# 构建steps访问
for key, value in state.data.items():
if key.startswith("steps."):
parts = key.split(".")
if len(parts) >= 3:
step_id = parts[1]
field = parts[2]
if step_id not in context["steps"]:
context["steps"][step_id] = {}
context["steps"][step_id][field] = value
try:
# 安全的表达式评估
return eval(expr, {"__builtins__": {}}, context)
except:
return None
def _evaluate_condition(self, condition: str, state: WorkflowState) -> bool:
"""评估条件"""
try:
result = self._evaluate_expression(condition, state)
return bool(result)
except:
return False
五、工作流模板库
5.1 常用工作流模板
# src/workflow/templates.py
# 客服工作流
CUSTOMER_SERVICE_WORKFLOW = {
"id": "customer_service",
"name": "智能客服工作流",
"description": "处理客户咨询",
"steps": {
"classify": {
"id": "classify",
"name": "意图分类",
"type": "agent",
"agent_id": "classifier",
"prompt_template": "分类以下用户问题:{{ data.inputs.query }}",
"next_step": "route"
},
"route": {
"id": "route",
"name": "路由",
"type": "condition",
"condition": "{{ steps.classify.output.category == 'technical' }}",
"true_step": "technical_support",
"false_step": "general_support"
},
"technical_support": {
"id": "technical_support",
"name": "技术支持",
"type": "agent",
"agent_id": "tech_agent",
"prompt_template": "回答技术问题:{{ data.inputs.query }}",
"next_step": "end"
},
"general_support": {
"id": "general_support",
"name": "常规支持",
"type": "agent",
"agent_id": "general_agent",
"prompt_template": "回答一般问题:{{ data.inputs.query }}",
"next_step": "end"
}
},
"entry_step": "classify"
}
# 内容生成工作流
CONTENT_GENERATION_WORKFLOW = {
"id": "content_generation",
"name": "内容生成工作流",
"description": "生成多平台内容",
"steps": {
"research": {
"id": "research",
"name": "资料研究",
"type": "agent",
"agent_id": "researcher",
"prompt_template": "研究主题:{{ data.inputs.topic }}",
"next_step": "parallel_create"
},
"parallel_create": {
"id": "parallel_create",
"name": "并行生成",
"type": "parallel",
"parallel_steps": ["create_article", "create_social", "create_video_script"],
"next_step": "end"
},
"create_article": {
"id": "create_article",
"name": "生成文章",
"type": "agent",
"agent_id": "writer",
"prompt_template": "基于研究写文章:{{ steps.research.output }}"
},
"create_social": {
"id": "create_social",
"name": "生成社媒内容",
"type": "agent",
"agent_id": "social_writer",
"prompt_template": "生成社媒内容:{{ steps.research.output }}"
},
"create_video_script": {
"id": "create_video_script",
"name": "生成视频脚本",
"type": "agent",
"agent_id": "script_writer",
"prompt_template": "生成视频脚本:{{ steps.research.output }}"
}
},
"entry_step": "research"
}
六、可视化编辑器
6.1 流程图生成
# src/workflow/visualize.py
from .models import Workflow, Step, StepType
from typing import List
class WorkflowVisualizer:
"""工作流可视化"""
@staticmethod
def to_mermaid(workflow: Workflow) -> str:
"""生成Mermaid图"""
lines = ["graph TD"]
# 添加节点
for step_id, step in workflow.steps.items():
shape = WorkflowVisualizer._get_node_shape(step.type)
lines.append(f" {step_id}{shape[0]}{step.name}{shape[1]}")
# 添加边
for step_id, step in workflow.steps.items():
if step.next_step:
lines.append(f" {step_id} --> {step.next_step}")
if step.true_step:
lines.append(f" {step_id} -->|Yes| {step.true_step}")
if step.false_step:
lines.append(f" {step_id} -->|No| {step.false_step}")
if step.parallel_steps:
for ps in step.parallel_steps:
lines.append(f" {step_id} --> {ps}")
return "n".join(lines)
@staticmethod
def _get_node_shape(step_type: StepType) -> tuple:
"""获取节点形状"""
shapes = {
StepType.AGENT: ("[", "]"),
StepType.TOOL: ("[[", "]]"),
StepType.CONDITION: {"{", "}"},
StepType.PARALLEL: ("{{", "}}"),
StepType.LOOP: [("[", "]")],
StepType.TRANSFORM: ("[/", "/]"),
StepType.HUMAN: ("((", "))")
}
return shapes.get(step_type, ("[", "]"))
# 使用示例
workflow = WorkflowParser.from_dict(CONTENT_GENERATION_WORKFLOW)
mermaid_code = WorkflowVisualizer.to_mermaid(workflow)
print(mermaid_code)
七、最佳实践
7.1 工作流设计原则
| 原则 | 说明 |
|---|---|
| 单一职责 | 每个步骤做一件事 |
| 清晰输入输出 | 定义明确的数据接口 |
| 错误处理 | 设计fallback路径 |
| 可观测性 | 记录执行日志 |
7.2 常见模式
# 重试模式
steps:
api_call:
type: tool
tool_name: api
on_error: retry
max_retries: 3
retry_delay: 1s
# 超时模式
steps:
long_task:
type: agent
timeout: 60s
on_timeout: fallback_step
八、总结
工作流系统要点
- DSL定义:YAML/JSON定义流程
- 执行引擎:支持多种步骤类型
- 模板库:复用常见工作流
- 可视化:流程图生成
下期预告
下一篇:Agent测试与评估——如何测试你的Agent!
往期回顾
正文完