OpenHarness 智能体与大模型交互的数据流

📅 2026/7/2 10:37:23
OpenHarness 智能体与大模型交互的数据流
1.openharness源码解析。参考的文章花一天读完 OpenHarness一个 11,733 行的 Agent Harness 到底长什么样https://www.joyehuang.me/blog/20260410---openharnessphase1/postOpenHarness 架构说明文档https://juejin.cn/post/7627340015571959850Harness 将大模型从一个纯推理引擎转变为一个感知-推理-行动的智能体agent。它填补的是行动能力和环境反馈——让模型能够验证自己的猜想、修正自己的错误、在真实状态上迭代。大模型 Harness 推理 行动 智能体。2.是通过在源码中加log来获取数据流的。通过数据流可以看到每次交互的真实数据可以看到大模型缓存命中的原因是因为agent每次都要发送前面的会话内容。3.一个是在run_query中一个是在https中的加密解密出。/home/claw/OpenHarness/src/openharness/engine/query.py_run_query_entry_count 0 async def run_query( context: QueryContext, messages: list[ConversationMessage], ) - AsyncIterator[tuple[StreamEvent, UsageSnapshot | None]]: Run the conversation loop until the model stops requesting tools. Auto-compaction is checked at the start of each turn. When the estimated token count exceeds the models auto-compact threshold, the engine first tries a cheap microcompact (clearing old tool result content) and, if that is not enough, performs a full LLM-based summarization of older messages. global _run_query_entry_count _run_query_entry_count 1 with open(/home/claw/testclaude/run_query_total.txt, a) as _f: _f.write(f\n{# * 80}\n) _f.write(f### run_query 入口 — 第 {_run_query_entry_count} 次进入 ###\n) _f.write(f{# * 80}\n) from openharness.services.compact import ( AutoCompactState, auto_compact_if_needed, ) compact_state AutoCompactState() reactive_compact_attempted False last_compaction_result: tuple[list[ConversationMessage], bool] (messages, False) effective_max_tokens _bounded_completion_tokens( context.max_tokens, context.context_window_tokens, ) reported_token_clamp False async def _stream_compaction( *, trigger: str, force: bool False, ) - AsyncIterator[tuple[StreamEvent, UsageSnapshot | None]]: nonlocal last_compaction_result progress_queue: asyncio.Queue[CompactProgressEvent] asyncio.Queue() async def _progress(event: CompactProgressEvent) - None: await progress_queue.put(event) task asyncio.create_task( auto_compact_if_needed( messages, api_clientcontext.api_client, modelcontext.model, system_promptcontext.system_prompt, statecompact_state, progress_callback_progress, forceforce, triggertrigger, hook_executorcontext.hook_executor, carryover_metadatacontext.tool_metadata, context_window_tokenscontext.context_window_tokens, auto_compact_threshold_tokenscontext.auto_compact_threshold_tokens, ) ) while True: try: event await asyncio.wait_for(progress_queue.get(), timeout0.05) yield event, None except asyncio.TimeoutError: if task.done(): break continue while not progress_queue.empty(): yield progress_queue.get_nowait(), None last_compaction_result await task return turn_count 0 while context.max_turns is None or turn_count context.max_turns: turn_count 1 with open(/home/claw/testclaude/run_query_total.txt, a) as _f: _f.write(f\n TURN {turn_count} \n) if effective_max_tokens ! context.max_tokens and not reported_token_clamp: reported_token_clamp True yield StatusEvent( message( Requested max_tokens f{context.max_tokens} exceeds the safe per-request output cap; fusing {effective_max_tokens}. ) ), None # --- auto-compact check before calling the model --------------- async for event, usage in _stream_compaction(triggerauto): yield event, usage compacted_messages, was_compacted last_compaction_result if compacted_messages is not messages: messages[:] compacted_messages # --------------------------------------------------------------- # --- image preprocessing: convert ImageBlocks to text for non-vision models --- async for event in _preprocess_images_in_messages(messages, context): yield event, None # ----------------------------------------------------------------------------- final_message: ConversationMessage | None None usage UsageSnapshot() # --- DEBUG: log API request to file (发送数据) --- if True: import json as _json _tools context.tool_registry.to_api_schema() _req ApiMessageRequest( modelcontext.model, messagesmessages, system_promptcontext.system_prompt, max_tokenseffective_max_tokens, tools_tools, effortcontext.effort, ) with open(/home/claw/testclaude/run_query_total.txt, a) as _f: _f.write(\n * 80 \n) _f.write( 发送数据 - API请求 \n) _f.write(fTURN {turn_count}\n) _f.write(fMODEL: {_req.model}\n) _f.write(fMAX_TOKENS: {_req.max_tokens}\n) _f.write(fEFFORT: {_req.effort}\n) _f.write(fSYSTEM_PROMPT ({len(_req.system_prompt) if _req.system_prompt else 0} chars):\n) _f.write((_req.system_prompt or (none)) \n) _f.write(fTOOLS ({len(_tools)}):\n) for _t in _tools: _f.write(f - {_t.get(name, ?)}: {_t.get(description, )}\n) _f.write(fMESSAGES ({len(messages)}):\n) for _i, _m in enumerate(messages): _f.write(f [{_i}] role{_m.role!r} content_blocks{len(getattr(_m, content, []))}\n) for _j, _b in enumerate(getattr(_m, content, [])): _bt type(_b).__name__ if hasattr(_b, text): _txt getattr(_b, text, ) _f.write(f [{_j}] {_bt} text{_txt!r}\n) elif hasattr(_b, content): _cnt getattr(_b, content, ) _f.write(f [{_j}] {_bt} content{_cnt!r}\n) elif hasattr(_b, name) and hasattr(_b, input): _f.write(f [{_j}] {_bt} name{getattr(_b, name, ?)!r} input{_json.dumps(getattr(_b, input, {}), ensure_asciiFalse)!r}\n) else: _f.write(f [{_j}] {_bt}\n) _f.write( * 80 \n) # --- end debug --- try: async for event in context.api_client.stream_message( ApiMessageRequest( modelcontext.model, messagesmessages, system_promptcontext.system_prompt, max_tokenseffective_max_tokens, toolscontext.tool_registry.to_api_schema(), effortcontext.effort, ) ): # --- DEBUG: log stream events to file (接收数据) --- if True: import json as _json with open(/home/claw/testclaude/run_query_total.txt, a) as _f: if isinstance(event, ApiTextDeltaEvent): _f.write(f 接收数据 - TEXT_DELTA: {event.text!r}\n) elif isinstance(event, ApiRetryEvent): _f.write(f 接收数据 - RETRY: attempt{event.attempt}/{event.max_attempts} delay{event.delay_seconds:.1f}s msg{event.message!r}\n) elif isinstance(event, ApiMessageCompleteEvent): _f.write(f 接收数据 - MESSAGE_COMPLETE: stop_reason{event.stop_reason!r}\n) _f.write(f 接收数据 - USAGE: input{event.usage.input_tokens} output{event.usage.output_tokens}\n) _msg event.message _f.write(f 接收数据 - FINAL_MESSAGE role{_msg.role!r} content_blocks{len(getattr(_msg, content, []))}\n) for _j, _b in enumerate(getattr(_msg, content, [])): _bt type(_b).__name__ if hasattr(_b, text): _txt getattr(_b, text, ) _f.write(f [{_j}] {_bt} text{_txt!r}\n) elif hasattr(_b, name) and hasattr(_b, input): _f.write(f [{_j}] {_bt} name{getattr(_b, name, ?)!r} input{_json.dumps(getattr(_b, input, {}), ensure_asciiFalse)!r}\n) elif hasattr(_b, name): _f.write(f [{_j}] {_bt} name{getattr(_b, name, ?)!r}\n) elif hasattr(_b, content): _f.write(f [{_j}] {_bt} content{getattr(_b, content, )!r}\n) else: _f.write(f [{_j}] {_bt}\n) _tus getattr(_msg, tool_uses, []) or [] if _tus: _f.write(f 接收数据 - TOOL_USES ({len(_tus)}):\n) for _tu in _tus: _f.write(f - name{_tu.name!r} id{_tu.id!r} input{_json.dumps(getattr(_tu, input, {}), ensure_asciiFalse)!r}\n) _f.write(\n) # --- end debug --- if isinstance(event, ApiTextDeltaEvent): yield AssistantTextDelta(textevent.text), None continue if isinstance(event, ApiRetryEvent): yield StatusEvent( message( fRequest failed; retrying in {event.delay_seconds:.1f}s f(attempt {event.attempt 1} of {event.max_attempts}): {event.message} ) ), None continue if isinstance(event, ApiMessageCompleteEvent): final_message event.message usage event.usage except Exception as exc: error_msg str(exc) if _is_completion_token_limit_error(exc): supported_limit _extract_completion_token_limit(exc) if supported_limit is not None and effective_max_tokens supported_limit: previous_max_tokens effective_max_tokens effective_max_tokens supported_limit yield StatusEvent( message( fModel rejected max_tokens{previous_max_tokens}; fretrying with provider limit {effective_max_tokens}. ) ), None turn_count max(0, turn_count - 1) continue if not reactive_compact_attempted and _is_prompt_too_long_error(exc): reactive_compact_attempted True yield StatusEvent(messageREACTIVE_COMPACT_STATUS_MESSAGE), None async for event, usage in _stream_compaction(triggerreactive, forceTrue): yield event, usage compacted_messages, was_compacted last_compaction_result if compacted_messages is not messages: messages[:] compacted_messages if was_compacted: continue if connect in error_msg.lower() or timeout in error_msg.lower() or network in error_msg.lower(): yield ErrorEvent(messagefNetwork error: {error_msg}. Check your internet connection and try again.), None else: yield ErrorEvent(messagefAPI error: {error_msg}), None return if final_message is None: raise RuntimeError(Model stream finished without a final message) coordinator_context_message: ConversationMessage | None None if context.system_prompt.startswith(You are a **coordinator**.): if messages and messages[-1].role user and messages[-1].text.startswith(# Coordinator User Context): coordinator_context_message messages.pop() if final_message.role assistant and final_message.is_effectively_empty(): log.warning(dropping empty assistant message from provider response) yield ErrorEvent( message( Model returned an empty assistant message. The turn was ignored to keep the session healthy. ) ), usage return messages.append(final_message) yield AssistantTurnComplete(messagefinal_message, usageusage), usage if coordinator_context_message is not None: messages.append(coordinator_context_message) if not final_message.tool_uses: if context.hook_executor is not None: await context.hook_executor.execute( HookEvent.STOP, { event: HookEvent.STOP.value, stop_reason: tool_uses_empty, }, ) return tool_calls final_message.tool_uses # --- DEBUG: log tool calls to file (接收数据 - 工具调用) --- if True: import json as _json with open(/home/claw/testclaude/run_query_total.txt, a) as _f: _f.write(f 接收数据 - TOOL_CALLS ({len(tool_calls)}):\n) for _tc in tool_calls: _f.write(f - name{_tc.name!r} id{_tc.id!r}\n) _f.write(f input{_json.dumps(getattr(_tc, input, {}), indent4, ensure_asciiFalse)}\n) # --- end debug --- if len(tool_calls) 1: # Single tool: sequential (stream events immediately) tc tool_calls[0] yield ToolExecutionStarted(tool_nametc.name, tool_inputtc.input), None try: result await _execute_tool_call(context, tc.name, tc.id, tc.input) except Exception as exc: log.exception(tool execution raised: name%s id%s, tc.name, tc.id) result ToolResultBlock( tool_use_idtc.id, contentfTool {tc.name} failed: {type(exc).__name__}: {exc}, is_errorTrue, ) # --- DEBUG: log single tool result (发送数据 - 工具执行结果) --- if True: with open(/home/claw/testclaude/run_query_total.txt, a) as _f: _f.write(f 发送数据 - TOOL_RESULT: name{tc.name!r} is_error{result.is_error}\n) _f.write(f output{result.content!r}\n) # --- end debug --- yield ToolExecutionCompleted( tool_nametc.name, outputresult.content, is_errorresult.is_error, metadataresult.result_metadata, ), None tool_results [result] else: # Multiple tools: execute concurrently, emit events after for tc in tool_calls: yield ToolExecutionStarted(tool_nametc.name, tool_inputtc.input), None async def _run(tc): return await _execute_tool_call(context, tc.name, tc.id, tc.input) # Use return_exceptionsTrue so a single failing tool does not abandon # its siblings as cancelled coroutines and leave the conversation with # un-replied tool_use blocks (Anthropics API rejects the next request # on the session if any tool_use is missing a matching tool_result). raw_results await asyncio.gather( *[_run(tc) for tc in tool_calls], return_exceptionsTrue ) tool_results [] for tc, result in zip(tool_calls, raw_results): if isinstance(result, BaseException): log.exception( tool execution raised: name%s id%s, tc.name, tc.id, exc_inforesult, ) result ToolResultBlock( tool_use_idtc.id, contentfTool {tc.name} failed: {type(result).__name__}: {result}, is_errorTrue, ) tool_results.append(result) for tc, result in zip(tool_calls, tool_results): # --- DEBUG: log multi tool result (发送数据 - 工具执行结果) --- if True: with open(/home/claw/testclaude/run_query_total.txt, a) as _f: _f.write(f 发送数据 - TOOL_RESULT: name{tc.name!r} is_error{result.is_error}\n) _f.write(f output{result.content!r}\n) # --- end debug --- yield ToolExecutionCompleted( tool_nametc.name, outputresult.content, is_errorresult.is_error, metadataresult.result_metadata, ), None messages.append(ConversationMessage(roleuser, contenttool_results)) if context.max_turns is not None: raise MaxTurnsExceeded(context.max_turns) raise RuntimeError(Query loop exited without a max_turns limit or final response)/home/claw/OpenHarness/.venv/lib/python3.11/site-packages/anyio/streams/tls.pyasync def receive(self, max_bytes: int 65536) - bytes: data await self._call_sslobject_method(self._ssl_object.read, max_bytes) if not data: raise EndOfStream # --- DEBUG: log plaintext after TLS decryption --- import datetime as _dt_recv _ts _dt_recv.datetime.now().isoformat() _logpath /home/claw/testclaude/send_receive.txt with open(_logpath, a) as _f: _f.write(f\n{─ * 80}\n) _f.write(f[RECV PLAINTEXT] {_ts} len{len(data)}\n) _f.write(f{─ * 80}\n) _text data.decode(utf-8, errorsreplace) _f.write(_text) if not _text.endswith(\n): _f.write(\n) # --- end debug --- # # NEW LOG (commented out — smart file extraction pretty-print) # _log_tls_plaintext(RECV, data) # return data async def send(self, item: bytes) - None: # --- DEBUG: log plaintext before TLS encryption --- import datetime as _dt_send _ts _dt_send.datetime.now().isoformat() _logpath /home/claw/testclaude/send_receive.txt with open(_logpath, a) as _f: _f.write(f\n{ * 80}\n) _f.write(f[SEND PLAINTEXT] {_ts} len{len(item)}\n) _f.write(f{ * 80}\n) _text item.decode(utf-8, errorsreplace) _f.write(_text) if not _text.endswith(\n): _f.write(\n) # --- end debug --- # # NEW LOG (commented out — smart file extraction pretty-print) # _log_tls_plaintext(SEND, item) # await self._call_sslobject_method(self._ssl_object.write, item)4.这个是完整的openharness会话内容 stm32f407实现跑马灯功能然后review这个功能。 ⏺ ⏵ skill namebrainstorming → 114L ⏺ 我来帮你实现STM32F407的跑马灯功能并进行代码审查。首先让我了解一下当前项目的结构。 ⏵ glob **/* → 1L ⏺ ⏵ bash ls -la → 5L ⏺ ⏵ glob docs/**/* → 1L ⏺ ⏵ bash ls -la docs/ → 4L ⏺ ⏵ bash ls -la docs/superpowers/ → 5L ⏺ 当前工作目录是空的没有现有的STM32项目。让我为你创建一个完整的STM32F407跑马灯项目然后进行代码审查。 设计方案 1. 硬件选择使用STM32F407的GPIO端口如PD12-PD15对应开发板上的4个LED 2. 实现方式 让我开始实现 ⏵ write_file pathmain.c → 1L ⏺ ⏵ write_file pathstm32f4xx_hal_conf.h → 1L ⏺ ⏵ write_file pathMakefile → 1L ⏺ ⏵ write_file pathSTM32F407VGTx_FLASH.ld → 1L ⏺ 现在让我使用review技能对代码进行审查 ⏵ skill namereview → 19L ⏺ ⏵ read_file pathmain.c → 193L ⏺ 代码审查报告 项目文件结构 ╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ │ main.c - 主程序 │ │ stm32f4xx_hal_conf.h - HAL配置文件 │ │ Makefile - 构建脚本 │ │ STM32F407VGTx_FLASH.ld - 链接器脚本 │ ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ ──────────────────────────────────────────────── ✅ 优点 ┌──────────────┬──────────────────────────────────────────────────┐ │ 项目 │ 说明 │ ├──────────────┼──────────────────────────────────────────────────┤ │ 代码结构清晰 │ 函数职责单一注释完整 │ │ 使用HAL库 │ 符合STM32现代开发规范可移植性好 │ │ 错误处理 │ Error_Handler 提供了错误指示机制 │ │ 宏定义 │ LED引脚和参数使用宏定义便于修改 │ │ 静态变量 │ current_led 使用 static 保持状态作用域控制得当 │ └──────────────┴──────────────────────────────────────────────────┘ ──────────────────────────────────────────────── ⚠️ 发现的问题 Critical - 潜在严重问题 1. main.c:180 - Error_Handler 中使用了 HAL_GPIO_TogglePin 和 HAL_Delay但如果错误发生在时钟配置阶段SystemClock_Config 失败SysTick可能尚未正确配置HAL_Delay 将无法正常工作。建议修复 Major - 重要改进建议 2. main.c:82-84 - 每次循环都熄灭所有LED再点亮一个会产生不必要的GPIO操作。可以优化为只操作变化的LED 3. main.c:25 - RUNNING_DELAY_MS 使用宏定义无法在运行时调整。建议添加按键控制速度的功能或至少提供可调接口。 Minor - minor改进 4. Makefile - 依赖 STM32CubeF4 环境变量缺少该变量时的友好提示 5. main.c:14-19 - LED颜色定义与Discovery开发板绑定如果使用其他开发板需要修改。建议添加条件编译或配置注释说明。 ──────────────────────────────────────────────── 代码质量评分 ┌──────────┬────────────┬───────────────────────────────────┐ │ 维度 │ 评分 │ 说明 │ ├──────────┼────────────┼───────────────────────────────────┤ │ 功能性 │ ⭐⭐⭐⭐⭐ │ 跑马灯功能完整实现 │ │ 可读性 │ ⭐⭐⭐⭐⭐ │ 注释清晰命名规范 │ │ 可维护性 │ ⭐⭐⭐⭐☆ │ 宏定义便于修改但缺少配置灵活性 │ │ 健壮性 │ ⭐⭐⭐☆☆ │ Error_Handler在时钟错误时可能失效 │ │ 效率 │ ⭐⭐⭐⭐☆ │ GPIO操作可优化 │ └──────────┴────────────┴───────────────────────────────────┘ ──────────────────────────────────────────────── 总结 这是一个结构良好、功能完整的STM32F407跑马灯实现。主要问题是 Error_Handler 在时钟配置失败时可能无法正常工作建议优先修复。其他改进建议可根据实际需求选择性采纳。 是否需要我修复上述问题 ──────────────────────────────────────────────────────────── model: kimi-k2.5 │ mode: Default shiftenter newline enter send / commands tab mode ↑↓ history esc stop ctrlc exit5.数据流log 和openharness生成的代码在资源里面OpenHarness 智能体与大模型交互的数据流中的 log 和生成的代码https://download.csdn.net/download/weixin_43864601/93054967?spm1001.2014.3001.5501