LangChain - 流式传输(Streaming)

📅 2026/7/2 12:54:44
LangChain - 流式传输(Streaming)
一、先搞懂一件事流式 vs 普通调用invoke 是等模型把整段话说完一次性返回给你。问题是大模型生成慢你要干等好几秒才看到结果体验差。**流式传输stream**就是改成模型一边生成、一边把字往你这儿送像打字机一样一个字一个字蹦出来你立刻就能看到。▎ 比喻invoke 等整桌菜做好才端上stream 火锅边涮边吃。用法就是把 invoke 换成 stream然后在循环里一段段接收forchunkinagent.stream({messages:[...]},...):# chunk 就是模型刚吐出的一小段处理(chunk)二、核心难点stream_mode 是什么stream_mode 不是开不开流式而是你想看哪种类型的实时信息。一个智能体运行时同时有好几种信息在产生进度信息现在跑到第几步了调工具了吗工具返回啥了文字信息模型正在一个字一个字地说什么自定义信息工具干活时想主动汇报点进度“已查到 10 条”你不可能一次全看所以让你选要看哪种。这就是 stream_mode。页面给了三种模式stream_mode通俗名字你能看到什么updates看进度每一步结束后的“小结”模型要调工具了 / 工具返回了 / 模型最终回答了messages看吐字模型正在一个字一个字生成的过程连工具参数都是一点点拼出来的custom看自定义你自己在代码里用 writer 主动发的任意消息三、三种模式逐个用大白话讲模式 1“updates” —— 看步骤进度就像看一个任务清单智能体每完成一步就给你发一条这步干完了结果是xxx。比如问旧金山天气智能体调了一次天气工具你会收到三条进度step: model → 模型说我要调 get_weather 工具step: tools → 工具说旧金山天气是晴的step: model → 模型说最终回答——旧金山天气晴朗代码forchunkinagent.stream({messages:[{role:user,content:What is the weather in SF?}]},stream_modeupdates,versionv2,# 推荐用 v2输出格式统一):ifchunk[type]updates:forstep,datainchunk[data].items():print(fstep:{step})▎ 特点粒度粗每步一条不是逐字。适合我想知道现在进行到哪了。模式 2“messages” —— 看逐字吐出这就是真正的打字机效果。模型生成的每一个字token都立刻送给你连工具调用的参数都是一块块拼出来的先 ‘{’再 ‘city’再 ‘“:”’再 ‘San’…。代码forchunkinagent.stream({messages:[{role:user,content:What is the weather in SF?}]},stream_modemessages,versionv2,):ifchunk[type]messages:token,metadatachunk[data]# token 是那一小段metadata 是来源信息print(token.content_blocks)▎ 特点粒度细逐字逐块。适合我想让用户看到字一个个蹦出来。模式 3“custom” —— 看你自己发的消息前两种是系统自动产生的信息custom 是你自己想发什么就发什么。在工具里用 get_stream_writer() 主动喊话fromlanggraph.configimportget_stream_writerdefget_weather(city:str)-str:Get weather for a given city.writerget_stream_writer()writer(f开始查{city}的天气...)# 你主动发的进度writer(f查到了{city}的数据)returnfIts always sunny in{city}!外面接收forchunkinagent.stream(...,stream_modecustom,versionv2):ifchunk[type]custom:print(chunk[data])# 就会打印你 writer() 发的那些话▎ 特点完全自定义。适合工具干耗时的活想给用户报进度。▎ 注意用了 get_stream_writer 的工具不能脱离 LangGraph 执行环境单独调用。四、能同时开多个模式吗能。stream_mode[“updates”, “custom”] 这种传列表就行。但记住不是每种模式各把全部结果吐一遍而是几种模式的事件按时间顺序交织着来每个事件只出现一次。就像一条河里混着三种颜色的鱼updates、custom、messages它们按游过来的先后顺序一条条到你面前你用鱼的颜色chunk[“type”]区分是哪种forchunkinagent.stream(...,stream_mode[updates,custom],versionv2):ifchunk[type]updates:...# 处理进度elifchunk[type]custom:...# 处理自定义实际输出顺序页面示例就是交替的updates模型要调工具→ custom开始查→ custom查到了→ updates工具完成→ updates最终回答五、每个块长啥样看 type 区分不管哪种模式v2 格式下每个块都是这种结构{type:updates或messages或custom,# 这块是哪种模式来的data:...# 具体内容}所以循环里永远先看 chunk[“type”]判断这是哪类信息再去取 chunk[“data”]。这是看懂流式代码的万能套路。六、几个进阶用法知道有这些就行看模型的思考过程有些模型如 Claude回答前会先思考。用 stream_mode“messages”然后从内容块里挑出 type“reasoning” 的就是思考过程type“text” 的才是正式回答fortoken,metadatainagent.stream(...,stream_modemessages):reasoning[bforbintoken.content_blocksifb[type]reasoning]text[bforbintoken.content_blocksifb[type]text]ifreasoning:print(f[思考]{reasoning[0][reasoning]})iftext:print(text[0][text])▎ 不管 Anthropic 的 thinking 还是 OpenAI 的 reasoningLangChain 都统一成 “reasoning” 块换厂商代码不用改。既看逐字、又看完整的工具调用messages 给你的是半个半个的 JSON 片段你想看拼好的完整工具调用就再配上 updatesstream_mode[“messages”, “updates”]messages 看逐字片段updates 看拼好的完整结果多智能体时区分谁在说话给每个智能体起 name流式时开 subgraphsTrue用 metadata[“lc_agent_name”] 就知道当前是哪个智能体在吐字weather_agentcreate_agent(...,nameweather_agent)supervisorcreate_agent(...,namesupervisor,tools[call_weather_agent])forchunkinsupervisor.stream(...,stream_mode[messages,updates],subgraphsTrue):ifagent_name:metadata.get(lc_agent_name):人工审批中断某些工具调用前先暂停问人approve/edit用 HumanInTheLoopMiddleware checkpointer。流式时收集中断 → 给决策 → 用 Command(resumedecisions) 恢复agentcreate_agent(...,middleware[HumanInTheLoopMiddleware(interrupt_on{get_weather:True})],checkpointer...)# 第一次流式会中断收集 interrupts# 给出决策后用 Command 恢复agent.stream(Command(resumedecisions),configconfig,...)七、两个小开关version“v2”强烈推荐。让所有模式的输出格式统一成 {type, data}不用再分情况解包。需要 LangGraph ≥ 1.1。禁用流式某模型不想流式设 streamingFalse或 disable_streamingTrue。多智能体里可用来控制只让某个智能体流式。八、一句话总结▎ 流式传输 把 invoke 换成 stream让结果逐段实时送出。stream_mode 决定看哪种信息updates 看步骤进度、messages 看逐字吐出、custom 看自己发的消息。▎可多选事件交织着来每个块靠 chunk[“type”] 区分。九、看懂流式代码的万能三步不管页面里哪段代码都套这个套路看 stream_mode他开了哪些模式循环里看 chunk[“type”]这块是哪个模式来的取 chunk[“data”]这个模式的实际内容是什么记住这三步所有例子都能看懂。十、和前面学的串起来流式拿到的字就是 AIMessageChunk消息页讲过可以累加拼成完整 AIMessage。content_blocks 把各厂商的思考/推理统一成 “reasoning” 块消息页讲过。get_stream_writer() 就是工具页讲的 runtime.stream_writer 的全局版本。人工审批用的 checkpointer thread_id 是短期记忆页讲的那套。