用 Go 实现一个轻量级事件总线,解耦智能工作流

📅 2026/6/22 15:13:29
用 Go 实现一个轻量级事件总线,解耦智能工作流
用 Go 实现一个轻量级事件总线解耦智能工作流做智能工作流系统时最麻烦的是业务逻辑太散。如果每个节点比如大模型调用、数据清洗、发通知都直接硬编码调用代码很快就会纠缠在一起。与其在核心业务里写满if-else和同步调用不如在中间加一层事件总线。让节点通过发布 - 订阅模式异步通信后续加新功能时不用动旧代码。一、为什么同步调用会卡住系统传统写法通常是一个主函数按顺序调完所有模块。比如工单处理完了顺便调个接口把摘要发到企业微信。一旦要加新功能就得打开核心文件改代码。这种写法有两个硬伤耦合太重违背了开闭原则改一个地方可能影响全局。阻塞风险下游通知模块如果网络超时会直接拖慢主业务的响应速度。对于长周期的 AI 任务通常需要支持异步回调和插件热插拔。如果没有一个抽象的事件中枢代码很容易变成难以维护的“意大利面”。二、事件驱动的工作流逻辑解耦的核心思路是节点执行完后把结果扔进事件总线不用管谁在听。架构逻辑大致如下发布者如工单节点发布order.created事件。事件总线收到后分发给所有订阅该主题的节点。订阅者如分类器、存储库、通知服务处理完后如果需要可以继续发布新事件如ticket.processed。插件管理器可以动态注册或注销订阅者无需重启服务。新增业务节点时只需要向总线注册订阅完全不用修改发布者的代码。三、Go 语言实现轻量级并发事件总线为了保持单机高性能这里用 Go 标准库实现了一个简单的并发安全事件总线。它支持基于 Topic 的发布订阅、带缓冲的 Channel、以及安全的动态注销。没有引入 RabbitMQ 或 Kafka 等外部中间件适合单机场景。package bus import ( context errors sync ) // Event 结构体定义了传递的数据实体 type Event struct { Topic string Data interface{} } // EventChannel 是订阅者接收事件的通道 type EventChannel chan Event // EventBus 管理所有的主题订阅关系 type EventBus struct { subscribers map[string][]EventChannel mu sync.RWMutex } // NewEventBus 初始化事件总线 func NewEventBus() *EventBus { return EventBus{ subscribers: make(map[string][]EventChannel), } } // Subscribe 注册对某个 Topic 的订阅返回一个接收通道 func (eb *EventBus) Subscribe(topic string, bufferSize int) EventChannel { eb.mu.Lock() defer eb.mu.Unlock() ch : make(EventChannel, bufferSize) eb.subscribers[topic] append(eb.subscribers[topic], ch) return ch } // Unsubscribe 注销订阅安全关闭通道并释放资源 func (eb *EventBus) Unsubscribe(topic string, ch EventChannel) error { eb.mu.Lock() defer eb.mu.Unlock() subs, ok : eb.subscribers[topic] if !ok { return errors.New(topic not found) } for i, sub : range subs { if sub ch { // 安全关闭通道防止向已关闭通道发送导致的 panic close(ch) // 从切片中移除该通道 eb.subscribers[topic] append(subs[:i], subs[i1:]...) return nil } } return errors.New(subscriber channel not found in this topic) } // Publish 异步广播事件支持 context 超时控制防止下游阻塞拖垮总线 func (eb *EventBus) Publish(ctx context.Context, event Event) { eb.mu.RLock() subs, ok : eb.subscribers[event.Topic] eb.mu.RUnlock() if !ok { return // 当前没有订阅者 } var wg sync.WaitGroup for _, sub : range subs { wg.Add(1) go func(ch EventChannel) { defer wg.Done() select { case -ctx.Done(): // 上下文超时或被撤销放弃发送防止 goroutine 挂起泄漏 return case ch - event: // 事件成功写入缓冲通道 } }(sub) } // 等待本次广播的所有并发分发 goroutine 结束 wg.Wait() }四、实际落地时的几个坑直接用事件总线虽然方便但有几个问题需要自己处理顺序问题并发处理时后发布的事件可能先执行完。如果业务强依赖顺序比如先扣款后发货需要在 Event 里加序列号让订阅者自己排序。背压Backpressure如果某个订阅者比如调大模型 API处理太慢它的 Channel 缓冲区会满。这时候需要设置合理的 buffer 大小或者在 Publish 时加超时控制直接丢弃或重试别让慢节点拖死整个系统。消息丢失被丢弃或消费失败的事件不能直接消失。建议做个死信队列Dead Letter Queue把失败的消息存起来方便后续人工排查或重放。五、小结这套方案的核心就是“解耦”。在智能工作流底层加一层并发安全的发布 - 订阅机制配合超时控制能大幅降低代码的维护成本。后续加新功能时只需要注册新订阅者不用动老代码系统扩展起来也顺手。