告别大模型连环网络等待用 DAG 异步调度实现 Agent 多工具并发执行配套代码../v3_rewoo/代码地址撕开黑盒学大模型-从白盒状态机演进到工业级Agent框架本文目标用一个克制版 Plan-then-Execute Demo 说明如何把串行工具调用改造成 DAG 异步调度。文章目录告别大模型连环网络等待用 DAG 异步调度实现 Agent 多工具并发执行1. 问题与背景2. 计划文本把隐式步骤显式化3. 依赖解析找出哪些节点可以并发4. 异步执行与超时 Observation5. 运行与验证6. 时间线可视化7. 风险边界与生产化迁移8. 总结9. 环境与复现范围参考资料1. 问题与背景ReAct 的优势是简单模型每一步决定下一步做什么程序执行工具然后把 Observation 写回上下文。但这种模式在多工具任务里有明显问题。假设任务需要查询 GPU 价格查询杭州天气查询库存状态汇总出门采购建议。传统串行模式可能是模型判断 - 查价格 - 模型判断 - 查天气 - 模型判断 - 查库存 - 模型汇总如果每一步都涉及网络等待总耗时会不断累加。很多工具之间其实没有依赖关系却被迫串行。v3_rewoo/演示的是一种简化版 ReWOO 思路先规划再执行。Planner 一次性生成包含依赖关系的计划Scheduler 再根据依赖图并发执行无依赖节点。2. 计划文本把隐式步骤显式化planner.py生成的计划文本如下E1: fetch_price[GPU] E2: fetch_weather[杭州] E3: unstable_inventory[GPU] E4: summarize[结合 #E1、#E2、#E3 给出出门采购建议]这里有两个关键点每个步骤都有唯一 ID例如E1、E2后续步骤可以用#E1引用前置步骤结果。parser.py负责把计划文本解析成结构化PlanStepPLAN_LINE_REre.compile(r^(?Pstep_idE\d):\s*(?Ptool\w)\[(?Pargument.*)\]\s*$)这一步看起来简单但很重要。只要计划文本被解析成结构化数据后续就可以做依赖分析、并发调度、超时控制和可视化。3. 依赖解析找出哪些节点可以并发scheduler.py中用正则查找参数里的依赖占位符DEPENDENCY_REre.compile(r#(?Pstep_idE\d))defdependencies(step:PlanStep)-set[str]:returnset(DEPENDENCY_RE.findall(step.argument))对于上面的计划E1没有依赖E2没有依赖E3没有依赖E4依赖E1、E2、E3。所以 Scheduler 可以先并发执行E1、E2、E3。等它们都有结果后再执行E4。参数替换由resolve_argument()完成defresolve_argument(argument:str,results:dict[str,str])-str:forstep_id,valueinresults.items():argumentargument.replace(f#{step_id},value)returnargument这就是 DAG 调度的核心不是让模型一步一步等而是把依赖关系显式化。4. 异步执行与超时 Observation并行执行用的是asyncio.gathercompletedawaitasyncio.gather(*(run_step(step)forstepinready))每个工具调用都包了一层超时控制asyncdefcall_tool(step:PlanStep,argument:str,timeout:float)-str:returnawaitasyncio.wait_for(TOOLS[step.tool](argument),timeouttimeout)如果工具超时不直接让整个任务崩溃而是返回错误 Observationexceptasyncio.TimeoutError:valuefERROR:{step.tool}timed out after{timeout}sreturnstep.step_id,value这体现了一个重要工程原则Agent 的工具失败应该变成可见事实而不是吞掉或直接中断。后续节点可以看到这个失败并在最终回答中说明限制。5. 运行与验证运行python v3_rewoo\benchmark.py一次验证输出如下serial ReAct-like execution: 1.619s E1: GPU 预算价格区间已获取 E2: 杭州 小雨建议带伞 E3: ERROR: unstable_inventory timed out after 0.6s E4: 综合结论结合 GPU 预算价格区间已获取、杭州 小雨建议带伞、ERROR: unstable_inventory timed out after 0.6s 给出出门采购建议 parallel ReWOO-like execution: 0.820s E1: GPU 预算价格区间已获取 E2: 杭州 小雨建议带伞 E3: ERROR: unstable_inventory timed out after 0.6s E4: 综合结论结合 GPU 预算价格区间已获取、杭州 小雨建议带伞、ERROR: unstable_inventory timed out after 0.6s 给出出门采购建议这个结果说明两件事并发调度确实降低了端到端等待超时工具没有阻塞最终汇总而是作为错误 Observation 进入结果。注意具体耗时会随机器负载略有变化文章中不应该把某一次数字写成绝对性能结论。更稳妥的表述是在本 demo 的模拟延迟条件下并行版本明显短于串行版本。这组耗时来自worker.py中的确定性模拟延迟工具模拟耗时调度含义fetch_price0.4s可独立执行fetch_weather0.4s可独立执行unstable_inventory0.8s但被 0.6s 超时截断失败应回写 Observationsummarize0.2s必须等待E1、E2、E3因此串行版本接近0.4 0.4 0.6 0.2 1.6s并行版本接近max(0.4, 0.4, 0.6) 0.2 0.8s。这不是泛化性能承诺而是用可控延迟验证“无依赖节点可以重叠执行”这个调度事实。6. 时间线可视化运行后会生成v3_rewoo/trace.json打开v3_rewoo/visualization.html可以看到串行和并行两组事件startsuccesstimeout每个事件的 timestamp每个节点的工具名和参数。时间线对理解调度非常重要。没有时间线时读者只能看到总耗时有时间线后读者能看到哪些节点重叠执行哪些节点等待依赖。运行后可以直接检查trace.json确认并行分支不是口头描述{runs:[{label:serial ReAct-like execution,elapsed:1.632},{label:parallel ReWOO-like execution,elapsed:0.809}],timeline:[{runner:parallel,step_id:E1,event:start},{runner:parallel,step_id:E2,event:start},{runner:parallel,step_id:E3,event:start},{runner:parallel,step_id:E3,event:timeout},{runner:parallel,step_id:E4,event:start}]}这里最值得关注的不是加速倍率而是事件顺序并行版本里E1、E2、E3同时 startE4在依赖节点结束后才 start。这说明调度器确实按依赖图执行而不是简单把所有工具一起扔进gather。7. 风险边界与生产化迁移当前实现是克制版 Plan-then-Execute不包含复杂生产能力。它刻意只保留计划解析、依赖分析、并发执行、超时 Observation 和时间线避免把调度核心淹没在框架细节里。生产环境至少还要补齐这些边界边界当前 Demo生产系统需要补齐循环依赖发现无 ready 节点后抛错在入队前校验 DAG给 Planner 反馈可修复错误工具失败转成ERROR: ...Observation区分可重试、不可重试、可降级、需人工介入超时控制每个工具统一0.6s按工具 SLA 配置超时、重试次数和熔断策略取消传播未实现用户取消或上游失败时取消未完成分支并清理资源幂等控制未实现为写操作增加 request id、去重键和补偿逻辑持久化只写trace.jsoncheckpoint、任务状态机、恢复执行、审计日志可观测性本地时间线trace id、span、指标、错误分类、耗时分布迁移到工业级 Agent 框架时不要只问“能不能并发”。更关键的问题是并发分支失败后怎么恢复工具是否幂等超时结果是否应该参与最终总结状态是否能从 checkpoint 恢复这些问题决定了调度器能不能进生产。一个更稳的落地路径是先把计划解析成结构化 DAG不允许隐式字符串依赖到处散落。在调度前做 DAG 校验阻断循环依赖、未知工具和未定义变量。为每个工具声明超时、重试、幂等、是否可降级。把每次执行写入 trace用于复盘“哪个节点等待了谁”。再接入持久化 checkpoint支持失败恢复和人工审计。本文 demo 只完成第 1 到第 4 步的最小闭环。它适合作为理解 ReWOO / DAG 调度的白盒模型不应该直接当成生产编排引擎。8. 总结ReAct 适合建立 Agent 工具调用认知但它天然偏串行。面对多工具任务可以把“模型逐步决定”改造成“模型先规划调度器再执行”。DAG 异步调度的关键不是asyncio.gather这一行代码而是四个工程动作把计划结构化解析依赖并发执行无依赖节点把失败转成 Observation。第 3 期的重点不是宣称 Plan-then-Execute 一定优于 ReAct而是给读者一套可观察的判断方法看计划是否结构化看依赖是否显式看无依赖节点是否重叠执行看失败是否进入最终上下文。理解这套机制后再看 LangGraph 的并行分支、持久化执行或更复杂 Agent 编排就会更容易判断框架能力边界。9. 环境与复现范围本文配套代码是一个无第三方服务依赖的教学 Demo用于复现 DAG 调度和超时 Observation不依赖真实网络 API。项目说明操作系统Windows 10/11、macOS、Linux 均可Python建议 Python 3.10第三方依赖无使用 Python 标准库asyncio、json、re、time运行目录仓库根目录或v3_rewoo/目录输出文件v3_rewoo/trace.json可视化页面v3_rewoo/visualization.html如果从仓库根目录运行python v3_rewoo\benchmark.py如果已经进入v3_rewoo/目录运行python benchmark.py运行后会刷新trace.json。再打开visualization.html可以看到串行和并行两组时间线。参考资料ReWOO: Decoupling Reasoning from Observations for Efficient Augmented Language ModelsReWOO 将推理过程和工具 Observation 解耦本文借用的是“先规划、再执行”的核心思想。LangGraph Workflows and Agents包含 parallelization、routing、orchestrator-worker 等编排模式可用于理解并行分支和聚合。LangGraph Persistence说明 checkpoint、thread、short-term memory 等持久化概念。PythonasyncioTasksgather()与wait_for()是本文 demo 并发和超时控制的标准库基础。下一篇小z疯狂码字ing…感谢阅读记得点赞、关注、收藏欢迎各位评论区交流