66_Python多线程与并发

📅 2026/6/30 17:56:21
66_Python多线程与并发
Python多线程与并发编程突破GIL限制的实用指南文章目录Python多线程与并发编程突破GIL限制的实用指南前言一、线程基础1.1 创建并启动线程1.2 继承Thread类二、守护线程Daemon Thread三、线程同步Lock 锁机制3.1 为什么需要锁3.2 使用Lock保护共享资源3.3 可重入锁RLock四、线程间通信4.1 Event 事件4.2 Condition 条件变量五、线程池concurrent.futures六、理解GIL全局解释器锁6.1 GIL是什么6.2 绕过GIL的方案七、实战案例并发下载器总结✅ 亮点总结适用场景扩展方向前言在当今多核CPU普及的时代并发编程已成为提升程序性能的关键手段。Python提供了threading模块来支持多线程编程但由于**全局解释器锁GIL**的存在Python多线程又有其独特的限制和适用场景。本文将带你深入理解Python多线程的核心概念、实际应用以及如何正确使用线程池和锁机制。为什么GIL话题如此重要几乎所有Python面试都会问到GIL因为它是理解Python性能瓶颈的关键。但GIL并非一无是处——在I/O密集型场景如网络请求、文件读写中多线程依然能带来显著的性能提升。本文不仅会讲是什么更会深入为什么和怎么办帮你在面试和实战中都游刃有余。一、线程基础1.1 创建并启动线程Python中有两种方式创建线程。两种方式各有优劣直接使用Thread类更加灵活适合简单的任务函数继承Thread类则更好地封装了线程的状态和行为适合复杂的多线程应用。实际选型建议如果只是并发执行几个独立的任务直接用Thread类即可如果需要在线程中维护复杂的状态且多个线程需要共享同一套逻辑则推荐继承方式。importthreadingimporttime# 方式一直接使用Thread类defworker(name,delay):工作线程函数foriinrange(3):time.sleep(delay)print(f[{name}] 第{i1}次执行 ——{time.strftime(%H:%M:%S)})# 创建线程t1threading.Thread(targetworker,args(线程A,1))t2threading.Thread(targetworker,args(线程B,0.5))# 启动线程t1.start()t2.start()# 等待线程完成t1.join()t2.join()print(所有线程执行完毕)1.2 继承Thread类classDownloadThread(threading.Thread):下载线程类def__init__(self,url,filename):super().__init__()self.urlurl self.filenamefilenamedefrun(self):print(f开始下载:{self.url}-{self.filename})# 模拟下载过程time.sleep(2)print(f下载完成:{self.filename})# 使用threads[DownloadThread(http://example.com/a.jpg,a.jpg),DownloadThread(http://example.com/b.jpg,b.jpg),DownloadThread(http://example.com/c.jpg,c.jpg),]fortinthreads:t.start()fortinthreads:t.join()print(全部下载完毕)二、守护线程Daemon Thread守护线程是一种特殊线程当所有非守护线程结束时守护线程会自动终止。守护线程的典型应用场景包括后台日志写入、心跳检测、垃圾回收等辅助性工作。关键注意不要在守护线程中进行文件写入、数据库操作等关键任务——守护线程可能在操作中途被强制终止导致数据损坏。这也是面试中常见的问题。defbackground_task():后台持续运行的任务count0whileTrue:count1print(f后台运行中...{count})time.sleep(0.5)# 设置为守护线程daemonthreading.Thread(targetbackground_task,daemonTrue)daemon.start()# 主线程工作time.sleep(3)print(主线程结束守护线程也将自动终止)三、线程同步Lock 锁机制3.1 为什么需要锁多线程共享数据时如果不加保护会出现竞态条件Race Condition。这是并发编程中最经典的陷阱——多个线程同时读写同一变量导致结果不可预测。关键认知counter 1看起来像一次操作实际上它至少包含读取 → 加1 → 写回三步线程可能在任意步骤之间被切换从而导致数据错乱。这也是面试中高频出现的问题。# 危险示例没有锁的情况counter0defunsafe_increment():globalcounterfor_inrange(100000):counter1# 非原子操作threads[threading.Thread(targetunsafe_increment)for_inrange(10)]fortinthreads:t.start()fortinthreads:t.join()print(f预期结果: 1000000, 实际结果:{counter})# 实际结果往往小于1000000且每次运行结果不同3.2 使用Lock保护共享资源counter0lockthreading.Lock()defsafe_increment():globalcounterfor_inrange(100000):withlock:# 获取锁counter1# 离开with块自动释放锁threads[threading.Thread(targetsafe_increment)for_inrange(10)]fortinthreads:t.start()fortinthreads:t.join()print(f使用Lock后:{counter})# 10000003.3 可重入锁RLockrlockthreading.RLock()defrecursive_function(n):withrlock:ifn0:print(f递归层级:{n})recursive_function(n-1)# 同一线程可多次获取RLockrecursive_function(3)# 不会死锁四、线程间通信多线程编程中线程之间的协调和通信是核心挑战。Python提供了Event和Condition两种同步原语来优雅地解决这个问题。Event是最简单的线程通信方式——一个线程等待事件发生另一个线程触发事件Condition则更加灵活支持wait()和notify()模式可以实现典型的生产者-消费者模型。4.1 Event 事件eventthreading.Event()defwaiter():print(等待事件触发...)event.wait()# 阻塞直到事件被设置print(事件已触发继续执行)deftrigger():time.sleep(2)print(准备触发事件...)event.set()# 触发事件t_waiterthreading.Thread(targetwaiter)t_triggerthreading.Thread(targettrigger)t_waiter.start()t_trigger.start()t_waiter.join()t_trigger.join()4.2 Condition 条件变量importrandom conditionthreading.Condition()items[]defproducer():生产者生产数据foriinrange(5):time.sleep(random.uniform(0.5,1.5))withcondition:itemf产品-{i1}items.append(item)print(f生产:{item})condition.notify()# 通知等待的消费者defconsumer():消费者消费数据for_inrange(5):withcondition:whilenotitems:condition.wait()# 等待通知itemitems.pop(0)print(f消费:{item})pthreading.Thread(targetproducer)cthreading.Thread(targetconsumer)p.start()c.start()p.join()c.join()五、线程池concurrent.futures线程池是最实用的多线程工具它管理线程的创建、复用和销毁。手动管理线程创建 → 运行 → 销毁有显著的开销——创建线程需要分配栈空间、初始化内部数据结构等如果频繁创建和销毁线程这些开销可能超过实际计算时间。线程池通过复用已创建的线程解决了这个问题这也是生产环境中推荐使用ThreadPoolExecutor而非手动Thread的原因。fromconcurrent.futuresimportThreadPoolExecutor,as_completedimporturllib.request URLS[https://www.python.org,https://www.baidu.com,https://www.github.com,https://www.stackoverflow.com,https://www.example.com,]deffetch_url(url):模拟抓取URL实际使用时应导入urllibtime.sleep(1)# 模拟网络延迟returnf{url}: 200 OK (模拟)# 使用线程池withThreadPoolExecutor(max_workers3)asexecutor:# 方式一map 保持顺序resultsexecutor.map(fetch_url,URLS)forurl,resultinzip(URLS,results):print(f{url}-{result})print(-*40)# 方式二submit as_completed 按完成顺序处理futures{executor.submit(fetch_url,url):urlforurlinURLS}forfutureinas_completed(futures):urlfutures[future]try:resultfuture.result()print(f[最先完成]{url}-{result})exceptExceptionase:print(f{url}出错:{e})六、理解GIL全局解释器锁6.1 GIL是什么GILGlobal Interpreter Lock是CPython解释器中的互斥锁它确保同一时刻只有一个线程执行Python字节码。这意味着CPU密集型任务多线程无法利用多核甚至可能比单线程更慢因为线程切换有开销I/O密集型任务多线程仍有显著的性能提升I/O等待时会释放GILGIL的设计初衷Python的内存管理不是线程安全的——GIL作为全局锁大大简化了CPython的实现特别是内存管理和引用计数部分。它减少了CPython代码的复杂性并使其更不容易出错。虽然GIL被广泛诟病但它至今仍存在是因为移除它的代价极高已有多次尝试而替代方案Jython、IronPython、PyPy已经证明了无GIL的可行性。importmathdefcpu_bound_task():CPU密集型计算素数count0fornuminrange(2,50000):is_primeTrueforiinrange(2,int(math.sqrt(num))1):ifnum%i0:is_primeFalsebreakifis_prime:count1returncountdefbenchmark():# 单线程starttime.time()for_inrange(4):cpu_bound_task()single_timetime.time()-start# 4线程starttime.time()withThreadPoolExecutor(max_workers4)asexecutor:futures[executor.submit(cpu_bound_task)for_inrange(4)]forfinfutures:f.result()multi_timetime.time()-startprint(f单线程耗时:{single_time:.2f}s)print(f四线程耗时:{multi_time:.2f}s)print(f加速比:{single_time/multi_time:.2f}x)benchmark()# CPU密集型任务中多线程几乎没有加速效果6.2 绕过GIL的方案fromconcurrent.futuresimportProcessPoolExecutor# 方案一multiprocessing 用于CPU密集型defcpu_heavy_task(n):returnsum(i*iforiinrange(n))# 使用进程池绕过GILwithProcessPoolExecutor(max_workers4)asexecutor:resultsexecutor.map(cpu_heavy_task,[10**7]*4)print(list(results))# 方案二asyncio 用于I/O密集场景见下一篇文章# 方案三使用C扩展如numpy—— 底层C代码释放GIL七、实战案例并发下载器fromconcurrent.futuresimportThreadPoolExecutor,as_completedfromthreadingimportLockimporttimeimportrandomclassConcurrentDownloader:简易并发下载器def__init__(self,max_workers3):self.max_workersmax_workers self.lockLock()self.completed0self.failed0self.total0defdownload(self,task_id,url):模拟下载单个文件delayrandom.uniform(0.5,2.0)time.sleep(delay)# 模拟10%的失败率ifrandom.random()0.1:raiseConnectionError(f下载失败:{url})returnf文件{task_id}:{delay*100:.0f}KB 下载完成defprocess_tasks(self,urls):self.totallen(urls)metrics{start:time.time()}withThreadPoolExecutor(max_workersself.max_workers)asexecutor:futures{executor.submit(self.download,i,url):(i,url)fori,urlinenumerate(urls,1)}forfutureinas_completed(futures):task_id,urlfutures[future]try:resultfuture.result()withself.lock:self.completed1print(f[{self.completed}/{self.total}] ✓{result})exceptExceptionase:withself.lock:self.failed1print(f[{self.completedself.failed}/{self.total}] ✗{url}:{e})metrics[end]time.time()self._print_summary(metrics)def_print_summary(self,metrics):elapsedmetrics[end]-metrics[start]print(\n*50)print(下载统计)print(f总数:{self.total}, 成功:{self.completed}, 失败:{self.failed})print(f成功率:{self.completed/self.total*100:.1f}%)print(f总耗时:{elapsed:.2f}s)print(*50)# 使用urls[fhttps://cdn.example.com/file_{i}.zipforiinrange(12)]downloaderConcurrentDownloader(max_workers4)downloader.process_tasks(urls)总结Python多线程编程的关键要点I/O密集型任务使用多线程能显著提升性能网络请求、文件读写CPU密集型任务应使用multiprocessing进程池或C扩展Lock保护共享资源Event/Condition实现线程通信ThreadPoolExecutor是最推荐的线程池管理方式GIL不是Bug而是CPython的设计权衡理解它能帮你做出正确的并发方案选择下一篇我们将探索Python的异步编程asyncio看看协程如何在单线程中实现高并发。✅ 亮点总结清晰对比 Python 多线程的两种实现方式Thread类与ThreadPoolExecutor线程池深入解析 GIL全局解释器锁的工作原理及其对 I/O 密集与 CPU 密集型任务的影响完整讲解 Lock、RLock、Semaphore、Event 四类同步原语的使用场景实战案例并发下载器演示线程池 队列的生产者-消费者模型适用场景批量 API 请求同时调用多个外部接口显著缩短总响应时间日志写入服务多线程安全地将不同模块的日志写入同一文件定时任务调度多个后台任务并行执行如定时清理、数据同步扩展方向学习multiprocessing模块突破 GIL 限制实现真正的并行计算掌握concurrent.futures的 ProcessPoolExecutor对比线程池与进程池的性能差异探索 asyncio 协程编程理解单线程高并发的实现方式推荐阅读第67篇《异步编程asyncio》