🎯 导读:该文档介绍了Apache RocketMQ消息队列的基础应用,包括消息发送与接收的基本流程。首先通过创建生产者实例,并指定名称服务器地址,启动后即可发送消息至指定主题。然后创建消费者实例订阅相应主题,并设置监听器处理接收到的消息。文档中还提供了代码示例,展示了如何实现简单的生产和消费逻辑。此外,文档解释了消息队列在不同场景下的分发策略,如负载均衡与广播模式,并强调了队列数量与消费者数量之间的关系以确保消息的合理分配。
文章目录
- 消息发送和监听的流程
- 消息生产者
- 消息消费者
- 搭建RocketMQ入门案例
- 创建项目
- 加入依赖
- 编写生产者
- 编写消费者
- 说明
- 一个消费者组消费一个topic
- 两个消费者组消费一个topic
- 生产者的消息发送给主题的哪个队列
- 消费者如何从队列中拉取消息
- 只有一个消费者,要拉取所有队列的消息
- 两个消费者,每个消费者要负责两个队列
- 三个消费者(要求尽量平衡)
- 四个消费者,一人一个
- 五个消费者,第五个消费者永远不接收消息
RocketMQ提供了发送多种发送消息的模式,例如同步消息,异步消息,顺序消息,延迟消息,事务消息等
消息发送和监听的流程
消息生产者
1、创建消息生产者 producer ,并指定生产者组名
2、指定 Nameserver 地址
3、启动 producer
4、创建消息对象,指定主题 Topic、Tag 和消息体等
5、发送消息
6、关闭 producer
消息消费者
1、创建消费者 consumer ,指定消费者组名
2、指定 Nameserver 地址
3、创建监听订阅主题 Topic和Tag 等
4、处理消息
5、启动消费者 consumer
搭建RocketMQ入门案例
创建项目
加入依赖
引入原生API,先不用spring-boot-starter版本
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version><!--docker的用下面这个版本--><version>4.4.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version></dependency>
</dependencies>
编写生产者
/*** 测试生产者** @throws Exception*/
@Test
public void testProducer() throws Exception {// 创建默认的生产者(指定生产者组名)DefaultMQProducer producer = new DefaultMQProducer("test-group");// 设置nameServer地址producer.setNamesrvAddr("localhost:9876");// 启动实例producer.start();for (int i = 0; i < 1; i++) {// 创建消息// 第一个参数:主题的名字// 第二个参数:消息内容(要转化为字节数组)Message msg = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes());// 发送结果SendResult send = producer.send(msg);// 打印发送状态System.out.println(send.getSendStatus());}// 关闭实例producer.shutdown();
}
为了连接方便,可以使用一个常量NAME_SRV_ADDR
来存储localhost:9876
【运行】
在控制台中可以看到创建了一个主题 testTopic
点击状态,一个主题默认4个队列
点击路由,可以查看 broker 的 ip 地址
在CONSUMER管理中,可以查看消费者
编写消费者
@Test
public void simpleConsumer() throws Exception {// 创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");// 连接 namesrvconsumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);// 订阅一个主题 * 表示订阅这个主题中所有的消息,后面会有消息过滤的教程consumer.subscribe("testTopic", "*");// 设置一个监听器 (一直监听的,异步回调方式,消费者线程和监听线程不是一个线程)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// 这个就是消费的方法 (业务处理)System.out.println("我是消费者");// msgs 虽然是List,但是只有一条消息,所以get(0)就行System.out.println(msgs.get(0).toString());// 消息内容从字节数组转化为StringSystem.out.println("消息内容:" + new String(msgs.get(0).getBody()));System.out.println("消费上下文:" + context);// 返回值 CONSUME_SUCCESS成功,消息会从mq出队return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// RECONSUME_LATER(报错/null)失败 消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});// 启动consumer.start();// 挂起当前的jvm,让监听一直存在System.in.read();
}
【运行】
说明
- 一个生产者组可以投递到多个主题
- 一个消费者组只能订阅一个主题
一个消费者组消费一个topic
【负载均衡模式】消息1给 C1 消费,消息2给 C2 消费,以此类推
【广播模式】同一条消息既给 C1 消费,又给 C2 消费
两个消费者组消费一个topic
同一消息,两个消费者组都获取到,但是组内要分配给哪个消费者,就看是负载【均衡模式】还是【广播模式】了
生产者的消息发送给主题的哪个队列
生产者会将消息轮询发送到主题的4个队列
消费者如何从队列中拉取消息
只有一个消费者,要拉取所有队列的消息
- 代理者:MQ
- 消费者:我们的程序
测试,生产者生产12个消息
差值:代理者位点-消费者位点。如果差值太大,说明消息堆积
两个消费者,每个消费者要负责两个队列
三个消费者(要求尽量平衡)
四个消费者,一人一个
五个消费者,第五个消费者永远不接收消息
队列数量最好大于等于消费者组内的消费者数量!!!