conversation-pipeline

📅 2026/7/1 19:18:34
conversation-pipeline
Claude Code 完整对话链路深度解析本文梳理了 Claude Code 从用户输入到模型回复的完整链路涵盖 REPL 交互、QueryEngine 状态管理、四级压缩、API 请求组装、流式处理、工具执行循环等全流程。一、整体架构概览┌─────────────────────────────────────────────────────────┐ │ 入口层 (Entry Points) │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ REPL 交互 │ │ SDK 调用 │ │ CLI 管道 │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ └──────────────┼─────────────┘ │ │ ▼ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ QueryEngine.submitMessage() │ │ │ │ (统一入口管理消息生命周期) │ │ │ └──────────────────────┬───────────────────────────┘ │ │ ▼ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ query() — 核心查询循环 │ │ │ │ ┌──────────────────────────────────────┐ │ │ │ │ │ 四级压缩 → API组装 → 流式接收 → 工具执行 │ │ │ │ │ │ ↑ │ │ │ │ │ │ └─── 循环(直到无tool_use) ──┘ │ │ │ │ └──────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────┘二、阶段一用户输入到达2.1 REPL 交互模式用户在终端输入文本 → REPL 组件处理// REPL.tsx → processUserInput.tsx// 1. 检查是否以 / 开头 → 斜杠命令 (/compact, /help, /commit 等)// 2. 否则 → 作为普通对话消息处理// 创建 user messageconstuserMessagecreateUserMessage({content:修复 foo.ts 的 login 函数,uuid:u51,// 唯一标识timestamp:Date.now(),// 时间戳})2.2 QueryEngine — 消息存储// QueryEngine.tsclassQueryEngine{privatemutableMessages:Message[]// ← 内存中的全部对话历史asyncsubmitMessage(prompt,options){// 1. 处理用户输入可能触发 slash 命令// 2. 将用户消息推入消息数组this.mutableMessages.push(userMessage)// 3. 持久化到磁盘 JSONLawaitrecordTranscript([userMessage])// 4. 打快照作为本轮 query 的输入constmessages[...this.mutableMessages]// 5. 进入 query loopforawait(constmessageofquery(messages,options)){yieldmessage// 流式 yield 给 REPL / SDK}}}2.3 消息持久化到磁盘// sessionStorage.ts: recordTranscript()awaitgetProject().insertMessageChain(messages)// → 逐条追加到 session-xxx.jsonl每行一条 JSON// → parentUuid 链维护消息间的父子关系三、阶段二Query Loop — 压缩管线3.1 进入循环// query.tsasyncfunction*query(messages,options){letmessagesForQuerymessages// 本轮使用的消息letneedsFollowUpfalse// 是否需要继续有 tool_usedo{// 压缩管线开始 3.2 第一级Snip Compact — 删除整条消息// query.ts:401-409letsnipTokensFreed0if(feature(HISTORY_SNIP)){constsnipResultsnipModule.snipCompactIfNeeded(messagesForQuery)messagesForQuerysnipResult.messages// 消息减少snipTokensFreedsnipResult.tokensFreed// 传给后续阈值检查}作用模型通过 SnipTool 标记了要删除的消息 UUID → 这些消息被过滤掉。为什么在 Microcompact 之前减少后续步骤需要处理的 token 量。3.3 第二级Microcompact — 清除工具结果内容// query.ts:413-426constmicrocompactResultawaitdeps.microcompact(messagesForQuery,toolUseContext,querySource)messagesForQuerymicrocompactResult.messages// 消息本身不变// compactionInfo.pendingCacheEdits → 缓存删除指令 // 额外产出作用旧的 tool_result 内容被标记为可以删掉缓存。消息数组不变但额外生成了cache_edits删除指令。3.4 第三级Context Collapse — 异步折叠// query.ts:440-447if(contextCollapse){constcollapseResultawaitcontextCollapse.applyCollapsesIfNeeded(messagesForQuery,toolUseContext,querySource)messagesForQuerycollapseResult.messages// 折叠区间被替换为 summary}作用后台 ctx-agent 分析完成的对话区间被折叠为摘要占位符。projectView() 做只读投影不修改 REPL 消息数组。3.5 第四级AutoCompact — 兜底总结// query.ts:453-467const{compactionResult,consecutiveFailures}awaitdeps.autocompact(messagesForQuery,toolUseContext,cacheSafeParams,querySource,tracking,snipTokensFreed)if(compactionResult){// 用压缩后的消息替换当前消息messagesForQuerybuildPostCompactMessages(compactionResult)// 如果是 task_budget 模式 → 扣减已消耗的 token 预算taskBudgetRemaining-preCompactContext}作用若前三层不够、token 仍超阈值 → LLM 总结前缀对话。四、阶段三API 请求组装4.1 normalizeMessagesForAPI — 消息整形// utils/messages.ts:1989functionnormalizeMessagesForAPI(messages:Message[],tools:Tools){// ① 重排附件消息 过滤虚拟消息messagesreorderAttachmentsForAPI(messages).filter(m!m.isVirtual)// ② 剥离超大图片/PDF 块避免反复 400messagesstripMediaBlocksForKnownErrors(messages)// ③ 合并同 message.id 的 assistant 消息// (流式返回时 thinking/tool_use 分散在不同消息中)// ④ 合并相邻 user 消息// ⑤ 注入 [id:xxx] 标签SnipTool 引用if(snipEnabled){for(msgofuserMessages){appendMessageTag(msg,[id:${shortId(msg.uuid)}])}}// ⑥ 清理末尾思考块、空白 assistant 等边界情况// ⑦ 验证所有图片不超 API 限制returnsanitized}4.2 addCacheBreakpoints — 缓存标记注入// claude.ts:3063functionaddCacheBreakpoints(messages,enableCaching,...,newCacheEdits,pinnedEdits){// ① 在倒数第二条消息放 cache_control唯一入口constmarkerIndexmessages.length-1messages[markerIndex].content.push({cache_control:{type:ephemeral}})// ② 给缓存前缀中的 tool_result 加 cache_reference 名字for(msgofmessagesBefore(markerIndex)){for(blockofmsg.content){if(block.typetool_result){block.cache_referenceblock.tool_use_id}}}// ③ 重插之前 pin 的 cache_edits在原始位置for(pinnedofpinnedEdits){insertBlockAfterToolResults(messages[pinned.userMessageIndex].content,pinned.block)}// ④ 注入本轮新的 cache_editsMicrocompact 产生的删除指令if(newCacheEdits){insertBlockAfterToolResults(messages[lastUserMsg].content,newCacheEdits)pinCacheEdits(lastUserMsgIndex,newCacheEdits)}returnmessages}4.3 paramsFromContext — 拼装完整请求// claude.ts:1699{model:claude-sonnet-4-6,system:[// system prompt 拆成多个 text block{type:text,text:你是 Claude Code...,cache_control:{type:ephemeral}},{type:text,text:...工具定义...},{type:text,text:...安全规则...,cache_control:{type:ephemeral}},],messages:addCacheBreakpoints(...),// 经过 normalized cache 标记tools:[// 完整的工具列表{name:Read,input_schema:{...}},{name:Write,input_schema:{...}},{name:Bash,input_schema:{...}},// ...共 30 个工具],thinking:{type:adaptive},// 自适应思考模式max_tokens:24000,// 最大输出 token 数betas:[// 实验性功能 headercache-editing-2025-07-29,context-1m-2025-12-01,],}五、阶段四发送请求与流式接收5.1 发送 HTTP 请求// claude.tsconststreamawaitclient.beta.messages.stream({...paramsFromContext(retryContext)})5.2 流式接收事件循环// query.tsforawait(constmessageofdeps.callModel({messages,systemPrompt,tools,...})){switch(message.type){casestream_event:// content_block_start / content_block_delta// content_block_delta.delta.text → 逐字显示在 UIyieldmessagebreakcaseassistant:// 一条完整的 assistant 回复到达// 可能包含: text tool_use blocksmutableMessages.push(message)recordTranscript(message)// 检查是否有 tool_useconsttoolUseBlocksmessage.content.filter(bb.typetool_use)if(toolUseBlocks.length0){needsFollowUptrue// 注册到流式工具执行器for(consttooloftoolUseBlocks){streamingToolExecutor.addTool(tool,message)}}yieldmessagebreakcaseuser:// 工具执行结果mutableMessages.push(message)recordTranscript(message)yieldmessagebreak}}六、阶段五工具执行循环6.1 工具执行// streamingToolExecutor.getCompletedResults()for(constresultofexecutor.getCompletedResults()){if(result.message){// result.message 是包含 tool_result 的 user messagemutableMessages.push(result.message)yieldresult.message toolResults.push(result.message)}}6.2 判断是否继续// query.tsif(needsFollowUp!aborted){// 检查 token 是否超限if(isAtBlockingLimit){// → 发送 Prompt too long 错误break}// 回到循环顶部带着新的 tool_result 再做一轮// Snip → Micro → Collapse → (AutoCompact) → APImessagesForQuery[...previousMessages,...newToolResults]continue// ← 回到 do-while 循环}6.3 可能出现的问题Tool 执行循环: 用户输入 → assistant(text tool_use) → tool_result → assistant(text tool_use) → tool_result → ... (循环直到 stop_reason end_turn) → assistant(text only) ✓ 退出在这个过程中如果遇到 API 413 (prompt too long)Collapse 恢复→ drain 所有 staged fold → 重试Reactive Compact→ 紧急 LLM 压缩 → 重试Max Output Tokens 恢复→ 增加输出 token 上限 → 重试七、阶段六完成后台活动query loop 结束后 │ ├─ recordTranscript() — 确保所有消息写入 JSONL ├─ ctx-agent (Collapse 启用时) — 分析本轮对话找出可折叠区间 ├─ extractMemories() (Session Memory 启用时) — 更新会话记忆文件 ├─ 遥测事件上报 — 所有 tengu_* 事件发送到分析平台 └─ 清理 — clear 各种临时状态八、完整时序图时间线 ──────────────────────────────────────────────────────────► 用户输入 修复 foo.ts 的 login │ ├─ QueryEngine.submitMessage() │ ├─ mutableMessages.push(userMsg51) │ └─ recordTranscript([userMsg51]) │ ├─ query() 循环入口 │ │ │ ├─ Round 1 ────────────────────────────────────── │ │ ├─ Snip Compact → 少 2 条消息 │ │ ├─ Microcompact → 生成 cache_edits │ │ ├─ Context Collapse → 1 段被折叠 │ │ ├─ AutoCompact → 跳过未超阈值 │ │ ├─ normalizeMessagesForAPI() │ │ ├─ addCacheBreakpoints() │ │ ├─ paramsFromContext() │ │ ├─ HTTP POST → API │ │ │ ↓ 流式接收 │ │ ├─ assistant(text tool_use: Read) │ │ ├─ 工具执行: Read foo.ts │ │ └─ tool_result(foo.ts) │ │ │ ├─ Round 2 (有 tool_use → 继续) ───────────────── │ │ ├─ Snip → Micro → Collapse → (AutoCompact) │ │ ├─ API 请求 (含前一轮 Read 结果 cache_edits) │ │ ├─ assistant(text tool_use: Edit) │ │ ├─ 工具执行: Edit foo.ts │ │ └─ tool_result(edit ok) │ │ │ ├─ Round 3 ────────────────────────────────────── │ │ ├─ API 请求 │ │ ├─ assistant(text: 修复完成login 函数...) │ │ └─ stop_reason: end_turn ✓ │ │ │ └─ needsFollowUp false → 退出 │ ├─ 用户看到最终回复 │ └─ 后台 ├─ ctx-agent 分析对话 → stage 新的折叠 ├─ extractMemories → 更新 session memory └─ 遥测上报九、关键设计决策9.1 mutableMessages — 真相源QueryEngine.mutableMessages是内存中的权威对话历史。所有操作push、snip、slash command都直接修改这个数组。API 层拿它做快照 ([...mutableMessages])压缩层在快照上做投影。REPL 同步读它来做 UI 渲染。9.2 双重视图分离mutableMessages全量REPL 可见 │ ├─ projectSnippedView() → Snip 投影 ├─ projectView() → Collapse 投影 │ ▼ messagesForQuery发给 API 的视图REPL 永远能看到完整历史API 看到的是压缩后的精简版。两者不互相干扰。9.3 压缩顺序不可变Snip 必须在 Microcompact 之前 → 删掉消息减少 MC 扫描量 Microcompact 必须在 Collapse 之前 → cache_edits 在 Collapse 之前注入 Collapse 必须在 AutoCompact 之前 → Collapse 可能解决问题避免 LLM 调用 AutoCompact 兜底 → 前三级都没解决时出手9.4 Query Loop 的 “多轮” 特性一次用户输入可能触发多次 API 调用模型说让我读文件→ Read → 结果回来后模型继续 → 可能 Edit → 再回来 → …。每一轮都完整走一遍压缩管线。但每轮只处理新增的内容前几轮的内容已经被之前的压缩处理过了。十、总结Claude Code 的对话链路是一条精心设计的流水线用户输入→ QueryEngine 统一收口四级压缩→ 逐级过滤从轻到重API 组装→ normalize cache_breakpoints param construction流式处理→ 逐字显示 工具自动执行多轮循环→ assistant → tool_use → tool_result → 继续后台收尾→ 持久化 预分析 遥测这条链路的每层都有明确的职责边界、优雅的降级策略、以及对 prompt cache 的极致尊重。本文全部来自博主学习 Claude Code 源码时的笔记和与 AI 的问答整理。