当前位置: 首页> 汽车> 行情 > 怎么开通个人网站_宁波网站推广渠道_seo是什么职位的简称_seo关键词搜索优化

怎么开通个人网站_宁波网站推广渠道_seo是什么职位的简称_seo关键词搜索优化

时间:2025/7/12 9:12:07来源:https://blog.csdn.net/c123728529/article/details/147193520 浏览次数: 0次
怎么开通个人网站_宁波网站推广渠道_seo是什么职位的简称_seo关键词搜索优化

以下是 Python 实现观察者模式的完整方案,包含同步/异步支持、类型注解、线程安全等特性:


1. 经典观察者模式实现

from abc import ABC, abstractmethod
from typing import List, Anyclass Observer(ABC):"""观察者抽象基类"""@abstractmethoddef update(self, subject: Any) -> None:passclass Subject:"""被观察对象基类"""def __init__(self):self._observers: List[Observer] = []def attach(self, observer: Observer) -> None:if observer not in self._observers:self._observers.append(observer)def detach(self, observer: Observer) -> None:try:self._observers.remove(observer)except ValueError:passdef notify(self) -> None:"""同步通知所有观察者"""for observer in self._observers:observer.update(self)# 使用示例
class TemperatureSensor(Subject):"""具体被观察者:温度传感器"""def __init__(self):super().__init__()self._temperature = 0.0@propertydef temperature(self) -> float:return self._temperature@temperature.setterdef temperature(self, value: float) -> None:self._temperature = valueself.notify()  # 温度变化时通知观察者class Display(Observer):"""具体观察者:显示屏"""def update(self, subject: TemperatureSensor) -> None:print(f"当前温度: {subject.temperature}°C")# 客户端代码
sensor = TemperatureSensor()
display = Display()
sensor.attach(display)sensor.temperature = 25.5  # 输出: 当前温度: 25.5°C

2. 线程安全增强版

import threading
from typing import List, Anyclass ThreadSafeSubject:"""线程安全的被观察对象"""def __init__(self):self._observers: List[Observer] = []self._lock = threading.RLock()def attach(self, observer: Observer) -> None:with self._lock:if observer not in self._observers:self._observers.append(observer)def detach(self, observer: Observer) -> None:with self._lock:try:self._observers.remove(observer)except ValueError:passdef notify(self) -> None:"""线程安全的通知"""with self._lock:observers = self._observers.copy()for observer in observers:observer.update(self)

3. 异步观察者模式

import asyncio
from abc import ABC, abstractmethod
from typing import List, Anyclass AsyncObserver(ABC):"""异步观察者接口"""@abstractmethodasync def update(self, subject: Any) -> None:passclass AsyncSubject:"""支持异步通知的被观察对象"""def __init__(self):self._observers: List[AsyncObserver] = []def attach(self, observer: AsyncObserver) -> None:if observer not in self._observers:self._observers.append(observer)async def notify(self) -> None:"""异步通知所有观察者"""await asyncio.gather(*[observer.update(self) for observer in self._observers])# 使用示例
class AsyncTemperatureSensor(AsyncSubject):def __init__(self):super().__init__()self._temp = 0.0async def set_temperature(self, value: float) -> None:self._temp = valueawait self.notify()class CloudLogger(AsyncObserver):async def update(self, subject: AsyncTemperatureSensor) -> None:print(f"云端记录温度: {subject._temp}°C")await asyncio.sleep(0.1)  # 模拟网络请求async def main():sensor = AsyncTemperatureSensor()sensor.attach(CloudLogger())await sensor.set_temperature(28.5)  # 输出: 云端记录温度: 28.5°Casyncio.run(main())

4. 事件总线实现(发布-订阅模式)

from typing import Dict, List, Callable, Any
import inspectclass EventBus:"""事件总线(高级观察者模式)"""_instance = Nonedef __new__(cls):if not cls._instance:cls._instance = super().__new__(cls)cls._instance._subscriptions: Dict[str, List[Callable]] = {}return cls._instancedef subscribe(self, event_type: str, callback: Callable) -> None:if not inspect.iscoroutinefunction(callback):callback = self._sync_to_async(callback)if event_type not in self._subscriptions:self._subscriptions[event_type] = []self._subscriptions[event_type].append(callback)async def publish(self, event_type: str, **data) -> None:if event_type in self._subscriptions:await asyncio.gather(*[callback(**data) for callback in self._subscriptions[event_type]])@staticmethoddef _sync_to_async(func: Callable) -> Callable:async def wrapper(*args, **kwargs):return func(*args, **kwargs)return wrapper# 使用示例
bus = EventBus()@bus.subscribe("temperature_change")
async def log_temp_change(value: float):print(f"温度变化记录: {value}°C")async def trigger_events():await bus.publish("temperature_change", value=30.0)asyncio.run(trigger_events())  # 输出: 温度变化记录: 30.0°C

5. 带过滤器的观察者模式

from typing import Callable, Anyclass FilteredObserver:"""带条件过滤的观察者"""def __init__(self, callback: Callable, filter_condition: Callable[[Any], bool]):self.callback = callbackself.filter = filter_conditiondef update(self, subject: Any) -> None:if self.filter(subject):self.callback(subject)# 使用示例
sensor = TemperatureSensor()def alert(temp: float):print(f"警报!当前温度过高: {temp}°C")# 只接收温度>30的通知
high_temp_observer = FilteredObserver(callback=alert,filter_condition=lambda s: s.temperature > 30
)sensor.attach(high_temp_observer)
sensor.temperature = 25  # 无输出
sensor.temperature = 35  # 输出: 警报!当前温度过高: 35°C

方案对比

实现方式特点适用场景
经典实现简单直接单线程简单场景
线程安全版避免竞态条件多线程环境
异步实现非阻塞通知I/O密集型应用
事件总线松耦合,支持多对多复杂事件系统
过滤观察者条件触发需要选择性通知的场景

最佳实践建议

  1. 生命周期管理

    # 使用上下文管理器自动取消注册
    class ObserverContext:def __init__(self, subject: Subject, observer: Observer):self.subject = subjectself.observer = observerdef __enter__(self):self.subject.attach(self.observer)return selfdef __exit__(self, *args):self.subject.detach(self.observer)with ObserverContext(sensor, display):sensor.temperature = 20
    
  2. 性能优化

    • 对于高频事件,考虑使用弱引用(weakref.WeakSet
    • 批量通知时使用@dataclass封装事件数据
  3. 异常处理

    def safe_notify(self):for observer in self._observers:try:observer.update(self)except Exception as e:print(f"Observer failed: {e}")
    
  4. 与Python生态集成

    • 使用PyPubSub等现成库
    • 结合asyncio.Queue实现生产者-消费者模式

根据项目复杂度选择合适实现,简单场景用经典模式即可,分布式系统建议使用事件总线架构。

关键字:怎么开通个人网站_宁波网站推广渠道_seo是什么职位的简称_seo关键词搜索优化

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: