【OpenClaw】通过 Nanobot 源码学习架构---(10)Heartbeat

📅 2026/6/25 14:17:09
【OpenClaw】通过 Nanobot 源码学习架构---(10)Heartbeat
0x00 概要OpenClaw 应该有40万行代码阅读理解起来难度过大因此本系列通过Nanobot来学习 OpenClaw 的特色。Nanobot是由香港大学数据科学实验室(HKUDS)开源的超轻量级个人 AI 助手框架定位为Ultra-Lightweight OpenClaw。非常适合学习Agent架构。HeartbeatService 组件是 Nanobot 实现 “周期性任务检测与执行” 的核心模块比如根据HEARTBEAT.md来周期性唤醒Nanobot执行操作监控在运行吗日志里有报错吗如果出问题了Agent 会主动给你发消息。通过_HEARTBEAT_TOOLLLM 工具调用的轻量化设计HeartbeatService 组件仅用不到 200 行代码就完成了 OpenClaw 同等核心的 “定时唤醒 Agent 检查任务” 能力。注因为最近看的文章太多所以如果有遗漏参考资料还请读者指出谢谢。0x01 基本功能1.1 整体作用HeartbeatService是 Nanobot 的周期性任务检测与执行服务其放弃传统的 “硬编码规则解析”改用 LLM 驱动的智能决策适配自然语言描述的任务场景基于 asyncio 实现轻量化的周期性调度无需依赖 Celery 等重型定时任务框架。HeartbeatService的核心职责/特色是按配置的时间间隔默认 30 分钟自动唤醒读取工作目录下的HEARTBEAT.md文件并执行 HEARTBEAT.md 中的周期性任务两阶段执行模式HeartbeatService采用 “两阶段执行” 架构将 “决策” 和 “执行” 解耦既保证决策的智能化又实现执行逻辑的解耦LLM 驱动的智能决策放弃传统的 “关键字匹配 / 正则解析” 方式通过 LLM 虚拟工具调用的方式分析HEARTBEAT.md内容判断是否有任务需要执行避免了 “HEARTBEAT_OK” 这类硬编码令牌的不稳定性适配自然语言描述的任务场景。灵活的回调执行扩展通过on_execute和on_notify回调函数解耦 “任务执行” 和 “结果推送” 逻辑无需修改心跳服务核心代码即可适配不同的执行 / 推送策略。若检测到任务触发预设的执行回调on_execute通过 Agent 完整执行任务执行完成后触发通知回调on_notify将结果推送至指定通道如 CLI / 第三方平台支持手动触发心跳检测兼顾 “自动周期性执行” 和 “手动应急触发” 需求。1.2 应用场景HeartbeatService 的应用场景如下持续监控定期检查某些条件是否满足例如监控文件变化、API 状态、外部事件等代理任务执行长时间运行的监控或检查任务无需用户持续交互即可主动采取行动主动维护定期整理文件、清理临时数据检查系统健康状况状态同步定期同步外部服务的状态保持本地数据与远程服务的同步1.3 Claw01.3.1 架构Claw0中一个定时器线程检查该不该运行, 然后将任务排入与用户消息相同的队列其架构如下Main Lane (user input): User Input -- lane_lock.acquire() ------- LLM -- Print (blocking: always wins) Heartbeat Lane (background thread, 1s poll): should_run()? |no -- sleep 1s |yes _execute(): lane_lock.acquire(blockingFalse) |fail -- yield (user has priority) |success build prompt from HEARTBEAT.md SOUL.md MEMORY.md | run_agent_single_turn() | parse: HEARTBEAT_OK? -- suppress meaningful text? -- duplicate? -- suppress |no output_queue.append() Cron Service (background thread, 1s tick): CRON.json -- load jobs -- tick() every 1s | for each job: enabled? -- due? -- _run_job() | error? -- consecutive_errors -- 5? -- auto-disable |ok consecutive_errors 0 -- log to cron-runs.jsonl其要点如下Lane 互斥:threading.Lock在用户和心跳之间共享. 用户总是赢 (阻塞获取); 心跳让步 (非阻塞获取).should_run(): 每次心跳尝试前的 4 个前置条件检查.HEARTBEAT_OK: agent 用来表示没有需要报告的内容的约定.CronService: 3 种调度类型 (at,every,cron), 连续错误 5 次后自动禁用.输出队列: 后台结果通过线程安全的列表输送到 REPL.1.3.2 核心架构Lane 互斥最重要的设计原则: 用户消息始终优先.lane_lock threading.Lock() # Main lane: 阻塞获取. 用户始终能进入. lane_lock.acquire() try: # 处理用户消息, 调用 LLM finally: lane_lock.release() # Heartbeat lane: 非阻塞获取. 用户活跃时让步. def _execute(self) - None: acquired self.lane_lock.acquire(blockingFalse) if not acquired: return # 用户持有锁, 跳过本次心跳 self.running True try: instructions, sys_prompt self._build_heartbeat_prompt() response run_agent_single_turn(instructions, sys_prompt) meaningful self._parse_response(response) if meaningful and meaningful.strip() ! self._last_output: self._last_output meaningful.strip() with self._queue_lock: self._output_queue.append(meaningful) finally: self.running False self.last_run_at time.time() self.lane_lock.release()前置条件链四个检查必须全部通过. 锁的检测在_execute()中单独进行,以避免 TOCTOU 竞态条件.def should_run(self) - tuple[bool, str]: if not self.heartbeat_path.exists(): return False, HEARTBEAT.md not found if not self.heartbeat_path.read_text(encodingutf-8).strip(): return False, HEARTBEAT.md is empty elapsed time.time() - self.last_run_at if elapsed self.interval: return False, finterval not elapsed ({self.interval - elapsed:.0f}s remaining) hour datetime.now().hour s, e self.active_hours in_hours (s hour e) if s e else not (e hour s) if not in_hours: return False, foutside active hours ({s}:00-{e}:00) if self.running: return False, already running return True, all checks passed1.4 ZeroClaw我们再看看 ZeroClaw。下图是来自其官方文档的“How the daemon keeps components alive”。从中看看 Cron 和 Heartbeat 的思路。根据 ZeroClaw 的架构设计这个流程图涵盖了以下核心逻辑组件并行启动Daemon 启动后会立即并行生成四个核心部分状态写入器每5秒刷新、网关、渠道、心跳和调度器。条件检查渠道、心跳和调度器会根据配置文件config.toml中的设置决定是否启动对应的 Worker。例如如果未配置 Cron则直接标记为 OK 并跳过。监督与循环每个核心组件Gateway, Channels, Heartbeat, Scheduler都拥有独立的Supervisor监督者和Loop循环。异常处理如果组件意外退出或报错系统会记录错误并进行退避等待Backoff随后尝试重新进入循环确保服务的稳定性。核心功能Gateway负责 HTTP/WebSocket 服务处理外部连接。Channels连接 Telegram、Discord 等聊天平台。Heartbeat定期执行后台感知任务赋予 AI “自主意识”。Scheduler基于 Cron 表达式触发定时任务。优雅退出当接收到CtrlC信号时Daemon 会中止所有任务并等待线程结束确保数据完整保存后停止。0x02 详细分析HeartbeatService 实现了一个周期性的自主唤醒系统定期检查是否有待处理的任务无需外部触发。这是一个任务驱动的唤醒机制通过读取 HEARTBEAT.md 文件了解待办任务使用 LLM 判断是否需要执行这些任务 / 并作相应执行概要流程如下等待 interval_s 秒 ↓ HeartbeatService.tick() ↓ HeartbeatService._decide() ↓ 输入HEARTBEAT.md 文件内容 ↓ 构建 LLM 提示 - 系统角色You are a heartbeat agent. - 用户输入HEARTBEAT.md 内容 - 工具_HEARTBEAT_TOOL ↓ LLM 处理请求 - 分析 HEARTBEAT.md 内容 - 决定是否需要执行任务 - 通过虚拟工具调用返回决策 ↓ 解析工具调用结果 - act ↓ 返回 (action, tasks) ↓ ├── Heartbeat: OK │ 无任务执行 │ ├── 执行任务并通知结果 │ 1. 调用 on_execute │ 2. 调用 on_notify │ └── 记录执行失败参见如下2.1 待办任务机制2.1.1 AGENTS.mdAGENTS.md 文件会用来指导 agent 如何管理 HEARTBEAT.md 文件中的周期性任务。## Heartbeat Tasks HEARTBEAT.md is checked every 30 minutes. Use file tools to manage periodic tasks: - **Add**: edit_file to append new tasks - **Remove**: edit_file to delete completed tasks - **Rewrite**: write_file to replace all tasks When the user asks for a recurring/periodic task, update HEARTBEAT.md instead of creating a one-time cron reminder.2.1.2 HEARTBEAT.mdHEARTBEAT.md是一个标记文件包含需要定期检查的任务列表文件位于工作空间根目录可由agent自主更新。agent 可以使用文件工具如 edit_file、write_file更新 HEARTBEAT.md支持添加、移除或重写周期性任务。agent 也可以通过技能自动管理心跳任务例如当用户请求周期性任务时agent 会更新 HEARTBEAT.md 而不是创建一次性提醒。# Heartbeat Tasks This file is checked every 30 minutes by your nanobot agent. Add tasks below that you want the agent to work on periodically. If this file has no tasks (only headers and comments), the agent will skip the heartbeat. ## Active Tasks !-- Add your periodic tasks below this line -- ## Completed !-- Move completed tasks here or delete them --2.1.3 MimiClaw我们也用MimiClaw 来进行对比验证心跳服务会定期读取 SPIFFS 上的HEARTBEAT.md检查是否有待办事项。如果发现未完成的条目非空行、非标题、非已勾选的- [x]就会向 Agent 循环发送提示让 AI 自主处理。这让 MimiClaw 变成一个主动型助理 — 把任务写入HEARTBEAT.md机器人会在下一次心跳周期自动拾取执行默认每 30 分钟。2.2 两阶段执行模式HeartbeatService 秉承两阶段执行机制Phase 1 (决策)读取 HEARTBEAT.md通过 LLM 虚拟工具调用判断是否有活跃任务返回 skip 或 run 决策。即HEARTBEAT.md 内容 → LLM → skip 或 run 决策Phase 2 (执行)只有当 Phase 1 返回 run 时才触发任务执行通过回调函数执行实际的 agent 操作。即任务摘要 → AgentLoop → 执行结果 → 通知 Periodic heartbeat service that wakes the agent to check for tasks. Phase 1 (decision): reads HEARTBEAT.md and asks the LLM — via a virtual tool call — whether there are active tasks. This avoids free-text parsing and the unreliable HEARTBEAT_OK token. Phase 2 (execution): only triggered when Phase 1 returns run. The on_execute callback runs the task through the full agent loop and returns the result to deliver. try: # Phase 1调用LLM做决策获取action和tasks action, tasks await self._decide(content) # 若决策为skip无任务记录日志并返回 if action ! run: return # 若决策为run有任务记录日志并执行Phase 2 # 若配置了执行回调触发回调执行任务 if self.on_execute: response await self.on_execute(tasks) # 若执行有结果且配置了通知回调推送结果 if response and self.on_notify: await self.on_notify(response)具体流程图如下2.3 Phase 1此部分是LLM 调用流程_decide 方法。2.3.1 方法入口async def _decide(self, content: str) - tuple[str, str]: Phase 1: ask LLM to decide skip/run via virtual tool call.2.3.2 步骤 1: 构建请求消息messages [ { role: system, content: You are a heartbeat agent. Call heartbeat tool to report your decision. }, { role: user, content: ( Review the following HEARTBEAT.md and decide whether there are active tasks.\n f{content} ) }, ]系统消息设定角色为 heartbeat agent明确告知任务用户消息包含 HEARTBEAT.md 文件内容和用户输入的任务描述2.3.3 步骤 2: 调用 LLM Provider 的 chat 方法response await self.provider.chat( messagesmessages, tools_HEARTBEAT_TOOL, # 传入工具定义 modelself.model, # 使用配置的模型 )调用与主 Agent 相同的 provider 实例传入工具列表只包含 heartbeat 工具传入模型参数model、temperature、max_tokens 使用默认值返回 LLMResponse 对象2.3.4 步骤 3: 解析工具调用响应if not response.has_tool_calls: return skip, # LLM 没有调用工具返回跳过 args response.tool_calls[0].arguments # 获取第一个工具调用的参数 return args.get(action, skip), args.get(tasks, )检查是否有工具调用response.has_tool_calls提取工具调用参数response.tool_calls[0].arguments解析 action 参数args.get(action, skip)解析 tasks 参数args.get(tasks, )2.3.5 _HEARTBEAT_TOOL上面步骤2使用了_HEARTBEAT_TOOL因此我们做特殊分析。LLM 需要分析 HEARTBEAT.md 中的任务是否需要执行。当时间到触发回调之后在 HeartbeatService._decide() 中会显式让LLM调用 _HEARTBEAT_TOOL。功能_HEARTBEAT_TOOL 是一个虚拟工具用于LLM决策跳过或者运行任务LLM 被要求调用这个工具并返回适当的参数避免了自由文本解析的不确定性。_HEARTBEAT_TOOL 定义了action参数skip或者run和 task 参数任务摘要。根据任务状态决定返回 skip无事可做或 run有活动任务。这个定义的核心价值是约束 LLM 的输出格式—— 让原本返回自然语言的 LLM必须按照固定结构返回 “决策结果”方便代码后续解析而非人工处理。内容_HEARTBEAT_TOOL 的内容如下Heartbeat service - periodic agent wake-up to check for tasks. # 定义心跳服务的虚拟工具SchemaOpenAI Function Call格式 # 核心作用让LLM通过标准化工具调用的方式返回决策结果避免自由文本解析的不稳定性 _HEARTBEAT_TOOL [ { type: function, function: { name: heartbeat, # 工具名称固定为heartbeatLLM调用时必须匹配 description: Report heartbeat decision after reviewing tasks., # 工具描述告知LLM该工具的用途 parameters: { # 工具参数Schema定义LLM返回的决策结果格式 type: object, properties: { action: { # 核心决策参数skip无任务/run有任务 type: string, enum: [skip, run], description: skip nothing to do, run has active tasks, }, tasks: { # 任务描述参数仅run时必填为自然语言的任务摘要 type: string, description: Natural-language summary of active tasks (required for run), }, }, required: [action], # 强制要求LLM返回action参数 }, }, } ]如何使用_HEARTBEAT_TOOL的设计逻辑是系统提示强制约束 LLM 的行为告诉它 “你是心跳代理必须调用 heartbeat 工具”避免 LLM 返回无关的自然语言用户提示传递决策依据把HEARTBEAT.md的内容作为输入让 LLM 有分析的素材。_HEARTBEAT_TOOL的调用逻辑如下它是 LLM 工具调用的 “契约定义”通过self.provider.chat的tools参数传入 LLMLLM 按其规范返回结构化决策结果代码再解析response.tool_calls获取最终决策await self.provider.chat 是_HEARTBEAT_TOOL被 “激活” 的核心LLM 提供商如 OpenAI的chat接口会解析tools参数理解 “heartbeat 工具” 的调用规范LLM 会基于HEARTBEAT.md内容分析然后按照_HEARTBEAT_TOOL的参数规范生成工具调用结果而非普通文本。LLM 被赋予的角色是 “heartbeat agent”心跳代理其唯一职责是读取HEARTBEAT.md内容判断是否有活跃任务按_HEARTBEAT_TOOL的规范调用heartbeat工具返回 “skip/run” 决策。这个角色定位是 Nanobot “超轻量级” 的体现 ——LLM 只做单一决策不处理复杂任务执行保证资源消耗最小。# 核心异步方法Phase 1 - LLM决策判断是否有任务需要执行 # 参数content - HEARTBEAT.md的内容 # 返回值(action, tasks) - action为skip/runtasks为任务摘要 async def _decide(self, content: str) - tuple[str, str]: Phase 1: ask LLM to decide skip/run via virtual tool call. Returns (action, tasks) where action is skip or run. # 调用LLM提供商的chat接口触发虚拟工具调用 response await self.provider.chat( messages[ # 系统提示告知LLM其角色为心跳代理必须调用heartbeat工具返回决策 {role: system, content: You are a heartbeat agent. Call the heartbeat tool to report your decision.}, # 用户提示传入HEARTBEAT.md内容让LLM分析并决策 {role: user, content: ( Review the following HEARTBEAT.md and decide whether there are active tasks.\n\n f{content} )},