python: Worker Pool Pattern

📅 2026/6/25 14:42:03
python: Worker Pool Pattern
项目经构Worker Pool工作池是控制并发的经典模式固定数量的「工人」线程 / 进程不随任务数量无限创建工人从「任务队列」中领取任务执行核心价值限制并发数、优化资源、稳定可控、优雅关闭# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:20 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : settings.py 全局配置文件所有可变参数统一管理 符合企业级配置与代码分离 import os # 工作池配置 WORKER_COUNT 3 # 固定生产线数量可动态修改 TASK_QUEUE_MAXSIZE 100 # 任务队列最大长度 # 业务模拟配置 SIMULATE_TASK_DELAY True # 是否模拟任务耗时 RANDOM_DELAY_RANGE (0.5, 2.5) # 模拟IO延迟 # 日志配置 LOG_LEVEL INFO LOG_FORMAT %(asctime)s - %(levelname)s - %(message)s # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:20 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : exceptions.py 统一异常体系职责单一 专门处理工作池 业务异常 class WorkerPoolException(Exception): 工作池基础异常 pass class TaskExecutionException(WorkerPoolException): 任务执行失败异常 pass class TaskNotFoundException(WorkerPoolException): 任务不存在异常 pass # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:21 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logger.py 日志工具全局唯一日志实例 职责单一只做日志输出 import logging from WorkerPoolPattern.config.settings import LOG_LEVEL, LOG_FORMAT def get_logger(name: str jewelry_production) - logging.Logger: logger logging.getLogger(name) logger.setLevel(LOG_LEVEL) if not logger.handlers: handler logging.StreamHandler() # Windows 必加解决中文/编码报错 handler.stream.reconfigure(encodingutf-8) formatter logging.Formatter(LOG_FORMAT) handler.setFormatter(formatter) logger.addHandler(handler) return logger # 全局可用的日志对象 logger get_logger() # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:23 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : worker.py 核心工作池通用、可扩展、线程安全 职责单一只管理工人 任务队列 不耦合任何珠宝业务可直接复用于其他项目 import queue import threading from typing import Callable, Any from WorkerPoolPattern.utils.logger import logger from WorkerPoolPattern.core.exceptions import TaskExecutionException class Worker(threading.Thread): 单个工作者无限从队列取任务执行 def __init__(self, task_queue: queue.Queue, worker_id: int): super().__init__(daemonTrue) self.queue task_queue self.worker_id worker_id self.name fWorker-{worker_id} def run(self): :return: logger.info(f工人【{self.worker_id}】已启动等待任务...) while True: try: task_func, task_id, args, kwargs self.queue.get() logger.info(f工人【{self.worker_id}】领取任务【{task_id}】) try: task_func(*args, **kwargs) logger.info(f工人【{self.worker_id}】完成任务【{task_id}】) except Exception as e: logger.error(f任务【{task_id}】执行失败{str(e)}) raise TaskExecutionException(f任务执行异常{e}) from e finally: self.queue.task_done() except Exception as e: logger.error(f工人【{self.worker_id}】异常{str(e)}) class WorkerPool: 工作池管理器职责单一只负责启动/管理/等待工人 def __init__(self, worker_count: int, queue_maxsize: int 0): self.worker_count worker_count self.task_queue queue.Queue(maxsizequeue_maxsize) self.workers: list[Worker] [] def start(self): 启动所有工作者 :return: logger.info(f启动工作池共 {self.worker_count} 个工作者) for i in range(1, self.worker_count 1): worker Worker(self.task_queue, i) worker.start() self.workers.append(worker) def submit(self, task_id: str, task_func: Callable, *args, **kwargs) - None: 提交任务到队列通用接口 :param task_id: :param task_func: :param args: :param kwargs: :return: self.task_queue.put((task_func, task_id, args, kwargs)) def wait_completion(self): 等待所有任务执行完成优雅关闭 :return: self.task_queue.join() logger.info(所有任务已完成工作池优雅关闭)# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:25 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : check_tasks.py 原料质检 / 成品质检 import time import random from WorkerPoolPattern.config.settings import RANDOM_DELAY_RANGE from WorkerPoolPattern.utils.logger import logger def raw_material_check(order_id: str): :param order_id: :return: logger.info(f[{order_id}] 原料质检钻石4C、真伪、纯度检测) time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) def finished_goods_check(order_id: str): :param order_id: :return: logger.info(f[{order_id}] 成品质检工艺、成色、尺寸、镶口精度全检) time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:27 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : process_tasks.py 首饰加工、设计、镶嵌 import time import random from WorkerPoolPattern.config.settings import RANDOM_DELAY_RANGE from WorkerPoolPattern.utils.logger import logger def jewelry_process(order_id: str): :param order_id: :return: logger.info(f[{order_id}] 首饰加工设计 → 执模 → 镶嵌 → 抛光 → 电金) time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:28 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logistics_tasks.py 库存录入、物流发货 import time import random from WorkerPoolPattern.config.settings import RANDOM_DELAY_RANGE from WorkerPoolPattern.utils.logger import logger def inventory_record(order_id: str): :param order_id: :return: logger.info(f[{order_id}] 库存录入生成唯一防伪码、ERP入库、标签打印) time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) def order_delivery(order_id: str): :param order_id: :return: logger.info(f[{order_id}] 订单发货物流下单、保价、短信通知) time.sleep(random.uniform(*RANDOM_DELAY_RANGE)) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:29 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : production_service.py 珠宝生产业务服务 只负责接收订单 → 编排任务 → 提交到工作池 from WorkerPoolPattern.tasks import JEWELRY_FULL_PROCESS from WorkerPoolPattern.core.worker import WorkerPool from WorkerPoolPattern.utils.logger import logger class JewelryProductionService(object): 业务编排层职责单一 def __init__(self, pool: WorkerPool): self.pool pool # 依赖注入解耦 def create_order_task(self, order_id: str): 为单个订单创建全流程任务链 :param order_id: :return: logger.info(f 开始创建订单【{order_id}】的全流程任务) for task_name, task_func in JEWELRY_FULL_PROCESS: task_id f{order_id}-{task_name} self.pool.submit(task_id, task_func, order_id) def batch_create_orders(self, order_count: int): 批量创建订单任务 :param order_count: :return: logger.info(f 开始批量创建 {order_count} 个珠宝生产订单) for i in range(1, order_count 1): order_id f珠宝订单-{i:03d} self.create_order_task(order_id)调用# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Worker Pool Pattern 工作池模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/21 8:31 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : WorkerPoolBll.py 项目主入口 职责单一只负责启动 → 执行业务 → 等待结束 from WorkerPoolPattern.config.settings import WORKER_COUNT, TASK_QUEUE_MAXSIZE from WorkerPoolPattern.core.worker import WorkerPool from WorkerPoolPattern.service.production_service import JewelryProductionService from WorkerPoolPattern.utils.logger import logger class WorkerPoolBll(object): def demo(self): :return: logger.info( * 60) logger.info(珠宝企业级生产系统启动Worker Pool 模式) logger.info( * 60) pool WorkerPool( worker_countWORKER_COUNT, queue_maxsizeTASK_QUEUE_MAXSIZE ) pool.start() production_service JewelryProductionService(pool) production_service.batch_create_orders(order_count10) pool.wait_completion() logger.info(珠宝生产系统全部执行完成)输出