当前位置: 首页> 科技> 能源 > 北京流感最新消息_东莞网站制作品牌祥奔科技_搜索引擎优化概述_爱站网seo综合查询工具

北京流感最新消息_东莞网站制作品牌祥奔科技_搜索引擎优化概述_爱站网seo综合查询工具

时间:2025/7/11 1:00:02来源:https://blog.csdn.net/lixiaonan0318/article/details/143078309 浏览次数:0次
北京流感最新消息_东莞网站制作品牌祥奔科技_搜索引擎优化概述_爱站网seo综合查询工具

目录

1. RabbitMQ 的 发布订阅模式

2. GRPC 服务间的实体同步

2.1 生产者服务

2.2 消费者服务

3. 可靠性

3.1 生产者丢失消息

3.2 消费者丢失消息

3.3 RabbitMQ 中间件丢失消息


1. RabbitMQ 的 发布订阅模式

https://www.rabbitmq.com/tutorials/tutorial-three-go

  • P 生产者:发送消息到一个特定的交换机(交换机类型是fanout),不需指定具体的目标队列
  • X 交换机:将消息分发给所有绑定到它的队列
  • C 消费者:订阅主题,通过绑定到交换机的队列接收消息

2. GRPC 服务间的实体同步

考虑以下业务需求——

  • 模拟消费者组机制:
    • 同一消费者组下的消费者(即一个服务的多个实例)监听同一个队列,是竞争关系
    • 不同消费者组(即不同服务)监听不同队列,这些队列绑定到同一个交换机,不同消费者组可以独立消费相同的数据
  • 消费历史数据:当生产者先启动,生产了一部分数据,消费者后启动时,也能消费到历史数据

服务之间的实体数据同步方案:

2.1 生产者服务

(1) 初始化

生产者初始化时需要负责绑定 RabbitMQ 的交换机和队列关系,即显式声明自己的实体有哪些消费者在消费。比如:

  • 声明交换机 exchange_user、exchange_group
  • 声明消费者 consumer_user_rpc、consumer_org_rpc
  • 创建队列 exchange_user_consumer_user_rpc、exchange_user_consumer_org_rpc、exchange_group_consumer_user_rpc、exchange_group_consumer_org_rpc,也就是对每个 topic-consumer 组合,创建一个相应的队列
  • 将交换机和队列绑定

(2) 实体变更时发送消息

发送消息到交换机,交换机会自动分发给所有绑定到它的队列,也就是发送一条消息至 exchange_user 交换机,那么消息会被投递给队列 exchange_user_consumer_user_rpc 和 队列 exchange_user_consumer_org_rpc。

2.2 消费者服务

消费者订阅一个 topic,处理 rabbitMQ 队列发来的消息。

  • 若消息处理成功(业务流程成功),发送 Ack 给 rabbitMQ 确认消费
  • 若消息处理失败(业务流程失败),发送 Nack 通知 rabbitMQ 处理失败,消息将放回队列等待下次消费

Ack 时 rabbitMQ 会记录消费者消费的 offset,下次会基于 offset 继续消费~

3. 可靠性

3.1 生产者丢失消息

(1) 生产者绑定交换机和队列

在生产者初始化时,需要先将交换机和队列的关系绑定好,以避免此场景发生:生产者先启动,未绑定交换机和队列,发送了消息到交换机,此时无法投递到具体队列。消费者后启动,即便做了交换机和队列绑定,也无法消费到历史消息。

func NewMQ(rabbitMQCfg *Config, option Option) (*RabbitMQ, error) {// ...// 初始化交换机和队列for topic, consumerGroups := range option.TopicConsumerGroupsBinding {err = initExchange(topic, consumerGroups, mq)if err != nil {return nil, err}}return mq, nil
}func initExchange(exchange, consumerGroups string, mq *RabbitMQ) error {// 1. 创建发送通道pch, err := mq.conn.Channel()if err != nil {return err}mq.produceChannels[exchange] = pch// 2. 开启消息确认机制if err := pch.Confirm(false); err != nil {return err}// 3. 创建交换机// 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)if err != nil {return err}slog.Info("rabbitmq declared exchange", "exchange_name", exchange)// 4. 创建队列并绑定到交换机for _, consumerGroup := range strings.Split(consumerGroups, ",") {consumerGroup = strings.TrimSpace(consumerGroup)if consumerGroup == "" {continue}queue := queueName(exchange, consumerGroup)// 创建队列// 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数_, err = pch.QueueDeclare(queue, true, false, false, false, nil)if err != nil {return err}// 将队列绑定到交换机// 参数 queue:队列名称, key:路由键, exchange:交换机名称, noWait:是否等待服务器确认, args:额外参数err = pch.QueueBind(queue, "", exchange, false, nil)if err != nil {return err}slog.Info("rabbitmq declared and bind queue", "queue", queue, "bind_exchange", exchange)// 创建接收通道cch, err := mq.conn.Channel()if err != nil {return err}mq.consumeChannels[queue] = cch}// 5. 开启消息确认事件监听、消息投递事件监听mq.publishWatcher[exchange] = &watcher{returnCh:  pch.NotifyReturn(make(chan amqp.Return)),confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),}// 监听未被交换机投递的消息go func() {for ret := range mq.publishWatcher[exchange].returnCh {// 尝试重新投递ctx, _ := context.WithTimeout(context.Background(), mq.config.Timeout)if err := mq.publish(ctx, ret.Exchange, ret.MessageId, ret.Body, ret.Timestamp); err != nil {slog.Error("rabbitmq republish failed.", "exchange", ret.Exchange, "msgID", ret.MessageId, "err", err)} else {slog.Warn("rabbitmq got exchange undelivered msg, republished.", "exchange", ret.Exchange, "msgID", ret.MessageId)}time.Sleep(time.Second * 3)}}()return nil
}

