LLM 工作流编排:从硬编码调用链到声明式 DAG 的工程实践

📅 2026/6/22 22:21:54
LLM 工作流编排:从硬编码调用链到声明式 DAG 的工程实践
LLM 工作流编排从硬编码调用链到声明式 DAG 的工程实践一、硬编码调用链——当 Prompt 工程变成面条代码在 LLM 应用的早期阶段开发者通常将模型调用直接写在业务代码中先调用一次模型做意图识别再根据结果调用不同的 Prompt 模板最后调用模型生成最终回复。这种硬编码的调用链在 Demo 阶段运行良好但进入生产环境后迅速失控。一个典型的客服机器人项目其调用链可能在三个月内从 3 步膨胀到 15 步意图分类、实体抽取、知识库检索、多轮对话管理、安全审核、情感分析、回复生成、格式校验……每一步都是一个 LLM 调用或一个传统 API 调用。这些步骤之间的依赖关系错综复杂实体抽取依赖意图分类的结果知识库检索依赖实体抽取的输出安全审核需要同时检查用户输入和模型输出。当这些逻辑全部用 if-else 嵌套实现时代码的可读性和可维护性急剧下降。更严重的问题是硬编码调用链无法应对步骤的动态调整。当需要将情感分析从第 5 步移到第 3 步时开发者必须仔细梳理所有受影响的变量引用和错误处理逻辑稍有遗漏就会引入 Bug。这种脆弱性使得团队不敢轻易调整工作流导致架构僵化。核心矛盾在于LLM 工作流本质上是一个有向无环图DAG而硬编码调用链将其强行压扁为线性序列丢失了步骤间的并行机会和灵活的依赖表达。二、声明式 DAG 编排——工作流的正确抽象层级将 LLM 工作流建模为 DAG是解决硬编码调用链问题的根本方法。DAG 中的每个节点代表一个执行步骤LLM 调用、API 调用、条件判断边代表数据依赖关系。没有依赖关系的节点可以并行执行依赖关系由 DAG 的拓扑结构自然表达。graph TD A[意图分类] -- B[实体抽取] A -- C[情感分析] B -- D[知识库检索] C -- D D -- E[回复生成] A -- F[安全审核br/用户输入] E -- G[安全审核br/模型输出] F -- H[结果合并与输出] G -- H style A fill:#e3f2fd,stroke:#1565c0 style B fill:#fff3e0,stroke:#e65100 style C fill:#fff3e0,stroke:#e65100 style D fill:#e8f5e9,stroke:#2e7d32 style E fill:#fce4ec,stroke:#c62828 style F fill:#f3e5f5,stroke:#6a1b9a style G fill:#f3e5f5,stroke:#6a1b9a style H fill:#efebe9,stroke:#4e342e上图展示了一个客服机器人的工作流 DAG。意图分类完成后实体抽取和情感分析可以并行执行两者都只依赖意图分类的输出。知识库检索需要同时依赖实体抽取和情感分析的结果因此等待两者都完成后才执行。安全审核分为两步用户输入的安全审核与意图分类并行模型输出的安全审核在回复生成后执行。DAG 编排的核心机制是拓扑排序与并行调度。调度器根据 DAG 的拓扑结构计算每个节点的入度即前置依赖数量。入度为 0 的节点可以立即执行节点完成后将其后继节点的入度减 1后继节点入度降为 0 时进入就绪队列。这种调度方式天然支持并行执行无需开发者手动管理并发。声明式定义意味着工作流的结构与执行逻辑分离。工作流定义是一个纯数据结构JSON 或 YAML不包含任何可执行代码。调度器读取定义后根据节点类型调用对应的执行器LLM 执行器、API 执行器、条件执行器。这种分离使得工作流可以被可视化编辑、版本管理、A/B 测试而无需修改业务代码。三、生产级代码实现——Python DAG 编排引擎以下代码展示了一个轻量级的 DAG 编排引擎核心实现适用于 LLM 工作流场景。from __future__ import annotations import asyncio from dataclasses import dataclass, field from typing import Any, Callable, Coroutine import logging logger logging.getLogger(__name__) # ---- DAG 数据结构定义 ---- dataclass class Node: 工作流节点封装一个执行步骤 name: str # 节点执行函数接收上游输出字典返回本节点输出 executor: Callable[[dict[str, Any]], Coroutine[Any, Any, dict[str, Any]]] # 前置依赖节点名称列表 depends_on: list[str] field(default_factorylist) # 节点超时时间秒防止 LLM 调用无限挂起 timeout: float 30.0 dataclass class WorkflowDAG: 声明式工作流定义纯数据结构不包含执行逻辑 nodes: dict[str, Node] # 入口节点名称列表无前置依赖的节点 entry_nodes: list[str] field(default_factorylist) # ---- DAG 调度器 ---- class DAGScheduler: 基于拓扑排序的 DAG 调度器。 设计决策使用 asyncio 实现并行调度每个就绪节点 在独立的 Task 中执行通过 asyncio.gather 等待同一批 就绪节点全部完成后再调度下一批。这种批量调度策略 避免了逐节点调度时的频繁上下文切换。 def __init__(self, dag: WorkflowDAG): self.dag dag # 构建邻接表节点 - 后继节点列表 self.successors: dict[str, list[str]] {name: [] for name in dag.nodes} # 入度表节点 - 未完成的前置依赖数量 self.in_degree: dict[str, int] {} for name, node in dag.nodes.items(): self.in_degree[name] len(node.depends_on) for dep in node.depends_on: self.successors[dep].append(name) async def execute(self, initial_input: dict[str, Any]) - dict[str, Any]: 执行工作流返回所有节点的输出汇总。 初始输入作为虚拟入口节点的输出供入口节点消费。 # 节点输出存储节点名 - 输出字典 outputs: dict[str, dict[str, Any]] {__input__: initial_input} # 当前入度副本调度过程中动态修改 in_deg dict(self.in_degree) # 就绪队列入度为 0 的节点 ready [n for n, d in in_deg.items() if d 0] while ready: # 并行执行所有就绪节点 tasks [] for node_name in ready: node self.dag.nodes[node_name] # 合并所有前置依赖的输出作为当前节点输入 node_input {} for dep in node.depends_on: node_input.update(outputs.get(dep, {})) # 也注入初始输入供入口节点使用 if not node.depends_on: node_input.update(initial_input) tasks.append(self._run_node(node_name, node, node_input)) # 等待当前批次所有节点完成 results await asyncio.gather(*tasks, return_exceptionsTrue) # 处理结果失败节点记录错误但不中断整个工作流 new_ready [] for node_name, result in zip(ready, results): if isinstance(result, Exception): logger.error(节点执行失败: %s, 错误: %s, node_name, result) outputs[node_name] {error: str(result)} else: outputs[node_name] result # 更新后继节点入度 for succ in self.successors[node_name]: in_deg[succ] - 1 if in_deg[succ] 0: new_ready.append(succ) ready new_ready return outputs async def _run_node( self, name: str, node: Node, input_data: dict[str, Any] ) - dict[str, Any]: 执行单个节点带超时控制 try: result await asyncio.wait_for( node.executor(input_data), timeoutnode.timeout, ) return result except asyncio.TimeoutError: raise RuntimeError(f节点 {name} 执行超时 ({node.timeout}s)) # ---- LLM 节点执行器示例 ---- async def intent_classifier(input_data: dict[str, Any]) - dict[str, Any]: 意图分类节点调用 LLM 对用户输入进行意图识别。 设计决策将 LLM 调用封装为独立函数而非类方法 保持节点执行器的无状态性便于测试和替换。 user_input input_data[user_input] # 实际项目中此处调用 LLM API此处简化为模拟 prompt f请判断以下用户输入的意图类别咨询/投诉/闲聊\n{user_input} intent await call_llm(prompt) # 假设的 LLM 调用函数 return {intent: intent, user_input: user_input} async def entity_extractor(input_data: dict[str, Any]) - dict[str, Any]: 实体抽取节点依赖意图分类结果 user_input input_data[user_input] intent input_data[intent] prompt f意图为{intent}请从以下输入中抽取关键实体\n{user_input} entities await call_llm(prompt) return {entities: entities} # ---- 工作流组装 ---- def build_customer_service_workflow() - WorkflowDAG: 构建客服机器人工作流 DAG nodes { intent: Node(nameintent, executorintent_classifier, depends_on[]), entity: Node(nameentity, executorentity_extractor, depends_on[intent]), sentiment: Node(namesentiment, executorsentiment_analyzer, depends_on[intent]), retrieval: Node(nameretrieval, executorknowledge_retriever, depends_on[entity, sentiment]), generate: Node(namegenerate, executorresponse_generator, depends_on[retrieval]), } return WorkflowDAG( nodesnodes, entry_nodes[intent], )上述代码的关键设计决策第一节点执行器是无状态的纯函数接收输入字典、返回输出字典。这使得每个节点可以独立测试不依赖全局状态。第二调度器采用批量并行策略同一批次的所有就绪节点并行执行批次间串行等待。这比逐节点调度更高效同时避免了过细粒度的并发控制。第三节点失败不中断整个工作流而是将错误信息写入输出由下游节点决定如何处理。这种容错策略适合 LLM 调用这种天然存在不确定性的场景。四、DAG 编排的局限——当工作流需要记忆和循环声明式 DAG 并非万能的。在实际 LLM 工作流中有两个常见需求超出了 DAG 的表达能力。第一个是动态分支。某些场景下需要根据 LLM 的输出决定后续执行哪些节点。例如意图分类结果为投诉时需要额外执行升级审核节点结果为闲聊时直接跳过检索步骤。纯 DAG 无法表达条件分支因为边的存在是静态的。解决方案是在节点执行器内部实现条件逻辑升级审核节点检查意图如果不需要则直接透传输入。这增加了节点的职责但保持了 DAG 结构的简洁性。第二个是多轮循环。Agent 场景中模型可能需要多次调用工具、观察结果、再决定下一步行动。这种循环结构违反了 DAG 的无环约束。解决方案是在 DAG 外层包装一个循环控制器每轮执行一次 DAG根据输出决定是否继续下一轮。循环逻辑与 DAG 编排解耦各司其职。性能方面DAG 调度器在每轮执行中需要计算拓扑排序和入度更新时间复杂度为 O(VE)。对于典型的 LLM 工作流10-20 个节点这部分开销可忽略。但当节点数量超过 100 时调度开销和内存占用需要关注。实践中超过 50 个节点的工作流通常意味着拆分粒度过细应考虑合并部分节点。可观测性方面DAG 编排天然适合集成分布式追踪。每个节点的执行可以映射为一个 Span节点间的数据传递映射为 Span 间的 Link。这使得工作流的执行过程可以在 Jaeger 或 Zipkin 中完整可视化。五、总结LLM 工作流从硬编码调用链演进到声明式 DAG核心是将步骤间的依赖关系从代码逻辑中提取为数据结构。DAG 编排通过拓扑排序实现自动并行调度通过声明式定义实现结构与执行的分离。Python 实现的轻量级调度器在 50 个节点以内的工作流中性能充足且代码量远低于 LangGraph 等框架。需要警惕的是DAG 无法原生表达条件分支和循环这些需求应通过节点内部的条件逻辑和外层循环控制器解决而非将控制流嵌入 DAG 定义。落地路线建议第一步将现有硬编码调用链中的步骤抽取为独立的节点执行器函数第二步梳理步骤间的依赖关系构建 DAG 定义第三步用调度器替换原有的顺序执行逻辑验证并行化带来的延迟改善。每一步都应保留回退能力确保线上服务稳定。