并发下的agent功能调用策略

📅 2026/6/28 4:11:39
并发下的agent功能调用策略
并发下的agent功能调用策略模式一 采用轻量级异步队列模式利用py中的asyncio 的事件循环在一个进程内处理所用用户的并发请求每个请求独立运行但是受限于全局的并发控制信号量他不需要外部依赖例如redis以及RabbitMQ来进行处理主要使用内存来处理这样它处理的并发数要低。客户端 POST 请求↓FastAPI 解析请求体 - req↓进入 async with GLOBAL_AGENT_SEMAPHORE:↓(如果当前并发数 5) 立即获得许可继续(如果当前并发数 5) 在此等待直到有任务完成释放信号量↓执行 asyncio.wait_for(run_agent(…), timeout60)↓run_agent 开始处理调用LLM、工具等↓如果 60 秒内完成 - 返回 result如果超时 - 抛出 TimeoutError↓成功 - 返回 200 JSON失败(超时) - 返回 504 错误↓最终释放信号量退出 with 块允许下一个等待的请求进入示例代码全局信号量假设最大并发处理5个 Agent 任务 GLOBAL_AGENT_SEMAPHOREasyncio.Semaphore(5)# 这里使用的是信号量Semaphore来进行处理最大设置为5fromfastapiimportFastAPI,HTTPExceptionfrompydanticimportBaseModel appFastAPI()classQueryRequest(BaseModel):user_id:strprompt:strapp.post(/v1/chat)asyncdefchat_endpoint(req:QueryRequest):# 在这里我们没有使用 asyncio.create_task 直接跑而是利用 Semaphore 控制进入。# 如果超过 5 个并发后续请求会在此处 await 等待直到有空位。asyncwithGLOBAL_AGENT_SEMAPHORE: with 等同于 await GLOBAL_AGENT_SEMAPHORE.acquire() # 尝试获取 try: # 执行代码块 finally: GLOBAL_AGENT_SEMAPHORE.release() # 无论是否异常都会释放 try:# 设置超时防止单个任务卡死例如 60 秒resultawaitasyncio.wait_for(run_agent(req.user_id,req.prompt),timeout60.0)return{status:success,data:result}exceptasyncio.TimeoutError:raiseHTTPException(status_code504,detailAgent processing timeout)并发与排队的底层逻辑重点在上面的代码中async with GLOBAL_AGENT_SEMAPHORE:是关键所在。未达到并发上限当只有 3 个用户同时请求时信号量内部计数器为 53 个请求立刻获得许可直接进入run_agent执行。达到并发上限当第 6 个用户请求到达时信号量计数器为 0该请求会在async with这一行挂起await事件循环会立即处理第 7、第 8 个请求它们也会依次排队。空出位置当排在前面的一个任务执行完毕释放信号量事件循环会从等待队列中唤醒一个排队的请求让它进入run_agent。这种机制实现了公平的先进先出FIFO排队且不需要引入额外的第三方组件。解释逐步演示-如何实现“5个并发”假设系统同时收到 6 个请求编号 R1 ~ R6并且它们几乎同时开始处理。R1 进入async with调用acquire()当前计数器 5 0计数器立即减为 4R1 获得许可进入内部执行run_agent。R2 进入计数器变为 3允许。R3 进入计数器变为 2。R4 进入计数器变为 1。R5 进入计数器变为 0。R6 进入调用acquire()但当前计数器 0所以 R6 被挂起await事件循环会转而处理其他可运行的任务比如正在执行的 R1~R5 中的 I/O 操作。假设 R1 在 30 秒后执行完毕代码退出async with块自动调用release()计数器从 0 变为 1。此时事件循环发现有一个等待中的协程R6在等待该信号量于是唤醒 R6R6 的acquire()返回计数器立刻又减为 0R6 进入内部执行。这样一来任何时候同时执行run_agent的请求数量都不会超过 5 个。为什么该设计比asyncio.create_task要好如果我们直接使用asyncio.create_task(run_agent(...))所有请求会立即创建协程并同时运行可能瞬间有几千个协程并发导致LLM API 被限流或崩溃。内存暴涨因为每个协程都持有上下文。无法控制下游服务的负载。而信号量提供了背压backpressure机制让系统在负载过高时自动排队保护后端资源。补充细节如何设置最优并发数并发数应参考下游 API 的限制如 OpenAI 的 RPM/TPM、工具服务的承载能力以及服务器自身资源内存、CPU。通常从较小值开始如 5通过压力测试逐步调整。你也可以使用多个信号量进行分层控制例如一个全局信号量限制整体 Agent 数量另一个专门限制外部搜索工具并发数。关于在任务排队的问题例如任务是否会丢失任务是否会无限期的等待的问题。任务丢失问题思考关于任务的丢失–在一般情况来说任务的丢失主要存在于用户直接断开连接由于 TCP的断开会导致FastAPI会检测到然后自动取消该请求对应的协程。该种断开时用户自己取消的如何正常情况服务器重启/崩溃的问题 如果服务器突然断电、进程崩溃或主动重启部署新版本内存中排队的所有任务都会彻底消失。 这是内存模式的最大劣势没有持久化。所以后面的模式需要解决这些问题。任务无限期等待思考如果前端没有超时设置或者超时时间很长并且前 5 个任务死锁或执行极慢排队任务会越积越多连接占用每个 HTTP 连接即使挂起都要占用文件描述符和内存。太多挂起连接会导致 Nginx 或操作系统报错。内存爆炸每个排队的请求都携带req.prompt可能很大和协程栈信息堆积上千个就会吃掉大量内存。雪崩效应排队越长积压越久新进来的请求等待时间更长最终导致大量超时系统整体不可用。如何优雅地解决“一直排队”问题针对这个问题工程上有几种成熟的应对策略你可以择一或组合使用策略一给“排队”本身也加超时推荐在async with外层包一个wait_for控制整个“排队 执行”的总时长asyncdefchat_endpoint(req:QueryRequest):try:# 总超时 65 秒5秒排队缓冲 60秒执行resultawaitasyncio.wait_for(_process_with_semaphore(req),timeout65.0)return{status:success,data:result}exceptasyncio.TimeoutError:raiseHTTPException(status_code504,detailRequest timeout (including queue wait))asyncdef_process_with_semaphore(req):asyncwithGLOBAL_AGENT_SEMAPHORE:returnawaitrun_agent(req.user_id,req.prompt)这样如果排队等了 10 秒还没轮到客户端会收到 504避免无限挂起。策略二使用asyncio.Queue 拒绝策略不直接用信号量而是用一个异步队列并设置队列最大长度例如maxsize100。如果队列满了直接返回 429 Too Many Requestsrequest_queueasyncio.Queue(maxsize100)app.post(/v1/chat)asyncdefchat_endpoint(req):try:# 尝试立即放入队列若满则抛异常awaitrequest_queue.put(req)exceptasyncio.QueueFull:raiseHTTPException(status_code429,detailServer busy, please retry later)# ... 后台 worker 消费策略三引入外部消息队列终极方案如果是企业级场景直接用Redis Streams或RabbitMQ替代内存排队。这样即使重启服务任务也保存在 Redis 里不会丢失且支持分布式消费。模式二 Actor模型Actor模型介绍Actor 模型的核心思想可以概括为一切皆 Actor。Actor 是计算的基本单元它封装了状态和行为彼此之间不共享任何内存仅通过异步消息进行通信。你可以把 Actor 想象成一个个独立的、有信箱的“小人”独立的状态每个“小人”都有自己的记忆状态和笔记本行为别人无法直接查看或修改异步通信“小人”之间不直接对话而是通过写信发送消息来交流。信被放进对方的信箱消息队列收信人空闲时会处理。顺序处理每个“小人”一次只处理一封信信被顺序处理这从根本上避免了多线程编程中的数据竞争和锁问题。Actor 模型 vs. 模式一异步队列与模式一相比Actor 模型的核心优势在于它将“状态”和“行为”封装在了 Actor 内部。特性模式一异步队列 (asyncio Queue)模式二Actor 模型状态管理集中式或外部存储。用户状态通常由中央数据结构如字典管理或依赖 Redis 等外部存储。分布式内置。每个 Actor 自带状态天然支持持久化和恢复。并发模型竞争式。多个任务通过asyncio并发执行但共享全局状态时仍需小心锁竞争。隔离式。每个 Actor 独立运行通过消息通信从根本上避免了锁和竞态条件。扩展性垂直扩展。受限于单进程水平扩展需引入额外组件如 Redis 做分布式锁。水平扩展。Actor 的位置透明可以透明地分布在单机多线程、多进程乃至多台服务器上。容错性脆弱。一个任务的崩溃可能影响整个服务。健壮。一个 Actor 的故障可以被封装和隔离不会导致系统整体崩溃。实现复杂度低。概念简单上手快。中高。需要理解 Actor 模型及其生命周期管理学习曲线相对陡峭。在 Python 中落地主流框架一览在 Python 生态中有几个基于 Actor 模型的优秀框架可以帮你快速落地ai-query一个统一的 Python SDK能将 AI 模型直接转化为有状态的 Actor。它原生支持多用户路由、持久化存储SQLite, Redis和类型安全的 RPC 调用是构建生产级分布式 AI Agent 的强力工具。actor-ai构建在成熟的 Actor 模型框架Pykka之上。每个 Agent 运行在独立线程中并提供了自然语言指令、工具调用、长期记忆等开箱即用的 AI 功能。Agency一个极简、灵活的 Actor 模型框架。它通过action装饰器暴露智能体的能力并支持通过 AMQP 扩展到分布式环境适合需要高度定制化多智能体协作的场景。Wactorz一个异步、可在运行时动态生成 Agent的框架。其核心思想是由 LLM 作为编排器在运行时根据自然语言指令**动态生成并启动新的 Agent 代码非常灵活。Thespian一个经典的 Python Actor 模型框架适用于构建分布式应用如何使用ai-query快速开始以功能全面的ai-query为例构建一个多用户 AI 服务非常简单。1. 定义一个有状态的 Agent你只需定义一个继承自Agent的类并指定存储后端如 SQLiteAgent 就会自动管理对话历史。importasynciofromai_query.agentsimportAgent,SQLiteStoragefromai_query.providersimportopenai创建一个具有持久化存储的 AgentagentAgent(my-assistant,modelopenai(gpt-4o),storageSQLiteStorage(agents.db)# 状态自动保存到 SQLite)asyncdefmain():asyncwithagent:responseawaitagent.chat(Hi, Im Alice!)print(response)# 输出: Hello Alice! How can I help you today?responseawaitagent.chat(Whats my name?)print(response)# 输出: Your name is Alice.2. 启动一个多用户服务器使用AgentServer你可以轻松地将你的 Agent 类部署为一个支持多用户路由的 Web 服务。fromai_query.agentsimportAgent,AgentServerfromai_query.providersimportgoogleclassUserAssistant(Agent):def__init__(self,id):super().__init__(id,modelgoogle(gemini-2.0-flash),systemYou are a personal assistant.)启动服务器自动为每个用户路由到独立的 Agent 实例AgentServer(UserAssistant).serve(port8080)之后你就可以通过http://your-server/agent/{user_id}/chat这样的接口来为不同用户提供服务了。对比模式一信号量内存字典特性模式一手动实现模式二ai-query实现状态隔离手动用user_id作为字典键需自行加锁框架自动路由到独立 Actor无需额外加锁持久化需自己实现存储逻辑如写文件或 Redis只需指定storage自动保存并发控制手动用 Semaphore 限制全局并发Actor 框架内部可通过配置或 actor 池控制并发扩展性单进程水平扩展需改造支持分布式部署如通过 Redis 共享 Actor 路由开发复杂度低但需处理很多细节高一层抽象但框架封装了大部分复杂性实际使用时的注意事项环境变量使用openai或googleprovider 时需要在环境变量中设置对应的 API Key如OPENAI_API_KEY。存储位置agents.db会生成在当前目录多个 Agent 实例可以共享同一个数据库文件通过id区分。服务器路由启动后你可以通过如下方式调用curl-XPOST http://localhost:8080/agent/alice/chat\-HContent-Type: application/json\-d{message: Hi!}其中alice就是user_id服务器会为alice维护独立的会话。模式三分布式任务队列引入 Redis/RabbitMQ Celery实现流量削峰、高可用和水平扩展。适合企业级场景。