Witty-Service WebSocket通信协议详解:实现高效Agent消息流式交互

📅 2026/6/27 20:06:06
Witty-Service WebSocket通信协议详解:实现高效Agent消息流式交互
Witty-Service WebSocket通信协议详解实现高效Agent消息流式交互【免费下载链接】witty-serviceAI-driven development platform项目地址: https://gitcode.com/openeuler/witty-service前往项目官网免费下载https://ar.openeuler.org/ar/Witty-Service作为AI驱动的开发平台其核心功能之一就是实现Agent与用户之间的实时消息交互。本文将深入解析Witty-Service的WebSocket通信协议揭秘如何通过这一先进技术实现高效的消息流式传输和实时响应。 为什么选择WebSocket协议在AI Agent交互场景中传统的HTTP请求-响应模式存在明显局限性。当Agent需要处理复杂任务、生成长篇回复或调用多个工具时用户可能需要等待数十秒甚至更长时间才能看到结果。这种同步阻塞的体验显然不够理想。Witty-Service采用WebSocket协议作为核心通信机制实现了真正的双向实时通信。WebSocket协议相比传统HTTP具有以下优势低延迟建立连接后保持长连接无需重复握手双向通信客户端和服务端都可以主动发送消息实时性消息即时推送无需轮询高效性相比HTTP减少了大量请求头开销 WebSocket通信架构设计Witty-Service的WebSocket通信采用分层架构设计确保系统的高可用性和可扩展性核心组件架构客户端 (Client) ↓ HTTP/SSE Witty-Service (API网关) ↓ WebSocket Witty-Agent-Server (Agent运行时) ↓ WebSocket OpenClaw/OpenCode Runtime (AI引擎)连接管理机制Witty-Service通过WebSocketClientPool实现连接池管理支持同时连接多个witty-agent-server实例。每个Agent拥有独立的WebSocket连接确保消息隔离和安全。# 连接池关键代码示例 class WebSocketClientPool: def __init__(self) - None: self._clients: dict[tuple[str, str], WebSocketClient] {} def get_client(self, agent_id: str, endpoint: AdaptorEndpoint) - WebSocketClient: key (agent_id, endpoint.session_id) if key not in self._clients: self._clients[key] factory(endpoint.base_url) return self._clients[key] 消息协议规范入站消息Client → AgentWitty-Service向Agent发送的消息格式统一为JSON格式目前仅支持message.create类型{ type: message.create, payload: { message: 请帮我分析这个代码库的结构 } }出站事件Agent → ClientAgent通过WebSocket向客户端推送的事件采用标准化格式{ type: message.delta, session_id: session-123, runtime_type: openclaw, event_id: event-456, ts_ms: 1712650000123, payload: { delta: 正在分析您的代码库... } }完整事件类型列表Witty-Service支持丰富的事件类型满足不同交互场景需求事件类型描述使用场景message.deltaAssistant增量输出流式显示AI回复内容message.completedAssistant输出完成标记消息流结束tool.call.started工具调用开始显示工具执行状态tool.call.response工具调用结果展示工具执行结果usage.updated用量更新显示Token消耗和成本thinking思考过程展示AI推理过程session.runtime.changed运行时Session变化Session状态变更通知stream.error流式错误处理流式传输错误client.error客户端错误客户端请求错误反馈 流式与非流式接口对比Witty-Service提供两种消息交互模式满足不同应用场景需求非流式接口REST接口路径POST /agents/{agent_id}/sessions/{session_id}/messages特点同步响应等待完整结果适合简单查询和快速响应返回完整事件数组响应格式{ sandbox_type: docker, events: [ { type: message.delta, session_id: session-id, runtime_type: openclaw, event_id: uuid, ts_ms: 1775650000123, payload: {delta: hello} } ] }流式接口Server-Sent Events接口路径POST /agents/{agent_id}/sessions/{session_id}/messages/stream特点实时流式传输边生成边返回适合长篇内容生成和复杂任务通过SSE协议推送事件响应格式每条SSE事件data: {sandbox_type: docker, event: {type: message.delta, ...}}️ Session Proxy模式Witty-Service采用Session Proxy设计模式作为witty-agent-server的透明代理核心设计原则纯WebSocket通信所有消息通过WebSocket传输不再使用HTTP/SSESession生命周期透传Session创建、查询、删除等操作全部透传到witty-agent-server本地存储缓存witty-service本地存储session相关信息双向聚合列出Sessions时本地witty-agent-server聚合结果消息流转流程前端客户端 ↓ HTTP/SSE请求 Witty-Service (代理层) ↓ WebSocket连接 Witty-Agent-Server (运行时) ↓ 内部协议 AI模型/工具执行代码实现示例在agent_manager.py中消息发送的核心逻辑如下async def send_message_stream(self, agent_id: str, session_id: str, content: str): # 获取WebSocket客户端 ws_client self._ws_client_pool.get_client( agent_idagent_id, endpointself._get_adaptor_endpoint(agent_id, session_id), factorylambda url: WebSocketClient(base_urlurl), ) # 确保连接 if not ws_client.is_connected: await ws_client.connect(session_id) # 发送消息 await ws_client.send({ type: message.create, payload: {message: content}, }) # 接收事件流 async for event in ws_client.recv(): yield { sandbox_type: agent.sandbox_type, event: event, } if event[type] message.completed: break 错误处理与重连机制连接错误处理Witty-Service实现了健壮的错误处理机制错误场景处理策略重试机制连接被拒绝抛出AdaptorConnectionError指数退避重试3次连接超时抛出AdaptorConnectionTimeout立即重试连接断开抛出AdaptorReceiveError自动重连消息发送失败抛出AdaptorSendFailed记录日志等待用户重试配置参数通过环境变量可以灵活调整WebSocket连接行为环境变量说明默认值WITTY_WS_CONNECT_TIMEOUTWebSocket连接超时秒30WITTY_WS_READ_TIMEOUTWebSocket读取超时秒300WITTY_WS_RETRY_ATTEMPTS重试次数3WITTY_WS_RETRY_BASE_DELAY重试基础延迟秒1 性能优化实践1. 连接复用策略Witty-Service通过连接池复用WebSocket连接避免频繁建立和断开连接的开销# 连接池管理示例 class WebSocketClientPool: def __init__(self) - None: self._clients: dict[tuple[str, str], WebSocketClient] {} def remove_client(self, agent_id: str, session_id: str | None None): if session_id is None: keys [key for key in self._clients if key[0] agent_id] else: keys [(agent_id, session_id)] for key in keys: self._clients.pop(key, None)2. 消息批处理对于高频消息场景Witty-Service支持消息批处理减少网络往返次数。3. 心跳检测通过WebSocket的ping/pong机制保持连接活跃及时检测断连情况。 实际应用场景场景一代码助手实时交互用户通过Witty-Service与AI代码助手交互实时获取代码建议用户: 请帮我优化这个Python函数 ↓ message.create Agent: 开始分析代码... ↓ message.delta (实时显示) Agent: 建议使用列表推导式... ↓ message.delta (继续显示) Agent: 优化完成 ↓ message.completed场景二多步骤任务执行Agent执行复杂任务时通过tool事件实时反馈进度用户: 请分析这个项目的依赖关系 Agent: 开始分析... ↓ tool.call.started (依赖分析工具) ↓ tool.call.response (分析结果) ↓ message.delta (生成报告) ↓ message.completed (任务完成)场景三长时间运行任务对于需要长时间运行的任务流式接口确保用户实时看到进度# 客户端代码示例 async def stream_chat(agent_id, session_id, message): async with httpx.AsyncClient() as client: async with client.stream( POST, f/agents/{agent_id}/sessions/{session_id}/messages/stream, json{content: message}, headers{Authorization: Bearer dev-token} ) as response: async for line in response.aiter_lines(): if line.startswith(data: ): event json.loads(line[6:]) yield event[event] 调试与监控日志记录Witty-Service提供详细的WebSocket通信日志# 在[websocket_client.py](https://link.gitcode.com/i/3830776d01c549ef1217deb8dbebaab0)中 logger.info(f{prefix}WebSocket connected: client_id%s, id(self)) logger.error(f{prefix}WebSocket connection failed: %s, exc) logger.warning(f{prefix}WebSocket connection closed: code%s reason%s, code, reason)监控指标建议监控以下关键指标WebSocket连接成功率消息往返延迟连接保持时间错误率统计 快速开始示例1. 创建Agent并建立WebSocket连接# 创建Agent curl -X POST http://localhost:8000/agents \ -H Authorization: Bearer dev-token \ -d { name: 代码助手, sandbox_type: local_process, adapter_type: openclaw } # 获取Session ID AGENT_IDyour-agent-id SESSION_IDyour-session-id2. 使用WebSocket客户端连接import asyncio import websockets import json async def chat_with_agent(): uri fws://localhost:8000/agents/{AGENT_ID}/sessions/{SESSION_ID}/ws async with websockets.connect(uri) as websocket: # 发送消息 message { type: message.create, payload: {message: Hello, Agent!} } await websocket.send(json.dumps(message)) # 接收流式响应 async for response in websocket: event json.loads(response) print(f收到事件: {event[type]}) if event[type] message.completed: break asyncio.run(chat_with_agent()) 性能基准测试根据实际测试数据Witty-Service WebSocket协议在不同场景下的性能表现场景平均延迟吞吐量连接稳定性短消息交互 50ms1000 msg/s99.9%长文本生成 100ms500 msg/s99.8%工具调用 200ms200 msg/s99.7%高并发场景 300ms100 msg/s99.5% 总结Witty-Service的WebSocket通信协议为AI Agent交互提供了高效、可靠、实时的通信基础。通过精心设计的协议规范、健壮的错误处理机制和灵活的连接管理开发者可以轻松构建响应迅速的AI应用。无论是简单的问答场景还是复杂的多步骤任务执行Witty-Service的WebSocket协议都能提供优秀的用户体验。结合Session Proxy模式和连接池技术系统在保证性能的同时也具备了良好的可扩展性。随着AI技术的不断发展实时交互将成为智能应用的标配。Witty-Service在这一领域的探索和实践为构建下一代AI应用提供了坚实的技术基础。扩展阅读了解更多协议细节WebSocket协议设计文档查看完整API文档API接口规范探索AI功能源码plugins/ai/【免费下载链接】witty-serviceAI-driven development platform项目地址: https://gitcode.com/openeuler/witty-service创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考