⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 blog.longyb.com/2019/01/23/transaction_mq/ 「longyb」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

事务消息组件的套路

本文介绍一些互联网产品的服务端关于处理事务消息的套路

套路1:最终一致性消息模型

该方案关键是要有个消息表。另外,一般会有个队列,而且我们一般都会假设这个MQ不丢消息。

基本思路如下

  • 消息生产方

需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交(有时实现时为了简单,可以只是增加一个字段。新增字段会跟业务强耦合)

  • 消息消费方 需要处理这个消息,并完成自己的业务逻辑。

如果本地事务处理成功,那发送给生产方一个confirm消息,表明已经处理成功了。

如果处理失败,可以发送给生产方一个失败消息,或者记录本地表。以后重试。

生产方定时扫描本地消息表,把还没处理完成的消息重新发送一遍。如果有其他的对账补账逻辑,这一步也可以省略。

整体流程如下图所示:

对一致性要求不高的或者有其他兜底方案的场景(比如较为频繁的对账补账机制),我们就不需要关心消息的confirm等情况,只要扔给消息,就认为OK,一般也是可取的。

但是这个方案存在一个小问题

如果发送消息失败,发送方并不知道是消息中间件真的没有收到消息呢,还是消息已经收到了只是返回response的时候失败了

  1. 如果是已经收到消息了,而发送端认为没有收到,执行update db的回滚操作。则会导致发送方事务回滚,接受方却把后续逻辑做掉了
  2. 把网络调用放在DB事务里面,可能会因为网络的延时,导致DB长事务,存在风险

套路2:事务消息模型

国内常用方案

事务消息与上面提到的最终一致性模型一致,只不过是把记录消息发送状态这一步在中间件内部做掉了,从而无需业务方针对每个业务自己手动实现。

具体的实现如下图所示。

现在的步骤如下:

  1. 业务应用把要发送的消息给协调器

    1.1 协调器在他自己的DB中记录下这条消息

    1.2 协调器返回对应的msgid

  2. 业务应用自行本地事务更新DB

  3. 业务系统如果本地事务执行成功,告诉协调器这条消息Commit,如果本地事务执行失败,则rollback

    3.1 协调器更新自己的数据库,标记消息已经Commit/Rollback

  4. 协调器对于成功submit的消息,开始往MQ进行投递,等待ack,如果一段时间没有收到ack,会继续投递该消息

  5. MQ将消息投递给接收方,接收方执行事务更新DB

  6. 接收方应用给协调器做ack,告诉这条消息消费成功.如果长时间没有收到ACK,协调器重投到MQ,这里需要接受方做幂等实现

  7. 如果协调器没有收到Commit/rollback,则会询问业务应用消息的状态,是要Commit还是Rollback

对于网易内部的TMC来说,1、3、7三步都是由dubbo实现的(7走的Dubbo的泛化调用)。

阿里的Rocketmq则是做在了产品内部,不依赖外部实现。

RocketMQ的实现细节见此

对比而言,方案2的最大不一样就是把“扫描消息表”这个事情,不需要业务方做,消息中间件帮着做了。

至于消息表,其实还是没有省掉。因为消息中间件要询问发送方,事物是否执行成功,还是需要一个“变相的本地消息表”或者说业务自己的业务表有对应的信息,记录事物执行状态。

KAFKA的事务消息实现方案

Kafka的transaction最开始是给kafka steaming实现exactly once语义设计的,后面又有一些扩展。

Kafka的实现方案(不考虑kafka streaming的情况下,kafka streaming对消费者有处理)对比上面是没有回查步骤的,同时消费者也需要业务代码来保证幂等。

流程图如下:

上图来自matt的博客

KAFKA的事务消息具体实现比较复杂,简单来说分为如下几个步骤:

  1. 生产者像事务协调器注册自己的TransactionalID,这个ID需要保证每台生产者是唯一的。这时如果对应的TransactionalID的机器有没有结束的transaction,会直接被终止。
  2. beginTransaction是本地做的,对于服务端没有交互
  3. 向协调器注册要发消息的partition(要解决跨Topic,跨Partition的事务,对应上图4.1a,协调器开始写ongoing的txLog到Transaction_Log的内部topic中)
  4. 正常发送消息到对应的partition去
  5. 调用commit/abord到协调器,协调器开始[两阶段提交]
  6. 协调器写prepare消息到Transaction_Log,这时就算提交了。然后再会给发送到的partition后面补一个不可见的commit消息(主要是用来处理read committed),然后会在写一个commited到到Transaction_Log。到这里就算一个事务结束。

由于这个方案没有引入一个唯一的消息ID(非kafka streaming 模式),且没有回查机制,对于方案1中的最后两个问题也是没有解决。

Kafka自己的kafka streaming则是需要把消费源头的offset作为transaction的一部分(会写到到Transaction_Log里)提交给协调器来实现幂等。而对接一般的外部业务系统,则没有对应机制可以做这件事。

下面是相关的资料:

confluent的简单介绍

原始KIP

matt33的博客(中文)写的很详细,推荐一看

文章目录
  1. 1. 事务消息组件的套路
  2. 2. 套路1:最终一致性消息模型
  3. 3. 套路2:事务消息模型
    1. 3.1. 国内常用方案
    2. 3.2. KAFKA的事务消息实现方案