06 持久化(Persistence)

📅 2026/6/30 1:52:27
06 持久化(Persistence)
06 持久化Persistence一、什么是持久化持久化是 LangGraph 的状态保存机制允许在图执行过程中保存和恢复状态实现故障恢复、长时间运行和会话记忆。核心理念通过检查点Checkpoint机制将图的执行状态序列化存储随时可以恢复到任意历史节点。概念说明类比检查点Checkpoint图在某个节点执行后的状态快照游戏存档线程Thread一次完整的图执行过程一次游戏流程检查点存储Checkpointer保存检查点的后端内存/SQLite/PostgreSQL存档文件二、为什么需要持久化2.1 典型场景# ❌ 无持久化 - 故障后必须从头开始graphStateGraph(State)# ... 添加节点和边resultgraph.invoke(input)# 如果中途失败只能重来2.2 持久化的价值# ✅ 有持久化 - 故障后从上次存档继续graphStateGraph(State)# ... 添加节点和边graph.compile(checkpointerMemorySaver())# 启用持久化# 第一次运行 - 执行到 node_3 时中断config{configurable:{thread_id:thread_1}}graph.invoke(input,config)# 自动保存检查点# 第二次运行 - 从 node_3 继续graph.invoke(None,config)# 恢复状态继续执行场景说明持久化作用故障恢复网络中断、服务崩溃从上次检查点继续长时间任务文档解析、模型训练分阶段保存进度会话记忆多轮对话、用户状态跨请求保持上下文调试回溯查看历史执行状态回到任意历史节点人机协作人工审批、人工干预暂停等待人工操作三、内置检查点存储LangGraph 提供多种内置存储后端存储类型适用场景持久性安装MemorySaver开发测试、单次会话内存重启丢失内置SqliteSentry本地开发、小型应用文件持久化pip install langgraph-checkpoint-sqlitePostgresSaver生产环境、多实例数据库持久化pip install langgraph-checkpoint-postgres3.1 MemorySaver内存存储 MemorySaver 演示 运行方式python 06_持久化_MemorySaver.py fromtypingimportTypedDictfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverclassState(TypedDict):count:intmessage:strdefincrement(state:State):return{count:state[count]1,message:f计数:{state[count]1}}defcheck_continue(state:State):ifstate[count]3:returnincrementreturnEND builderStateGraph(State)builder.add_node(increment,increment)builder.add_edge(START,increment)builder.add_conditional_edges(increment,check_continue)# 创建检查点存储memory_saverMemorySaver()graphbuilder.compile(checkpointermemory_saver)# 使用 thread_id 标识会话config{configurable:{thread_id:thread_1}}# 第一次调用result1graph.invoke({count:0,message:},config)print(f第一次:{result1})# 查看检查点历史historylist(graph.get_state_history(config))print(f检查点数量:{len(history)})# 从历史恢复forstate_snapshotinhistory:print(f检查点:{state_snapshot.values})3.2 SqliteSentrySQLite 持久化 SqliteSentry 演示 运行方式python 06_持久化_SqliteSentry.py 安装依赖 pip install langgraph-checkpoint-sqlite fromlanggraph.checkpoint.sqliteimportSqliteSentry# 创建 SQLite 检查点存储sqlite_saverSqliteSentry.from_conn_string(checkpoints.db)graphbuilder.compile(checkpointersqlite_saver)# 使用方式与 MemorySaver 相同config{configurable:{thread_id:thread_1}}resultgraph.invoke({count:0,message:},config)四、检查点详解4.1 检查点包含什么每个检查点保存以下信息{values:{# 状态的当前值count:2,message:计数: 2},next:[increment],# 下一个要执行的节点config:{configurable:{thread_id:thread_1,checkpoint_id:abc123# 检查点唯一标识}},metadata:{source:loop,# 来源input/loop/updatestep:2,# 执行步骤parents:{}# 父图检查点用于子图},tasks:[{id:task_1,name:increment,interrupts:[]# 中断点用于人机协作}]}4.2 查看检查点历史 查看检查点历史演示 运行方式python 06_检查点历史.py fromlanggraph.checkpoint.memoryimportMemorySaver# 假设 graph 已编译graphbuilder.compile(checkpointerMemorySaver())config{configurable:{thread_id:thread_1}}# 执行几次graph.invoke({count:0,message:},config)# 获取状态历史historylist(graph.get_state_history(config))fori,state_snapshotinenumerate(history):print(f--- 检查点{i}---)print(f状态值:{state_snapshot.values})print(f下一个节点:{state_snapshot.next})print(f元数据:{state_snapshot.metadata})print()4.3 回到历史检查点 回到历史检查点演示 运行方式python 06_回退检查点.py # 获取历史检查点historylist(graph.get_state_history(config))target_checkpointhistory[2]# 选择第3个检查点# 更新状态到历史检查点graph.update_state(config,target_checkpoint.values)# 继续执行resultgraph.invoke(None,config)print(f从历史恢复后:{result})五、会话记忆实现5.1 多轮对话示例 多轮对话记忆示例 运行方式python 06_会话记忆.py fromtypingimportTypedDict,Listfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverclassChatState(TypedDict):messages:List[dict]context:strdefprocess_message(state:ChatState):messagesstate[messages]last_msgmessages[-1][content]ifmessageselse# 模拟 LLM 处理responsef你说的是:{last_msg}messages.append({role:assistant,content:response})return{messages:messages,context:last_msg}builderStateGraph(ChatState)builder.add_node(process,process_message)builder.add_edge(START,process)builder.add_edge(process,END)memoryMemorySaver()chat_graphbuilder.compile(checkpointermemory)# 会话1config_1{configurable:{thread_id:user_123}}result1chat_graph.invoke({messages:[{role:user,content:你好}],context:},config_1)print(f会话1:{result1[messages][-1][content]})# 继续会话1result2chat_graph.invoke({messages:result1[messages][{role:user,content:LangGraph是什么?}],context:},config_1)print(f会话1继续:{result2[messages][-1][content]})# 会话2独立会话config_2{configurable:{thread_id:user_456}}result3chat_graph.invoke({messages:[{role:user,content:新会话}],context:},config_2)print(f会话2:{result3[messages][-1][content]})5.2 掌柜智库中的应用在 RAG 系统中持久化可用于保存用户的对话历史和检索上下文 RAG 会话记忆示例 图结构 START ──→ retrieve ──→ generate ──→ END 持久化保存 - 用户对话历史 - 检索到的文档片段 - 生成的答案上下文 fromtypingimportTypedDict,Listfromlanggraph.graphimportStateGraph,START,ENDfromlanggraph.checkpoint.memoryimportMemorySaverclassRAGState(TypedDict):query:strchat_history:List[dict]retrieved_docs:List[str]answer:strdefretrieve(state:RAGState):querystate[query]# 模拟检索docs[f文档片段:{query}相关内容]return{retrieved_docs:docs}defgenerate(state:RAGState):querystate[query]docsstate[retrieved_docs]historystate[chat_history]# 基于历史和检索结果生成答案answerf基于{len(docs)}个文档和{len(history)}轮对话的回答:{query}# 更新对话历史history.append({role:user,content:query})history.append({role:assistant,content:answer})return{answer:answer,chat_history:history}builderStateGraph(RAGState)builder.add_node(retrieve,retrieve)builder.add_node(generate,generate)builder.add_edge(START,retrieve)builder.add_edge(retrieve,generate)builder.add_edge(generate,END)memoryMemorySaver()rag_graphbuilder.compile(checkpointermemory)# 第一轮对话config{configurable:{thread_id:rag_session_1}}result1rag_graph.invoke({query:什么是RAG?,chat_history:[],retrieved_docs:[],answer:},config)print(f第一轮:{result1[answer]})print(f对话历史长度:{len(result1[chat_history])})# 第二轮对话 - 自动携带历史result2rag_graph.invoke({query:它有什么优势?,chat_history:result1[chat_history],# 传入上轮历史retrieved_docs:[],answer:},config)print(f第二轮:{result2[answer]})print(f对话历史长度:{len(result2[chat_history])})六、生产环境配置6.1 PostgreSQL 持久化 PostgreSQL 持久化配置 安装依赖 pip install langgraph-checkpoint-postgres 连接字符串格式 postgresql://user:passwordlocalhost:5432/dbname fromlanggraph.checkpoint.postgresimportPostgresSaver# 方式1: 直接连接字符串postgres_saverPostgresSaver.from_conn_string(postgresql://user:passwordlocalhost:5432/langgraph_db)# 方式2: 使用连接池frompsycopgimportConnection connConnection.connect(postgresql://user:passwordlocalhost:5432/langgraph_db)postgres_saverPostgresSaver(conn)graphbuilder.compile(checkpointerpostgres_saver)6.2 配置最佳实践配置项开发环境生产环境存储后端MemorySaver或SqliteSentryPostgresSaverthread_id策略固定值测试用户会话 ID检查点清理无需管理定期清理过期检查点并发控制无需考虑使用连接池七、核心要点总结要点说明检查点图执行到某个节点时的状态快照线程通过thread_id隔离不同的执行会话存储选择开发用 MemorySaver生产用 PostgreSQL状态恢复可从任意历史检查点继续执行会话记忆跨请求保持对话历史和上下文生产配置使用 PostgreSQL 连接池相关笔记[[05-子图Subgraph|子图]] · [[07-人机协作Human-in-the-loop|人机协作]]