(2) 发送重试

发送消息时增加重试机制。若超过重试上限,需记录日志或报警。

func (r *RabbitMQ) Produce(ctx context.Context, topic string, data map[string]any) error {body, _ := json.Marshal(data)msgID := uuid.New()var retried intfor {err := r.publish(ctx, topic, msgID, body, time.Now())if err == nil {return nil}retried++if retried > r.option.RetryNum {return err}time.Sleep(r.option.RetryInterval)}
}

(3) confirm 消息确认机制

生产端投递消息到 RabbitMQ 后,RabbitMQ 将发送一个确认事件,让生产端知晓消息已发送成功。监听 confirm 事件以确认消息的发送状态:

func initExchange(exchange string, mq *RabbitMQ) error {// ...// 开启消息确认机制if err := pch.Confirm(false); err != nil {return err}// 创建监听器mq.publishWatcher[exchange] = &watcher{confirmCh: pch.NotifyPublish(make(chan amqp.Confirmation)),}// ...
}func (r *RabbitMQ) publish(ctx context.Context, ...) error {// publish发送消息// ...// 等待rabbitmq返回消息确认select {case confirm := <-r.publishWatcher[exchange].confirmCh:if !confirm.Ack {return errors.New("publish failed, got nack from rabbitmq")}case <-ctx.Done():return errors.New("context deadline, publish to rabbitmq timeout")case <-time.After(r.config.Timeout):return errors.New("publish to rabbitmq timeout")}return nil
}

3.2 消费者丢失消息

消费者消费完成后,必须手动 Ack 通知 MQ,表示已经消费成功:

func (r *RabbitMQ) Ack(topic, consumerGroup, msgID string) error {// ...return consumeChannel.Ack(deliveryTag, false)
}

如果消费失败,需要手动 Nack,那此条消息会重新入队,等待下次消费:

func (r *RabbitMQ) Nack(topic, consumerGroup, msgID string) error {// ...return consumeChannel.Nack(deliveryTag, false, true)
}

3.3 RabbitMQ 中间件丢失消息

(1) 数据持久化到磁盘

交换机持久化(durable=true):

// 创建交换机
// 参数 exchange:交换机名称, kind:交换机类型 fanout为发布订阅模式, durable:是否持久化, autoDelete:是否自动删除, internal:是否内部交换机, noWait:是否等待服务器确认, args:额外参数
err = pch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)

队列持久化(durable=true):

// 创建队列
// 参数 queue:队列名称, durable:是否持久化, autoDelete:是否自动删除, exclusive:是否独占, noWait:是否等待服务器确认, args:额外参数
_, err = pch.QueueDeclare(queue, true, false, false, false, nil)

消息持久化(DeliveryMode=Persistent):

err := ch.Publish(exchange, // 交换机名称"",       // 路由键true,     // 如果消息不能路由到任何队列,是否返回未处理的消息. true将返回未处理通知, false将丢弃消息false,    // 是否立即交付给消费者. 若为true, 当队列中没有等待中的消费者时,消息会被丢弃amqp.Publishing{MessageId:    msgID,              // 消息IDContentType:  "application/json", // 消息内容类型Body:         body,               // 消息内容DeliveryMode: amqp.Persistent,    // 消息需要持久化Timestamp:    t,                  // 消息时间},
)

(2) RabbitMQ 本身的数据一致性保证

RabbitMQ 使用 raft 共识算法保证数据一致性:

https://www.rabbitmq.com/docs/clustering#replica-placement

关键字:北京流感最新消息_东莞网站制作品牌祥奔科技_搜索引擎优化概述_爱站网seo综合查询工具

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: