由某一次真实生产环境rabbitMQ故障引发血案,下面复盘问题发生原因以及问题解决方法。

📅 2026/7/2 3:28:12
由某一次真实生产环境rabbitMQ故障引发血案,下面复盘问题发生原因以及问题解决方法。
试验队列阻塞某天周末在家里找个测试环境安装rabbitmq尝试重现这过程并做模拟测试。写两个测试应用Demo假设是两个项目应用分别有生产者和消费者并分别使用队列testA和testB。为了尽可能还原生产的情况一开始测试使用了同一个vhost后面分别设置不同vhost。生产者A示例代码如下消费者AMQ配置生产者B每次生产10万条消息消费者B代码故意写错模拟出现异常的情况不是正常的json串导致解释json时抛出异常先了解一下Rabbitmq客户端启动连接工作过程通过wireshark抓包分析如下先对AMQP做一个简单的介绍请求的AMQP协议方法信息AMQP协议方法包含类名方法名参数这一列主要展示了类名和方法名Connection.Start请求服务端开始建立连接Channel.Open请求服务端建立信道Queue.Declare声明队列Basic.Consume开始一个消费者请求指定队列的消息详细方法可以查看amqp官网https://www.rabbitmq.com/amqp-0-9-1-reference.html工作过程分析Basic.Publish客户端发送Basic.Publish方法请求将消息发布到exchangerabbitmq server会根据路由规则转发到队列中Basic.Deliver服务端发送Basic.Deliver方法请求投递消息到监听队列的客户端消费者Basic.Ack客户端发送Basic.Ack方法请求告知rabbimq server,消息已接收处理。两个应用程序启动后通过rabbitmq管理控制台可以观察一些参数和监控指标一开始A应用生产和消费都是正常的。B消费端错误代码异常狂刷报错信息经过大概30分钟运行观察A生产者应用控制台也有出现异常信息查看服务端连接状态出现blocked情况与生产故障发生情景很类似。此时客户端即本机器CPU和内存上涨明显风扇声音很响明显卡顿再过30分钟应用基本不可用状态。分析原因上面错误代码展示了消费者B无法ack由于没有进行ack导致队里阻塞。那么问题来了这是为什么呢其实这是RabbitMQ的一种保护机制。防止当消息激增的时候海量的消息进入consumer而引发consumer宕机。RabbitMQ提供了一种QOS(服务质量保证)功能即在非自动确认的消息的前提下限制信道上的消费者所能保持的最大未确认的数量。可以通过设置prefetchCount实现自动确认prefetchCount设置无效。举例说明:可以理解为在consumer前面加了一个缓冲容器容器能容纳最大的消息数量就是PrefetchCount。如果容器没有满RabbitMQ就会将消息投递到容器内如果满了就不投递了。当consumer对消息进行ack以后就会将此消息移除从而放入新的消息。通过上面的配置发现prefetch初始我只配置了2并且concurrency配置的只有1所以当我发送了2条错误消息以后由于解析失败这2条消息一直没有被ack。将缓冲区沾满了这个时候RabbitMQ认为这个consumer已经没有消费能力了就不继续给它推送消息了所以就造成了队列阻塞。判断队列是否有阻塞的风险。当ack模式为manual并且线上出现了unacked消息这个时候不用慌。由于QOS是限制信道channel上的消费者所能保持的最大未确认的数量。所以允许出现unacked的数量可以通过channelCount * prefetchCount *消费节点数量得出。channlCount就是由concurrency,max-concurrency决定的。min concurrency * prefetch *消费节点数量max max-concurrency * prefetch *消费节点数量由此可以得出结论unacked_msg_count min队列不会阻塞。但需要及时处理unacked的消息。unacked_msg_count min可能会出现堵塞。unacked_msg_count max队列一定阻塞。重点注意1、unacked的消息在consumer切断连接后(如重启)再连接会自动回到队头。2、若将ack模式改成auto自动这样会使QOS不生效。会出现大量消息涌入consumer从而可能造成consumer宕机风险。再回看程序配置做一些分析和调整对B消费端问题代码加个try-catch-finally不管中间有何问题都进行消息签收ACK。代码调整之后两个队列正常运行客户端两个应用也正常运行。经过一段时间消费B消费者端已经把堆积的消息消费完了。3、第三个问题原因分析还是查看抓包信息Basic.Reject 客户端发送Basic.Reject方法请求表示无法处理消息拒绝消息此时的requeue参数为true将消息返回原来的队列Basic.Deliver 服务端调用Basic.Deliver方法和第一次Basic.Deliver方法不同的是此时的redeliver参数为true表示重新投递消息到监听队列的消费者然后这两步会一直重复下去。RabbitMQ消息监听程序异常时consumer会向rabbitmq server发送Basic.Reject表示消息拒绝接受由于Spring默认requeue-rejected配置为true消息会重新入队然后rabbitmq server重新投递。就相当于死循环了所以容易导致消费端资源占用过高特别是TCP连接数、线程数、IO飙升如果个别程序带事务或数据库操作等连接资源得不到释放也会占满导致应用假死状态出现问题的时候查看问题应用出现大量的connection timeout错误报错日志。因此针对性的有些业务场景不强调数据强一致性的场景比如日志收集可以设置default-requeue-rejected: false即可。factory.setDefaultRequeueRejected(false);会根据异常类型选择直接丢弃或加入dead-letter-exchange中。消费者端正确的使用手动确认示例结构代码很重要try { // 业务逻辑。 }catch (Exception e){ // 输出错误日志。 }finally { // 消息签收。 }4、验证队列设置最大长度限制设置queueLengthLimit队列最大长度限制 x-max-length5