LangGraph实战:构建有状态AI工作流引擎

📅 2026/7/2 4:40:21
LangGraph实战:构建有状态AI工作流引擎
一、LangGraph vs 其他AI框架LangGraph是LangChain团队推出的新一代AI应用开发框架专门用于构建有状态、多步骤的AI工作流。与LangChain的DAG模型不同LangGraph采用Cyclic Graph循环图结构支持无限循环和条件分支可以实现真正的工作流编排。回到顶部二、核心概念StateGraph状态图工作流的核心 Node节点代表一个步骤或任务 Edge边定义节点间的流转关系 State状态整个工作流的共享状态 Checkpoint检查点支持断点续跑和回溯 ConditionalEdge条件边支持动态路由回到顶部三、环境搭建pip install langgraph langchain langchain-openai import os os.environ[OPENAI_API_KEY] your-api-key回到顶部四、最简单的工作流from langgraph.graph import StateGraph, END from typing import TypedDict # 定义状态 class GraphState(TypedDict): messages: list result: str # 定义节点 def node1(state): return {result: Step 1 完成 - state.get(result, )} def node2(state): return {result: state.get(result, ) Step 2 完成} # 创建图 workflow StateGraph(GraphState) workflow.add_node(step1, node1) workflow.add_node(step2, node2) # 定义边 workflow.set_entry_point(step1) workflow.add_edge(step1, step2) workflow.add_edge(step2, END) # 编译 app workflow.compile() # 执行 result app.invoke({messages: [], result: }) print(result)回到顶部五、有状态工作流客服机器人from langchain_openai import ChatOpenAI llm ChatOpenAI(modelgpt-4o) class客服State(TypedDict): query: str intent: str response: str needs_human: bool # 意图识别节点 def识别意图(state): query state[query] prompt f判断用户意图{query}只返回咨询/投诉/查询/办理 intent llm.invoke(prompt).content return {intent: intent} # 分类处理 def处理咨询(state): response llm.invoke(f专业回答用户问题{state[query]}).content return {response: response} def处理投诉(state): response 非常抱歉给您带来不便我们会尽快处理您的问题。 return {response: response, needs_human: True} # 创建工作流 workflow StateGraph(客服State) workflow.add_node(intent_classifier, 识别意图) workflow.add_node(handle_consult, 处理咨询) workflow.add_node(handle_complaint, 处理投诉) workflow.set_entry_point(intent_classifier) # 条件边根据意图路由 def路由(state): intent state.get(intent, ) if 投诉 in intent: return handle_complaint return handle_consult workflow.add_conditional_edges( intent_classifier, 路由, {handle_consult: handle_consult, handle_complaint: handle_complaint} ) workflow.add_edge(handle_consult, END) workflow.add_edge(handle_complaint, END) app workflow.compile() result app.invoke({query: 我要投诉快递太慢, intent: , response: , needs_human: False}) print(result)回到顶部六、循环工作流智能迭代# 循环工作流代码审查 自动修复 class CodeState(TypedDict): code: str issues: list iterations: int def代码审查(state): issues [] if TODO in state[code]: issues.append(发现TODO待办) if len(state[code]) 500: issues.append(代码过长) return {issues: issues} def代码修复(state): code state[code] for issue in state.get(issues, []): if TODO in issue: code code.replace(TODO, DONE) return {code: code, iterations: state.get(iterations, 0) 1} def检查是否继续(state): if state.get(issues) and state.get(iterations, 0) 3: return fix return end workflow StateGraph(CodeState) workflow.add_node(review, 代码审查) workflow.add_node(fix, 代码修复) workflow.set_entry_point(review) workflow.add_conditional_edges( review, 检查是否继续, {fix: fix, end: END} ) workflow.add_edge(fix, review) app workflow.compile() result app.invoke({code: def test(): TODO..., issues: [], iterations: 0}) print(result)回到顶部七、检查点与断点续跑# 检查点配置 from langgraph.checkpoint.memory import MemorySaver checkpointer MemorySaver() workflow StateGraph(GraphState) workflow.add_node(step1, node1) workflow.add_node(step2, node2) workflow.set_entry_point(step1) workflow.add_edge(step1, step2) workflow.add_edge(step2, END) # 编译时启用检查点 app workflow.compile(checkpointercheckpointer) # 首次执行会保存检查点 config {configurable: {thread_id: user-123}} result app.invoke({messages: [], result: Step 1 - }, config) print(首次执行:, result) # 断点续跑从检查点恢复 result2 app.invoke({messages: [], result: }, config) print(续跑结果:, result2)回到顶部八、Human-in-the-Loop# 人类审核节点 from langgraph.prebuilt import ToolNode def审核节点(state): # 返回特殊信号等待人类审核 return {__interrupt__: True} def人类决策(state, decision): # 接收人类决策后继续执行 return {human_decision: decision} workflow StateGraph(GraphState) workflow.add_node(ai_process, lambda s: {result: AI处理完成}) workflow.add_node(human_review, 审核节点) workflow.add_node(human_decide, lambda s: {result: 人类批准: s.get(human_decision, )}) workflow.set_entry_point(ai_process) workflow.add_edge(ai_process, human_review) workflow.add_edge(human_review, human_decide) workflow.add_edge(human_decide, END) app workflow.compile(interrupt_before[human_decide]) # 执行到人类审核节点后暂停 result app.invoke({messages: [], result: }) print(暂停等待审核:, result) # 人类审核后继续 app.invoke({human_decision: 批准}, config)回到顶部九、与Spring Boot集成Service public class LangGraphService { Value(${langgraph.service.url}) private String serviceUrl; public String runWorkflow(String workflowId, MapString, Object input) { HttpHeaders headers new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); MapString, Object body Map.of( workflow_id, workflowId, input, input, thread_id, UUID.randomUUID().toString() ); HttpEntityMapString, Object entity new HttpEntity(body, headers); try { ResponseEntity response restTemplate.exchange( serviceUrl /execute, HttpMethod.POST, entity, Map.class ); return (String) response.getBody().get(result); } catch (Exception e) { return Error: e.getMessage(); } }