多 Agent 的指挥系统:Agent 编排引擎的设计与实现

📅 2026/6/16 23:24:16
多 Agent 的指挥系统:Agent 编排引擎的设计与实现
多 Agent 的指挥系统Agent 编排引擎的设计与实现一、单 Agent 的编排瓶颈为什么一个 Agent 干所有事走不通单个 LLM Agent 处理简单任务没问题但面对多步骤、多工具的复杂工作流时问题就来了Prompt 越来越长工具描述互相干扰Agent 在不同角色间反复跳转输出质量断崖式下降。更致命的是单 Agent 无法并行——一个需要同时查询数据库和调用外部 API 的任务只能串行执行延迟翻倍。Agent 编排引擎解决的就是这个问题把复杂工作流拆解为多个 Agent 的协作任务由编排引擎负责任务分发、依赖管理、并行调度和结果聚合。每个 Agent 只关心自己的领域上下文干净工具集精简输出质量稳定。编排引擎是指挥Agent 是乐手——指挥不需要会弹每件乐器但必须知道什么时候谁该进、谁该停。二、Agent 编排引擎的核心架构flowchart TD A[工作流定义: DAG] -- B[编排引擎: 解析 调度] B -- C[Agent A: 数据查询] B -- D[Agent B: API 调用] B -- E[Agent C: 文档生成] C --|结果| F[结果聚合器] D --|结果| F F -- G[Agent D: 质量检查] G --|通过| H[最终输出] G --|不通过| I[重试/降级] subgraph 编排核心 J[依赖解析: 拓扑排序] K[并行调度: 无依赖任务并发执行] L[超时控制: 单 Agent 超时熔断] M[重试策略: 指数退避 最大次数] end subgraph Agent 隔离 N[独立上下文: 各 Agent 只看到自己的工具和知识] O[独立资源: Token 预算独立分配] P[独立超时: 不因一个 Agent 卡住整体] end编排引擎的三个核心能力依赖解析工作流定义为有向无环图DAG引擎通过拓扑排序确定执行顺序。无依赖关系的节点并行执行有依赖关系的节点串行执行。并行调度并行度受限于 LLM API 的并发限制和 Token 预算。引擎需要实现信号量机制控制同时运行的 Agent 数量。容错与降级单个 Agent 失败不应导致整个工作流崩溃。引擎需要实现超时熔断、重试和降级策略——如果某个 Agent 超时用缓存结果或默认值替代而非阻塞整个流程。三、Agent 编排引擎的实现3.1 工作流定义与 DAG 解析from dataclasses import dataclass, field from enum import Enum from typing import Any, Callable, Optional class AgentState(Enum): PENDING pending RUNNING running SUCCESS success FAILED failed TIMEOUT timeout SKIPPED skipped dataclass class AgentTask: 编排引擎中的单个 Agent 任务 id: str agent_name: str prompt_template: str dependencies: list[str] field(default_factorylist) timeout_seconds: int 60 max_retries: int 2 fallback_value: Any None state: AgentState AgentState.PENDING result: Any None error: Optional[str] None dataclass class Workflow: 工作流定义DAG 结构 name: str tasks: dict[str, AgentTask] max_parallelism: int 3 class DAGResolver: DAG 依赖解析与拓扑排序 def topological_sort(self, workflow: Workflow) - list[list[str]]: 返回分层执行计划同一层内的任务可并行执行 in_degree {tid: 0 for tid in workflow.tasks} for task in workflow.tasks.values(): for dep in task.dependencies: in_degree[task.id] 1 layers [] remaining set(workflow.tasks.keys()) while remaining: # 找到当前无依赖的任务 ready [tid for tid in remaining if in_degree[tid] 0] if not ready: raise ValueError(工作流存在循环依赖) layers.append(ready) for tid in ready: remaining.remove(tid) # 减少依赖此任务的后续任务的入度 for other_id in remaining: if tid in workflow.tasks[other_id].dependencies: in_degree[other_id] - 1 return layers3.2 编排引擎核心调度import asyncio from datetime import datetime class OrchestrationEngine: Agent 编排引擎 def __init__(self, agent_registry: dict, max_parallelism: int 3): self.agent_registry agent_registry # agent_name → agent_instance self.max_parallelism max_parallelism self.semaphore asyncio.Semaphore(max_parallelism) async def execute(self, workflow: Workflow, context: dict) - dict: 执行完整工作流 resolver DAGResolver() layers resolver.topological_sort(workflow) results {} start_time datetime.now() for layer_idx, layer in enumerate(layers): # 同一层内的任务并行执行 tasks [] for task_id in layer: task workflow.tasks[task_id] # 构建任务输入依赖任务的结果 task_input self._build_task_input(task, results, context) tasks.append(self._execute_task(task, task_input)) layer_results await asyncio.gather(*tasks, return_exceptionsTrue) # 收集结果 for task_id, result in zip(layer, layer_results): if isinstance(result, Exception): workflow.tasks[task_id].state AgentState.FAILED workflow.tasks[task_id].error str(result) results[task_id] workflow.tasks[task_id].fallback_value else: results[task_id] result total_time (datetime.now() - start_time).total_seconds() return { workflow: workflow.name, results: results, total_time_seconds: total_time, task_states: { tid: t.state.value for tid, t in workflow.tasks.items() }, } async def _execute_task(self, task: AgentTask, task_input: dict) - Any: 执行单个 Agent 任务带超时和重试 async with self.semaphore: agent self.agent_registry.get(task.agent_name) if not agent: raise ValueError(f未注册的 Agent: {task.agent_name}) for attempt in range(task.max_retries 1): try: task.state AgentState.RUNNING result await asyncio.wait_for( agent.run(task_input), timeouttask.timeout_seconds, ) task.state AgentState.SUCCESS task.result result return result except asyncio.TimeoutError: task.state AgentState.TIMEOUT if attempt task.max_retries: await asyncio.sleep(2 ** attempt) # 指数退避 continue return task.fallback_value except Exception as e: task.state AgentState.FAILED task.error str(e) if attempt task.max_retries: await asyncio.sleep(2 ** attempt) continue return task.fallback_value def _build_task_input( self, task: AgentTask, results: dict, context: dict ) - dict: 构建任务输入合并上下文和依赖结果 task_input {context: context} for dep_id in task.dependencies: if dep_id in results and results[dep_id] is not None: task_input[fdep_{dep_id}] results[dep_id] return task_input3.3 工作流定义示例# 定义一个竞品分析报告生成工作流 competitor_analysis_workflow Workflow( namecompetitor_analysis, max_parallelism3, tasks{ fetch_data: AgentTask( idfetch_data, agent_nameweb_search_agent, prompt_template搜索 {company_name} 的最新产品动态和用户评价, timeout_seconds30, max_retries2, fallback_value{search_results: 数据获取失败使用缓存数据}, ), analyze_sentiment: AgentTask( idanalyze_sentiment, agent_namesentiment_agent, prompt_template分析以下用户评价的情感倾向, dependencies[fetch_data], timeout_seconds45, ), fetch_financials: AgentTask( idfetch_financials, agent_nameapi_agent, prompt_template获取 {company_name} 的财务数据, timeout_seconds20, ), generate_report: AgentTask( idgenerate_report, agent_namewriting_agent, prompt_template基于情感分析和财务数据生成竞品分析报告, dependencies[analyze_sentiment, fetch_financials], timeout_seconds60, ), quality_check: AgentTask( idquality_check, agent_namereview_agent, prompt_template检查报告的完整性和准确性, dependencies[generate_report], timeout_seconds30, ), }, )四、编排引擎的工程权衡并行度 vs Token 预算并行执行多个 Agent 时每个 Agent 的上下文独立总 Token 消耗是各 Agent 之和。如果工作流有 5 个并行 Agent每个消耗 2000 Token总消耗就是 10000 Token。必须设置全局 Token 预算上限超限时降级为串行执行。降级策略的精度损失使用 fallback_value 替代失败 Agent 的结果意味着最终输出可能基于不完整数据。编排引擎必须在输出中标注哪些数据是降级结果让下游消费者知道数据的可靠性。工作流定义的灵活性静态 DAG 无法处理条件分支如如果情感分析结果为负面增加危机分析步骤。更高级的编排引擎需要支持动态 DAG——根据中间结果调整后续任务的执行计划。但这增加了引擎的复杂度也增加了调试难度。可观测性多 Agent 工作流的调试比单 Agent 困难得多。必须为每个任务记录输入、输出、耗时和状态建立类似分布式链路追踪的 Agent Trace 机制。五、总结Agent 编排引擎是多 Agent 系统的指挥系统核心能力是依赖解析、并行调度和容错降级。DAG 定义工作流结构拓扑排序确定执行层次信号量控制并行度超时和重试保障可靠性。落地要点工作流用 DAG 定义无依赖任务并行执行每个 Agent 独立上下文和超时互不干扰失败任务用 fallback 降级不阻塞整体流程全链路 Trace 是调试的基础。编排引擎的目标不是让 Agent 做更多事而是让多个 Agent 做各自最擅长的事——专业分工高效协作。