用 Python 写个轻量级工作流引擎

📅 2026/6/27 0:53:45
用 Python 写个轻量级工作流引擎
用 Python 写个轻量级工作流引擎做企业级应用时工作流是绕不开的。很多团队一上来就搞 Camunda 或 Activity结果发现太重了维护起来心累。其实大部分场景一个基于 DAG有向无环图的轻量级引擎就够了。为什么传统引擎不好用以前做工作流最头疼三件事配置太复杂。XML 定义流程改个逻辑得翻半天文档业务一变流程就得重构。状态难保。分布式环境下节点挂了任务状态丢了重试机制又得重新写。没法动态调整。现在业务里经常要接 LLM模型输出不确定传统引擎的静态分支根本处理不了。与其堆功能不如把核心逻辑剥离出来用 JSON 定义流程用 Python 跑调度数据在节点间传。核心逻辑DAG 调度器架构其实就三层定义层JSON 写节点和依赖。调度层拓扑排序按顺序跑任务。数据层全局上下文节点间传参。流程大概是这样的解析 JSON生成拓扑结构。入度为 0 的节点进队列。线程池拉取任务执行完更新上下文释放后续节点依赖。没任务了流程结束。代码实现不用第三方库原生 Python 就能写。核心是SimpleWorkflowEngine支持并发和条件分支。import threading import queue from typing import Dict, List, Callable, Any class WorkflowNode: def __init__(self, name: str, action: Callable, condition: Callable None): self.name name self.action action self.condition condition class SimpleWorkflowEngine: def __init__(self): self.nodes {} self.dependencies {} self.adjacency {} self.context {} self.lock threading.Lock() def add_node(self, node: WorkflowNode): self.nodes[node.name] node self.dependencies[node.name] [] self.adjacency[node.name] [] def add_dependency(self, parent: str, child: str): if parent not in self.dependencies[child]: self.dependencies[child].append(parent) if child not in self.adjacency[parent]: self.adjacency[parent].append(child) def execute(self, initial_context: Dict[str, Any]) - Dict[str, Any]: self.context initial_context.copy() in_degree {n: len(d) for n, d in self.dependencies.items()} completed set() q queue.Queue() # 初始节点入队 for n, d in in_degree.items(): if d 0: q.put(n) def worker(): while True: try: name q.get(timeout1) except queue.Empty: with self.lock: if len(completed) len(self.nodes): break continue node self.nodes[name] run True if node.condition: with self.lock: run node.condition(self.context) if run: try: res node.action(self.context) with self.lock: self.context.update(res) except Exception as e: print(fNode {name} failed: {e}) with self.lock: completed.add(name) for nxt in self.adjacency[name]: in_degree[nxt] - 1 if in_degree[nxt] 0: q.put(nxt) q.task_done() threads [threading.Thread(targetworker) for _ in range(4)] for t in threads: t.start() for t in threads: t.join() return self.context测试一下if __name__ __main__: engine SimpleWorkflowEngine() engine.add_node(WorkflowNode(Start, lambda ctx: {loaded: True})) engine.add_node(WorkflowNode(Process, lambda ctx: {res: 42}, lambda ctx: ctx.get(loaded))) engine.add_node(WorkflowNode(Skip, lambda ctx: {skipped: True}, lambda ctx: not ctx.get(loaded))) engine.add_dependency(Start, Process) engine.add_dependency(Start, Skip) print(engine.execute({}))生产环境要注意什么代码跑通只是第一步上线还得考虑这几块1. 记录日志别只打印 stdout。每个实例给个 ID节点开始时间、结束时间、输入输出、报错信息全存数据库。不然出了问题根本没法查。2. 心跳检测任务跑久了容易僵死。执行节点得定时发心跳调度器发现超时就触发重试或降级。3. 断点续跑网络抖动或重启是常事。引擎得支持从最后成功的节点接着跑别每次都从头来。前提是节点操作得是幂等的不然数据就乱了。总结工作流引擎不用追求大而全。DAG 模型够用了重点是把状态管理、并发调度和异常处理做好。代码越轻维护越容易接业务也越快。