《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Spring-Cloud/Bus-Kafka/ 「芋道源码」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

本文在提供完整代码示例,可见 https://github.com/YunaiV/SpringBoot-Labslabx-19 目录。

原创不易,给点个 Star 嘿,一起冲鸭!

1. 概述

友情提示:在开始本文之前,胖友需要对 Kafka 进行简单的学习。可以阅读《Kafka 极简入门》文章,将第一二小节看完,在本机搭建一个 Kafka 服务。

Kafka 是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:

  • 通过 O(1) 的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
  • 支持通过 Kafka 服务器和消费机集群来分区消息。

本文我们来学习 Spring Cloud Bus Kafka 组件,基于 Spring Cloud Bus 的编程模型,接入 Kafka 消息队列,实现事件总线的功能。

Spring Cloud Bus 是事件、消息总线,用于在集群(例如,配置变化事件)中传播状态变化,可与 Spring Cloud Config 联合实现热部署。

《芋道 Spring Boot 事件机制 Event 入门》文章,我们已经了解到,Spring 内置了事件机制,可以实现 JVM 进程内的事件发布与监听。但是如果想要跨 JVM 进程的事件发布与监听,此时它就无法满足我们的诉求了。

因此,Spring Cloud Bus 在 Spring 事件机制的基础之上进行拓展,结合 RabbitMQ、Kafka、RocketMQ 等等消息队列作为事件的“传输器”,通过发送事件(消息)到消息队列上,从而广播到订阅该事件(消息)的所有节点上。最终如下图所示:整体模型

Spring Cloud Bus 定义了 RemoteApplicationEvent 类,远程的 ApplicationEvent 的抽象基类。核心代码如下:

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonIgnoreProperties("source") // <2>
public abstract class RemoteApplicationEvent extends ApplicationEvent { // <1>

private final String originService;

private final String destinationService;

private final String id;

// ... 省略一大撮代码
}
  • 显然,我们使用 Spring Cloud Bus 发送的自定义事件,必须要继承 RemoteApplicationEvent 类。
  • <1> 处,继承 Spring 事件机制定义的 ApplicationEvent 抽象基类。
  • <2> 处,通过 Jackson 的 @JsonIgnoreProperties 注解,设置忽略继承自 ApplicationEvent 的 source 属性,避免序列化问题。
  • id 属性,事件编号。一般情况下,RemoteApplicationEvent 会使用 UUID.randomUUID().toString() 代码,自动生成 UUID 即可。
  • originService 属性,来源服务。Spring Cloud Bus 提供好了 ServiceMatcher#getServiceId() 方法,获取服务编号作为 originService 属性的值。

    友情提示:这个属性非常关键,艿艿稍后会详细讲一下,都是眼泪啊!!!

  • destinationService 属性,目标服务。该属性的格式是 {服务名}:{服务实例编号}

    举个板栗:

    • 如果想要广播给所有服务的所有实例,则设置为 **:**
    • 如果想要广播给 users 服务的所有实例,则设置为 users:**
    • 如如果想要广播给 users 服务的指定实例,则设置为 users:bc6d27d7-dc0f-4386-81fc-0b3363263a15

2. 快速入门

示例代码对应仓库:

哔哔再多,不如撸个 Spring Cloud Bus 快速入门的示例。我们会新建两个项目:

项目结构

2.1 事件发布器项目

创建 labx-19-sc-bus-kafka-demo-publisher 项目,扮演事件发布器的角色,使用 Spring Cloud Bus 发送事件。

2.1.1 引入依赖

创建 pom.xml 文件,引入 Spring Cloud Bus 相关依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>labx-19</artifactId>
<groupId>cn.iocoder.springboot.labs</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>labx-19-sc-bus-kafka-demo-publisher</artifactId>

<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<spring.cloud.version>Hoxton.SR1</spring.cloud.version>
</properties>

<!--
引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 引入基于 Kafka 的 Spring Cloud Bus 的实现的依赖,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
</dependencies>

</project>

2.1.2 配置文件

创建 application.yml 配置文件,添加 Spring Cloud Bus 相关配置:

