python: Bulkheads Pattern

📅 2026/6/26 23:52:54
python: Bulkheads Pattern
项目结构# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:15 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : settings.py # 舱壁资源配置各业务域独立线程池参数 BULKHEAD_CONFIG { inventory: { max_workers: 2, timeout: 3.0, thread_prefix: bulkhead-inventory }, order_pay: { max_workers: 3, timeout: 3.0, thread_prefix: bulkhead-order-pay }, logistics: { max_workers: 2, timeout: 3.0, thread_prefix: bulkhead-logistics } } # 业务常量 BUSINESS_TIMEOUT_MSG 请求超时舱壁隔离熔断 # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:14 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : exceptions.py class BulkheadTimeoutException(Exception): 舱壁执行超时异常 pass class BulkheadPoolFullException(Exception): 舱壁线程池耗尽异常 pass # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:15 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : bulkhead_executor.py import time from concurrent.futures import ThreadPoolExecutor, TimeoutError from functools import wraps from typing import Dict, Any from .exceptions import BulkheadTimeoutException class BulkheadExecutor: 舱壁执行器每个业务域独立实例隔离线程资源 def __init__(self, config: Dict[str, Any]): self.max_workers config[max_workers] self.timeout config[timeout] self.pool ThreadPoolExecutor( max_workersself.max_workers, thread_name_prefixconfig[thread_prefix] ) def execute(self, func, *args, **kwargs): 提交任务至独立舱壁线程池执行 :param func: :param args: :param kwargs: :return: try: future self.pool.submit(func, *args, **kwargs) return future.result(timeoutself.timeout) except TimeoutError: raise BulkheadTimeoutException(f执行超时 {self.timeout}s) except Exception as e: raise e def bulkhead_decorator(self): 装饰器绑定当前舱壁实例执行能力 :return: def decorator(target_func): wraps(target_func) def wrapper(*args, **kwargs): try: return self.execute(target_func, *args, **kwargs) except BulkheadTimeoutException as te: return {success: False, code: 504, msg: str(te)} except Exception as e: return {success: False, code: 500, msg: f业务异常: {str(e)}} return wrapper return decorator def shutdown(self, wait: bool True): 程序退出释放线程池资源 :param wait: :return: self.pool.shutdown(waitwait) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:17 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logger.py import logging import sys # 全局日志配置 logging.basicConfig( levellogging.INFO, format%(asctime)s [%(threadName)s] %(levelname)s | %(message)s, datefmt%Y-%m-%d %H:%M:%S, streamsys.stdout ) logger logging.getLogger(jewelry-bulkhead) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:17 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : inventory_service.py import time from BulkheadsPattern.config.settings import BULKHEAD_CONFIG from BulkheadsPattern.core.bulkhead_executor import BulkheadExecutor from BulkheadsPattern.common.logger import logger # 初始化独立库存舱壁 inventory_bulkhead BulkheadExecutor(BULKHEAD_CONFIG[inventory]) bulkhead inventory_bulkhead.bulkhead_decorator() class InventoryService: 珠宝库存服务钻石、黄金、铂金库存查询独立舱壁隔离 bulkhead def query_gem_stock(self, jewelry_id: str): 查询宝石类珠宝库存模拟阻塞故障场景 :param jewelry_id: :return: logger.info(f查询珠宝库存 ID: {jewelry_id}) # 模拟数据库慢查询/第三方接口卡死超过超时阈值 time.sleep(5) return { success: True, code: 200, data: { jewelry_id: jewelry_id, stock_num: 15, material: 钻石, store: 深圳总店 } } bulkhead def query_gold_stock(self, jewelry_id: str): 黄金素圈库存查询正常逻辑 :param jewelry_id: :return: logger.info(f查询黄金库存 ID: {jewelry_id}) time.sleep(0.3) return { success: True, code: 200, data: { jewelry_id: jewelry_id, stock_num: 32, material: 足金999, store: 全国门店总仓 } } # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:29 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : order_service.py import time from BulkheadsPattern.config.settings import BULKHEAD_CONFIG from BulkheadsPattern.core.bulkhead_executor import BulkheadExecutor from BulkheadsPattern.common.logger import logger # 订单支付独立舱壁 order_bulkhead BulkheadExecutor(BULKHEAD_CONFIG[order_pay]) bulkhead order_bulkhead.bulkhead_decorator() class OrderPayService: 珠宝订单与支付服务下单、收银、结算 bulkhead def create_order(self, order_no: str, amount: float, user_id: str): 创建珠宝订单 logger.info(f创建订单 {order_no} 用户:{user_id} 金额:{amount}) time.sleep(0.5) return { success: True, code: 200, data: { order_no: order_no, pay_amount: amount, status: 待支付 } } bulkhead def order_settle(self, order_no: str, pay_channel: str): 订单支付结算 :param order_no: :param pay_channel: :return: logger.info(f订单结算 {order_no} 渠道:{pay_channel}) time.sleep(0.4) return { success: True, code: 200, data: { order_no: order_no, pay_status: 已结清, transaction_id: fTX{hash(order_no) % 9999999} } } # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:31 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logistics_service.py import time from BulkheadsPattern.config.settings import BULKHEAD_CONFIG from BulkheadsPattern.core.bulkhead_executor import BulkheadExecutor from BulkheadsPattern.common.logger import logger # 物流独立舱壁 logistics_bulkhead BulkheadExecutor(BULKHEAD_CONFIG[logistics]) bulkhead logistics_bulkhead.bulkhead_decorator() class LogisticsService: 珠宝物流服务保价出库、快递揽收、物流跟踪 bulkhead def ship_jewelry(self, order_no: str, receiver_addr: str): 珠宝保价发货 :param order_no: :param receiver_addr: :return: logger.info(f订单{order_no}安排发货收件地址:{receiver_addr}) time.sleep(0.4) return { success: True, code: 200, data: { order_no: order_no, express_no: fSF{hash(order_no) % 1000000}, insured_value: 20000, status: 已揽收 } } bulkhead def track_express(self, express_no: str): 物流轨迹查询 :param express_no: :return: logger.info(f查询物流单号 {express_no}) time.sleep(0.2) return { success: True, code: 200, data: {express_no: express_no, location: 深圳转运中心} }调用# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看言語成了邀功盡責的功臣還需要行爲每日來值班嗎 # 描述Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:31 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : BulkheadsBll.py import threading import time from BulkheadsPattern.common.logger import logger from BulkheadsPattern.business.inventory.inventory_service import InventoryService, inventory_bulkhead from BulkheadsPattern.business.order_pay.order_service import OrderPayService, order_bulkhead from BulkheadsPattern.business.logistics.logistics_service import LogisticsService, logistics_bulkhead class BulkheadsBll(object): # 实例化各业务服务 inv_svc InventoryService() ord_svc OrderPayService() log_svc LogisticsService() def run_scene(self): 模拟珠宝门店高并发流量场景 logger.info( 珠宝行业舱壁模式压测开始 ) start time.time() task_list [ # 库存域会超时故障 threading.Thread(targetlambda: logger.info(self.inv_svc.query_gem_stock(D001))), threading.Thread(targetlambda: logger.info(self.inv_svc.query_gem_stock(D002))), # 库存正常接口 threading.Thread(targetlambda: logger.info(self.inv_svc.query_gold_stock(G001))), # 订单支付域 threading.Thread(targetlambda: logger.info(self.ord_svc.create_order(O2026062601, 16888, U10086))), threading.Thread(targetlambda: logger.info(self.ord_svc.create_order(O2026062602, 9999, U10087))), threading.Thread(targetlambda: logger.info(self.ord_svc.order_settle(O2026062601, 微信支付))), # 物流域 threading.Thread(targetlambda: logger.info(self.log_svc.ship_jewelry(O2026062601, 深圳福田珠宝大厦))), threading.Thread(targetlambda: logger.info(self.log_svc.track_express(SF12345678))), ] # 启动所有并发任务 for t in task_list: t.start() for t in task_list: t.join() cost round(time.time() - start, 2) logger.info(f 压测结束总耗时 {cost}s ) def run_example(self): :return: try: self.run_scene() finally: # 优雅释放所有舱壁线程池资源 inventory_bulkhead.shutdown() order_bulkhead.shutdown() logistics_bulkhead.shutdown() logger.info(所有舱壁线程池已安全关闭) staticmethod def demo(): bll BulkheadsBll() bll.run_example()输出