⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 不一样的科技宅 「不一样的科技宅」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

前言

那天我和同事一起吃完晚饭回公司加班,然后就群里就有人@我说xxx商户说收不到推送,一开始觉得没啥。我第一反应是不是极光没注册上,就让客服通知商户,重新登录下试试。这边打开极光推送的后台进行检查。后面反应收不到推送的越来越多,我就知道这事情不简单。

事故经过

由于大量商户反应收不到推送,我第一反应是不是推送系统挂了,导致没有进行推送。于是让运维老哥检查推送系统各节点的情况,发现都正常。于是打开RabbitMQ的管控台看了一下,人都蒙了。已经有几万条消息处于ready状态,还有几百条unacked的消息。

我以为推送服务和MQ连接断开了,导致无法推送消息,于是让运维重启推送服务,将所有的推送服务重启完,发现unacked的消息全部变成ready,但是没过多久又有几百条unacked的消息了,这个就很明显了能消费,没有进行ack呀。

当时我以为是网络问题,导致mq无法接收到ack,让运维老哥检查了一下,发现网络没问题。现在看是真的是傻,网络有问题连接都连不上。由于确定的是无法ack造成的,立马将ack模式由原来的manual 改成auto紧急发布。将所有的节点升级好以后,发现推送正常了。

你以为这就结束了其实并没有,没过多久发现有一台MQ服务出现异常,由于生产采用了镜像队列,立即将这台有问题的MQ从集群中移除。直接进行重置,然后加入回集群。这事情算是告一段落了。此时已经接近24:00了。

时间来到第二天上午10:00,运维那边又出现报警了,说推送系统有台机器,磁盘快被写满了,并且占用率很高。我的乖乖从昨晚到现在写了快40G的日志,一看报错信息瞬间就明白问题出在哪里了。麻溜的把bug修了紧急发布。

吐槽一波公司的ELK,压根就没有收集到这个报错信息,导致我没有及时发现。

事故重现-队列阻塞

MQ配置

spring:
# 消息队列
rabbitmq:
host: 10.0.0.53
username: guest
password: guest
virtual-host: local
port: 5672
# 消息发送确认
publisher-confirm-type: correlated
# 开启发送失败退回
publisher-returns: true
listener:
simple:
# 消费端最小并发数
concurrency: 1
# 消费端最大并发数
max-concurrency: 5
# 一次请求中预处理的消息数量
prefetch: 2
# 手动应答
acknowledge-mode: manual

问题代码

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);

try {
// 模拟推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));
}finally {
// 消息签收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}

}

看起来好像没啥问题。由于和交易系统约定好,订单数据需要先转换json串,然后再使用AES进行加密,所以这边需要,先进行解密然后在进行解析。才能得到订单数据。

为了防止消息丢失,交易系统做了失败重发机制,防止消息丢失,不巧的是重发的时候没有对订单数据进行加密。这就导致推送系统,在解密的时候出异常,从而无法进行ack

默默的吐槽一句:人在家中坐,锅从天上来。

模拟推送

推送代码

发送3条正常的消息

curl http://localhost:8080/sendMsg/3

发送1条错误的消息

curl http://localhost:8080/sendErrorMsg/1

再发送3条正常的消息

curl http://localhost:8080/sendMsg/3

观察日志发下,虽然有报错,但是还能正常进行推送。但是RabbitMQ已经出现了一条unacked的消息。

继续发送1条错误的消息

curl http://localhost:8080/sendErrorMsg/1

再发送3条正常的消息

curl http://localhost:8080/sendMsg/3

这个时候你会发现控制台报错,当然错误信息是解密失败,但是正常的消息却没有被消费,这个时候其实队列已经阻塞了。

RabbitMQ管控台也可以看到,刚刚发送的的3条消息处于ready状态。这个时候就如果一直有消息进入,都会堆积在队里里面无法被消费。

再发送3条正常的消息

curl http://localhost:8080/sendMsg/3

分析原因

上面说了是由于没有进行ack导致队里阻塞。那么问题来了,这是为什么呢?其实这是RabbitMQ的一种保护机制。防止当消息激增的时候,海量的消息进入consumer而引发consumer宕机。

RabbitMQ提供了一种QOS(服务质量保证)功能,即在非自动确认的消息的前提下,限制信道上的消费者所能保持的最大未确认的数量。可以通过设置PrefetchCount实现。

举例说明:可以理解为在consumer前面加了一个缓冲容器,容器能容纳最大的消息数量就是PrefetchCount。如果容器没有满RabbitMQ就会将消息投递到容器内,如果满了就不投递了。当consumer对消息进行ack以后就会将此消息移除,从而放入新的消息。

