《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 https://www.iocoder.cn/Fight/Message-queue-reliability-duplicate-messages-message-backlog-the-use-of-distributed-transactions/ 「芋道源码」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

一、如何确保消息不丢失?

1、检测消息丢失的方法

可以利用消息队列的有序性来验证是否有消息丢失。在Producer端给每个发出的消息附加一个连续递增的序号,然后在Consumer端来检查这个序号的连续性。如果没有消息丢失,Consumer收到消息的序号必然是连续递增的,如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因

大多数消息队列的 客户端都支持拦截器机制,可以利用这个拦截器机制,在Producer发送消息之前的拦截器中将序号注入到消息中,在Consumer收到消息的拦截器中检测序号的连续性

如果是在一个分布式系统中实现这个检测方法,有几个问题需要注意:

首先,像Kafka和RocketMQ这样的消息队列,是不保证Topic上的严格顺序的,只能保证分区上的消息是有序的,所以在发消息的时候必须指定分区,并且,在每个分区单独检测消息序号的连续性

如果系统中Producer是多实例的,由于并不好协调多个Producer之间的发送顺序,所以也需要每个Producer分别生成各自的消息序号,并且需要附加上Producer的标识,在Consumer端按照每个Producer分别来检测序号的连续性

Consumer实例的数量最好和分区数量一致,做到Consumer和分区一一对应,这样会比较方便地在Consumer内检测消息序号的连续性

2、确保消息可靠传递

一条消息从生产到消费完成这个过程,可以划分为三个阶段:

  • 生产阶段:在这个阶段,从消息在Producer创建出来,经过网络传输发送到Broker端
  • 存储阶段:在这个阶段,消息在Broker端存储,如果是集群,消息会在这个阶段被复制到其他的副本上
  • 消费阶段:在这个阶段,Consumer从Broker上拉取消息,经过网络传输发送到Consumer上

1)、生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当在代码中调用发送消息方法时,消息队列的客户端会把消息发送到Broker,Broker收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送

只要Producer收到了Broker的确认响应就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户

在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失

以Kafka为例:

同步发送时,只要注意捕获异常即可

try {
producer.send(record).get();
System.out.println("消息发送成功");
} catch (Exception e) {
System.out.println("消息发送失败");
System.out.println(e);
}

异步发送时,则需要在回调方法里进行检查

producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
System.out.println(exception);
}
}
});

或者

producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
System.out.println(exception);
}
});

2)、存储阶段

在存储阶段正常情况下,只要Broker在正常运行,就不会出现丢失消息的问题,但是如果Broker出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的

如果对消息的可靠性要求非常高,可以通过配置Broker参数来避免因为宕机丢消息

对于单个节点的Broker,需要配置Broker参数,在收到消息后,将消息写入磁盘后再给Producer返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在RocketMQ中,需要将刷盘方式flushDiskType配置为SYNC_FLUSH同步刷盘

如果Broker是由多个节点组成的集群,需要将Broker集群配置成:至少将消息发送到2个以上的节点,再给客户端回复发送确认响应。这样当某个Broker宕机后,其他的Broker可以替代宕机的Broker,也不会发生消息丢失

3)、消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从Broker拉取消息后,执行用户的消费业务逻辑,成功后,才会给Broker发送消费确认响应。如果Broker没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确认消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失

在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认

以SpringBoot整合RabbitMQ为例:

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable = "${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
durable = "${spring.rabbitmq.listener.order.exchange.durable}",
type = "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
))
@RabbitHandler
//Order需要实现序列化接口
public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
//处理业务逻辑
System.out.println("消费端:" + order);
//手工ACK
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}

3、小结

  • 在生产阶段,需要捕获消息发送的错误,并重发消息
  • 在存储阶段,可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个Broker宕机或者磁盘损坏而丢失
  • 在消费阶段,需要在处理完全部消费业务逻辑之后,再发送消费确认

二、如何处理消费过程中的重复消息?

1、消息重复的情况必然存在

在MQTT协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once:至多一次。消息在传递时,最多会被送达一次。也就是说,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失
  • At least once:至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级

这个服务质量标准不仅适用于MQTT,对所有的消息队列都是适用的。现在常用的绝大部分消息队列提供的服务质量都是At least once,包括RocketMQ、RabbitMQ和Kafka都是这样。也就是说,消息队列很难保证消息不重复

2、用幂等性解决重复消息问题

一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性