spring:
application:
name: publisher-demo

# Kafka 配置项,对应 KafkaProperties 配置类
kafka:
bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔

# Bus 相关配置项,对应 BusProperties
cloud:
bus:
enabled: true # 是否开启,默认为 true
destination: springCloudBus # 目标消息队列,默认为 springCloudBus

spring.kafka 配置项,为 Kafka 相关配置项。

友情提示:感兴趣的胖友,可以阅读《芋道 Spring Boot 消息队列 Kafka 入门》文章。

spring.cloud.bus 配置项,为 Spring Cloud Bus 配置项,对应 BusProperties 类。一般情况下,使用默认值即可。

2.1.3 UserRegisterEvent

创建 UserRegisterEvent 类,用户注册事件。代码如下:

public class UserRegisterEvent extends RemoteApplicationEvent {

/**
* 用户名
*/
private String username;

public UserRegisterEvent() { // 序列化
}

public UserRegisterEvent(Object source, String originService, String destinationService, String username) {
super(source, originService);
this.username = username;
}

public String getUsername() {
return username;
}

}

① 继承 RemoteApplicationEvent 抽象基类。

② 创建一个空的构造方法,毕竟要序列化。

2.1.4 DemoController

创建 DemoController 类,提供 /demo/register 注册接口,发送 UserRegisterEvent 事件。代码如下:

@RestController
@RequestMapping("/demo")
public class DemoController {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private ApplicationEventPublisher applicationEventPublisher;

@Autowired
private ServiceMatcher busServiceMatcher;

@GetMapping("/register")
public String register(String username) {
// ... 执行注册逻辑
logger.info("[register][执行用户({}) 的注册逻辑]", username);

// ... <2> 发布
applicationEventPublisher.publishEvent(new UserRegisterEvent(this, busServiceMatcher.getServiceId(),
null, username)); // <1>
return "success";
}


}

<1> 处,创建 UserRegisterEvent 对象。

  • originService 属性,通过 ServiceMatcher#getServiceId() 方法,获得服务编号。
  • destinationService 属性,我们传入 null 值。RemoteApplicationEvent 会自动转换成 **,表示广播给所有监听该消息的实例。

<2> 处,和 Spring 事件机制一样,通过 ApplicationEventPublisher 的 #publishEvent(event) 方法,直接发送事件到 Spring Cloud Bus 消息总线。好奇的胖友,可以打开 BusAutoConfiguration 的代码,如下图所示:BusAutoConfiguration 源码

友情提示:如果胖友仔细看的话,还可以发现 Spring Cloud Bus 是使用 Spring Cloud Stream 进行消息的收发的。

2.1.5 PublisherDemoApplication

创建 PublisherDemoApplication 类,作为启动类。代码如下:

@SpringBootApplication
public class PublisherDemoApplication {

public static void main(String[] args) {
SpringApplication.run(PublisherDemoApplication.class, args);
}

}

2.2 事件监听器项目

创建 labx-19-sc-bus-kafka-demo-listener 项目,扮演事件监听器的角色,使用 Spring Cloud Bus 监听事件。

2.2.1 引入依赖

创建 pom.xml 文件,引入相关的依赖。和「2.1.1 引入依赖」是一致的,就不重复“贴”出来了,胖友点击 pom.xml 文件查看。

2.2.2 配置文件

创建 application.yaml 配置文件,添加相关的配置项。和「2.1.2 配置文件」是一致的,就不重复“贴”出来了,胖友点击 application.yaml 文件查看。

2.2.3 UserRegisterEvent

创建 UserRegisterEvent 类,用户注册事件。和「2.1.3 UserRegisterEvent」是一致的,就不重复“贴”出来了,胖友点击 UserRegisterEvent 文件查看。

2.2.4 UserRegisterListener

创建 UserRegisterListener 类,监听 UserRegisterEvent 事件。代码如下:

/**
* 用户注册事件的监听器
*/
@Component
public class UserRegisterListener implements ApplicationListener<UserRegisterEvent> {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void onApplicationEvent(UserRegisterEvent event) {
logger.info("[onApplicationEvent][监听到用户({}) 注册]", event.getUsername());
}

}

和 Spring 事件机制一样,只需要监听指定事件即可。好奇的胖友,可以打开 BusAutoConfiguration 的代码,如下图所示:BusAutoConfiguration 源码

2.2.5 ListenerDemoApplication

创建 ListenerDemoApplication 类,作为启动类。代码如下:

@SpringBootApplication
@RemoteApplicationEventScan
public class ListenerDemoApplication {

public static void main(String[] args) {
SpringApplication.run(ListenerDemoApplication.class, args);
}

}

在类上,添加 Spring Cloud Bus 定义的 @RemoteApplicationEventScan 注解,声明要从 Spring Cloud Bus 监听 RemoteApplicationEvent 事件。

2.3 简单测试

① 执行 PublisherDemoApplication 一次,启动一个事件发布器

② 执行 ListenerDemoApplication 两次,启动两个事件监听器。需要将「Allow parallel run」进行勾选,如下图所示:IDEA

此时,我们可以在 Kafka 运维界面看到 springCloudBus 这个 Topic,如下图所示:

  • Kafka 运维界面 - Topic
  • Kafka 运维界面 - Consumer

③ 调用 http://127.0.0.1:8080/demo/register?username=yudaoyuanma 接口,进行注册。IDEA 控制台打印日志如下:

# PublisherDemoApplication 控制台
2020-04-09 07:42:03.417 INFO 31050 --- [nio-8080-exec-1] c.i.s.l.p.controller.DemoController : [register][执行用户(haha) 的注册逻辑]

# ListenerDemoApplication 控制台 01
2020-04-09 07:42:03.603 INFO 31027 --- [container-0-C-1] c.i.s.l.l.listener.UserRegisterListener : [onApplicationEvent][监听到用户(haha) 注册]

# ListenerDemoApplication 控制台 02
2020-04-09 07:42:03.603 INFO 31040 --- [container-0-C-1] c.i.s.l.l.listener.UserRegisterListener : [onApplicationEvent][监听到用户(haha) 注册]

发布的 UserRegisterEvent 事件,被两个事件监听器的进程都监听成功。

3. 监控端点

示例代码对应仓库:labx-19-sc-bus-kafka-demo-listener-actuator

Spring Cloud Bus 的 endpoint 模块,基于 Spring Boot Actuator,提供了两个自定义监控端点:

同时,Spring Cloud Bus 拓展了 Spring Boot Actuator 内置的 httptrace 端点,会监听 Spring Cloud Bus 发送消息时产生的 SentApplicationEvent 事件和确认消息的产生 AckRemoteApplicationEvent 事件,配合 TraceListener 监听器,记录相应的跟踪信息。不过因为 httptrace 端点改版了,所以目前该功能已经失效,而且失效了 2 年多了,具体代码如下:TraceListener 源码

友情提示:对 Spring Boot Actuator 不了解的胖友,可以后续阅读《芋道 Spring Boot 监控端点 Actuator 入门》文章。

我们来搭建一个 Spring Cloud Bus 监控端点的使用示例。考虑方便,我们直接复用「2. 快速入门」小节的项目,从 labx-19-sc-bus-kafka-demo-listener 复制出 labx-19-sc-bus-kafka-demo-listener-actuator,测试 Spring Cloud Bus 的监控端点结果。最终项目如下图所示:项目结构

友情提示:不使用 labx-19-sc-bus-kafka-demo-publisher 的原因是,未添加 @RemoteApplicationEventScan 注解,不会从 Spring Cloud Bus 监听 RemoteApplicationEvent 事件。

3.1 引入依赖

pom.xml 文件中,额外引入 Spring Boot Actuator 相关依赖。代码如下:

<!-- 实现对 Actuator 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

3.2 配置文件

修改 application.yaml 配置文件,额外增加 Spring Boot Actuator 配置项。配置如下:

spring:
application:
name: listener-demo

# Kafka 配置项,对应 KafkaProperties 配置类
kafka:
bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔

server:
port: 18080 # 随机端口,方便启动多个消费者

management:
endpoints:
# Actuator HTTP 配置项,对应 WebEndpointProperties 配置类
web:
exposure:
include: '*' # 需要开放的端点。默认值只打开 health 和 info 两个端点。通过设置 * ,可以开放所有端点。

新增 management 配置项,设置 Spring Boot Actuator 配置项。这里先不详细解析,后续看下《芋道 Spring Boot 监控端点 Actuator 入门》文章。

3.3 简单测试

执行 ListenerDemoApplication 启动项目。

① 使用 Postman 模拟请求 bus-env 端点,如下图所示:Postman `bus-env` 端口

此时,我们在控制台可以看到 EnvironmentChangeListener 打印日志如下,说明成功接收到 EnvironmentChangeRemoteApplicationEvent 事件:

2020-04-09 07:53:33.737  INFO 31712 --- [io-18080-exec-1] o.s.c.b.event.EnvironmentChangeListener  : Received remote environment change request. Keys/values to update {test-property=test-value}

② 使用 Postman 模拟请求 bus-refresh 端点,如下图所示:Postman `bus-refresh` 端口

此时,我们在控制台可以看到 RefreshListener 打印日志如下,说明成功接收到 RefreshRemoteApplicationEvent 事件:

2020-04-09 07:53:46.409  INFO 31712 --- [io-18080-exec-2] o.s.cloud.bus.event.RefreshListener      : Received remote refresh request. Keys refreshed []

4. 集成到 Spring Cloud Config

实际上,Spring Cloud Bus 在日常开发中,基本不会使用到。绝大多数情况下,我们通过使用 Spring Cloud Stream 即可实现它所有的功能,并且更加强大和灵活。同时,艿艿也找了一些在使用 Spring Cloud 作为微服务解决方案的胖友,确实一个都没有在使用 Spring Cloud Bus 的 = =。

倔强的艿艿又翻阅了网上的相关资料,绝大多数都是提到通过 Spring Cloud Bus,实现 Spring Cloud Config 配置中心的自动配置刷新的功能。因此,可能我们不是很必要去学习它,哈哈哈。

不过良心的艿艿,还是在《芋道 Spring Cloud 配置中心 Spring Cloud Config 入门》文章的「5. 自动配置刷新(第二弹)」小节中,将 Spring Cloud Bus 集成到 Spring Cloud Config 中,实现配置中心的自动配置刷新的功能。

Spring Cloud Config + Spring Cloud Bus

示例代码对应仓库:

感兴趣的胖友可以去看看,不过貌似国内采用 Spring Cloud Config 作为配置中心的公司越来越少,更多的都是采用 Nacos 或者 Apollo 嘿嘿~

友情提示:目前艿艿团队,采用 Nacos 作为配置中心 + 注册中心。

666. 彩蛋

至此,我们已经完成 Spring Cloud Bus Kafka 的学习。如下是 Bus Kafka 相关的官方文档:

文章目录
  1. 1. 1. 概述
  2. 2. 2. 快速入门
    1. 2.1. 2.1 事件发布器项目
      1. 2.1.1. 2.1.1 引入依赖
      2. 2.1.2. 2.1.2 配置文件
      3. 2.1.3. 2.1.3 UserRegisterEvent
      4. 2.1.4. 2.1.4 DemoController
      5. 2.1.5. 2.1.5 PublisherDemoApplication
    2. 2.2. 2.2 事件监听器项目
      1. 2.2.1. 2.2.1 引入依赖
      2. 2.2.2. 2.2.2 配置文件
      3. 2.2.3. 2.2.3 UserRegisterEvent
      4. 2.2.4. 2.2.4 UserRegisterListener
      5. 2.2.5. 2.2.5 ListenerDemoApplication
    3. 2.3. 2.3 简单测试
  3. 3. 3. 监控端点
    1. 3.1. 3.1 引入依赖
    2. 3.2. 3.2 配置文件
    3. 3.3. 3.3 简单测试
  4. 4. 4. 集成到 Spring Cloud Config
  5. 5. 666. 彩蛋