【Agent Harness】Gliding Horse 的 L2 作战地图:让多 Agent 协作从“摸黑”变成“透明”

📅 2026/6/30 17:32:34
【Agent Harness】Gliding Horse 的 L2 作战地图:让多 Agent 协作从“摸黑”变成“透明”
Gliding Horse 的 L2 作战地图让多 Agent 协作从“摸黑”变成“透明”摘要本文深入解析 Gliding Horse流马框架的 L2 共享黑板设计一套专为多 Agent 协作打造的“实时作战地图”。文章详细拆解了 AgentTrackerAgent 生命体征监控、三级资源锁并发控制、跨任务依赖管理DAG 任务树以及 SharedZoneAgent 间结构化通信四大核心组件。通过将操作系统进程管理思想适配到 AI Agent 协作场景L2 黑板让调度器SA能够实时掌握全局态势实现从“摸黑协作”到“透明调度”的跨越。适合对多 Agent 系统、AI 工程架构感兴趣的开发者阅读。关键词多 Agent 系统共享黑板Agent 协作任务调度资源锁Gliding Horse流马AI 工程架构在构建多 Agent 协作系统的过程中我们遇到了一个核心挑战当多个 Agent 同时执行不同的子任务时调度器如何实时掌握全局状态某个 Agent 是否还活着它正在操作什么资源两个并行任务之间是否存在依赖冲突这些信息如果不能实时可见整个系统就会陷入“摸黑协作”的尴尬境地。Gliding Horse流马的 L2 共享黑板正是为了解决这个问题而设计的。它不仅仅是 Agent 之间的共享内存更是一张实时作战地图——调度器SA可以在这里看到所有 Agent 的状态、资源的占用情况、任务之间的依赖关系以及跨 Agent 的协调消息。本文将深入拆解这套 L2 作战地图的设计细节展示它如何为多 Agent 协作提供坚实的工程保障。一、L2 黑板的定位不只是“共享内存”在 Gliding Horse 的四层记忆架构中L2 是承上启下的关键一层向下它缓存 L0 持久化层中的热数据提供毫秒级的读写性能。向上它为 L3 投影引擎提供实时数据源按需裁剪上下文注入 LLM。横向它是多个 Agent 实例唯一共享的工作区承载着任务状态、节点数据和协调消息。传统 Agent 框架往往通过消息队列或轮询来传递状态而流马的 L2 黑板则将这些能力内建在同一个图存储中所有 Agent 通过 SPARQL 直接读写无需额外的通信中间件。Agent 集群读写读写读写读写SPARQL 查询SA 态势面板实时仪表盘Agent 状态 / 任务进度 / 资源冲突L2 作战地图 (Blackboard)节点缓存 Oxigraph SPARQLAgentTracker实时状态跟踪任务树 DAG依赖管理资源锁并发控制SharedZone协调消息PA (计划)DA-1 (执行)DA-2 (执行)CA (检查)二、AgentTracker每个 Agent 的“生命体征”L2 作战地图的核心组件是AgentTracker——一个实时跟踪所有 Agent 状态的子系统。每当一个 Agent 实例启动、执行任务或结束时它都会在 L2 中更新自己的“生命体征”。pubstructAgentStatus{pubagent_id:String,// Agent 唯一标识pubagent_role:String,// PA / DA / CA / AA / SApubtask_iri:String,// 当前执行的任务 IRIpubstatus:AgentActivity,// Idle / Working / Blocked / Errorpubstarted_at:DateTimeUtc,publast_heartbeat:DateTimeUtc,// 最后心跳时间pubcurrent_operation:OptionString,// 当前操作描述pubresource_locks:VecResourceLock,// 持有的资源锁}pubenumAgentActivity{Idle,Working,Blocked,Error,}pub enum AgentActivity {Idle,Working,Blocked,Error,}下面是一个完整的 Python 示例展示如何创建 AgentStatus 对象、更新心跳、以及 SA 如何通过 SPARQL 查询超时 Agent python import time from datetime import datetime, timezone from typing import Optional, List from enum import Enum class AgentActivity(Enum): Agent 活动状态枚举 IDLE Idle WORKING Working BLOCKED Blocked ERROR Error class ResourceLock: 资源锁信息简化版 def __init__(self, resource_type: str, resource_id: str, lock_type: str): self.resource_type resource_type self.resource_id resource_id self.lock_type lock_type class AgentStatus: Agent 状态对象对应 Rust 中的 AgentStatus 结构体 def __init__(self, agent_id: str, agent_role: str): self.agent_id agent_id self.agent_role agent_role # PA / DA / CA / AA / SA self.task_iri: Optional[str] None # 当前执行的任务 IRI self.status: AgentActivity AgentActivity.IDLE self.started_at: datetime datetime.now(timezone.utc) self.last_heartbeat: datetime self.started_at self.current_operation: Optional[str] None # 当前操作描述 self.resource_locks: List[ResourceLock] [] # 持有的资源锁 def update_heartbeat(self): 更新心跳时间戳Agent 定期调用 self.last_heartbeat datetime.now(timezone.utc) print(f[{self.agent_id}] 心跳已更新: {self.last_heartbeat.isoformat()}) def start_task(self, task_iri: str, operation: str): 开始执行任务 self.task_iri task_iri self.status AgentActivity.WORKING self.current_operation operation self.update_heartbeat() print(f[{self.agent_id}] 开始任务: {task_iri} → {operation}) def mark_blocked(self, reason: str): 标记为阻塞状态 self.status AgentActivity.BLOCKED self.current_operation reason self.update_heartbeat() print(f[{self.agent_id}] 阻塞: {reason}) def mark_error(self, error_msg: str): 标记为错误状态 self.status AgentActivity.ERROR self.current_operation error_msg self.update_heartbeat() print(f[{self.agent_id}] 错误: {error_msg}) class SchedulerAgent: 调度器SA负责监控所有 Agent 状态 HEARTBEAT_TIMEOUT 30 # 心跳超时阈值秒 def __init__(self): self.agents: dict[str, AgentStatus] {} def register_agent(self, agent: AgentStatus): 注册 Agent 到 SA 的监控列表 self.agents[agent.agent_id] agent print(f[SA] 注册 Agent: {agent.agent_id} ({agent.agent_role})) def detect_stale_agents(self) - List[str]: 检测超时 Agent对应 Rust 中的 detect_stale_agents() 返回所有超过 30 秒未心跳的 Agent ID 列表 now datetime.now(timezone.utc) stale [] for agent_id, status in self.agents.items(): elapsed (now - status.last_heartbeat).total_seconds() if elapsed self.HEARTBEAT_TIMEOUT: stale.append(agent_id) print(f[SA] ⚠️ 检测到僵尸 Agent: {agent_id}已离线 {elapsed:.1f} 秒) return stale def query_working_agents(self) - List[AgentStatus]: 模拟 SPARQL 查询获取所有 Working 状态的 Agent 对应原文中的 SPARQL SELECT ?agent ?role ?status ?operation WHERE { GRAPH blackboard:shared { ?agent a http://agent-os.org/type/AgentStatus ; http://agent-os.org/prop/status Working ; http://agent-os.org/prop/current_operation ?operation . } } return [a for a in self.agents.values() if a.status AgentActivity.WORKING] # 使用示例 if __name__ __main__: # 1. 创建 SA 调度器 sa SchedulerAgent() # 2. 创建三个 Agent 并注册 da1 AgentStatus(agent_idda-001, agent_roleDA) da1.start_task(task:code-gen, 生成用户模块代码) da2 AgentStatus(agent_idda-002, agent_roleDA) da2.start_task(task:db-migrate, 执行数据库迁移) ca1 AgentStatus(agent_idca-001, agent_roleCA) ca1.mark_blocked(等待代码审查结果) for agent in [da1, da2, ca1]: sa.register_agent(agent) # 3. 模拟心跳更新 time.sleep(1) da1.update_heartbeat() # DA-001 正常心跳 da2.update_heartbeat() # DA-002 正常心跳 # CA-001 故意不更新心跳模拟超时 # 4. 模拟等待 32 秒后检测超时 print(\n--- 等待 32 秒后检测超时 ---) # 手动将 ca1 的心跳时间调早模拟超时 ca1.last_heartbeat datetime.now(timezone.utc).replace(year2020) stale_agents sa.detect_stale_agents() print(f超时 Agent 列表: {stale_agents}) # 5. 查询当前正在工作的 Agent print(\n--- 查询 Working 状态的 Agent ---) working sa.query_working_agents() for w in working: print(f {w.agent_id} ({w.agent_role}) → {w.current_operation})心跳超时检测是这个子系统的关键机制。每个 Agent 定期更新自己的心跳时间戳SA 则通过detect_stale_agents()方法扫描所有超过阈值默认 30 秒未心跳的 Agent。一旦发现“僵尸” AgentSA 可以立即回收其持有的资源锁并将它负责的子任务重新分配给其他 Agent。所有状态数据同步写入 Oxigraph 的blackboard:shared命名图这意味着 SA 可以通过 SPARQL 直接查询实时态势SELECT ?agent ?role ?status ?operation WHERE { GRAPH blackboard:shared { ?agent a http://agent-os.org/type/AgentStatus ; http://agent-os.org/prop/status Working ; http://agent-os.org/prop/current_operation ?operation . } }这种设计让 SA 的态势感知从“被动等待通知”变为“主动实时查询”调度决策不再依赖猜测。三、资源锁防止 Agent 互相踩脚并行 Agent 最常见的冲突场景是资源竞争——两个 DA 同时试图修改同一个文件或者一个 Agent 正在读数据另一个 Agent 却开始写。传统的做法是通过消息队列串行化但这会牺牲并行性。流马的方案是三级资源锁直接在 L2 中实现pubstructResourceLock{pubresource_type:String,// file, db, api, graphpubresource_id:String,// 如 file:///data/sales.csvpubacquired_at:DateTimeUtc,pubacquired_by:String,// agent_idpublock_type:LockType,// Read / Write / Exclusive}pubenumLockType{Read,// 多个 Agent 可同时持有Write,// 仅一个 Agent 可持有与其他 Write/Exclusive 互斥Exclusive,// 仅一个 Agent 可持有与其他所有锁互斥}锁冲突检测是实时的当一个 Agent 尝试获取资源锁时系统会检查该资源的当前锁状态。如果锁冲突例如两个 Agent 同时请求 Write 锁后来的请求会被拒绝Agent 需要等待或选择其他方案。所有锁信息同步到blackboard:shared图SA 可以随时查看“哪些资源正在被谁锁定”。四、跨任务依赖让任务树从“孤立”到“关联”在实际的软件工程流程中任务之间往往存在复杂的依赖关系——“设计文档”完成之后才能开始“编码”“数据库迁移”完成之后才能执行“API 测试”。如果 L2 只跟踪单个任务的子树SA 就无法判断跨任务的阻塞状态。我们在TaskTreeNode中新增了跨任务依赖边pubstructTaskTreeNode{pubtask_iri:String,pubparent:OptionString,// 父任务pubchildren:VecString,// 子任务pubdependencies:VecString,// 依赖的其他任务pubdependents:VecString,// 依赖此任务的其他任务反向索引pubstatus:String,}通过add_task_dependency()方法任意两个任务之间可以建立依赖关系。get_task_dag()方法则利用拓扑排序将任务树展开为层级化的有向无环图DAG。这使得 SA 可以回答关键调度问题“任务 B 为什么还没开始”——因为它依赖的任务 A 还在执行。“如果我取消任务 C哪些任务会受影响”——查询dependents列表即可。五、SharedZoneAgent 间的“群聊”除了结构化的状态数据和资源锁Agent 有时还需要松耦合的沟通——比如一个 DA 发现了某个潜在问题希望通知 CA 特别关注或者 SA 广播一条紧急指令让所有 Agent 暂停当前操作。SharedZone 提供了这样的能力pubstructCoordinationMessage{pubfrom_agent:String,// 发送者pubmsg_type:CoordinationMsgType,// 消息类型pubpayload:serde_json::Value,// 消息内容pubtimestamp:DateTimeUtc,// 时间戳}pubenumCoordinationMsgType{TaskAnnouncement,// 任务公告ProgressUpdate,// 进度更新ResourceRequest,// 资源请求ConflictWarning,// 冲突警告SyncRequest,// 同步请求}Agent 可以发布协调消息到blackboard:shared其他 Agent 则可以按时间戳或发送者过滤读取。这相当于给 Agent 们开了一个“群聊”但消息是有结构的、可查询的、持久化的——不是简单的文本广播。六、给平台带来的核心优势能力传统方案L2 作战地图Agent 状态可见性通过日志或外部监控间接推断SA 可实时 SPARQL 查询每个 Agent 的状态、心跳、当前操作资源冲突处理事后检测人工介入锁冲突实时拒绝所有锁状态可视跨任务依赖靠文档或约定容易遗漏结构化依赖关系拓扑排序自动化调度Agent 间通信消息队列额外基础设施同图存储内协调消息零延迟可查询故障恢复手动排查心跳超时自动检测僵尸 Agent自动回收资源和任务一句话总结L2 作战地图让多 Agent 协作从“摸黑干活”变成了“开着雷达飞行”。调度器可以实时掌握全局态势Agent 之间可以在同一个图空间里安全地共享数据和协调行动而所有这些信息都是可追溯、可审计的。七、结语Gliding Horse 的 L2 黑板设计本质上是一套多 Agent 操作系统的进程管理表。它借鉴了操作系统中进程控制块PCB、资源分配表、死锁检测等经典思想将它们适配到了 AI Agent 的协作场景中。当你的系统需要同时运行数十个 Agent并且要求它们安全、高效地协作时这样一张“作战地图”就不再是锦上添花而是必备的基础设施。Gliding Horse 已在 GitHub 开源https://github.com/doiito/gliding_horse