一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方法使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变

从对系统的影响结果来说:At least once+幂等消费=Exactly once

几种常用的设计幂等操作的方法:

1)、利用数据库的唯一约束实现幂等

举个例子:将账户X的余额加100元。可以通过改造业务逻辑,让它具备幂等性

首先,可以限定对于每个转账单每个账户只可以执行一次变更操作,最简单的是在数据库建一张转账流水表,这个表有三个字段:转账单ID、账户ID和变更金额,然后给转账单ID和账户ID这两个字段联合起来创建一个唯一约束,这样对于相同的转账单ID和账户ID,表里至多只能存在一条记录

这样,消费消息的逻辑可以变为:在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。在转账流水表增加一条转账记录这个操作中,由于在这个表中预先定义了账户ID转账单ID的唯一索引,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作

只要是支持类似INSERT IF NOT EXIST语义的存储类系统都可以用于实现幂等,比如,可以用Redis的SETNX命令来替代数据库中的唯一约束,来实现幂等消费

2)、为更新的数据设置前置条件

另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作

比如,将账户X的余额增加100元这个操作并不满足幂等性,可以把这个操作加上一个前置条件,变为:如果账户X当前的月为500元,将余额加100元,这个操作就具备了幂等性。对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候判断数据库中当前余额是否与消息中的余额相等,只有相等才执行变更操作

更加通用的方法是,给数据增加一个版本号属性,每次更新数据前,比较当前数据的版本号是否和消息中的版本号一直,如果不一致就拒绝更新数据,更新数据的同时将版本号+1,一样可以实现幂等更新

3)、记录并检查操作

还有一种通用性最强的实现幂等性方法:记录并检查操作,也称为Token机制或者GUID(全局唯一ID)机制,实现思路:在执行数据更新操作之前,先检查一下是否执行过这个更新操作

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的ID,消费时,先根据这个ID检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费

但在分布式系统中,这个方法非常难以实现。首先,给每个消息指定一个全局唯一的ID就是一件不那么简单的事情,方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,检查消费状态,然后更新数据并且设置消费状态这三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现Bug

三、消息积压了该如何处理?

消息积压的直接原因一定是系统中的某个部分出现了性能问题,来不及处理上游发送的消息,才会导致消息积压

1、优化性能来避免消息积压

1)、发送端性能优化

对于发送消息的业务逻辑,只需要设置合适的并发和批量大小,就可以达到很多的发送性能

Producer发送消息的过程包括:Producer发送消息给Broker,Broker收到消息返回确认响应。假设这一次交互的平均时延是1ms,这1ms包括了下面这些步骤的耗时:

  • 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在网络请求之前的耗时
  • 发送消息和返回响应在网络传输中的耗时
  • Broker处理消息的时延

如果是单线程发送,每次只发送1条消息,那么每秒只能发送1000ms/1ms*1条/ms=1000条消息。无论是增加每次发送消息的批量大小,还是增加并发都能成倍地提升发送性能

比如说,消息发送端主要接收RPC请求处理在线业务,因为所有RPC框架都是多线程支持多并发的,自然就实现了并行发送消息。并且在线业务比较在意的是请求响应时延,选择批量发送会影响RPC服务的时延

如果是一个离线系统,它在性能上更注重整个系统的吞吐量,发送端的数据都是来自于数据库,这种情况就更适合批量发送。可以批量从数据库读取数据,然后批量来发送消息,同样用少量的并发就可以获得非常高的吞吐量

2)、消费端性能优化

使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送生产消息的速度,就会造成消息积压。如果这种性能倒挂的问题只是暂时的,只要消费单的性能恢复之后,超过发送端的性能,那积压的消息是可以逐渐被消化掉的

要是消费速度一直比生产速度慢,时间长了,整个系统就会出现问题,要么,消息队列的存储被填满无法提供服务,要么消息丢失,这对于整个系统来说都是严重故障

在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能

消费端的性能优化除了优化消费业务逻辑之外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。在扩容Consumer的实例数量的同时,必须同步扩容主题中的分区数量,确保Consumer的实例数和分区数量是相等的。如果Consumer的实例数量超过分区数量,这样的扩容是无效的

2、消息积压了该如何处理?

还有一种消息积压的情况是,日常系统正常运转的时候,没有积压或者只有少量积压很快就消费掉了,但是某一时刻,突然就开始积压消息并且积压持续上涨。这种情况下需要在短时间内找到消息积压的原因,迅速解决问题

