《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 波波微课 「架构师杨波」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

介绍

系统架构微服务化以后,根据微服务独立数据源的思想,每个微服务一般具有各自独立的数据源,但是不同微服务之间难免需要通过数据分发来共享一些数据,这个就是微服务的数据分发问题。Netflix/Airbnb等一线互联网公司的实践[参考附录1/2/3]表明,数据一致性分发能力,是构建松散耦合、可扩展和高性能的微服务架构的基础。

本文解释分布式微服务中的数据一致性分发问题,应用场景,并给出常见的解决方法。本文主要面向互联网分布式系统架构师和研发经理。

为啥要分发数据?场景?

我们还是要从具体业务场景出发,为啥要分发数据?有哪些场景?在实际企业中,数据分发的场景其实是非常多的。假设某电商企业有这样一个订单服务Order Service,它有一个独立的数据库。同时,周边还有不少系统需要订单的数据,上图给出了一些例子:

  1. 一个是缓存系统,为了提升订单数据的访问性能,我们可以把频繁访问的订单数据,通过Redis缓存起来;
  2. 第二个是Fulfillment Service,也就是订单履行系统,它也需要一份订单数据,借此实现订单履行的功能;
  3. 第三个是ElasticSearch搜索引擎系统,它也需要一份订单数据,可以支持前台用户、或者是后台运营快速查询订单信息;
  4. 第四个是传统数据仓库系统,它也需要一份订单数据,支持对订单数据的分析和挖掘。

当然,为了获得一份订单数据,这些系统可以定期去订单服务查询最新的数据,也就是拉模式,但是拉模式有两大问题:

  1. 一个是拉数据通常会有延迟,也就是说拉到的数据并不实时;
  2. 如果频繁拉的话,考虑到外围系统众多(而且可能还会增加),势必会对订单数据库的性能造成影响,严重时还可能会把订单数据库给拉挂。

所以,当企业规模到了一定阶段,还是需要考虑数据分发技术,将业务数据同步分发到对数据感兴趣的其它服务。除了上面提到的一些数据分发场景,其实还有很多其它场景,例如:

  1. 第一个是数据复制(replication)。为了实现高可用,一般要将数据复制多分存储,这个时候需要采用数据分发。
  2. 第二个是支持数据库的解耦拆分。在单体数据库解耦拆分的过程中,为了实现不停机拆分,在一段时间内,需要将遗留老数据同步复制到新的数据存储,这个时候也需要数据分发技术。
  3. 第三个是实现CQRS,还有去数据库Join。这两个场景我后面有单独文章解释,这边先说明一下,实现CQRS和数据库去Join的底层技术,其实也是数据分发。
  4. 第四个是实现分布式事务。这个场景我后面也有单独文章讲解,这边先说明一下,解决分布式事务问题的一些方案,底层也是依赖于数据分发技术的。
  5. 其它还有流式计算、大数据BI/AI,还有审计日志和历史数据归档等场景,一般都离不开数据分发技术。

总之,波波认为,数据分发,是构建现代大规模分布式系统、微服务架构和异步事件驱动架构的底层基础技术。

双写?

对于数据分发这个问题,乍一看,好像并不复杂,稍有开发经验的同学会说,我在应用层做一个双写不就可以了吗?比方说,请看上图右边,这里有一个微服务A,它需要把数据写入DB,同时还要把数据写到MQ,对于这个需求,我在A服务中弄一个双写,不就搞定了吗?其实这个问题并没有那么简单,关键是你如何才能保证双写的事务性?

请看上图左边的代码,这里有一个方法updateDbThenSendMsgInTransaction,这个方法上加了事务性标注,也就是说,如果抛异常的话,数据库操作会回滚。我们来看这个方法的执行步骤:

第一步先更新数据库,如果更新成功,那么result设为true,如果更新失败,那么result设为false;

第二步,如果result为true,也就是说DB更新成功,那么我们就继续做第三步,向mq发送消息

如果发消息也成功,那么我们的流程就走到第四步,整个双写事务就成功了。

如果发消息抛异常,也就是发消息失败,那么容器会执行该方法的事务性回滚,上面的数据库更新操作也会回滚。

