概述前面第 5 篇我们讲过 LCELchainprompt|model|parser resultchain.invoke({topic:Runnable})这行代码看起来很轻但背后压着 LangChain 最核心的一层设计Runnable。如果只从使用者视角看它像是一套链式调用 API。但从源码视角看Runnable至少解决了五个问题不同组件如何拥有统一调用接口|管道语法如何把多个组件组合成一个新组件为什么组合后的chain仍然可以继续invoke()、stream()、batch()普通 Python 函数为什么可以接进 LCELtracing、metadata、并发控制、异常处理这些运行时信息如何贯穿整个链路本文会围绕langchain-core的 Runnable 源码展开重点看Runnable抽象类的职责。__or__运算符重载如何生成RunnableSequence。invoke()、stream()、batch()的执行差异。RunnableSequence如何把多个步骤串起来。RunnableLambda如何把普通函数包装成 Runnable。RunnableConfig如何把 callbacks、tags、metadata、max_concurrency 传下去。LCEL 的本质不是“用竖线少写几行代码”而是用Runnable把 LLM 应用中的组件统一成可组合、可批处理、可流式、可追踪的执行单元。源码入口先找到 Runnable 在哪里读这篇源码时先记住几个路径。以 LangChain 官方仓库为准核心代码主要在libs/core/langchain_core/runnables/base.py这个文件里包含几类关键对象对象作用Runnable所有可执行组件的抽象基类RunnableSerializable支持序列化和配置化的 Runnable 基类RunnableSequence顺序组合多个 RunnableRunnableParallel并行执行多个 RunnableRunnableLambda把普通函数包装成 RunnableRunnableGenerator包装生成器函数适合流式转换coerce_to_runnable()把函数、dict、Runnable-like 对象转换成 Runnable相关辅助逻辑主要在libs/core/langchain_core/runnables/config.py libs/core/langchain_core/runnables/utils.py libs/core/langchain_core/runnables/graph.py其中config.py: 负责RunnableConfig、callback manager、executor、config 合并。utils.py: 负责函数签名判断、schema 推断、并发工具等。graph.py: 负责把 Runnable 链路转换成图结构供可视化、调试和 tracing 使用。源码阅读顺序建议是Runnable - __or__ / pipe - coerce_to_runnable - RunnableSequence - RunnableLambda - batch / stream / transform - RunnableConfig不要一上来就读RunnableSequence.batch()它里面有 callback、异常、并发、config 分发会显得很绕。先读Runnable的公共协议再读组合类和包装类最后读 config、callback、streaming 这些横切逻辑。Runnable 的心智模型一个可执行、可组合、可观测的工作单元先看一个最小 Runnablefromlangchain_core.runnablesimportRunnableLambdadefadd_one(x:int)-int:returnx1rRunnableLambda(add_one)print(r.invoke(1))# 2print(r.batch([1,2,3]))# [2, 3, 4]从源码角度看Runnable不是“链”的意思而是“可执行单元”。这个可执行单元至少有三层能力能力方法含义单次调用invoke()/ainvoke()一个输入变一个输出批量调用batch()/abatch()多个输入变多个输出流式调用stream()/astream()一个输入逐步产出多个 chunk事件流astream_events()观察中间步骤和运行事件日志流astream_log()观察运行日志和 patch组合__or__()/pipe()拼成新的 Runnable配置with_config()/bind()注入配置或绑定参数增强with_retry()/with_fallbacks()增加重试和降级工具化as_tool()转成 tool这就是为什么prompt、model、parser看起来类型完全不同却都能被|拼起来。它们都遵守一个最小协议Input - Runnable - Output一旦遵守这个协议就可以进入 LCEL 组合系统。Runnable 抽象类真正必须实现的是 invoke从源码结构看Runnable是一个泛型抽象类Runnable[Input,Output]你可以把它理解成接收 Input 类型 输出 Output 类型它的核心抽象方法是invoke()。简化理解如下classRunnable(Generic[Input,Output]):definvoke(self,input:Input,config:RunnableConfig|NoneNone,**kwargs)-Output:...源码里的真实实现会更复杂因为它要处理类型推断。Pydantic schema。config schema。callback manager。tracing。sync / async 适配。streaming。batch 并发。序列化。但从阅读入口看先抓住一个重点对一个最小 Runnable 来说invoke()是必须回答的问题给我一个输入我如何产出一个输出其他能力很多都有默认实现。例如默认ainvoke()可以把同步invoke()放进 executor 里跑。默认batch()可以对多个输入并发调用invoke()。默认stream()可以退化成只产出一次invoke()的结果。这套默认实现非常关键。它意味着只要一个组件实现了最基础的invoke()就立刻获得了异步、批处理、流式协议的基本形态。invoke()是 Runnable 的最小执行语义ainvoke()、batch()、stream()则是在这个语义上的通用运行模式。__or__竖线运算符如何生成 RunnableSequence我们最熟悉的写法是chainprompt|model|parserPython 里|是运算符。LangChain 让它能用于 Runnable是因为Runnable重载了__or__()。源码可以简化成这样的逻辑def__or__(self,other):returnRunnableSequence(self,coerce_to_runnable(other),)也就是说prompt|model会变成RunnableSequence(prompt,model)再继续prompt|model|parser可以理解成RunnableSequence(RunnableSequence(prompt,model),parser,)实际源码会对嵌套 sequence 做整理保证最终步骤列表更平整但心智模型就是这样。__ror__为什么 dict 也能放到管道里你可能见过这种写法chain{question:RunnablePassthrough(),context:retriever,}|prompt|model这里左边是一个dict它本身不是 Runnable。能工作是因为Runnable还实现了反向运算符__ror__()。当 Python 发现左操作数处理不了|就会尝试右操作数的__ror__()。简化理解def__ror__(self,other):returnRunnableSequence(coerce_to_runnable(other),self,)所以some_dict|prompt会先把some_dict转成 Runnable再和prompt组成RunnableSequence。coerce_to_runnable()Runnable-like 对象的入口coerce_to_runnable()是 LCEL 很重要的“入口转换器”。它负责把这些对象转换成真正的 Runnable传入对象转换结果已经是Runnable原样返回普通函数RunnableLambda生成器函数RunnableGeneratordictRunnableParallel不支持的对象抛出类型错误这就是为什么 LCEL 里可以混用fromlangchain_core.runnablesimportRunnableLambda chain(RunnableLambda(lambdax:x1)|(lambdax:x*2)|{value:lambdax:x,text:str})上面写法中后两个对象不是显式 Runnable但都会被转换。|的核心不是“把两个对象连起来”而是“先把右侧对象规范化成 Runnable再生成一个新的RunnableSequence”。RunnableSequence链式组合的内部结构RunnableSequence是 LangChain 里最重要的组合类。它代表step1 - step2 - step3 - ... - stepN每一步都是 Runnable。例如chainprompt|model|parser可以理解成RunnableSequence first: prompt middle: [model] last: parser或者更直观地理解成steps [prompt, model, parser]invoke()顺序执行每一步RunnableSequence.invoke()的核心逻辑非常直接。伪代码如下definvoke(input,configNone):valueinputforstepinsteps:valuestep.invoke(value,child_config)returnvalue所以这条链chainprompt|model|parser执行时就是原始输入 dict | v prompt.invoke(...) | v PromptValue / messages | v model.invoke(...) | v AIMessage | v parser.invoke(...) | v 字符串 / 结构化对象你可以把RunnableSequence当成一个“for 循环封装器”但它比普通 for 循环多做了很多事情给每个 step 分配子 callback。合并和传递 config。维护 run name。捕获错误并上报 tracing。让整个 sequence 仍然是一个 Runnable。让 sequence 自动继承batch()、stream()、ainvoke()等能力。这就是 LCEL 的价值组合之后仍然符合同一个协议。子 run为什么 trace 里有seq:step:1读源码时会看到类似这样的概念seq:step:1 seq:step:2 seq:step:3这不是业务逻辑而是 tracing 逻辑。一个RunnableSequence本身是一个 root run它里面的每个 step 都是 child run。结构类似RunnableSequence run | |-- seq:step:1 prompt |-- seq:step:2 model |-- seq:step:3 parser这也是为什么 LangSmith 或 ConsoleCallbackHandler 可以看到中间步骤。如果没有这层 callback 分发chain.invoke()在外部看起来就只是一个黑盒。RunnableSequence.invoke()的本质是顺序调用每个 step但源码里额外处理了 config、callback、trace、异常和子步骤命名。stream()流式输出不是所有步骤都天然支持很多人以为只要调用forchunkinchain.stream(input):print(chunk)就一定会从第一步开始一路流式输出。源码层面并不是这么简单。stream()背后更核心的方法是transform()。可以这样理解stream(input) - 把单个 input 包装成 iterator - 调用 transform(iterator) - 逐个 yield output chunk对RunnableSequence来说流式数据会沿着每个 step 的transform()往后传input iterator - step1.transform(...) - step2.transform(...) - step3.transform(...) - output iterator关键点在这里如果中间某个 step 不支持真正的流式 transform它可能会先把输入缓存起来等输入完整后再产出结果。所以流式链路是否真的“边生成边输出”取决于链上每一步是否支持流式。例如chainprompt|streaming_model|parser如果prompt很快产出完整 messages。streaming_model支持 token/chunk 流式输出。parser支持逐 chunk 解析。那么整个链路就能比较自然地流式输出。但如果中间插入一个普通RunnableLambdachainprompt|model|RunnableLambda(lambdax:expensive_parse(x))它可能需要等模型输出完整后才执行解析这会打断前面的流式体验。RunnableLambda 的流式局限普通函数包装成RunnableLambda时通常是收完整输入 - 调函数 - 产一个输出它并不天然适合逐 chunk 处理。如果你需要真正的流式转换更应该考虑生成器函数或专门实现支持transform()的 Runnable。stream()是协议层能力但真实流式效果取决于链上每个步骤是否支持transform()普通同步函数经常会让流式链路变成“先缓存、后输出”。batch()默认并发调用 invoke而不是神秘批处理batch()的使用方式很简单outputschain.batch([{topic:Runnable},{topic:RunnableSequence},{topic:RunnableLambda},])很多人会误以为batch()一定会调用模型供应商的批量 API。源码默认逻辑不是这样。官方 reference 和源码都表明默认batch()会并发执行多次invoke()适合 IO 密集型 Runnable。简化伪代码defbatch(inputs,configNone,return_exceptionsFalse):configsget_config_list(config,len(inputs))defone(input,config):returnself.invoke(input,config)returnexecutor.map(one,inputs,configs)它的关键点有三个一个 input 对应一个 config。多个 input 默认并发执行。如果底层 API 有真正批量接口子类应该重写batch()。比如对普通 HTTP 调用来说默认并发已经有价值。但如果某个 embeddings provider 提供了原生 batch endpoint那么最佳实现应该直接覆盖batch()一次请求处理多个文本而不是并发发很多小请求。max_concurrency并发不是无限开RunnableConfig里有一个关键字段config{max_concurrency:5,}它用于控制并发数量。例如resultschain.batch([{topic:t}fortintopics],config{max_concurrency:5},)这在调用外部模型、搜索 API、数据库、爬虫工具时很重要。否则你很容易遇到provider rate limit。数据库连接池耗尽。本地 CPU 线程过多。请求排队导致延迟抖动。return_exceptions批处理中如何处理单条失败batch()还有一个常见参数return_exceptionsTrue如果它是False某个输入失败通常会抛出异常。如果它是True失败项会作为异常对象放回结果列表中。例如outputschain.batch(inputs,return_exceptionsTrue)foriteminoutputs:ifisinstance(item,Exception):print(failed:,item)else:print(ok:,item)这在批量处理文档、批量抽取结构化信息、批量生成摘要时很实用。默认batch()不是供应商原生批量 API而是并发执行多个invoke()真正高效的批量接口需要具体 Runnable 自己重写。RunnableSequence.batch批处理链路如何逐步推进单个 Runnable 的batch()是并发执行多个invoke()。但RunnableSequence的batch()更有意思。它不是简单地对每个输入完整执行一遍全链路而是按 step 推进inputs [a, b, c] step1.batch(inputs) - [a1, b1, c1] step2.batch([a1, b1, c1]) - [a2, b2, c2] step3.batch([a2, b2, c2]) - [a3, b3, c3]这样做有两个好处每个 step 都有机会使用自己的批处理优化。callback 和 tracing 可以按步骤组织。例如chainprompt|model|parser执行chain.batch(inputs)时大致是prompt.batch(inputs) - messages_list model.batch(messages_list) - ai_messages parser.batch(ai_messages) - parsed_outputs如果model.batch()有供应商侧优化它就可以在这一层发挥作用。如果没有它也会退回到并发model.invoke()。批处理中的异常传播当return_exceptionsTrue时RunnableSequence.batch()还需要处理一个问题某个输入在 step1 已经失败了还要不要进入 step2合理做法是失败项不再进入后续 step但最终结果列表仍然保持输入顺序。所以源码里会维护失败输入的位置再把成功结果和异常结果重新组装回原来的顺序。这也是为什么RunnableSequence.batch()的源码比单个 Runnable 的batch()更长。它不仅要“批量执行”还要保证顺序不乱。异常位置不丢。成功项继续往后跑。每个 step 都有自己的 child callback。RunnableSequence.batch()是按步骤批量推进而不是简单地把整条链复制 N 份并发跑。RunnableLambda普通函数如何变成 RunnableRunnableLambda是 LCEL 非常实用的一层适配器。它让普通 Python 函数可以进入 Runnable 世界fromlangchain_core.runnablesimportRunnableLambdadefnormalize_topic(topic:str)-dict:return{topic:topic.strip()}chainRunnableLambda(normalize_topic)|prompt|model|parser源码上它主要做几件事保存原始函数。判断函数是同步还是异步。推断输入输出类型和 schema。调用时把input、config、run_manager等按函数签名注入。如果函数返回的还是 Runnable则继续执行这个 Runnable。如果函数是 generator则支持按 chunk 产出。函数签名注入为什么有些函数能接收 config你可以写fromlangchain_core.runnablesimportRunnableConfig,RunnableLambdadefadd_trace_prefix(x:str,config:RunnableConfig)-str:run_nameconfig.get(run_name,unnamed)returnf[{run_name}]{x}rRunnableLambda(add_trace_prefix)print(r.invoke(hello,config{run_name:demo}))不是所有函数都必须接收config。RunnableLambda会根据函数签名判断是否传入这些额外参数。常见可注入参数包括configrun_manager这让普通函数在需要时可以参与 tracing、读取配置、派生 callback。返回 Runnable动态路由的基础RunnableLambda还有一个很重要的能力如果函数返回一个 Runnable它会继续调用返回的 Runnable。这可以用来做动态路由fromlangchain_core.runnablesimportRunnableLambdadefroute(question:str):ifSQLinquestion:returnsql_chainreturnrag_chain routerRunnableLambda(route)resultrouter.invoke(请用 SQL 查询订单数量)心智模型是route(input) - 返回某条 chain - 执行这条 chain源码里会用recursion_limit防止无限返回 Runnable 导致递归失控。RunnableLambda是普通 Python 世界和 Runnable 世界之间的桥它让函数、动态路由、小型转换逻辑都能进入 LCEL。*RunnableParalleldict 为什么代表并行分支虽然第这篇重点是RunnableSequence但理解dict自动变成RunnableParallel很重要。比如 RAG 里常见写法fromoperatorimportitemgetterfromlangchain_core.runnablesimportRunnablePassthrough chain{context:itemgetter(question)|retriever,question:itemgetter(question),}|prompt|model|parser左侧 dict 会被coerce_to_runnable()转成RunnableParallel。它的语义是同一份输入 | -- context 分支 | -- question 分支 | v 合并成 dict 输出也就是input - { context: context_runnable.invoke(input), question: question_runnable.invoke(input), }这解释了为什么 LCEL 里经常用 dict 做字段组装。它不是普通字典赋值而是并行执行分支并把多个分支结果合成一个字典。在 LCEL 里dict 通常不是静态数据而是会被转换成RunnableParallel的并行分支声明。RunnableConfig为什么 config 能一路传到底RunnableConfig是理解生产级 LCEL 的关键。它不是业务输入而是运行时配置。常见字段包括字段作用tags给 run 打标签方便筛选 tracemetadata附加运行元数据callbacks注入 callback handlerrun_name指定当前 run 名称max_concurrency控制 batch 并发数量recursion_limit控制 Runnable 动态递归深度configurable给可配置字段传值run_id指定运行 ID调用时可以这样传resultchain.invoke({topic:Runnable},config{run_name:runnable_article_demo,tags:[csdn,source-reading],metadata:{article:22},},)在RunnableSequence内部config 不会简单原样塞给每一步。它会被 patch、merge、派生 child callbacks。大致结构是root config | -- step1 child config | -- step2 child config | -- step3 child config这使得 trace 可以形成树chain run | |-- prompt run |-- model run |-- parser run如果你以后读create_agent()、middleware、ToolNode、LangSmith tracing都会看到这套 config/callback 机制。业务数据走input运行控制走configRunnable 源码大量复杂性都来自 config、callback 和 tracing 的传递。用一个例子串起 invoke、stream、batch下面用一个不依赖模型的例子把三种调用方式串起来。fromlangchain_core.runnablesimportRunnableLambdadefclean_text(text:str)-str:returntext.strip()defto_words(text:str)-list[str]:returntext.split()defcount_words(words:list[str])-int:returnlen(words)chain(RunnableLambda(clean_text)|RunnableLambda(to_words)|RunnableLambda(count_words))print(chain.invoke( hello langchain runnable ))print(chain.batch([hello langchain,runnable sequence,invoke stream batch,]))执行invoke() hello langchain runnable - clean_text - hello langchain runnable - to_words - [hello, langchain, runnable] - count_words - 3执行batch()step1.batch([hello langchain, runnable sequence, ...]) - cleaned_texts step2.batch(cleaned_texts) - word_lists step3.batch(word_lists) - counts执行stream()时由于这里都是普通函数实际不会有模型 token 那样的细粒度流式效果。通常会等某一步拿到完整输入后再产出。如果想观察stream()更明显的效果需要使用支持流式的模型或 generator Runnable。源码阅读时要抓的关键问题读base.py时不建议逐行读完。更高效的方式是带着问题读问题一这个类到底是不是 Runnable判断标准能不能 invoke 能不能 batch 能不能 stream 能不能被 | 组合问题二这个对象是原生 Runnable还是被转换出来的例如ChatPromptTemplate通常本身就是 Runnable。ChatModel通常本身就是 Runnable。普通函数会被转成RunnableLambda。dict 会被转成RunnableParallel。问题三这个链路的输入输出类型是什么源码中有InputType、OutputType、input_schema、output_schema。这些信息不仅用于文档也用于校验、可视化、工具化、部署。问题四流式输出是在哪一步断掉的如果你发现chain.stream(input)没有逐 token 输出优先检查模型是否开启 streaming。中间 parser 是否支持流式。是否插入了普通RunnableLambda。是否某一步只实现了invoke()没有真正实现transform()。问题五批处理为什么没有变快优先检查是否被max_concurrency限制。是否底层 provider 本身限流。是否某个 step 是 CPU 密集型。是否具体 Runnable 没有重写真正的原生 batch。是否 callback/tracing 开销过大。常见误区最后整理几个常见误区。误区一Runnable 等于 Chain不准确。Runnable是统一执行接口RunnableSequence才是顺序链。也就是说Runnable 是协议 RunnableSequence 是一种组合实现误区二|只是 Python 语法糖不准确。|会触发__or__()把右侧对象转换成 Runnable并返回新的RunnableSequence。它改变了对象结构不只是少写几行代码。误区三batch()一定等于供应商批量 API不准确。默认batch()是并发调用invoke()。只有具体 Runnable 重写了batch()才可能使用供应商原生批量能力。误区四stream()一定逐 token 输出不准确。stream()是协议真实输出粒度取决于链上每个 step 是否支持流式转换。误区五普通函数放进 LCEL 没有成本不准确。普通函数很方便但它可能打断流式输出。隐藏类型信息。让异常位置不直观。把复杂业务逻辑塞进匿名 lambda降低可读性。如果函数逻辑变复杂建议显式命名并用RunnableLambda包装。总结本文从源码视角拆了Runnable的核心设计。你需要记住这几条主线Runnable是统一执行协议核心是invoke(input, config)。__or__()会把两个 Runnable-like 对象组合成RunnableSequence。coerce_to_runnable()负责把函数、dict、generator 等对象转换成 Runnable。RunnableSequence.invoke()是顺序执行每个 step。RunnableSequence.batch()是按 step 批量推进并保持结果顺序和异常位置。stream()依赖transform()真实流式效果取决于每个 step 是否支持流式。RunnableLambda让普通 Python 函数进入 LCEL但普通函数不天然等于高质量流式组件。RunnableConfig负责 tags、metadata、callbacks、run_name、max_concurrency 等运行时控制。最后给一张源码阅读地图Runnable 抽象 | |-- invoke / ainvoke |-- batch / abatch |-- stream / astream |-- transform / atransform | -- __or__ / __ror__ / pipe | v RunnableSequence | -- step1.invoke -- step2.invoke -- step3.invoke coerce_to_runnable | |-- function - RunnableLambda |-- generator - RunnableGenerator |-- dict - RunnableParallel |-- Runnable - 原样返回 RunnableConfig | |-- callbacks |-- tags / metadata |-- run_name |-- max_concurrency |-- recursion_limit看懂Runnable就看懂了 LangChain 如何把 Prompt、Model、Parser、Retriever、Tool 和普通函数统一成一套可组合的执行系统。