能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了

大部分消息队列都内置了监控的功能,只要通过监控数据,很容易确定是哪种原因。如果是单位事件发送的消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例来提升总体的消费能力

如果短时间内没有足够的服务器资源进行扩容,没办法的办法是将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务

还有一种不太常见的情况,通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候需要检查一下消费端是不是消费失败导致的一条消息发福消费这种情况比较多,这种情况也会拖垮整个系统的消费速度

四、如何利用事务消息实现分布式事务?

消息队列中的事务主要解决的是消息生产者和消息消费者的数据一致性问题

拿电商来举个例子,一般来说,用户在电商APP上购物时,先把商品加到购物车里,然后几件商品一起下单,最后支付,完成购物流程,就可以等待收货了。这个过程中有一个需要用到消息队列的步骤,订单系统创建订单后,发消息给购物车系统,将已下单的商品从购物车中删除。因为从购物车删除已下单商品这个步骤,并不是用户下单支付这个主要流程中必需的步骤,使用消息队里来异步清理购物车是更加合理的设计

对于订单系统来说,它创建订单的过程中实际上执行了2个步骤的操作:

  • 在订单库中插入一条订单数据,创建订单
  • 发消息给消息队列,消息的内容就是刚刚创建的订单

购物车系统订阅相应的主题,接收订单创建的消息,然后清理购物车,在购物车中删除订单中的商品

问题的关键点集中在订单系统,创建订单和发送消息这两个步骤要么都操作成功,要么都操作失败,不允许一个成功而另一个失败的情况出现

1、什么是分布式事务?

事务的4个特性(ACID):

  • 原子性:指一个事务操作不可分割,要么成功,要么失败,不能有一半成功一半失败的情况
  • 一致性:指这些数据在事务执行完成这个时间点之前,读到的一定是更新前的数据,之后读到的一定是更新后的数据,不应该存在一个时刻,让用户读到更新过程中的数据
  • 隔离性:指一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对正在进行的其他事务是隔离的,并发执行的各个事务之间不能互相干扰
  • 持久性:指一个事务一旦完成提交,后续的其他操作和故障都不会对事务的结果产生任何影响

事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。比如订单系统的例子,在创建订单后,如果出现短暂的几秒,购物车里的商品没有及时情况,也不是完全不可接受的,只要最终购物车的数据和订单数据保持一致就可以了

2、消息队列是如何实现分布式事务的?

回到订单和购物车这个例子,来看下如何用消息队列来实现分布式事务

首先,订单系统在消息队列上开启了一个事务。然后订单系统给消息服务器发送一个半消息,这个半消息包含的内容是完整的消息内容,和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的

半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了要么都成功,要么都失败的一致性要求

如果在第四步提交事务消息时失败了,Kafka会直接抛出异常,让用户自行处理,可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿

3、RocketMQ中的分布式事务实现

在RocketMQ中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。如果Producer也就是订单系统,在提交或者回滚事务消息时发生网络异常,RocketMQ的Broker没有收到提交或者回滚的请求,Broker会定期去Producer上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务

为了支撑这个事务反查机制,业务代码中需要实现一个反查本地事务状态的接口,告知RocketMQ本地事务是成功还是失败

在订单系统的例子中,反查本地事务的逻辑只要根据消息中的订单ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ会自动根据事务反查的结果提交或者回滚事务消息

这个反查本地事务的实现,并不依赖消息的发送方,也就是订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ依然可以通过其他订单服务的节点来执行反查,确保事务的完整性

使用RocketMQ事务消息功能实现分布式事务的流程如下图:

文章目录
  1. 1. 一、如何确保消息不丢失?
    1. 1.1. 1、检测消息丢失的方法
    2. 1.2. 2、确保消息可靠传递
    3. 1.3. 3、小结
  2. 2. 二、如何处理消费过程中的重复消息?
    1. 2.1. 1、消息重复的情况必然存在
    2. 2.2. 2、用幂等性解决重复消息问题
  3. 3. 三、消息积压了该如何处理?
    1. 3.1. 1、优化性能来避免消息积压
    2. 3.2. 2、消息积压了该如何处理?
  4. 4. 四、如何利用事务消息实现分布式事务?
    1. 4.1. 1、什么是分布式事务?
    2. 4.2. 2、消息队列是如何实现分布式事务的?
    3. 4.3. 3、RocketMQ中的分布式事务实现