初看这个双写流程没有问题,可以保证事务性。但是深入研究会发现它其实是有问题的。比方说在第三步,如果发消息抛异常了,并不保证说发消息失败了,可能只是由于网络异常抖动而造成的抛异常,实际消息可能是已经发到MQ中,但是抛异常会造成上面数据库更新操作的回滚,结果造成两边数据不一致。

模式一:事务性发件箱(Transactional Outbox)

对于事务性双写这个问题,业界沉淀下来比较实践的做法,其中一种,就是采用所谓事务性发件箱模式,英文叫Transactional Outbox。据说这个模式是eBay最早发明和使用的。事务性发件箱模式不难理解,请看上图。

我们仍然以订单Order服务为例。在数据库中,除了订单Order表,为了实现事务性双写,我们还需增加了一个发件箱Outbox表。Order表和Outbox表都在同一个数据库中,对它们进行同时更新的话,通过数据库的事务机制,是可以实现事务性更新的。

下面我们通过例子来展示这个流程,我们这里假定Order Service要添加一个新订单。

首先第一步,Order Service先将新订单数据写入Order表,然后它再向Outbox表中写入一条订单新增记录,这两个DB操作可以包在一个DB事务里头,也就是可以实现事务性写入。

然后第二步,我们再引入一个称为消息中继Message Relay的角色,它负责定期Poll拉取Outbox中的新数据,然后第三步再Publish发送到MQ。如果写入MQ确认成功,Message Relay就可以将Outbox中的对应记录标记为已消费。这里可能会出现一种异常情况,就是Message Relay在将消息发送到MQ时,发生了网络抖动,实际消息可能已经写入MQ,但是Message Relay并没有得到确认,这时候它会重发,直到明确成功为止。所以,这里也是一个At Least Once,也就是至少交付一次的消费语义,消息可能被重复投递。因此,MQ之后的消费方要做消息去重或幂等处理。

总之,事务性发件箱模式可以保证,对Order表的修改,然后将对应事件发送到MQ,这两个动作可以实现事务性,也就是实现数据分发的事务性。

注意,这里的Message Relay角色既可以是一个独立部署的服务,也可以和Order Service住在一起。生产实践中,需要考虑Message Relay的高可用部署,还有监控和告警,否则如果Message Relay挂了,消息就发不出来,然后,依赖于消息的各种消费方也将无法正常工作。

Transactional Outbox参考实现 ~ Killbill Common Queue

事务性发件箱的原理简单,实现起来也不复杂,波波这边推荐一个生产级的参考实现。这个实现源于一个叫killbill的项目,killbill是美国高朋(GroupOn)公司开源的订阅计费和支付平台,这个项目已经有超过8~9年的历史,在高朋等公司已经有不少落地案例,是一个比较成熟的产品。killbill项目里头有一些公共库,单独放在一个叫killbill-commons的子项目里头,其中有一个叫killbill common queue,它其实是事务性发件箱的一个生产级实现。上图有给出这个queue的github链接。

Killbill common queue也是一个基于DB实现的分布式的队列,它上层还包装了EventBus事件总线机制。killbill common queue的总体设计思路不难理解,请看上图:

在上图的左边,killbill common queue提供发送消息API,并且是支持事务的。比方说图上的postFromTransaction方法,它可以发送一个BusEvent事件到DB Queue当中,这个方法还接受一个数据库连接Connection参数,killbill common queue可以保证对事件event的数据库写入,和使用同一个Connection的其它数据库写入操作,发生在同一个事务中。这个做法其实就是一种事务性发件箱的实现,这里的发件箱存的就是事件event。

除了POST写入API,killbill common queue还支持类似前面提到的Message Relay的功能,并且是包装成EeventBus + Handler方式来实现的。开发者只需要实现事件处理器,并且注册订阅在EventBus上,就可以接收到DB Queue,也就是发件箱当中的新事件,并进行消费处理。如果事件处理成功,那么EvenbBus会将对应的事件从发件箱中移走;如果事件处理不成功,那么EventBus会负责重试,直到处理成功,或者超过最大重试次数,那么它会将该事件标记为处理失败,并移到历史归档表中,等待后续人工检查和干预。这个EventBus的底层,其实有一个Dispatcher派遣线程,它负责定期扫描DB Queue(也就是发件箱)中的新事件,有的话就批量拉取出来,并发送到内部EventBus的队列中,如果内部队列满了,那么Dispather Thread也会暂停拉取新事件。

