Python asyncio 并发模式:从协程原理到 Rust 开发者的思维转换

📅 2026/6/15 19:51:58
Python asyncio 并发模式:从协程原理到 Rust 开发者的思维转换
Python asyncio 并发模式从协程原理到 Rust 开发者的思维转换一、Rust 开发者看 Python asyncio相似但不同我学 Rust 的 Tokio 之前先学了 Python 的 asyncio当时觉得两者差不多——都是事件循环 协程。后来深入对比才发现虽然概念相似实现差异很大。Python 的协程是基于生成器的运行时动态调度Rust 的 Future 是编译期状态机零成本抽象。最大的思维差异在于错误处理。Python 用 try/except协程里的异常会冒泡到事件循环Rust 用 Result编译器强制你处理每个可能的错误。在 asyncio 中忘记捕获异常整个事件循环可能崩溃在 Tokio 中JoinError 会被类型系统捕获。另一个差异是取消机制。Python 的协程取消通过asyncio.CancelledError实现但协程可以捕获并忽略这个异常Rust 的CancellationToken是协作式的Future 必须主动检查取消信号。Python 的方式更灵活但更危险Rust 的方式更安全但需要更多样板代码。二、asyncio 的底层机制事件循环与协程调度Python asyncio 的核心是事件循环Event Loop。事件循环不断从就绪队列中取出协程执行遇到 IO 操作时注册回调IO 完成后把协程放回就绪队列。协程的切换发生在await点和 Rust 的.await类似。flowchart TB A[事件循环 Event Loop] -- B[就绪队列br/Ready Queue] A -- C[IO 多路复用br/epoll/kqueue] A -- D[定时器队列br/Timer Heap] B -- E[取出就绪协程执行] E -- F{遇到 await} F --|IO 操作| G[注册回调到 epoll] F --|sleep| H[加入定时器队列] F --|完成| I[返回结果] G --|IO 完成| J[回调将协程加入就绪队列] H --|超时| J J -- B subgraph 协程状态 K[创建: coroutine object] L[运行: 正在执行] M[挂起: 等待 IO/定时器] N[完成: 返回结果] O[取消: CancelledError] end K -- L -- M -- N M -- O subgraph 与 Tokio 的对比 P[Python: 动态调度br/运行时决定] Q[Rust: 静态状态机br/编译期生成] R[Python: 绿色线程br/无 Send 约束] S[Rust: Send staticbr/编译期检查] end关键区别Python 的协程是懒的——创建后不会执行必须被 await 或调度。Rust 的 Future 也是懒的但通过.await驱动状态机转换。Python 的开销在运行时动态调度Rust 的开销在编译期代码膨胀。三、生产级代码实现asyncio 并发模式3.1 并发任务编排import asyncio from dataclasses import dataclass, field from typing import List, Optional, Any import time dataclass class TaskResult: 任务执行结果 task_name: str success: bool data: Any None error: Optional[str] None duration_ms: float 0.0 class ConcurrentRunner: 并发任务编排器 def __init__(self, max_concurrency: int 10): # 为什么限制并发数Python 的 asyncio # 虽然是协作式调度但并发量过大 # 会导致事件循环调度开销增加 # 内存占用上升10 是一个经验值 # 适用于大多数 IO 密集场景 self.semaphore asyncio.Semaphore( max_concurrency) async def run_with_limit( self, coro, name: str ) - TaskResult: 带并发限制的任务执行 async with self.semaphore: start time.perf_counter() try: result await coro duration (time.perf_counter() - start) * 1000 return TaskResult( task_namename, successTrue, dataresult, duration_msround(duration, 2)) except asyncio.CancelledError: # 取消不是错误直接传播 raise except Exception as e: duration (time.perf_counter() - start) * 1000 return TaskResult( task_namename, successFalse, errorstr(e), duration_msround(duration, 2)) async def gather_with_limit( self, coros: List, names: List[str] ) - List[TaskResult]: 带并发限制的批量执行 tasks [ self.run_with_limit(coro, name) for coro, name in zip(coros, names) ] # gather 的 return_exceptionsTrue # 确保单个任务失败不影响其他任务 # 为什么用 return_exceptions # 默认情况下 gather 会在任意 # 任务抛异常时立即取消其他任务 # return_exceptionsTrue 让 # 异常作为结果返回 results await asyncio.gather( *tasks, return_exceptionsTrue) return [ r if isinstance(r, TaskResult) else TaskResult( task_namenames[i], successFalse, errorstr(r)) for i, r in enumerate(results) ] async def first_completed( self, coros: List, names: List[str], timeout: float 5.0 ) - TaskResult: 竞速执行返回最先完成的结果 tasks { asyncio.create_task( self.run_with_limit(coro, name), namename ): name for coro, name in zip(coros, names) } try: done, pending await asyncio.wait( tasks.keys(), timeouttimeout, return_whenasyncio.FIRST_COMPLETED) # 取消未完成的任务 # 为什么必须取消未取消的 # 任务会继续占用资源 # 即使结果不再需要 for task in pending: task.cancel() # 等待取消完成 await asyncio.gather( *pending, return_exceptionsTrue) # 返回第一个完成的结果 for task in done: return task.result() except asyncio.TimeoutError: # 全部超时 for task in tasks: task.cancel() return TaskResult( task_nametimeout, successFalse, errorf全部任务超时 {timeout}s) return TaskResult( task_namenone, successFalse, error无任务完成)3.2 生产者-消费者模式import asyncio from typing import AsyncIterator class AsyncProducerConsumer: 异步生产者-消费者模式 def __init__( self, queue_size: int 100, consumer_count: int 3 ): # 为什么限制队列大小无界队列 # 在生产速度大于消费速度时会 # 导致内存溢出有界队列在满时 # 阻塞生产者实现自然背压 self.queue asyncio.Queue(maxsizequeue_size) self.consumer_count consumer_count self._stopped False async def producer( self, source: AsyncIterator ): 生产者从数据源读取并放入队列 try: async for item in source: if self._stopped: break # 队列满时自动背压 await self.queue.put(item) finally: # 发送结束信号 # 为什么用 None 作为哨兵 # 每个消费者需要一个哨兵 # 才能正常退出循环 for _ in range(self.consumer_count): await self.queue.put(None) async def consumer( self, consumer_id: int, process_fn ): 消费者从队列取出并处理 while True: item await self.queue.get() if item is None: # 收到结束信号 self.queue.task_done() break try: await process_fn(item, consumer_id) except Exception as e: # 消费者不能因单个 # 处理失败而退出 print(f消费者 {consumer_id} f处理失败: {e}) finally: self.queue.task_done() async def run( self, source: AsyncIterator, process_fn ): 运行生产者-消费者 producer_task asyncio.create_task( self.producer(source)) consumer_tasks [ asyncio.create_task( self.consumer(i, process_fn)) for i in range(self.consumer_count) ] # 等待生产者完成 await producer_task # 等待所有消费者完成 await asyncio.gather(*consumer_tasks)3.3 超时与取消处理import asyncio async def with_timeout( coro, timeout: float, task_name: str unnamed ): 带超时的协程执行 try: # 为什么用 wait_for 而非手动计时 # wait_for 在超时时自动取消任务 # 释放资源手动计时需要自己 # 管理任务取消和清理 result await asyncio.wait_for( coro, timeouttimeout) return result except asyncio.TimeoutError: print(f任务 {task_name} 超时 f({timeout}s)) raise except asyncio.CancelledError: # 取消时的清理逻辑 # 为什么单独捕获 CancelledError # 取消是正常的控制流 # 不是错误需要做清理 # 但不应该记录为错误 print(f任务 {task_name} 被取消) raise async def graceful_shutdown( tasks: List[asyncio.Task], timeout: float 5.0 ): 优雅关闭所有任务 # 发送取消请求 for task in tasks: task.cancel() # 等待任务响应取消 # 为什么给超时有些任务可能 # 不响应取消如 C 扩展中的 # 阻塞调用超时后强制放弃 results await asyncio.gather( *tasks, return_exceptionsTrue) for i, result in enumerate(results): if isinstance(result, Exception) and \ not isinstance(result, asyncio.CancelledError): print(f任务 {i} 关闭异常: {result})四、asyncio 的边界与 Rust Tokio 的关键差异性能差距Python asyncio 的单次协程切换开销约 1μsRust Tokio 约 0.1μs。在高并发场景下10 万 协程这个差距会累积。Python 的 GIL 虽然在 IO 密集场景下不阻塞但 CPU 密集操作会锁住整个解释器。类型安全Python 的协程没有 Send/Sync 约束多协程共享可变状态不会编译报错只在运行时出问题。Rust 的 Send static 约束在编译期就排除了数据竞争。取消语义Python 的取消通过抛出 CancelledError 实现协程可以捕获并忽略。Rust 的取消是协作式的Future 必须主动检查。Python 的方式更灵活但更危险——忽略取消会导致资源泄漏。调试难度Python 的异步调用栈比同步深得多异常信息难以追踪。Rust 的编译器错误虽然难读但至少在编译期就暴露了问题。五、总结Python asyncio 和 Rust Tokio 的核心概念相似事件循环 协程但实现和约束差异很大。Python 更灵活但更危险Rust 更安全但更严格。从 Rust 回来看 asyncio最需要注意三点用 Semaphore 控制并发数用 return_exceptionsTrue 防止级联失败用 CancelledError 做清理而非忽略。asyncio 的优势是生态成熟、开发速度快劣势是性能和类型安全。如果你的项目对性能和可靠性要求高Rust 是更好的选择如果追求开发速度Python asyncio 足够用。