listener:
simple:
# 消费端最小并发数
concurrency: 1
# 消费端最大并发数
max-concurrency: 5
# 一次处理的消息数量
prefetch: 2
# 手动应答
acknowledge-mode: manual

prefetch参数就是PrefetchCount

通过上面的配置发现prefetch我只配置了2,并且concurrency配置的只有1,所以当我发送了2条错误消息以后,由于解密失败这2条消息一直没有被ack。将缓冲区沾满了,这个时候RabbitMQ认为这个consumer已经没有消费能力了就不继续给它推送消息了,所以就造成了队列阻塞。

判断队列是否有阻塞的风险。

ack模式为manual,并且线上出现了unacked消息,这个时候不用慌。由于QOS是限制信道channel上的消费者所能保持的最大未确认的数量。所以允许出现unacked的数量可以通过channelCount * prefetchCount * 节点数量 得出。

channlCount就是由concurrency,max-concurrency决定的。

  • min = concurrency * prefetch * 节点数量
  • max = max-concurrency * prefetch * 节点数量

由此可以的出结论

  • unacked_msg_count < min 队列不会阻塞。但需要及时处理unacked的消息。
  • unacked_msg_count >= min 可能会出现堵塞。
  • unacked_msg_count >= max 队列一定阻塞。

这里需要好好理解一下。

处理方法

其实处理的方法很简单,将解密和解析的方法放入try catch中就解决了这样不管解密正常与否,消息都会被签收。如果出错将会输出错误日志,让开发人员进行处理了。

对于这个就需要有日志监控系统,来及时告警了。

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
try {

// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);

// 模拟推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息签收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}

}

注意的点

unacked的消息在consumer切断连接后(重启),会自动回到队头。

事故重现-磁盘占用飙升

一开始我不知道代码有问题,就是以为单纯的没有进行ack所以将ack模式改成auto自动,紧急升级了,这样不管正常与否,消息都会被签收,所以在当时确实是解决了问题。

其实现在回想起来是非常危险的操作的,将ack模式改成auto自动,这样会使QOS不生效。会出现大量消息涌入consumer从而造成consumer宕机,可以是因为当时在晚上,交易比较少,并且推送系统有多个节点,才没出现问题。

问题代码

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);

try {

// 模拟推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失败-错误信息:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息签收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}

}

配置文件

listener:
simple:
# 消费端最小并发数
concurrency: 1
# 消费端最大并发数
max-concurrency: 5
# 一次处理的消息数量
prefetch: 2
# 手动应答
acknowledge-mode: auto

由于当时不知道交易系统的重发机制,重发时没有对订单数据加密的bug,所以还是会发出少量有误的消息。

发送1条错误的消息

curl http://localhost:8080/sendErrorMsg/1

原因

RabbitMQ消息监听程序异常时,consumer会向rabbitmq server发送Basic.Reject,表示消息拒绝接受,由于Spring默认requeue-rejected配置为true,消息会重新入队,然后rabbitmq server重新投递。就相当于死循环了,所以控制台在疯狂刷错误日志造成磁盘利用率飙升的原因。

解决方法

default-requeue-rejected: false即可。

总结

  • 个人建议,生产环境不建议使用自动ack,这样会QOS无法生效。
  • 在使用手动ack的时候,需要非常注意消息签收。
  • 其实在将有问题的MQ重置时,是将错误的消息给清除才没有问题了,相当于是消息丢失了。

try {
// 业务逻辑。
}catch (Exception e){
// 输出错误日志。
}finally {
// 消息签收。
}

参考资料

  • RabbitMQ消息监听异常问题探究

代码地址

https://gitee.com/huangxunhui/rabbitmq_accdient.git

结尾

如果有人告诉你遇到线上事故不要慌,除非是超级大佬久经沙场。否则就是瞎扯淡,你让他来试试,看看他会不会大脑一片空白,直冒汗。

如果觉得对你有帮助,可以多多评论,多多点赞哦,也可以到我的主页看看,说不定有你喜欢的文章,也可以随手点个关注哦,谢谢。

文章目录
  1. 1. 前言
  2. 2. 事故经过
  3. 3. 事故重现-队列阻塞
    1. 3.1. MQ配置
    2. 3.2. 问题代码
    3. 3.3. 模拟推送
    4. 3.4. 分析原因
    5. 3.5. 判断队列是否有阻塞的风险。
    6. 3.6. 处理方法
    7. 3.7. 注意的点
  4. 4. 事故重现-磁盘占用飙升
    1. 4.1. 问题代码
    2. 4.2. 原因
  5. 5. 解决方法
  6. 6. 总结
  7. 7. 参考资料
  8. 8. 代码地址
  9. 9. 结尾