实验目的与要求
(1)了解生产者/消费者问题。
(2)熟练使用 Python 标准库 threading 创建线程。
(3)熟练使用 Python 标准库 threading 中的 Condition 对象实现线程同步。
(4)理解使用列表模拟缓冲区的方法。
(5)理解缓冲区的重要性。
实验原理与内容
1、利用安装部署好的开发环境模拟多线程+Condition 对象模拟生产者/消费者问题。
2、编写程序,创建生产者线程和消费者线程以及大小为 5 的缓冲区。生产者每隔 1 至 3 秒钟就生产一个数字并放入缓冲区,如果缓冲区已满则等待;消费者每隔 1 至 3 秒就 从缓冲区里取出生产日期较早的数字进行消费,如果缓冲区已空就等待。
3、运行程序,观察并理解缓冲区内数字变化以及生产者和消费者线程之间的同步。
实验设备与软件环境
1、 Windows win7 / win8 / win10 操作系统。
2、 Visual Studio / Pycharm 及以上
实验过程与结果
1.定义 Producer 类、Consumer 类;
Producer类:class Producer:def __init__(self, name):self.name = namedef produce(self, item):print(f"{self.name} is producing {item}")# 示例用法producer1 = Producer("Producer 1")producer1.produce("Product A")
Consumer类:class Consumer:def __init__(self, name, age):self.name = nameself.age = agedef greet(self):print(f"Hello, my name is {self.name} and I'm {self.age} years old.")# 示例用法if __name__ == "__main__":# 创建一个 Consumer 实例consumer1 = Consumer("Alice", 30)# 调用 greet 方法consumer1.greet()
2.构造 run 函数,使用 sleep()、randint()函数来控制生产/消费的速度。
Producer类
import threading
import time
from random import randintclass Producer(threading.Thread):def __init__(self, buffer, max_items):threading.Thread.__init__(self)self.buffer = bufferself.max_items = max_itemsdef run(self):while True:# 生成一个随机数作为生产的物品item = randint(1, 100)# 模拟生产者生产物品的速度time.sleep(randint(1, 3))# 尝试向缓冲区中添加物品self.buffer.add_item(item)print(f"Produced item {item}. Buffer: {self.buffer}")# 如果达到了最大物品数量,则结束生产if self.buffer.size() >= self.max_items:print("Producer finished producing.")breakclass Buffer:def __init__(self):self. Items = []def add_item(self, item):self.items.append(item)def size(self):return len(self.items)if __name__ == "__main__":buffer = Buffer()max_items = 10producer = Producer(buffer, max_items)producer.start()
Consumer类
import time
from random import randintclass Consumer:def __init__(self, name):self.name = namedef run(self):while True:# 模拟消费过程print(f"{self.name} is consuming...")time.sleep(randint(1, 3)) # 随机休眠时间,模拟不同速度的消费print(f"{self.name} has finished consuming.")
3、在代码的后面还设置了停止线程。
import threading
import time
import randomclass Buffer:def __init__(self, capacity):self.capacity = capacityself.items = []self.lock = threading.Lock()def produce(self):while True:with self.lock:if len(self.items) < self.capacity:item = random.randint(1, 100)self.items.append(item)print(f"Produced: {item}")time.sleep(random.randint(1, 3))def consume(self):
while True:with self.lock:if self.items:item = self.items.pop(0)print(f"Consumed: {item}")time.sleep(random.randint(1, 3))def stop_threads(self):self.running = Falsebuffer = Buffer(capacity=5)producer_thread = threading.Thread(target=buffer.produce)
consumer_thread = threading.Thread(target=buffer.consume)producer_thread.start()
consumer_thread.start()time.sleep(30) # 让线程运行一段时间
buffer.stop_threads()
producer_thread.join()
consumer_thread.join()
4、调试及运行已设计编写好的代码,具体如调试运行所示。
import threading
import time
import randomclass Buffer:def __init__(self, capacity):self.capacity = capacityself. Items = []
self.lock = threading.Lock()def produce(self):while True:with self.lock:if len(self.items) < self.capacity:item = random.randint(1, 100)self.items.append(item)print(f"Produced: {item}")time.sleep(random.randint(1, 3))def consume(self):while True:with self.lock:if self.items:item = self.items.pop(0)print(f"Consumed: {item}")time.sleep(random.randint(1, 3))def stop_threads(self):self.running = Falsebuffer = Buffer(capacity=5)producer_thread = threading.Thread(target=buffer.produce)
consumer_thread = threading.Thread(target=buffer.consume)producer_thread.start()
consumer_thread.start()time.sleep(30) # 让线程运行一段时间
buffer.stop_threads()
producer_thread.join()
consumer_thread.join()