在killbill common queue的设计中,每个节点上的Dispather线程只负责通过自己这个节点写入的事件,并且在一个节点上,Dispather线程也只有一个,这样才能保证消息消费的顺序性,并且也不会重复消费。

Reaper机制

killbill common queue,其实是一个基于集中式数据库实现的分布式队列,为什么说它是分布式队列呢?请看上图,killbill common queue的设计是这样的,它的每个节点,只负责消费处理从自己这个节点写入的事件。比方说上图中有蓝色/黄色和绿色3个节点,那么蓝色节点,只负责从蓝色节点写入,在数据库中标记为蓝色的事件。同样,黄色节点,只负责从黄色节点写入,在数据库中标记为黄色的事件。绿色节点也是类似。这是一种分布式的设计,如果处理容量不够,只需按需添加更多节点,就可以实现负载分摊。

这里有个问题,如果其中某个节点挂了,比方说上图的蓝色节点挂了,那么谁来继续消费数据库中蓝色的,还没有来得及处理的事件呢?为了解决这个问题,killbill common queue设计了一种称为reaper收割机的机制。每个节点上都还住了一个收割机线程,它们会定期检查数据库,看有没有长时间无人处理的事件,如果有,就抢占标记为由自己负责。比方说上图的右边,最终黄色节点上的收割机线程抢到了原来由蓝色节点负责的事件,那么它会把这些事件标记为黄色,也就是由自己来负责。

收割机机制,保证了killbill common queue的高可用性,相当于保证了事务性发件箱中的Message Relay的高可用性。

Killbill PersistentBus表结构

基于killbill common queue的EventBus,也被称为killbill PersistentBus。上图给出了它的数据库表结构,其中bus_events就是用来存放待处理事件的,相当于发件箱,主要的字段包括:

  1. event_json,存放json格式的原始数据。
  2. creating_owner,记录创建节点,也就是事件是由哪个节点写入的。
  3. processingowner,记录处理节点,也就是事件最终是由哪个节点处理的;通常由creatingowner自己处理,但也可能被收割,由其它节点处理。
  4. processing_state,当前的处理状态。
  5. error_count,处理错误计数,超过一定计数会被标记为处理失败。

当前处理状态主要包括6种:

  1. AVAILABLE,表示待处理
  2. IN_PROCESSING,表示已经被dispatcher线程取走,正在处理中
  3. PROCESSED,表示已经处理
  4. REMOVED,表示已经被删除
  5. FAILED,表示处理失败
  6. REPEATED,表示被其它节点收割了

除了bus_events待处理事件表,还有一个对应的bus-events-history事件历史记录表。不管成功还是失败,最终,事件会被写入历史记录表进行归档,作为事后审计或者人工干预的依据。

上图下方给出了数据库表的github链接,你可以进一步参考学习。

Killbill PersistentBus处理状态迁移

上图给出了killbill PersistentBus的事件处理状态迁移图。

  1. 刚开始事件处于AVAILABLE待处理状态;
  2. 之后事件被dispatcher线程拉取,进入IN_PROCESSING处理中状态;
  3. 之后,如果事件处理器成功处理了事件,那么事件就进入PROCESSED已经处理状态;
  4. 如果事件处理器处理事件失败,那么事件的错误计数会被增加1,如果错误计数还没有超过最大失败重试阀值,那么事件就会重新进入AVAILABLE状态;
  5. 如果事件的错误数量超过了最大失败重试阀值,那么事件就会进入FAILED失败状态;
  6. 如果负责待处理事件的节点挂了,那么到达一定的时间间隔,对应的事件会被收割进入REAPED被收割状态。

上图有一个通过API触发进入的REMOVED移除状态,这个是给通知队列用的,用户可以通过API移除对应的通知消息。顺便提一下,除了事件/消息队列,Killbill queue也是支持通知队列(或者说延迟消息队列)的。

模式二:变更数据捕获(Change Data Capture, CDC)

对于事务性双写这个问题,业界沉淀下来比较实践的做法,其中第二种,就是所谓的变更数据捕获,英文称为Change Data Capture,简称CDC。

变更数据捕获的原理也不复杂,它利用了数据库的事务日志记录。一般数据库,对于变更提交操作,都记录所谓事务日志Transaction Log,也称为提交日志Commit Log,比方说MySQL支持binlog,Postgres支持Write Ahead log。事务日志可以简单理解为数据库本地的一个文件队列,它记录了按时间顺序发生的对数据库表的变更提交记录。

下面我们通过例子来展示这个变更数据捕获的流程,我们这里假定Order Service要添加一个新订单。

第一步,Order Service将新订单记录写入Order表,并且提交。因为这是一次表变更操作,所以这次变更会被记录到数据库的事务日志当中,其中内容包括发生的变更数据。

第二步,我们还需要引入一个称为Transaction Log Miner这样的角色,这个Miner负责订阅在事务日志队列上,如果有新的变更记录,Miner就会捕获到变更记录。

然后第三步,Miner会将变更记录发送到MQ消息队列。同之前的Message Relay一样,这里的发送到MQ也是At Least Once语义,消息可能会被重复发送,所以MQ之后的消费者需要做去重或者幂等处理。

总之,CDC技术同样可以保证,对Order表的修改,然后将对应事件发送到MQ,这两个动作可以实现事务性,也就是实现数据分发的事务性。

注意,这里的CDC一般是一个独立部署的服务,生产中需要做好高可用部署,并且做好监控告警。否则如果CDC挂了,消息也就发不出来,然后,依赖于消息的各种消费方也将无法正常工作。

CDC开源项目(企业级)

当前,有几个比较成熟的企业级的CDC开源项目,我这边收集了一些,供大家学习参考:

  1. 第一个是阿里开源的Canal,目前在github上有超过1.4万颗星,这个项目在国内用得比较多,之前在拍拍贷的实时数据场景,Canal也有不少成功的应用。Canal主要支持MySQL binlog的增量订阅和消费。它是基于MySQL的Master/Slave机制,它的Miner角色是通过伪装成Slave来实现的。这个项目的使用文档相对比较完善,建议大家一步参考学习。
  2. 第二个是Redhat开源的Debezium,目前在github上有超过3.2k星,这个项目在国外用得较多。Debezium主要是在Kafka Connect的基础上开发的,它不仅支持mysql数据库,还支持postgres/sqlserver/mongodb等数据库。
  3. 第三个是Zendesk开源的Maxwell,目前在github上有超过2.1k星。Maxwell是一个轻量级的CDC Deamon,主要支持MySQL binlog的变更数据捕获和处理。
  4. 第四个是Airbnb开源的SpinalTap,目前在github上有两百多颗星。SpinalTap主要支持MySQL binlog的变更捕获和处理。这个项目的星虽然不多,但是它是在Airbnb SOA服务化过程中,通过实践落地出来的一个项目,值得参考。

对于上面的这些项目,如果你想生产使用的话,波波推荐的是阿里的Canal,因为这个项目毕竟是国内大厂阿里落地出来,而且在国内已经有不少企业落地案例。其它几个项目,你也可以参考研究。

学习参考 ~ Eventuate-Tram

既然谈到这个CDC,这里有必要提到一个人和一本书,这个人叫Chris Chardson,他是美国的老一辈的技术大牛,曾今是第一代的Cloud Foundry项目的创始人(后来Cloud Foundry被Pivotal所收购)。近几年,Chris Chardson开始转战微服务领域,这两年,他还专门写了一本书,叫《微服务设计模式》,英文名是《Microservices Patterns》。这本书主要是讲微服务架构和设计模式的,内容还不错,是我推荐大家阅读的。

Charis Chardson还专门开发了一个叫Eventuate-Tram的开源项目(这个项目也有商业版),另外他的微服务书里头也详细介绍了这个项目。这个项目可以说是一个大集成框架,它不仅实现了DDD领域驱动开发模式,CQRS命令查询职责分离模式,事件溯源模式,还实现了Saga事务状态机模式。当然,这个项目的底层也实现了CDC变更数据捕获模式。

波波认为,Charis的项目,作为学习研究还是有价值的,但是暂不建议生产级使用,因为他的东西不是一线企业落地出来的,主要是他个人开发的。至于说Charis的项目能否在一线企业落地,还有待时间的进一步检验。

