Gliding Horse 的 L2 作战地图:让多 Agent 协作从“摸黑”变成“透明”

📅 2026/6/30 3:04:48
Gliding Horse 的 L2 作战地图:让多 Agent 协作从“摸黑”变成“透明”
一、L2 黑板的定位不只是“共享内存”在 Gliding Horse 的四层记忆架构中L2 是承上启下的关键一层向下它缓存 L0 持久化层中的热数据提供毫秒级的读写性能。向上它为 L3 投影引擎提供实时数据源按需裁剪上下文注入 LLM。横向它是多个 Agent 实例唯一共享的工作区承载着任务状态、节点数据和协调消息。传统 Agent 框架往往通过消息队列或轮询来传递状态而流马的 L2 黑板则将这些能力内建在同一个图存储中所有 Agent 通过 SPARQL 直接读写无需额外的通信中间件。Agent 集群读写读写读写读写SPARQL 查询SA 态势面板实时仪表盘Agent 状态 / 任务进度 / 资源冲突L2 作战地图 (Blackboard)节点缓存 Oxigraph SPARQLAgentTracker实时状态跟踪任务树 DAG依赖管理资源锁并发控制SharedZone协调消息PA (计划)DA-1 (执行)DA-2 (执行)CA (检查)二、AgentTracker每个 Agent 的“生命体征”L2 作战地图的核心组件是AgentTracker——一个实时跟踪所有 Agent 状态的子系统。每当一个 Agent 实例启动、执行任务或结束时它都会在 L2 中更新自己的“生命体征”。pub struct AgentStatus { pub agent_id: String, // Agent 唯一标识 pub agent_role: String, // PA / DA / CA / AA / SA pub task_iri: String, // 当前执行的任务 IRI pub status: AgentActivity, // Idle / Working / Blocked / Error pub started_at: DateTimeUtc, pub last_heartbeat: DateTimeUtc, // 最后心跳时间 pub current_operation: OptionString, // 当前操作描述 pub resource_locks: VecResourceLock, // 持有的资源锁 } pub enum AgentActivity { Idle, Working, Blocked, Error, }pub enum AgentActivity {Idle,Working,Blocked,Error,}下面是一个完整的 Python 示例展示如何创建 AgentStatus 对象、更新心跳、以及 SA 如何通过 SPARQL 查询超时 Agent python import time from datetime import datetime, timezone from typing import Optional, List from enum import Enum class AgentActivity(Enum): Agent 活动状态枚举 IDLE Idle WORKING Working BLOCKED Blocked ERROR Error class ResourceLock: 资源锁信息简化版 def __init__(self, resource_type: str, resource_id: str, lock_type: str): self.resource_type resource_type self.resource_id resource_id self.lock_type lock_type class AgentStatus: Agent 状态对象对应 Rust 中的 AgentStatus 结构体 def __init__(self, agent_id: str, agent_role: str): self.agent_id agent_id self.agent_role agent_role # PA / DA / CA / AA / SA self.task_iri: Optional[str] None # 当前执行的任务 IRI self.status: AgentActivity AgentActivity.IDLE self.started_at: datetime datetime.now(timezone.utc) self.last_heartbeat: datetime self.started_at self.current_operation: Optional[str] None # 当前操作描述 self.resource_locks: List[ResourceLock] [] # 持有的资源锁 def update_heartbeat(self): 更新心跳时间戳Agent 定期调用 self.last_heartbeat datetime.now(timezone.utc) print(f[{self.agent_id}] 心跳已更新: {self.last_heartbeat.isoformat()}) def start_task(self, task_iri: str, operation: str): 开始执行任务 self.task_iri task_iri self.status AgentActivity.WORKING self.current_operation operation self.update_heartbeat() print(f[{self.agent_id}] 开始任务: {task_iri} → {operation}) def mark_blocked(self, reason: str): 标记为阻塞状态 self.status AgentActivity.BLOCKED self.current_operation reason self.update_heartbeat() print(f[{self.agent_id}] 阻塞: {reason}) def mark_error(self, error_msg: str): 标记为错误状态 self.status AgentActivity.ERROR self.current_operation error_msg self.update_heartbeat() print(f[{self.agent_id}] 错误: {error_msg}) class SchedulerAgent: 调度器SA负责监控所有 Agent 状态 HEARTBEAT_TIMEOUT 30 # 心跳超时阈值秒 def __init__(self): self.agents: dict[str, AgentStatus] {} def register_agent(self, agent: AgentStatus): 注册 Agent 到 SA 的监控列表 self.agents[agent.agent_id] agent print(f[SA] 注册 Agent: {agent.agent_id} ({agent.agent_role})) def detect_stale_agents(self) - List[str]: 检测超时 Agent对应 Rust 中的 detect_stale_agents() 返回所有超过 30 秒未心跳的 Agent ID 列表 now datetime.now(timezone.utc) stale [] for agent_id, status in self.agents.items(): elapsed (now - status.last_heartbeat).total_seconds() if elapsed self.HEARTBEAT_TIMEOUT: stale.append(agent_id) print(f[SA] ⚠️ 检测到僵尸 Agent: {agent_id}已离线 {elapsed:.1f} 秒) return stale def query_working_agents(self) - List[AgentStatus]: 模拟 SPARQL 查询获取所有 Working 状态的 Agent 对应原文中的 SPARQL SELECT ?agent ?role ?status ?operation WHERE { GRAPH blackboard:shared { ?agent a http://agent-os.org/type/AgentStatus ; http://agent-os.org/prop/status Working ; http://agent-os.org/prop/current_operation ?operation . } } return [a for a in self.agents.values() if a.status AgentActivity.WORKING] # 使用示例 if __name__ __main__: # 1. 创建 SA 调度器 sa SchedulerAgent() # 2. 创建三个 Agent 并注册 da1 AgentStatus(agent_idda-001, agent_roleDA) da1.start_task(task:code-gen, 生成用户模块代码) da2 AgentStatus(agent_idda-002, agent_roleDA) da2.start_task(task:db-migrate, 执行数据库迁移) ca1 AgentStatus(agent_idca-001, agent_roleCA) ca1.mark_blocked(等待代码审查结果) for agent in [da1, da2, ca1]: sa.register_agent(agent) # 3. 模拟心跳更新 time.sleep(1) da1.update_heartbeat() # DA-001 正常心跳 da2.update_heartbeat() # DA-002 正常心跳 # CA-001 故意不更新心跳模拟超时 # 4. 模拟等待 32 秒后检测超时 print(\n--- 等待 32 秒后检测超时 ---) # 手动将 ca1 的心跳时间调早模拟超时 ca1.last_heartbeat datetime.now(timezone.utc).replace(year2020) stale_agents sa.detect_stale_agents() print(f超时 Agent 列表: {stale_agents}) # 5. 查询当前正在工作的 Agent print(\n--- 查询 Working 状态的 Agent ---) working sa.query_working_agents() for w in working: print(f {w.agent_id} ({w.agent_role}) → {w.current_operation})心跳超时检测是这个子系统的关键机制。每个 Agent 定期更新自己的心跳时间戳SA 则通过detect_stale_agents()方法扫描所有超过阈值默认 30 秒未心跳的 Agent。一旦发现“僵尸” AgentSA 可以立即回收其持有的资源锁并将它负责的子任务重新分配给其他 Agent。所有状态数据同步写入 Oxigraph 的blackboard:shared命名图这意味着 SA 可以通过 SPARQL 直接查询实时态势SELECT ?agent ?role ?status ?operation WHERE { GRAPH blackboard:shared { ?agent a http://agent-os.org/type/AgentStatus ; http://agent-os.org/prop/status Working ; http://agent-os.org/prop/current_operation ?operation . } }这种设计让 SA 的态势感知从“被动等待通知”变为“主动实时查询”调度决策不再依赖猜测。三、资源锁防止 Agent 互相踩脚并行 Agent 最常见的冲突场景是资源竞争——两个 DA 同时试图修改同一个文件或者一个 Agent 正在读数据另一个 Agent 却开始写。传统的做法是通过消息队列串行化但这会牺牲并行性。流马的方案是三级资源锁直接在 L2 中实现pub struct ResourceLock { pub resource_type: String, // file, db, api, graph pub resource_id: String, // 如 file:///data/sales.csv pub acquired_at: DateTimeUtc, pub acquired_by: String, // agent_id pub lock_type: LockType, // Read / Write / Exclusive } pub enum LockType { Read, // 多个 Agent 可同时持有 Write, // 仅一个 Agent 可持有与其他 Write/Exclusive 互斥 Exclusive, // 仅一个 Agent 可持有与其他所有锁互斥 }锁冲突检测是实时的当一个 Agent 尝试获取资源锁时系统会检查该资源的当前锁状态。如果锁冲突例如两个 Agent 同时请求 Write 锁后来的请求会被拒绝Agent 需要等待或选择其他方案。所有锁信息同步到blackboard:shared图SA 可以随时查看“哪些资源正在被谁锁定”。四、跨任务依赖让任务树从“孤立”到“关联”在实际的软件工程流程中任务之间往往存在复杂的依赖关系——“设计文档”完成之后才能开始“编码”“数据库迁移”完成之后才能执行“API 测试”。如果 L2 只跟踪单个任务的子树SA 就无法判断跨任务的阻塞状态。我们在TaskTreeNode中新增了跨任务依赖边pub struct TaskTreeNode { pub task_iri: String, pub parent: OptionString, // 父任务 pub children: VecString, // 子任务 pub dependencies: VecString, // 依赖的其他任务 pub dependents: VecString, // 依赖此任务的其他任务反向索引 pub status: String, }通过add_task_dependency()方法任意两个任务之间可以建立依赖关系。get_task_dag()方法则利用拓扑排序将任务树展开为层级化的有向无环图DAG。这使得 SA 可以回答关键调度问题“任务 B 为什么还没开始”——因为它依赖的任务 A 还在执行。“如果我取消任务 C哪些任务会受影响”——查询dependents列表即可。五、SharedZoneAgent 间的“群聊”除了结构化的状态数据和资源锁Agent 有时还需要松耦合的沟通——比如一个 DA 发现了某个潜在问题希望通知 CA 特别关注或者 SA 广播一条紧急指令让所有 Agent 暂停当前操作。SharedZone 提供了这样的能力pub struct CoordinationMessage { pub from_agent: String, // 发送者 pub msg_type: CoordinationMsgType, // 消息类型 pub payload: serde_json::Value, // 消息内容 pub timestamp: DateTimeUtc, // 时间戳 } pub enum CoordinationMsgType { TaskAnnouncement, // 任务公告 ProgressUpdate, // 进度更新 ResourceRequest, // 资源请求 ConflictWarning, // 冲突警告 SyncRequest, // 同步请求 }Agent 可以发布协调消息到blackboard:shared其他 Agent 则可以按时间戳或发送者过滤读取。这相当于给 Agent 们开了一个“群聊”但消息是有结构的、可查询的、持久化的——不是简单的文本广播。六、给平台带来的核心优势能力传统方案L2 作战地图Agent 状态可见性通过日志或外部监控间接推断SA 可实时 SPARQL 查询每个 Agent 的状态、心跳、当前操作资源冲突处理事后检测人工介入锁冲突实时拒绝所有锁状态可视跨任务依赖靠文档或约定容易遗漏结构化依赖关系拓扑排序自动化调度Agent 间通信消息队列额外基础设施同图存储内协调消息零延迟可查询故障恢复手动排查心跳超时自动检测僵尸 Agent自动回收资源和任务一句话总结L2 作战地图让多 Agent 协作从“摸黑干活”变成了“开着雷达飞行”。调度器可以实时掌握全局态势Agent 之间可以在同一个图空间里安全地共享数据和协调行动而所有这些信息都是可追溯、可审计的。