AgentScope2.0 源码分析- 事件系统

📅 2026/7/2 2:23:04
AgentScope2.0 源码分析- 事件系统
AgentScope2.0 源码分析- 事件系统版本:v2.0.3适用对象:开发者、架构师、技术面试官文档目标:理解事件系统的设计原理、实现机制、使用方法和面试要点一、核心概念1.1 什么是事件系统?事件系统是 AgentScope 中用于流式传输 Agent 执行状态的机制。消息 (Msg) ←→ 事件 (Event) 静态视图 动态视图 完整快照 流式传输 持久化存储 实时渲染关键理解:消息= 一次完整的对话回合(如 Agent 的一次回复)事件= 消息的流式传输单元(如逐字输出的文本)关系:一个消息由多个事件组成,事件可以重建为消息1.2 为什么需要事件系统?需求消息事件存到数据库,下次继续聊✅❌发给 LLM 当上下文✅❌前端打字机效果实时显示太慢✅断线后恢复进度只能从头✅ 从检查点继续人工确认(暂停/恢复)✅核心矛盾:存数据需要完整(消息)传数据需要实时(事件)AgentScope 的解法:事件可以组装成消息,消息可以拆成事件。二、事件类型详解2.1 事件类型枚举# src/agentscope/event/_event.py:20classEventType(StrEnum):# 生命周期事件REPLY_START="REPLY_START"REPLY_END="REPLY_END"EXCEED_MAX_ITERS="EXCEED_MAX_ITERS"# 模型调用事件MODEL_CALL_START="MODEL_CALL_START"MODEL_CALL_END="MODEL_CALL_END"# 文本块事件TEXT_BLOCK_START="TEXT_BLOCK_START"TEXT_BLOCK_DELTA="TEXT_BLOCK_DELTA"TEXT_BLOCK_END="TEXT_BLOCK_END"# 思考块事件THINKING_BLOCK_START="THINKING_BLOCK_START"THINKING_BLOCK_DELTA="THINKING_BLOCK_DELTA"THINKING_BLOCK_END="THINKING_BLOCK_END"# 数据块事件DATA_BLOCK_START="DATA_BLOCK_START"DATA_BLOCK_DELTA="DATA_BLOCK_DELTA"DATA_BLOCK_END="DATA_BLOCK_END"# 工具调用事件TOOL_CALL_START="TOOL_CALL_START"TOOL_CALL_DELTA="TOOL_CALL_DELTA"TOOL_CALL_END="TOOL_CALL_END"# 工具结果事件TOOL_RESULT_START="TOOL_RESULT_START"TOOL_RESULT_TEXT_DELTA="TOOL_RESULT_TEXT_DELTA"TOOL_RESULT_DATA_DELTA="TOOL_RESULT_DATA_DELTA"TOOL_RESULT_END="TOOL_RESULT_END"# 人工介入事件REQUIRE_USER_CONFIRM="REQUIRE_USER_CONFIRM"REQUIRE_EXTERNAL_EXECUTION="REQUIRE_EXTERNAL_EXECUTION"USER_CONFIRM_RESULT="USER_CONFIRM_RESULT"EXTERNAL_EXECUTION_RESULT="EXTERNAL_EXECUTION_RESULT"# 其他HINT_BLOCK="HINT_BLOCK"CUSTOM="CUSTOM"关键点🔴:共30+ 种事件类型使用StrEnum保证类型安全所有事件继承EventBase2.2 事件基类classEventBase(BaseModel):id:str=Field(default_factory=_generate_id)created_at:str=Field(default_factory=lambda:datetime.now().isoformat())metadata:Dict[str,Any]=Field(default_factory=dict)关键点🔴:使用Pydantic模型,自动序列化/反序列化每个事件有唯一id和创建时间metadata用于扩展自定义数据2.3 事件分类详解类别 1:生命周期事件事件触发时机关键字段ReplyStartEventAgent 开始回复session_id,reply_id,nameReplyEndEventAgent 完成回复session_id,reply_idExceedMaxItersEvent达到最大循环次数reply_id,name作用:标记一次回复的开始和结束类别 2:模型调用事件事件触发时机关键字段ModelCallStartEvent开始调用 LLMreply_id,model_nameModelCallEndEventLLM 返回完成reply_id,input_tokens,output_tokens作用:记录 LLM 调用的 Token 消耗类别 3:内容块事件(Start/Delta/End 模式)文本块:事件作用关键字段TextBlockStartEvent开始输出文本reply_id,block_idTextBlockDeltaEvent文本增量(逐字)reply_id,block_id,deltaTextBlockEndEvent文本输出结束reply_id,block_id思考块:事件作用关键字段ThinkingBlockStartEvent开始思考reply_id,block_idThinkingBlockDeltaEvent思考增量reply_id,block_id,deltaThinkingBlockEndEvent思考结束reply_id,block_id数据块(多模态):事件作用关键字段DataBlockStartEvent开始传输数据reply_id,block_id,media_typeDataBlockDeltaEvent数据增量(base64)reply_id,block_id,data,media_typeDataBlockEndEvent数据传输结束reply_id,block_id关键点🔴:遵循Start → Delta → End模式block_id用于关联同一内容块的事件delta字段携带增量数据类别 4:工具调用事件工具调用请求:事件触发时机关键字段ToolCallStartEvent开始调用工具reply_id,tool_call_id,tool_call_nameToolCallDeltaEvent工具参数增量reply_id,tool_call_id,deltaToolCallEndEvent工具调用结束reply_id,tool_call_id工具执行结果:事件触发时机关键字段ToolResultStartEvent开始接收结果reply_id,tool_call_id,tool_call_nameToolResultTextDeltaEvent文本结果增量reply_id,tool_call_id,deltaToolResultDataDeltaEvent数据结果增量reply_id,tool_call_id,block_id,media_typeToolResultEndEvent结果接收完成reply_id,tool_call_id,state关键点🔴:tool_call_id用于关联工具调用和结果state字段表示执行状态(success/error/denied/interrupted)类别 5:人工介入事件(HITL)事件触发时机作用RequireUserConfirmEvent需要用户确认暂停执行,等待确认UserConfirmResultEvent用户已确认恢复执行RequireExternalExecutionEvent需要外部执行暂停,等待外部系统ExternalExecutionResultEvent外部执行完成恢复执行关键点🔴:这是控制流,不仅仅是数据流RequireUserConfirmEvent会暂停Agent 执行UserConfirmResultEvent会恢复Agent 执行类别 6:其他事件事件作用HintBlockEvent注入隐藏提示(给 LLM 的)CustomEvent自定义事件(中间件扩展)三、设计原理3.1 三级 ID 关联机制reply_id ← 这次回复的唯一标识 │ ├── block_id ← 这个内容块的唯一标识(文本块/思考块/数据块) │ │ │ └── TextBlockStartEvent(reply_id="r1", block_id="b1") │ └── TextBlockDeltaEvent(reply_id="r1", block_id="b1", delta="你") │ ── TextBlockEndEvent(reply_id="r1", block_id="b1") │ ── tool_call_id ← 这次工具调用的唯一标识 │ ├── ToolCallStartEvent(reply_id="r1", tool_call_id="t1",) ├── ToolCallDeltaEvent(reply_id="r1", tool_call_id="t1", delta='{"city":"北京"}') └── ToolCallEndEvent(reply_id="r1", tool_call_id="t1")关键点🔴:reply_id:每次回复开始时生成block_id:每个内容块开始时生成tool_call_id:LLM 返回时自带(模型生成的)3.2 事件与消息的转换事件 → 消息(append_event)msg=AssistantMsg(name="agent",content=[],id=reply_id)asyncforeventinagent.reply_stream(user_msg):msg.append_event(event)# 每个事件追加到消息# 循环结束后,msg 就是完整的回复关键点🔴:append_event根据block_id/tool_call_id自动归类文本 Delta 事件会拼接到对应文本块工具结果事件会设置工具调用的状态消息 → 事件(重放时)foreventinmsg.to_events():yieldevent关键点🔴:消息可以拆分为事件序列用于断点续传时的重放3.3 事件产生机制(埋点)关键点🔴:所有事件都是预先埋点的# src/agentscope/agent/_agent.py:782asyncdef_reasoning_impl(self,tool_choice):# 埋点 1:模型调用开始yieldModelCallStartEvent(reply_id=self.state.reply_id,model_name=self.model.model,)# 调用 LLMres=awaitself._call_model(**kwargs)# 埋点 2:文本块开始iftext_blocks:yieldTextBlockStartEvent(...)# 埋点 3:文本增量yieldTextBlockDeltaEvent(delta="...")# 埋点 4:文本块结束yieldTextBlockEndEvent(...)# 埋点 5:模型调用结束yieldModelCallEndEvent(input_tokens=...,output_tokens=...,)关键点🔴:使用yield关键字让函数变成异步生成器每次yield都会暂停函数,把事件传给调用者调用者处理完后,函数从暂停处继续执行3.4 事件传输机制(SSE)# src/agentscope/app/_router/_session.py:530@session_router.get("/{session_id}/stream")asyncdefstream_session_events(session_id,...):asyncdef_sse_generator()-AsyncGenerator[str,None]:# 1. 重放缓冲的事件(断线重连)for_entry_id,eventinawaitmessage_bus.log_read(...):yieldf"data:{json.dumps(event)}\n\n"# 2. 订阅实时事件queue=asyncio.Queue()awaitmessage_bus.subscribe(...,queue)# 3. 实时推送whileTrue:item=awaitqueue.get()yieldf"data:{json.dumps(item)}\n\n"returnStreamingResponse(_sse_generator(),media_type="text/event-stream",)关键点🔴:使用SSE(Server-Sent Events)传输支持断线重连(重放缓冲事件)每 30 秒发送心跳保持连接四、实现原理与流程4.1 整体架构┌─────────────────────────────────────────────────────────────┐ │ 1. 事件定义层(Pydantic 模型) │ │ - EventBase(基类) │ │ - 30+ 种具体事件类 │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ 2. 事件产生层(AsyncGenerator + yield) │ │ - Agent._reply() → yield 事件 │ │ - Agent._reasoning() → yield 事件 │ │ - Agent._execute_tool() → yield 事件 │ └─────────────────────────────────────────────────────────────┘ │ ▼ ─────────────────────────────────────────────────────────────┐ │ 3. 事件传输层(SSE / StreamingResponse) │ │ - FastAPI Router: /{session_id}/stream │ │ - MessageBus(Redis Pub/Sub) │ └─────────────────────────────────────────────────────────────┘ │ ▼ ─────────────────────────────────────────────────────────────┐ │ 4. 事件消费层(前端 appendEvent) │ │ - EventSource 接收 SSE │ │ - msg.appendEvent(event) 重建消息 │ └─────────────────────────────────────────────────────────────┘4.2 事件定义层实现4.2.1 事件类型枚举# src/agentscope/event/_event.py:20classEventType(StrEnum):"""Event type enumeration."""REPLY_START="REPLY_START"REPLY_END="REPLY_END"# ... 30+ 种类型实现要点:使用StrEnum保证类型安全每个事件类型是字符串常量前端可以用switch(event.type)穷举匹配4.2.2 事件基类# src/agentscope/event/_event.py:63classEventBase(BaseModel):"""Base event class."""model_config=ConfigDict(use_enum_values=True)id:str=Field(default_factory=_generate_id)"""Unique event identifier."""created_at:str=Field(default_factory=lambda:datetime.now().isoformat())"""ISO 8601 timestamp of when the event was created."""metadata:Dict[str,Any]=Field(default_factory=dict)"""Optional metadata attached to the event."""实现要点:继承Pydantic BaseModel,自动序列化/反序列化id使用_generate_id()生成唯一标识created_at自动记录创建时间metadata用于扩展自定义数据4.2.3 具体事件类# src/agentscope/event/_event.py:137classTextBlockDeltaEvent(EventBase):"""Text block delta event."""type:Literal[EventType.TEXT_BLOCK_DELTA]=EventType.TEXT_BLOCK_DELT