在 langgraph 里,检查点(checkpoint)是一项重要的功能,它能够记录工作流在执行过程中的中间状态。当工作流因某些原因中断时,可以从检查点恢复继续执行,避免从头开始,提升效率。
示例:
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
import json# 定义状态类型
class WorkflowState(TypedDict):input_data: strintermediate_result: strfinal_result: str# 定义步骤函数
def step1(state: WorkflowState):print("执行步骤 1")# 模拟一些处理state["intermediate_result"] = f"处理“ {state['input_data']} ”的中间结果"# 模拟中断,在步骤 1 之后随机抛出异常,50%的概率出现中断import randomif random.random() < 0.5:raise Exception("模拟工作流中断")return statedef step2(state: WorkflowState):print("执行步骤 2")# 模拟一些处理state["final_result"] = f"最终结果基于 {state['intermediate_result']}"return state# 创建工作流
def create_workflow():workflow = StateGraph(WorkflowState)# 添加节点workflow.add_node("step1", step1)workflow.add_node("step2", step2)# 添加边workflow.add_edge(START, "step1")workflow.add_edge("step1", "step2")workflow.add_edge("step2", END)return workflow# 保存检查点
def save_checkpoint(state, checkpoint_path):# 正常代码不用加这一段,这里为了演示效果state = {"input_data": f"{state['input_data']},我是检查点数据",}with open(checkpoint_path, 'w') as f:json.dump(state, f)# 加载检查点
def load_checkpoint(checkpoint_path):try:with open(checkpoint_path, 'r') as f:temp = json.load(f)print(f"加载检查点成功 : {temp}")return tempexcept FileNotFoundError:print(f"加载检查点 失败,因为不存在。")return None# 主函数
def main():input_data = "示例输入数据"checkpoint_path = "checkpoint.json"# 尝试加载检查点initial_state = load_checkpoint(checkpoint_path)if initial_state is None:initial_state = {"input_data": input_data}print(f"初始数据: {initial_state['input_data']}")workflow = create_workflow()app = workflow.compile()try:# 执行工作流final_state = app.invoke(initial_state)print("最终结果:", final_state["final_result"])except Exception as e:print(f"工作流中断: {e}")# 保存当前状态作为检查点save_checkpoint(initial_state, checkpoint_path)print("已保存当前状态到检查点。")if __name__ == "__main__":main()
当线程中断时:
重新执行后: