Python的多线程是通过threading
模块实现的,它允许你创建线程并行执行任务。
多线程适用于I/O密集型任务,如网络请求或文件操作。由于Python的全局解释器锁(GIL),多线程可能不适用于CPU密集型任务。
关键特性
-
全局解释器锁(GIL):Python的GIL确保同一时刻只有一个线程执行Python字节码,这意味着在CPU密集型任务中,多线程可能不会带来性能提升。
-
I/O密集型任务:多线程非常适合I/O密集型任务,如网络请求、文件读写等,因为线程可以在等待I/O操作完成时被挂起,让其他线程运行。
-
线程安全:由于线程共享内存空间,需要小心处理共享数据,避免竞态条件和数据不一致。
-
同步机制:可以使用锁(
Lock
)、事件(Event
)、条件(Condition
)和信号量(Semaphore
)等同步原语来控制线程间的交互。
import time
import threading# 定义线程要执行的函数
def print_numbers():for i in range(1, 6):time.sleep(1)print(i)# 创建线程
thread = threading.Thread(target=print_numbers)# 启动线程
thread.start()# 等待线程完成
thread.join()print("线程执行完毕")
线程同步
在Python中,多线程同步机制是确保多个线程在访问共享资源时保持一致性和防止数据竞争的关键工具。
1. Lock(锁)
Lock
是最基本的同步原语,用于防止多个线程同时访问共享资源。
import threading# 创建一个锁
lock = threading.Lock()def critical_section():with lock: # 使用with语句自动获取和释放锁# 临界区代码,只有获取到锁的线程可以执行pass# 创建线程
thread1 = threading.Thread(target=critical_section)
thread2 = threading.Thread(target=critical_section)# 启动线程
thread1.start()
thread2.start()# 等待线程完成
thread1.join()
thread2.join()
2. RLock(可重入锁)
RLock
(Reentrant Lock)允许一个线程多次获取锁。
import threading# 创建一个可重入锁
rlock = threading.RLock()def f():with rlock:print(1)with rlock:print(2)f()
3. Semaphore(信号量)
Semaphore
用于限制对共享资源的访问数量。
import threading# 创建一个信号量,最大值为2
semaphore = threading.Semaphore(2)def access_resource():with semaphore:# 访问共享资源pass# 创建线程
threads = [threading.Thread(target=access_resource) for _ in range(5)]# 启动线程
for thread in threads:thread.start()# 等待线程完成
for thread in threads:thread.join()
4. Event(事件)
Event
用于线程间的通信,一个线程等待事件被设置,其他线程在某个条件满足时设置事件。
import threading# 创建一个事件
event = threading.Event()def wait_for_event():event.wait() # 等待事件被设置print("事件被设置")def set_event():print("设置事件")event.set()# 创建线程
thread1 = threading.Thread(target=wait_for_event)
thread2 = threading.Thread(target=set_event)# 启动线程
thread1.start()
thread2.start()# 等待线程完成
thread1.join()
thread2.join()
5. Condition(条件)
Condition
用于更复杂的线程同步场景,允许一个或多个线程等待,直到被通知。
import threading# 创建一个条件变量
condition = threading.Condition()def wait_for_condition():with condition:condition.wait() # 等待条件被通知print("条件被通知")def notify_condition():with condition:print("通知条件")condition.notify() # 通知一个等待的线程condition.notify_all() # 通知所有等待的线程# 创建线程
thread1 = threading.Thread(target=wait_for_condition)
thread2 = threading.Thread(target=notify_condition)# 启动线程
thread1.start()
thread2.start()# 等待线程完成
thread1.join()
thread2.join()
6. Barrier(屏障)
Barrier
用于让一定数量的线程在继续执行之前等待。
import threading# 创建一个屏障,需要3个线程到达后才能继续
barrier = threading.Barrier(3)def do_work(n):print(f"线程 {n} 开始工作")barrier.wait() # 等待其他线程print(f"线程 {n} 继续工作")# 创建线程
threads = [threading.Thread(target=do_work, args=(i,)) for i in range(3)]# 启动线程
for thread in threads:thread.start()# 等待线程完成
for thread in threads:thread.join()
线程池
Python中的线程池是通过concurrent.futures
模块中的ThreadPoolExecutor
类实现的。
线程池是一种执行器(Executor),它维护着一个线程池,允许你并发地执行多个线程任务。
from concurrent.futures import ThreadPoolExecutordef task(n):print(f"处理任务 {n}")return n * n# 创建线程池并提交任务
with ThreadPoolExecutor(max_workers=5) as executor:# submit方法提交任务并获取Future对象future = executor.submit(task, 10)# 等待任务完成并获取结果result = future.result()print(f"任务结果:{result}")# 并发执行多个任务,返回一个迭代器
with ThreadPoolExecutor(max_workers=5) as executor:# 并发执行多个任务results = executor.map(task, [1, 2, 3, 4, 5])for result in results:print(f"任务结果:{result}")
高级用法
-
as_completed
:返回一个迭代器,按照任务完成的顺序产生Future对象。from concurrent.futures import as_completedwith ThreadPoolExecutor(max_workers=5) as executor:futures = [executor.submit(task, i) for i in range(10)]for future in as_completed(futures):print(future.result())
-
wait
:等待多个Future对象中的任意一个或全部完成。from concurrent.futures import waitwith ThreadPoolExecutor(max_workers=5) as executor:futures = [executor.submit(task, i) for i in range(10)]done, _ = wait(futures, return_when='ALL_COMPLETED')for future in done:print(future.result())
- 线程安全:共享资源时需要考虑线程安全,使用锁或其他同步机制。
- 资源限制:创建过多的线程可能会导致资源耗尽。
- 调试难度:多线程程序的调试通常比单线程程序更复杂,因为需要考虑线程间的交互和状态。
多线程使用
- 避免死锁:确保在代码中不会出现死锁的情况,例如,避免在持有一个锁的同时尝试获取另一个锁。
- 资源限制:创建大量线程可能会导致资源耗尽,因为每个线程都需要一定的内存和系统资源。
- 调试难度:多线程程序的调试通常比单线程程序更复杂,因为需要考虑线程间的交互和状态。