Transactional Outbox vs CDC

好的,前面我介绍了解决数据的事务性分发的两种落地模式,一种是事务性发件箱模式,另外一种是变更数据捕获模式,这两种模式其实各有优劣,为了帮助大家做选型决策,我这边对这两种模式进行一个比较,请看上面的比较表格:

  1. 首先比较一下复杂性,事务性发件箱相对比较简单,简单做法只需要在数据库中增加一个发件箱表,然后再启一个Poller线程拉消息和发消息就可以了。CDC技术相对比较复杂,需要你深入理解数据库的事务日志格式和协议。另外Miner的实现也不简单,要保证不丢消息,如果生产部署的话,还要考虑Miner的高可以部署,还有监控告警等环节。
  2. 第二个比较的是Polling延迟和开销。事务性发件箱的Polling是近实时的,同时如果频繁拉数据库表,难免会有性能开销。CDC是比较实时的,同时它不侵入数据库和表,所以它的性能开销相对小。
  3. 第三个比较的是应用侵入性。事务性发件箱是有一定的应用侵入性的,应用在更新业务数据的同时,还要单独发送消息。CDC对应用是无侵入的,因为它拉取的是数据库事务日志,这个和应用是不直接耦合的。当然,CDC和事务性发件箱模式并不排斥,你可以在应用层采用事务性发件箱模式,同时仍然采用CDC到数据库去捕获和发件箱中的消息对应的事务日志。这个方法对应用有一定的侵入性,但是通过CDC可以获得较好的数据同步性能。
  4. 第四点是适用场合。事务性发件箱主要适用于中小规模的企业,因为做法比较简单,一个开发人员也可以搞定。CDC则主要适用于中大规模互联网企业,最好有独立框架团队负责CDC的治理和维护。像Netflix/Airbnb这样的一线互联网公司,也是在中后期才引入CDC技术的[参考附录1/2/3]。

Single Source of Truth

前面我解答了如何解决微服务的数据一致性分发问题,也给出了可落地的方案。最后,我特别说明在实践中进行数据分发的一个原则,叫Single Source of Truth,翻成中文就是单一真实数据源。它的意思是说,你要实现数据分发,目标服务可以有很多,但是一定要注意,数据的主人只能有一个,它是数据的权威记录系统(canonical system of record),其它的数据都是只读的,非权威的拷贝(read-only, non-authoritative copy)。

换句话说,任何时候,对于某类数据,它主人应该是唯一的,它是Single Source of Truth,只有它可以修改数据,其它的服务可以获得数据拷贝,做本地缓存也没问题,但是这些数据都是只读的,不能修改。

只有遵循这条原则,数据分发才能正常工作,不会产生不一致的情况。

结论

  1. Netflix和Airbnb等一线互联网公司的实践证明,企业要真正实现松散耦合、可扩展和高性能的微服务架构,那么底层的数据分发同步能力是非常关键的。
  2. 数据分发技术,简单的可以采用事务性发件箱模式来实现,重量级的可以考虑变更数据捕获CDC技术来实现。事务性发件箱可以参考Killbill Queue的实现,CDC可以参考阿里的Canal等开源产品来实现。
  3. 最简单的双写也是实现数据分发的一种方式,但是为了保证一致性,需要引入后台校验补偿程序。
  4. 最后,数据分发/同步的原则是:确保单一真实数据源(Single Source of Truth)。系统中数据的主人应该只有一个,只有主人可以写入数据,其它都是只读拷贝。
文章目录
  1. 1. 介绍
  2. 2. 为啥要分发数据?场景?
  3. 3. 双写?
  4. 4. 模式一:事务性发件箱(Transactional Outbox)
  5. 5. Transactional Outbox参考实现 ~ Killbill Common Queue
  6. 6. Reaper机制
  7. 7. Killbill PersistentBus表结构
  8. 8. Killbill PersistentBus处理状态迁移
  9. 9. 模式二:变更数据捕获(Change Data Capture, CDC)
  10. 10. CDC开源项目(企业级)
  11. 11. 学习参考 ~ Eventuate-Tram
  12. 12. Transactional Outbox vs CDC
  13. 13. Single Source of Truth
  14. 14. 结论