《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Spring-Cloud/Kafka/ 「芋道源码」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文我们来学习 Spring Cloud Stream Kafka 组件,基于 Spring Cloud Stream 的编程模型,接入 Kafka 作为消息中间件,实现消息驱动的微服务。

FROM 《分布式发布订阅消息系统 Kafka》

Kafka 是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:

  • 通过 O(1) 的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
  • 支持通过 Kafka 服务器和消费机集群来分区消息。

在开始本文之前,胖友需要对 Kafka 进行简单的学习。可以阅读《Kafka 安装部署》文章,将第一二小节看完,在本机搭建一个 Kafka 服务。

2. Spring Cloud Stream 介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架,使用 Spring Integration 与 Broker 进行连接。

友情提示:可能有胖友对 Broker 不太了解,我们来简单解释下。

一般来说,消息队列中间件都有一个 Broker Server(代理服务器),消息中转角色,负责存储消息、转发消息。

例如说在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker 也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Spring Cloud Stream 提供了消息中间件的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。

Spring Cloud Stream 内部有两个概念:BinderBinding

Binder,跟消息中间件集成的组件,用来创建对应的 Binding。各消息中间件都有自己的 Binder 具体实现。

public interface Binder<T, 
C extends ConsumerProperties, // 消费者配置
P extends ProducerProperties> { // 生产者配置

// 创建消费者的 Binding
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

// 创建生产者的 Binding
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);

}

Binding,包括 Input Binding 和 Output Binding。Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

最终整体交互如下图所示:Spring Cloud Stream Application

可能看完之后,胖友对 Spring Cloud Stream 还是有点懵逼,并且觉得概念怎么这么多呢?不要慌,我们先来快速入个门,会有更加具象的感受。

3. 快速入门

示例代码对应仓库:

友情提示:这可能是一个信息量有点大的入门内容,请保持耐心~

本小节,我们一起来快速入门下,会创建 2 个项目,分别作为生产者和消费者。最终项目如下图所示:项目结构

3.1 搭建生产者

创建 labx-11-sc-stream-kafka-producer-demo 项目,作为生产者。

3.1.1 引入依赖

创建 pom.xml 文件中,引入 Spring Cloud Stream Kafka 相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>labx-11</artifactId>
<groupId>cn.iocoder.springboot.labs</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>labx-11-sc-stream-kafka-producer-demo</artifactId>

<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<spring.cloud.version>Hoxton.SR1</spring.cloud.version>
</properties>

<!--
引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 引入 Spring Cloud Stream Kafka 相关依赖,将 Kafka 作为消息队列,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>

</project>

通过引入 spring-cloud-starter-stream-kafka 依赖,引入并实现 Stream Kafka 的自动配置。在该依赖中,已经帮我们自动引入 Kafka 的大量依赖,非常方便,如下图所示:`spring-cloud-starter-stream-kafka`

3.1.2 配置文件

创建 application.yaml 配置文件,添加 Spring Cloud Stream Kafka 相关配置。

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
# binders:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 Kafka Topic
content-type: application/json # 内容格式。这里使用 JSON
# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
# Kafka 自定义 Binding 配置项,对应 KafkaBindingProperties Map
bindings:
demo01-output:
# Kafka Producer 配置项,对应 KafkaProducerProperties 类
producer:
sync: true # 是否同步发送消息,默认为 false 异步。

server:
port: 18080

spring.cloud.stream 为 Spring Cloud Stream 配置项,对应 BindingServiceProperties 类。配置的层级有点深,我们一层一层来看看。

spring.cloud.stream.bindings 为 Binding 配置项,对应 BindingProperties Map。其中,key 为 Binding 的名字。要注意,虽然说 Binding 分成 Input 和 Output 两种类型,但是在配置项中并不会体现出来,而是要在稍后搭配 @Input 还是 @Output 注解,才会有具体的区分。

这里,我们配置了一个名字为 demo01-output 的 Binding。从命名上,我们的意图是想作为 Output Binding,用于生产者发送消息。

  • destination:目的地。在 Kafka 中,使用 Topic 作为目的地。这里我们设置为 DEMO-TOPIC-01

    Topic(主题):每条发布到 Kafka 的消息都有一个类别,这个类别被称为 Topic。

  • content-type:内容格式。这里使用 JSON 格式,因为稍后我们将发送消息的类型为 POJO,使用 JSON 进行序列化。

spring.cloud.stream.kafka 为 Spring Cloud Stream Kafka 配置项。

spring.cloud.stream.kafka.binder 为 Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类。

  • brokers:指定 Kafka Broker 地址,可以设置多个,以逗号分隔。

    Broker:Kafka 集群中的一台或多台服务器统称为 Broker。

spring.cloud.stream.kafka.bindings 为 Kafka 自定义 Binding 配置项,用于对通用的 spring.cloud.stream.bindings 配置项的增强,实现 Kafka Binding 独特的配置。该配置项对应 KafkaBindingProperties Map,其中 key 为 Binding 的名字,需要对应上噢。

这里,我们对名字为 demo01-output 的 Binding 进行增强,进行 Producer 的配置。其中,producer 为 Kafka Producer 配置项,对应 KafkaProducerProperties 类。

  • sync:是否同步发送消息,默认为 false 异步。一般业务场景下,使用同步发送消息较多,所以这里我们设置为 true 同步消息。

3.1.3 MySource

创建 MySource 接口,声明名字为 Output Binding。代码如下:

public interface MySource {

@Output("demo01-output")
MessageChannel demo01Output();

}

这里,我们通过 @Output 注解,声明了一个名字为 demo01-output 的 Output Binding。注意,这个名字要和我们配置文件中的 spring.cloud.stream.bindings 配置项对应上。

同时,@Output 注解的方法的返回结果为 MessageChannel 类型,可以使用它发送消息。MessageChannel 提供的发送消息的方法如下:

@FunctionalInterface
public interface MessageChannel {

long INDEFINITE_TIMEOUT = -1;

default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}

boolean send(Message<?> message, long timeout);

}

那么,我们是否要实现 MySource 接口呢?答案是不需要,全部交给 Spring Cloud Stream 的 BindableProxyFactory 来解决。BindableProxyFactory 会通过动态代理,自动实现 MySource 接口。 而 @Output 注解的方法的返回值,BindableProxyFactory 会扫描带有 @Output 注解的方法,自动进行创建。

例如说,#demo01Output() 方法被自动创建返回结果为 DirectWithAttributesChannel,它是 MessageChannel 的子类。

友情提示:感兴趣的胖友,可以在 BindableProxyFactory 的 #afterPropertiesSet()#invoke(MethodInvocation invocation) 方法上,都打上一个断点,然后进行愉快的调试。

3.1.4 Demo01Message

创建 Demo01Message 类,示例 Message 消息。代码如下:

public class Demo01Message {

/**
* 编号
*/
private Integer id;

// ... 省略 setter/getter/toString 方法

}

3.1.5 Demo01Controller

创建 Demo01Controller 类,提供发送消息的 HTTP 接口。代码如下:

@RestController
@RequestMapping("/demo01")
public class Demo01Controller {

@Autowired
private MySource mySource; // <X>

@GetMapping("/send")
public boolean send() {
// <1> 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// <2> 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// <3> 发送消息
return mySource.demo01Output().send(springMessage);
}

}
  • <X> 处,使用 @Autowired 注解,注入 MySource Bean。
  • <1> 处,创建 Demo01Message 对象。
  • <2> 处,使用 MessageBuilder 创建 Spring Message 对象,并设置消息内容为 Demo01Message 对象。
  • <3> 处,通过 MySource 获得 MessageChannel 对象,然后发送消息。

3.1.6 ProducerApplication

创建 ProducerApplication 类,启动应用。代码如下:

@SpringBootApplication
@EnableBinding(MySource.class)
public class ProducerApplication {

public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}

}

使用 @EnableBinding 注解,声明指定接口开启 Binding 功能,扫描其 @Input@Output 注解。这里,我们设置为 MySource 接口。

3.2 搭建消费者

创建 labx-11-sc-stream-kafka-consumer-demo 项目,作为消费者。

3.2.1 引入依赖

创建 pom.xml 文件中,引入 Spring Cloud Alibaba Kafka 相关依赖。

友情提示:和「3.1.1 引入依赖」基本一样,点击 链接 查看。

3.2.2 配置文件

创建 application.yaml 配置文件,添加 Spring Cloud Stream Kafka 相关配置。

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
# binders:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 Kafka Topic
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group # 消费者分组
# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

总体来说,和「3.1.2 配置文件」是比较接近的,所以我们只说差异点噢。

spring.cloud.stream.bindings 为 Binding 配置项。

这里,我们配置了一个名字为 demo01-input 的 Binding。从命名上,我们的意图是想作为 Input Binding,用于消费者消费消息。

  • group:消费者分组。

    消费者组(Consumer Group):同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。

对于消费队列的消费者,会有两种消费模式:集群消费(Clustering)和广播消费(Broadcasting)。

  • 集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
  • 广播消费(Broadcasting):广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

Kafka 的消费者两种模式都支持。因为这里我们配置了消费者组,所以采用集群消费。至于如何使用广播消费,我们稍后举例子。

这里一点要注意!!!艿艿加了三个感叹号,一定要理解集群消费和广播消费的差异。我们来举个例子,以有两个消费者分组 A 和 B 的场景举例子:

  • 假设每个消费者分组各启动一个实例,此时我们发送一条消息,该消息会被两个消费者分组 "consumer_group_01""consumer_group_02" 都各自消费一次。
  • 假设每个消费者分组各启动一个实例,此时我们发送一条消息,该消息会被分组 A 的某个实例消费一次,被分组 B 的某个实例也消费一次

通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER" 的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:

  • 积分模块:判断如果是手机注册,给用户增加 20 积分。
  • 优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。
  • 站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。
  • … 等等

这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。

同时,相同消费者分组的多个实例,可以实现高可用,保证在一个实例意外挂掉的情况下,其它实例能够顶上。并且,多个实例都进行消费,能够提升消费速度

友情提示:如果还不理解的话,没有关系,我们下面会演示下我们上面举的例子。

② 考虑到稍后我们要测试集群消费,所以我们要给 DEMO-TOPIC-01 Topic 创建多个 Partition。在 Kafka 中,Topic 是基于 Partition 分配到相同消费组下的消费者,从而进行消费消息的。如下图所示:消费者分区

Partition(分区):Topic 物理上的分组,一个 Topic 可以分为多个 Partition ,每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 id(offset)。

这里,我们给 DEMO-TOPIC-01 Topic 创建 Partition 大小为 10。操作命令如下:

# 情况一,如果 `DEMO-TOPIC-01` Topic 未创建,则进行创建:
$ bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic my-topic --partitions 10 --replication-factor 1

# 情况二,如果 `DEMO-TOPIC-01` Topic 未创建,则进行修改 Partition 大小:
$ bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 10 --topic DEMO-TOPIC-01

3.2.3 MySink

创建 MySink 接口,声明名字为 Input Binding。代码如下:

public interface MySink {

String DEMO01_INPUT = "demo01-input";

@Input(DEMO01_INPUT)
SubscribableChannel demo01Input();

}

这里,我们通过 @Input 注解,声明了一个名字为 demo01-input 的 Input Binding。注意,这个名字要和我们配置文件中的 spring.cloud.stream.bindings 配置项对应上。

同时,@Input 注解的方法的返回结果为 SubscribableChannel 类型,可以使用它订阅消息来消费。MessageChannel 提供的订阅消息的方法如下:

public interface SubscribableChannel extends MessageChannel {

boolean subscribe(MessageHandler handler); // 订阅

boolean unsubscribe(MessageHandler handler); // 取消订阅

}

那么,我们是否要实现 MySink 接口呢?答案也是不需要,还是全部交给 Spring Cloud Stream 的 BindableProxyFactory 大兄弟来解决。BindableProxyFactory 会通过动态代理,自动实现 MySink 接口。 而 @Input 注解的方法的返回值,BindableProxyFactory 会扫描带有 @Input 注解的方法,自动进行创建。

例如说,#demo01Input() 方法被自动创建返回结果为 DirectWithAttributesChannel,它也是 SubscribableChannel 的子类。

友情提示:感兴趣的胖友,可以在 BindableProxyFactory 的 #afterPropertiesSet()#invoke(MethodInvocation invocation) 方法上,都打上一个断点,然后进行愉快的调试。

3.2.4 Demo01Message

创建 Demo01Message 类,示例 Message 消息。

友情提示:和「3.1.4 Demo01Message」基本一样,点击 链接 查看。

3.2.5 Demo01Consumer

创建 Demo01Consumer 类,消费消息。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

}

在方法上,添加 @StreamListener 注解,声明对应的 Input Binding。这里,我们使用 MySink.DEMO01_INPUT

又因为我们消费的消息是 POJO 类型,所以我们需要添加 @Payload 注解,声明需要进行反序列化成 POJO 对象。

3.2.6 ConsumerApplication

创建 ConsumerApplication 类,启动应用。代码如下:

@SpringBootApplication
@EnableBinding(MySink.class)
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

}

使用 @EnableBinding 注解,声明指定接口开启 Binding 功能,扫描其 @Input@Output 注解。这里,我们设置为 MySink 接口。

3.3 测试单集群多实例的场景

本小节,我们会在一个消费者集群启动两个实例,测试在集群消费的情况下的表现。

① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 demo01-consumer-group 下有两个消费者实例。

友情提示:因为 IDEA 默认同一个程序只允许启动 1 次,所以我们需要配置 DemoProviderApplication 为 Allow parallel run。如下图所示:Allow parallel run

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口十次,发送十条消息。此时在 IDEA 控制台看到消费者打印日志如下:

// ConsumerApplication 控制台 01
2020-03-08 20:59:50.461 INFO 65124 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=1983864145}]
2020-03-08 20:59:53.081 INFO 65124 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-2014337623}]
2020-03-08 20:59:53.475 INFO 65124 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-250644839}]
2020-03-08 20:59:53.844 INFO 65124 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=2143820238}]
2020-03-08 20:59:54.289 INFO 65124 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=1421045645}]

// ConsumerApplication 控制台 02
2020-03-08 20:59:51.009 INFO 65177 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=132504622}]
2020-03-08 20:59:51.416 INFO 65177 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=2052532135}]
2020-03-08 20:59:51.824 INFO 65177 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-223534414}]
2020-03-08 20:59:52.262 INFO 65177 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=1525635094}]
2020-03-08 20:59:52.666 INFO 65177 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-688399661}]

符合预期。从日志可以看出,每条消息仅被消费一次。

3.4 测试多集群多实例的场景

本小节,我们会在二个消费者集群启动两个实例,测试在集群消费的情况下的表现。

① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 demo01-consumer-group 下有两个消费者实例。

② 修改 labx-11-sc-stream-kafka-consumer-demo 项目的配置文件,修改 spring.cloud.stream.bindings.demo01-input.group 配置项,将消费者分组改成 demo02-consumer-group

然后,执行 ConsumerApplication 两次,再启动两个消费者的实例,从而实现在消费者分组 demo02-consumer-group 下有两个消费者实例。

③ 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口十次,发送十条消息。此时在 IDEA 控制台看到消费者打印日志如下:

// 消费者分组 `demo01-consumer-group` 的 ConsumerApplication 控制台 01
2020-03-08 21:07:34.728 INFO 65124 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-1737808914}]
2020-03-08 21:07:37.728 INFO 65124 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-2140451405}]
2020-03-08 21:07:38.203 INFO 65124 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=813702607}]
2020-03-08 21:07:38.601 INFO 65124 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-1028557655}]
2020-03-08 21:07:39.024 INFO 65124 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-399853121}]

// 消费者分组 `demo01-consumer-group` 的 ConsumerApplication 控制台 02
2020-03-08 21:07:35.196 INFO 65177 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-1461873560}]
2020-03-08 21:07:35.717 INFO 65177 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-1947944757}]
2020-03-08 21:07:36.176 INFO 65177 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=151575160}]
2020-03-08 21:07:36.789 INFO 65177 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=88228856}]
2020-03-08 21:07:37.272 INFO 65177 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=790199000}]

// 消费者分组 `demo02-consumer-group` 的 ConsumerApplication 控制台 01
2020-03-08 21:07:35.196 INFO 65419 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-1461873560}]
2020-03-08 21:07:35.717 INFO 65419 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-1947944757}]
2020-03-08 21:07:36.176 INFO 65419 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=151575160}]
2020-03-08 21:07:36.789 INFO 65419 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=88228856}]
2020-03-08 21:07:37.272 INFO 65419 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=790199000}]

// 消费者分组 `demo02-consumer-group` 的 ConsumerApplication 控制台 02
2020-03-08 21:07:34.728 INFO 65422 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-1737808914}]
2020-03-08 21:07:37.728 INFO 65422 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-2140451405}]
2020-03-08 21:07:38.204 INFO 65422 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=813702607}]
2020-03-08 21:07:38.601 INFO 65422 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-1028557655}]
2020-03-08 21:07:39.024 INFO 65422 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-399853121}]

符合预期。从日志可以看出,每条消息被每个消费者集群都进行了消费,且仅被消费一次。

3.5 小结

至此,我们已经完成了 Stream Kafka 的快速入门,是不是还是蛮简答的噢。现在胖友可以在回过头看看 Binder 和 Binding 的概念,是不是就清晰一些了。

4. 定时消息

Kafka 并未提供定时消息的功能,需要我们自行拓展

例如说《基于 Kafka 的定时消息/任务服》文章,提供的方案。

当然,也可以考虑基于 MySQL 存储定时消息,Job 扫描到达时间的定时消息,发送给 Kafka 。

5. 消费重试

示例代码对应仓库:

Spring-Kafka 提供消费重试的机制。在消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。

友情提示:Spring Cloud Stream Kafka 是基于 Spring-Kafka 操作 Kafka,它仅仅是上层的封装哟。

当然,Spring-Kafka 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,Spring-Kafka 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,Spring-Kafka 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

Spring-Kafka 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。后续,我们可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费。

每条消息的失败重试,是可以配置一定的间隔时间。具体,我们在示例的代码中,来进行具体的解释。

下面,我们来实现一个 Consumer 消费重试的示例。最终项目如下图所示:项目结构

5.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-11-sc-stream-kafka-producer-demo 项目即可。

5.2 搭建消费者

「3. 快速入门」小节的 labx-11-sc-stream-kafka-consumer-demo 项目,复制出 labx-11-sc-stream-kafka-consumer-retry 项目作为消费者。

5.2.1 配置文件

修改 application.yml 配置文件,增加消费重试相关的配置项。最终配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
# binders:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 Kafka Topic
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group # 消费者分组
# Consumer 配置项,对应 ConsumerProperties 类
consumer:
max-attempts: 3 # 重试次数,默认为 3 次。
back-off-initial-interval: 3000 # 重试间隔的初始值,单位毫秒,默认为 1000
back-off-multiplier: 2.0 # 重试间隔的递乘系数,默认为 2.0
back-off-max-interval: 10000 # 重试间隔的最大值,单位毫秒,默认为 10000
# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
# Kafka Binding 配置项,对应 KafkaBindingProperties 类
bindings:
demo01-input:
# Kafka Consumer 配置项,对应 KafkaConsumerProperties 类
consumer:
enable-dlq: true # 是否开启死信队列,默认为 false 关闭
dlq-name: # 死信队列名,默认为 `errors.{topicName}.{consumerGroup}`

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

spring.cloud.stream.bindings.<bindingName>.consumer 为 Spring Cloud Stream Consumer 通用配置项,对应 ConsumerProperties 类。

  • max-attempts:最大重试次数,默认为 3 次。如果想要禁用掉重试,可以设置为 1。

    max-attempts 配置项要注意,是一条消息一共尝试消费总共 max-attempts 次,包括首次的正常消费。

  • back-off-initial-interval:重试间隔的初始值,单位毫秒,默认为 1000。

  • back-off-multiplier:重试间隔的递乘系数,默认为 2.0。
  • back-off-max-interval:重试间隔的最大值,单位毫秒,默认为 10000。

将四个参数组合在一起,我们来看一个消费重试的过程:

  • 第一次 00:00:00:首次消费,失败。
  • 第二次 00:00:03:3 秒后重试,因为重试间隔的初始值为 back-off-initial-interval,等于 3000 毫秒。
  • 第三次 00:00:09:6 秒后重试,因为有重试间隔的递乘系数 back-off-multiplier,所以是 2.0 * 3000 等于 6000 毫秒。
  • 第四次,没有,因为到达最大重试次数,等于 3。

spring.cloud.stream.kafka.bindings.<bindingName>.consumer 为 Spring Cloud Stream Kafka Consumer 专属配置项,我们新增了两个配置项:

  • enable-dlq:是否开启死信队列,默认为 false 关闭。这里,问问们设置为 true 来进行开启。
  • dlq-name:死信队列名,默认为 errors.{topicName}.{consumerGroup}。这里我们并未设置,直接使用默认,即本小节的示例对应 errors.DEMO-TOPIC-01.demo01-consumer-group

5.2.2 Demo01Consumer

修改 Demo01Consumer 类,直接抛出异常,模拟消费失败,从而演示消费重试的功能。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// <X> 注意,此处抛出一个 RuntimeException 异常,模拟消费失败
throw new RuntimeException("我就是故意抛出一个异常");
}

}

5.3 简单测试

① 执行 ConsumerApplication,启动一个消费者的实例。

我们打开 Kafka 运维界面,可以看到多了一个 errors.DEMO-TOPIC-01.demo01-consumer-group Topic,即本小节的死信队列。如下图所示:`errors.DEMO-TOPIC-01.demo01-consumer-group` Topic

友情提示:Kafka 运维界面,可以看看《芋道 Kafka 安装部署》文章的「4. Kafka Manager」小节。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送消息。IDEA 控制台输出日志如下:

// 第一次消费
2020-03-08 22:14:55.465 INFO 67252 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-1264911073}]
// 第二次消费,3 秒后
2020-03-08 22:14:58.467 INFO 67252 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-1264911073}]
// 第三次消费,6 秒后
2020-03-08 22:15:04.471 INFO 67252 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-1264911073}]

// 内置的 LoggingHandler 打印异常日志
2020-03-08 22:15:04.473 ERROR 67252 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking Demo01Consumer#onMessage[1 args]; nested exception is java.lang.RuntimeException: 我就是故意抛出一个异常 // ... 省略异常堆栈
Caused by: java.lang.RuntimeException: 我就是故意抛出一个异常 // ... 省略异常堆栈

测试 Consumer 消费重试成功~

不过要注意,目前我们看到的重试方案,是通过 RetryTemplate 来实现客户端级别的消费冲水。而 RetryTemplate 又是通过 sleep 来实现消费间隔的时候,这样将影响 Consumer 的整体消费速度,毕竟 sleep 会占用掉线程。

6. 消费异常处理机制

示例代码对应仓库:

在 Spring Cloud Stream 中,提供了通用的消费异常处理机制,可以拦截到消费者消费消息时发生的异常,进行自定义的处理逻辑。

下面,我们来搭建一个 Spring Cloud Stream 消费异常处理机制的示例。最终项目如下图所示:项目结构

6.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-11-sc-stream-kafka-producer-demo 项目即可。

6.2 搭建消费者

「5. 消费重试」小节的 labx-11-sc-stream-kafka-consumer-retry 项目,复制出 labx-11-sc-stream-kafka-consumer-error-handler 项目作为消费者。

6.2.1 Demo01Consumer

修改 Demo01Consumer 类,增加消费异常处理方法。完整代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// <X> 注意,此处抛出一个 RuntimeException 异常,模拟消费失败
throw new RuntimeException("我就是故意抛出一个异常");
}

@ServiceActivator(inputChannel = "DEMO-TOPIC-01.demo01-consumer-group.errors")
public void handleError(ErrorMessage errorMessage) {
logger.error("[handleError][payload:{}]", errorMessage.getPayload().getMessage());
logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
logger.error("[handleError][headers:{}]", errorMessage.getHeaders());
}

@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
public void globalHandleError(ErrorMessage errorMessage) {
logger.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage());
logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
}

}

① 在 Spring Integration 的设定中,若 #onMessage(@Payload Demo01Message message) 方法消费消息发生异常时,会发送错误消息(ErrorMessage)到对应的错误 Channel(<destination>.<group>.errors中。同时,所有错误 Channel 都桥接到了 Spring Integration 定义的全局错误 Channel(errorChannel)

友情提示:先暂时记住 Spring Integration 这样的设定,艿艿也没去深究 T T,也是一脸懵逼。

因此,我们有两种方式来实现异常处理:

  • 局部的异常处理:通过订阅指定错误 Channel
  • 全局的异常处理:通过订阅全局错误 Channel

② 在 #handleError(ErrorMessage errorMessage) 方法上,我们声明了 @ServiceActivator 注解,订阅指定错误 Channel的错误消息,实现 #onMessage(@Payload Demo01Message message) 方法的局部异常处理。如下图所示:对应关系

③ 在 #globalHandleError(ErrorMessage errorMessage) 方法上,我们声明了 @StreamListener 注解,订阅全局错误 Channel的错误消息,实现全局异常处理。

④ 在全局局部异常处理都定义的情况下,错误消息仅会被符合条件局部错误异常处理。如果没有符合条件的,错误消息才会被全局异常处理。

6.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送一条消息。IDEA 控制台输出日志如下:

// onMessage 方法,一共 3 次,包括重试
2020-03-09 07:38:23.037 INFO 68126 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-1512001860}]
2020-03-09 07:38:26.045 INFO 68126 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-1512001860}]
2020-03-09 07:38:32.046 INFO 68126 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-1512001860}]

// handleError 方法
2020-03-09 07:38:32.054 ERROR 68126 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [handleError][payload:Exception thrown while invoking Demo01Consumer#onMessage[1 args]; nested exception is java.lang.RuntimeException: 我就是故意抛出一个异常]
2020-03-09 07:38:32.054 ERROR 68126 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [handleError][originalMessage:GenericMessage [payload=byte[18], headers={kafka_offset=45, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@e32f669, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=DEMO-TOPIC-01, kafka_receivedTimestamp=1583710702910, kafka_groupId=demo01-consumer-group}]]
2020-03-09 07:38:32.054 ERROR 68126 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [handleError][headers:{kafka_data=ConsumerRecord(topic = DEMO-TOPIC-01, partition = 0, leaderEpoch = 0, offset = 45, CreateTime = 1583710702910, serialized key size = -1, serialized value size = 18, headers = RecordHeaders(headers = [RecordHeader(key = contentType, value = [34, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110, 34]), RecordHeader(key = spring_json_header_types, value = [123, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = [B@57ae40f0), id=e6852311-62b5-cc85-1f63-cef9a01847a2, sourceData=ConsumerRecord(topic = DEMO-TOPIC-01, partition = 0, leaderEpoch = 0, offset = 45, CreateTime = 1583710702910, serialized key size = -1, serialized value size = 18, headers = RecordHeaders(headers = [RecordHeader(key = contentType, value = [34, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110, 34]), RecordHeader(key = spring_json_header_types, value = [123, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = [B@57ae40f0), timestamp=1583710712046}]

😆 不过要注意,如果异常处理方法成功,没有重新抛出异常,会认定为该消息被消费成功,所以就不会发到死信队列了噢。

7. 广播消费

示例代码对应仓库:

在上述的示例中,我们看到的都是使用集群消费。而在一些场景下,我们需要使用广播消费

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

使用场景?

例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 Kafka 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。

又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 Kafka 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。

如何实现?

不过 Kafka 并不直接提供内置的广播消费的功能!!!此时,我们只能退而求其次,每个 Consumer 独有一个 Consumer Group ,从而保证都能接收到全量的消息

恰好,Spring Cloud Stream RabbitMQ 在设置 Consumer 的消费者分组为空时,会为该 Consumer 生成一个独有随机的消费者分组,从而实现广播消费的功能。

下面,我们来实现一个 Consumer 广播消费的示例。最终项目如下图所示:项目结构

7.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-11-sc-stream-kafka-producer-demo 项目即可。

7.2 搭建消费者

「3. 快速入门」小节的 labx-11-sc-stream-kafka-consumer-demo 项目,复制出 labx-11-sc-stream-kafka-consumer-broadcasting 项目作为消费者。

7.2.1 配置文件

修改 application.yml 配置文件,删除 Consumer 的消费者分组配置项 group 即可。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
# binders:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 Kafka Topic
content-type: application/json # 内容格式。这里使用 JSON
# group: demo01-consumer-group # 消费者分组
# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

7.3 简单测试

① 执行 ConsumerApplication 两次,启动两个消费者的实例。

我们打开 Kafka 运维界面,可以看到 Spring Cloud Stream Kafka 生成的以 anonymous. 开头的消费者分组。如下图所示:运维界面

同时我们在 IDEA 控制台的日志中,也可以看 Spring Cloud Stream Kafka 生成的以 anonymous. 开头的消费者分组。如下所示:

// ConsumerApplication 控制台 01
2020-03-09 08:13:54.091 INFO 68839 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-1092051199}]
2020-03-09 08:13:54.422 INFO 68839 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-618964293}]
2020-03-09 08:13:54.721 INFO 68839 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=94760781}]

// ConsumerApplication 控制台 02
2020-03-09 08:13:54.092 INFO 68852 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:26 消息内容:Demo01Message{id=-1092051199}]
2020-03-09 08:13:54.422 INFO 68852 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:26 消息内容:Demo01Message{id=-618964293}]
2020-03-09 08:13:54.721 INFO 68852 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:26 消息内容:Demo01Message{id=94760781}]

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。IDEA 控制台输出日志如下:

// ConsumerApplication 控制台 01
2020-03-07 15:43:35.883 INFO 46486 --- [Z-_87KO2Pl-WQ-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=2084635466}]
2020-03-07 15:43:37.278 INFO 46486 --- [Z-_87KO2Pl-WQ-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-2118253111}]
2020-03-07 15:43:37.652 INFO 46486 --- [Z-_87KO2Pl-WQ-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=1956010289}]

// ConsumerApplication 控制台 02
2020-03-07 15:43:35.884 INFO 46527 --- [2e8iPDhSVKdcg-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=2084635466}]
2020-03-07 15:43:37.278 INFO 46527 --- [2e8iPDhSVKdcg-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-2118253111}]
2020-03-07 15:43:37.652 INFO 46527 --- [2e8iPDhSVKdcg-1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=1956010289}]

符合预期。从日志可以看出,每条消息仅被每个消费者消费了一次。

8. 并发消费

示例代码对应仓库:

在上述的示例中,我们配置的每一个 Binding 的 Consumer,都是串行消费的。显然,这在监听的 Topic 每秒消息量比较大的时候,会导致消费不及时,导致消息积压的问题。

虽然说,我们可以通过启动多个 JVM 进程,实现多进程的并发消费,从而加速消费的速度。但是问题是,否能够实现多线程的并发消费呢?答案是

通过在配置文件中的 spring.cloud.stream.bindings.<bindingName>.consumer.concurrency 配置项,可以指定该 Binder 并发消费的线程数。例如说,如果设置 concurrency=10 时,Spring Cloud Stream Kafka 就会为 Binder 创建 10 个线程,进行并发消费。

考虑到让胖友能够更好的理解 concurrency 属性,我们来简单说说 Spring-Kafka 在这块的实现方式。我们来举个例子:

  • 首先,我们来创建一个 Topic 为 "DEMO-TOPIC-01" ,并且设置其 Partition 分区数为 10
  • 然后,我们创建一个用于 Consumer 的 Binding 配置,并设置 concurrency 配置项为 2。
  • 再然后,我们启动项目。Spring-Kafka 会根据 Binding 的 concurrency 配置项为 2,为该 Binding 创建 2 个 Kafka Consumer 。注意噢,是 2 个 Kafka Consumer 呢!!!后续,每个 Kafka Consumer 会被单独分配到一个线程中,进行拉取消息,消费消息。
  • 之后,Kafka Broker 会将 Topic 为 "DEMO-TOPIC-01" 分配给创建的 2 个 Kafka Consumer 各 5 个 Partition 。😈 如果不了解 Kafka Broker “分配区分”机制单独胖友,可以看看 《Kafka 消费者如何分配分区》 文章。
  • 这样,因为用于 Consumer 的 Binding 的 concurrency 配置项为 2,创建 2 个 Kafka Consumer ,就在各自的线程中,拉取各自的 Topic 为 "DEMO-TOPIC-01" 的 Partition 的消息,各自串行消费。从而,实现多线程的并发消费。

酱紫讲解一下,胖友对 Spring-Kafka 实现多线程的并发消费的机制,是否理解了。不过有一点要注意,不要配置 concurrency 属性过大,则创建的 Kafka Consumer 分配不到消费 Topic 的 Partition 分区,导致不断的空轮询。

友情提示:可以选择不看。

在理解 Spring-Kafka 提供的并发消费机制,花费了好几个小时,主要陷入到了一个误区。

如果胖友有使用过 RocketMQ 的并发消费,会发现只要创建一个 RocketMQ Consumer 对象,然后 Consumer 拉取完消息之后,丢到 Consumer 的线程池中执行消费,从而实现并发消费。

而在 Spring-Kafka 提供的并发消费,会发现需要创建多个 Kafka Consumer 对象,并且每个 Consumer 都单独分配一个线程,然后 Consumer 拉取完消息之后,在各自的线程中执行消费。

又或者说,Spring-Kafka 提供的并发消费,很像 RocketMQ 的顺序消费。😈 从感受上来说,Spring-Kafka 的并发消费像 BIO ,RocketMQ 的并发消费像 NIO 。

不过,理论来说,在原生的 Kafka 客户端,也是能封装出和 RocketMQ Consumer 一样的并发消费的机制。

也因此,在使用 Kafka 的时候,每个 Topic 的 Partition 在消息量大的时候,要注意设置的相对大一些。

下面,我们来实现一个 Consumer 并发消费的示例。最终项目如下图所示:项目结构

8.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-11-sc-stream-kafka-producer-demo 项目即可。

8.2 搭建消费者

「3. 快速入门」小节的 labx-11-sc-stream-kafka-consumer-demo 项目,复制出 labx-11-sc-stream-kafka-consumer-concurrency 项目作为消费者。

8.2.1 配置文件

修改 application.yml 配置文件,增加并发消费的配置项。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
# binders:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 Kafka Topic
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group # 消费者分组
# Consumer 配置项,对应 ConsumerProperties 类
consumer:
concurrency: 2 # 每个 Consumer 消费线程数的初始大小,默认为 1
# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

spring.cloud.stream.bindings.<bindingName>.consumer.concurrency 配置项,指定该 Binder 的 Consumer 消费线程数的初始大小,默认为 1。

这里我们设置为 2,表示该 Consumer 初始使用 2 个线程并发消费。

8.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

此时我们在 IDEA 控制台的日志中,可以看到创建了两个 Kafka Consumer 和它们所分配到的 Partition,如下所示:

// Spring-Kafka 打印的日志,两个 Consumer 分别消费的 Partition
2020-03-09 08:34:42.039 INFO 69381 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=demo01-consumer-group] Setting offset for partition DEMO-TOPIC-01-1 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 0 rack: null), epoch=0}}
2020-03-09 08:34:42.039 INFO 69381 --- [container-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=demo01-consumer-group] Setting offset for partition DEMO-TOPIC-01-9 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 0 rack: null), epoch=0}}
2020-03-09 08:34:42.040 INFO 69381 --- [container-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=demo01-consumer-group] Setting offset for partition DEMO-TOPIC-01-7 to the committed offset FetchPosition{offset=6, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 0 rack: null), epoch=0}}
2020-03-09 08:34:42.040 INFO 69381 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=demo01-consumer-group] Setting offset for partition DEMO-TOPIC-01-2 to the committed offset FetchPosition{offset=6, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 0 rack: null), epoch=0}}
2020-03-09 08:34:42.040 INFO 69381 --- [container-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=demo01-consumer-group] Setting offset for partition DEMO-TOPIC-01-8 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 0 rack: null), epoch=0}}
2020-03-09 08:34:42.040 INFO 69381 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=demo01-consumer-group] Setting offset for partition DEMO-TOPIC-01-0 to the committed offset FetchPosition{offset=47, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 0 rack: null), epoch=0}}
2020-03-09 08:34:42.040 INFO 69381 --- [container-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=demo01-consumer-group] Setting offset for partition DEMO-TOPIC-01-5 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 0 rack: null), epoch=0}}
2020-03-09 08:34:42.040 INFO 69381 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=demo01-consumer-group] Setting offset for partition DEMO-TOPIC-01-3 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 0 rack: null), epoch=0}}
2020-03-09 08:34:42.040 INFO 69381 --- [container-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=demo01-consumer-group] Setting offset for partition DEMO-TOPIC-01-6 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 0 rack: null), epoch=0}}
2020-03-09 08:34:42.040 INFO 69381 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=demo01-consumer-group] Setting offset for partition DEMO-TOPIC-01-4 to the committed offset FetchPosition{offset=6, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 0 rack: null), epoch=0}}

// Spring Cloud Stream Kafka 打印的日志,两个 Consumer 分别消费的 Partition
2020-03-09 08:34:42.045 INFO 69381 --- [container-1-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : demo01-consumer-group: partitions assigned: [DEMO-TOPIC-01-9, DEMO-TOPIC-01-7, DEMO-TOPIC-01-8, DEMO-TOPIC-01-5, DEMO-TOPIC-01-6]
2020-03-09 08:34:42.045 INFO 69381 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : demo01-consumer-group: partitions assigned: [DEMO-TOPIC-01-3, DEMO-TOPIC-01-4, DEMO-TOPIC-01-1, DEMO-TOPIC-01-2, DEMO-TOPIC-01-0]

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口四次,发送四条消息。IDEA 控制台输出日志如下:

// 线程编号为 26
2020-03-09 08:43:07.450 INFO 69381 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:26 消息内容:Demo01Message{id=1381530661}]
2020-03-09 08:43:08.239 INFO 69381 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:26 消息内容:Demo01Message{id=-1440386434}]

// 线程编号为 28
2020-03-09 08:43:08.626 INFO 69381 --- [container-1-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-279884815}]
2020-03-09 08:43:08.907 INFO 69381 --- [container-1-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:28 消息内容:Demo01Message{id=-1848103858}]

我们可以看到,两个线程在消费 DEMO-TOPIC-01 Topic 下的消息。

9. 顺序消息

示例代码对应仓库:

我们先来一起了解下顺序消息的顺序消息的定义:

  • 普通顺序消息 :Producer 将相关联的消息发送到相同的消息队列。
  • 完全严格顺序 :在【普通顺序消息】的基础上,Consumer 严格顺序消费。

消息有序,指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序:对于指定的一个 Topic,所有消息根据 Sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。适用场景:性能要求高,以 Sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

注意,分区顺序就是普通顺序消息,全局顺序就是完全严格顺序。

📚 如何实现? 📚

在上述的示例中,我们看到 Spring-Kafka 在 Consumer 消费消息时,天然就支持按照 Topic 下的 Partition 下的消息,顺序消费。即使在「8. 并发消费」时,也能保证如此。

那么此时,我们只需要考虑将 Producer 将相关联的消息发送到 Topic 下的相同的 Partition 即可。在 Spring Cloud Stream 中,支持从消息中获取 Sharding key,从而发送消息到 Topic 下对应的 Partition 中。

下面,我们来实现一个 Spring Cloud Stream RabbitMQ 下的顺序消息的示例。最终项目如下图所示:项目结构

9.1 搭建生产者

「3. 快速入门」小节的 labx-11-sc-stream-kafka-producer-demo 项目,复制出 labx-11-sc-stream-kafka-producer-partitioning 项目作为生产者。

9.1.1 配置文件

修改 application.yml 配置文件,添加 partition-key-expression 配置项,设置 Producer 发送顺序消息的 Sharding key。完整配置如下:

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
# binders:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 Kafka Topic
content-type: application/json # 内容格式。这里使用 JSON
# Producer 配置项,对应 ProducerProperties 类
producer:
partition-key-expression: payload['id'] # 分区 key 表达式。该表达式基于 Spring EL,从消息中获得分区 key。
# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
# Kafka 自定义 Binding 配置项,对应 KafkaBindingProperties Map
bindings:
demo01-output:
# Kafka Producer 配置项,对应 KafkaProducerProperties 类
producer:
sync: true # 是否同步发送消息,默认为 false 异步。

server:
port: 18080

spring.cloud.stream.bindings.<bindingName>.producer.partition-key-expression 配置项,该表达式基于 Spring EL,从消息中获得 Sharding key。

友情提示:Sharding Key 和 Partition Key 是等价的,有些文章喜欢叫分片键,有些文章喜欢叫分区键。

艿艿自己的习惯,是叫 Sharding Key,奈何 Spring Cloud Stream 是 Partition Key,所以下文胖友看到两个词存在混用的情况,知道是一个意思哈~

这里,我们设置该配置项为 payload['id'],表示从 Spring Message 的 payload 的 id。稍后我们发送的消息的 payload 为 Demo01Message,那么 id 就是 Demo01Message.id

如果我们想从消息的 headers 中获得 Sharding key,可以设置为 headers['partitionKey']

② Spring Cloud Stream 使用 PartitionHandler 进行 Sharding key 的获得与计算,最终 Sharding key 的结果为 key.hashCode() % partitionCount

感兴趣的胖友,可以阅读 PartitionHandler 的 #determinePartition(Message<?> message) 方法。

我们以发送一条 id 为 1 的 Demo01Message 消息为示例,最终会发送到对应 Kafka Topic 的 Partition 为 1。计算过程如下:

// 第一步,PartitionHandler 使用 `partition-key-expression` 表达式,从 Message 中获得 Sharding key
key => 1

// 第二步,PartitionHandler 计算最终的 Sharding key
// 这里 Partition 数量为 10 的原因是,在「3. 快速入门」小节,我设置 `DEMO-TOPIC-01` Topic 的 Partition 大小为 10.
key => key.hashCode() % partitionCount = 1.hashCode() % 10 = 1 % 10 = 1

// 第三步,Kafka 发送到 `DEMO-TOPIC-01` Topic 的顺序为 key Partition 中
// 这里,因为 key 为 1,所以 Partition 顺序为 1。

这样,我们就能保证相同 Sharding Key 的消息,发送到相同的对应 Kafka Topic 的 Partition 中。当前,前提是该 Topic 的 Partition 总数不能变噢,不然计算的 Sharding Key 会发生变化。

9.1.2 Demo01Controller

修改 Demo01Controller 类,增加发送 3 条顺序消息的 HTTP 接口。代码如下:

@GetMapping("/send_orderly")
public boolean sendOrderly() {
// 发送 3 条相同 id 的消息
int id = new Random().nextInt();
for (int i = 0; i < 3; i++) {
// 创建 Message
Demo01Message message = new Demo01Message().setId(id);
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// 发送消息
mySource.demo01Output().send(springMessage);
}
return true;
}

每次发送的 3 条消息使用相同的 id,配合上我们使用它作为 Sharding key,就可以发送对应 Topic 的相同 Partition 中。

另外,整列发送的虽然是顺序消息,但是和发送普通消息的代码是一模一样的。

9.2 搭建消费者

直接使用「8. 并发消费」小节的 labx-11-sc-stream-kafka-consumer-concurrency 项目即可。

9.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_orderly 接口,发送顺序消息。IDEA 控制台输出日志如下:

2020-03-08 10:37:05.351  INFO 71912 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer      : [onMessage][线程编号:26 消息内容:Demo01Message{id=1414772641}]
2020-03-08 10:37:05.354 INFO 71912 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:26 消息内容:Demo01Message{id=1414772641}]
2020-03-08 10:37:05.358 INFO 71912 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:26 消息内容:Demo01Message{id=1414772641}]

id 为 1414772641 的消息被发送到 Kafka Topic 的 Partition 为 1,并且在线程编号为 26 的线程中消费。😈 胖友可以自己在多调用几次接口,继续尝试。

10. 消息过滤

示例代码对应仓库:

Spring Cloud Stream 提供了通用Consumer 级别的效率过滤器机制。我们只需要使用 @StreamListener 注解的 condition 属性,设置消息满足指定 Spring EL 表达式的情况下,才进行消费。

下面,我们来实现一个 Spring Cloud Stream Kafka 下的消息过滤的示例。最终项目如下图所示:项目结构

10.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-11-sc-stream-kafka-producer-demo 项目即可。

10.1.1 Demo01Controller

修改 Demo01Controller 类,增加发送 3 条tag 消息头的消息的 HTTP 接口。代码如下:

@GetMapping("/send_tag")
public boolean sendTag() {
for (String tag : new String[]{"yunai", "yutou", "tudou"}) {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader("tag", tag) // <X> 设置 Tag
.build();
// 发送消息
mySource.demo01Output().send(springMessage);
}
return true;
}

<X> 处,设置发送消息的 tag 消息头。

10.2 搭建消费者

「3. 快速入门」小节的 labx-11-sc-stream-kafka-consumer-demo 项目,复制出 labx-11-sc-stream-kafka-consumer-filter 项目作为消费者。

10.2.1 Demo01Consumer

修改 Demo01Consumer 类,使用 @StreamListener 注解的 condition 属性来过滤消息。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(value = MySink.DEMO01_INPUT, condition = "headers['tag'] == 'yunai'")
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

}

这里我们设置消息的 Header 带有的 tag 值为 yunai 时,才进行消费。

10.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_tag 接口,发送带有 Tag 的消息。IDEA 控制台输出日志如下:

// 消息头 tag 为 `yunai` 的消息被消费
2020-03-09 19:25:28.495 INFO 80717 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:25 消息内容:Demo01Message{id=-8065193}]

// 消息头 tag 为 `yutou` 和 `tudou` 的消息被过滤
2020-03-09 19:25:28.500 WARN 80717 --- [container-0-C-1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: null
2020-03-09 19:25:28.502 WARN 80717 --- [container-0-C-1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: null

只消费了一条消息头为 yunai 的消息,而消息头为 yutoutudou 的消息被 Consumer 过滤。要注意,被过滤掉的消息,后续是无法被消费掉了,效果和消费成功是一样的。

11. 事务消息

示例代码对应仓库:

Kafka 内置提供事务消息的支持。对事务消息的概念不了解的胖友,可以看看 《事务消息组件的套路》 文章。

不过 Kafka 提供的并不是完整的的事务消息的支持,缺少了回查机制。关于这一点,刚推荐的文章也有讲到。目前,常用的分布式消息队列,只有 RocketMQ 提供了完整的事务消息的支持,具体的可以看看《芋道 Spring Boot 消息队列 RocketMQ 入门》「9. 事务消息」小节,😈 暂时不拓展开来讲。

下面,我们来实现一个 Spring Cloud Stream Kafka 下的事务消息的示例。最终项目如下图所示:项目结构

11.1 搭建生产者

「3. 快速入门」小节的 labx-11-sc-stream-kafka-producer-demo 项目,复制出 labx-11-sc-stream-kafka-producer-transaction 项目作为生产者。

11.1.1 配置文件

修改 application.yml 配置文件,添加事务相关配置项,开启发送事务消息的功能。完整配置如下:

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
# binders:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 Kafka Topic
content-type: application/json # 内容格式。这里使用 JSON
# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
transaction:
transaction-id-prefix: demo. # 事务编号前缀
producer:
configuration:
retries: 1 # 发送失败时,重试发送的次数
acks: all # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
# Kafka 自定义 Binding 配置项,对应 KafkaBindingProperties Map
bindings:
demo01-output:
# Kafka Producer 配置项,对应 KafkaProducerProperties 类
producer:
sync: true # 是否同步发送消息,默认为 false 异步。

server:
port: 18080

spring.cloud.stream.kafka.binder.transaction 为 Spring Cloud Stream Kafka 事务配置项,对应 KafkaBinderConfigurationProperties.Transaction 类。

  • transaction-id-prefix 配置项,事务编号的前缀。需要保证相同应用配置相同,不同应用配置不同。具体可以看看《How to choose Kafka transaction id for several applications》的讨论。
  • producer 配置项,Producer 在事务中的附加配置项。
    • retries 配置项,发送失败时,重试发送的次数。
    • acks 配置项,必须设置为 all,不然在启动时会报 "Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence." 错误。因为,Kafka 的事务消息需要基于幂等性来实现,所以必须保证所有节点都写入成功。

11.1.2 TransactionConfig

创建 TransactionConfig 类,创建 KafkaTransactionManager Bean,Kafka 的事务管理器,集成到 Spring 的事务体系中,这样就可以使用 @Transactional 声明式事务。代码如下:

@Configuration
@EnableTransactionManagement
public class TransactionConfig {

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
// 获得 Kafka ProducerFactory 对象
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
// 创建 KafkaTransactionManager 事务管理器
assert pf != null;
return new KafkaTransactionManager<>(pf);
}

}

11.1.3 Demo01Controller

修改 Demo01Controller 类,增加发送事务消息的 HTTP 接口。代码如下:

// Demo01Controller .java

@Transactional
@GetMapping("/send_transaction")
public void sendTransaction() throws InterruptedException {
// 创建 Message
int id = new Random().nextInt();
Demo01Message message = new Demo01Message()
.setId(id);
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// 发送消息
mySource.demo01Output().send(springMessage);
logger.info("[sendTransaction][发送编号:[{}] 发送成功]", id);

// <X> 等待
Thread.sleep(10 * 1000L);
}

在发送消息方法上,我们添加了 @Transactional 注解,声明事务。因为我们创建了 KafkaTransactionManager 事务管理器,所以这里会创建 Kafka 事务。

<X> 处,我们故意等待 Thread#sleep(long millis) 10 秒,判断 Kafka 事务是否生效。

  • 如果同步发送消息成功后,Consumer 立即消费到该消息,说明未生效。
  • 如果 Consumer 是 10 秒之后,才消费到该消息,说明已生效。

11.2 搭建消费者

「3. 快速入门」小节的 labx-11-sc-stream-kafka-consumer-demo 项目,复制出 labx-11-sc-stream-kafka-consumer-transaction 项目作为消费者。

11.2.1 配置文件

修改 application.yml 配置文件,添加事务相关配置项,仅消费已提交的消息。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
# binders:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 Kafka Topic
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group # 消费者分组

# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
# Kafka Binding 配置项,对应 KafkaBindingProperties 类
bindings:
demo01-input:
# Kafka Consumer 配置项,对应 KafkaConsumerProperties 类
consumer:
configuration:
isolation:
level: read_committed # 读取已提交的消息

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

添加 spring.cloud.stream.kafka.bindings.<bindingName>.consumer.configuration.isolation.levelread_committed,设置 Consumer 读取已提交的消息。😈 一定要配置!!!被坑惨了,当时以为自己的事务消息怎么就是不生效,原来少加了这个。

11.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_transaction 接口,发送事务消息。IDEA 控制台输出日志如下:

// Producer 成功同步发送了 1 条消息。此时,事务并未提交
2020-03-09 22:31:34.863 INFO 84414 --- [io-18080-exec-1] c.i.s.l.k.k.controller.Demo01Controller : [send_transaction][发送编号:[629326486] 发送成功]

// 10 秒后,Producer 提交事务。
// 此时,Consumer 消费到该消息。
2020-03-09 22:31:44.952 INFO 84408 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=629326486}]

Consumer 在事务消息提交后,消费到该消息。符合预期~

12. 消费进度的提交机制

示例代码对应仓库:

📚 原生 Kafka 的提交机制

原生 Kafka Consumer 消费端,有两种消费进度提交的提交机制:

  • 【默认】自动提交,通过配置 enable.auto.commit=true ,每过 auto.commit.interval.ms 时间间隔,都会自动提交消费消费进度。而提交的时机,是在 Consumer 的 #poll(...) 方法的逻辑里完成,在每次从 Kafka Broker 拉取消息时,会检查是否到达自动提交的时间间隔,如果是,那么就会提交上一次轮询拉取的位置。
  • 手动提交,通过配置 enable.auto.commit=false ,后续通过 Consumer 的 #commitSync(...)#commitAsync(...) 方法,同步或异步提交消费进度。

📚 Spring-Kafka 的提交机制

Spring-Kafka Consumer 消费端,提供了更丰富的消费者进度的提交机制,更加灵活。当然,也是分成自动提交和手动提交两个大类。在 AckMode 枚举类中,可以看到每一种具体的方式。代码如下:

// ContainerProperties#AckMode.java

public enum AckMode {

// ========== 自动提交 ==========

/**
* Commit after each record is processed by the listener.
*/
RECORD, // 每条消息被消费完成后,自动提交

/**
* Commit whatever has already been processed before the next poll.
*/
BATCH, // 每一次消息被消费完成后,在下次拉取消息之前,自动提交

/**
* Commit pending updates after
* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
*/
TIME, // 达到一定时间间隔后,自动提交。
// 不过要注意,它并不是一到就立马提交,如果此时正在消费某一条消息,需要等这条消息被消费完成,才能提交消费进度。

/**
* Commit pending updates after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded.
*/
COUNT, // 消费成功的消息数到达一定数量后,自动提交。
// 不过要注意,它并不是一到就立马提交,如果此时正在消费某一条消息,需要等这条消息被消费完成,才能提交消费进度。

/**
* Commit pending updates after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded or after {@link ContainerProperties#setAckTime(long)
* ackTime} has elapsed.
*/
COUNT_TIME, // TIME 和 COUNT 的结合体,满足任一都会自动提交。

// ========== 手动提交 ==========

/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}.
*/
MANUAL, // 调用时,先标记提交消费进度。等到当前消息被消费完成,然后在提交消费进度。

/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}. The consumer
* immediately processes the commit.
*/
MANUAL_IMMEDIATE, // 调用时,立即提交消费进度。

}
  • 看下每种方式,艿艿都添加了注释哟。

那么,既然现在存在原生 Kafka 和 Spring-Kafka 提供的两种消费进度的提交机制,我们应该怎么配置呢?

  • 使用原生 Kafka 的方式,通过配置 spring.kafka.consumer.enable-auto-commit=true 。然后,通过 spring.kafka.consumer.auto-commit-interval 设置自动提交的频率。
  • 使用 Spring-Kafka 的方式,通过配置 spring.kafka.consumer.enable-auto-commit=false 。然后通过 spring.kafka.listener.ack-mode 设置具体模式。另外,还有 spring.kafka.listener.ack-timespring.kafka.listener.ack-count 可以设置自动提交的时间间隔和消息条数。

默认什么都不配置的情况下,使用 Spring-Kafka 的 BATCH 模式:每一次消息被消费完成后,在下次拉取消息之前,自动提交

📚 Spring Cloud Stream Kafka 的提交机制

Spring Cloud Stream Kafka 在 Spring-Kafka 上进一步封装,在 spring.cloud.stream.kafka.bindings.<bindingName>.consumer 下提供了两个配置项:

  • auto-commit-offset 配置项,是否自动提交消费进度,默认为 true 自动提交。
  • ack-each-record 配置项,是否每一条消息都进行提交消费进度,默认为 false 在每一批消费完成后一起提交。

我们进行下整理,将 Spring Cloud Stream Kafka 这两个配置项,和 Spring-Kafka 的 AckMode 对应上,如下表格:

AckMode auto-commit-offset ack-each-record
自动 RECORD true false
自动 BATCH true true
手动 MANUAL false false
手动 MANUAL_IMMEDIATE false true

因此,默认什么都不配置的情况下,也使用 Spring-Kafka 的 BATCH 模式:每一次消息被消费完成后,在下次拉取消息之前,自动提交

下面,我们来实现一个 Spring Cloud Stream Kafka 下的手动提交消费进度的示例。最终项目如下图所示:项目结构

12.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-11-sc-stream-kafka-producer-demo 项目即可。

12.2 搭建消费者

「3. 快速入门」小节的 labx-11-sc-stream-kafka-consumer-demo 项目,复制出 labx-11-sc-stream-kafka-consumer-ack 项目作为消费者。

12.2.1 配置文件

修改 application.yml 配置文件,设置 auto-commit-offset 配置项为 falseack-each-record 配置项为 true,即使用 Spring-Kafka 的 MANUAL 模式,手动提交消费进度。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
# binders:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 Kafka Topic
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group # 消费者分组
# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
# Kafka Binding 配置项,对应 KafkaBindingProperties 类
bindings:
demo01-input:
# Kafka Consumer 配置项,对应 KafkaConsumerProperties 类
consumer:
auto-commit-offset: false # 是否自动提交消费进度,默认为 true 自动提交。
ack-each-record: true # 是否每一条消息都进行提交消费进度,默认为 false 在每一批消费完成后一起提交。

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

12.2.2 Demo01Consumer

修改 Demo01Consumer 类,增加手动提交消费进度的代码。代码如下:代码如下:

// Demo08Consumer.java

@Component
public class Demo08Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@KafkaListener(topics = Demo08Message.TOPIC,
groupId = "demo08-consumer-group-" + Demo08Message.TOPIC)
public void onMessage(Demo08Message message, Acknowledgment acknowledgment) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// 提交消费进度
if (message.getId() % 2 == 1) {
acknowledgment.acknowledge();
}
}

}

① 在消费方法上,我们增加了第二个方法参数,类型为 Acknowledgment 类。通过调用其 #acknowledge() 方法,可以提交当前消息的 Topic 的 Partition 的消费进度。

② 在消费逻辑中,我们故意只提交消费的第一条消息。😈 这样,我们只需要发送两条消息,如果第二条的消费进度没有被提交,就可以说明手动提交消费进度成功。

12.3 简单测试

友情提示:这里为了测试方便,避免其它示例污染,因此艿艿先直接删除了 DEMO-TOPIC-01 Topic,然后重新创建。

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送两条消息。IDEA 控制台输出日志如下:

// Consumer 消费 2 条消息成功
2020-03-10 08:28:52.274 INFO 86430 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-1390450417}]
2020-03-10 08:28:53.101 INFO 86430 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=34018866}]

我们打开 Kafka 运维界面,查看下 DEMO-TOPIC-01 Topic 的消息进度情况,会看到一条消息的消费进度未被提交,符合预期。如下图所示:`DEMO-TOPIC-01` Topic

13. 批量发送消息

示例代码对应仓库:

在一些业务场景下,我们希望使用 Producer 批量发送消息,提高发送性能。不同于我们在《芋道 Spring Boot 消息队列 RocketMQ 入门》「4. 批量发送消息」 功能,RocketMQ 是提供了一个可以批量发送多条消息的 API 。

而 Kafka 提供的批量发送消息,它提供了一个 RecordAccumulator 消息收集器,将发送给相同 Topic 的相同 Partition 分区的消息们,“偷偷”收集在一起,当满足条件时候,一次性批量发送提交给 Kafka Broker 。通过在 spring.cloud.stream.kafka.bindings.<bindingName>.producer 下提供了两个配置项,满足任一即会批量发送:

  • 【时间】batch-timeout :超过收集的时间的最大等待时长,单位:毫秒。
  • 【空间】buffer-memory :超过收集的消息占用的最大内存。

下面,我们来实现一个 Spring Cloud Stream Kafka 下的批量发送消息的示例。最终项目如下图所示:项目结构

13.1 搭建生产者

「3. 快速入门」小节的 labx-11-sc-stream-kafka-producer-demo 项目,复制出 labx-11-sc-stream-kafka-producer-batch 项目作为生产者。

13.1.1 配置文件

修改 application.yaml 配置文件,增加批量发送消息相关的配置项。完整配置如下:

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
# binders:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 Kafka Topic
content-type: application/json # 内容格式。这里使用 JSON
# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
# Kafka 自定义 Binding 配置项,对应 KafkaBindingProperties Map
bindings:
demo01-output:
# Kafka Producer 配置项,对应 KafkaProducerProperties 类
producer:
batch-timeout: 30000 # 批处理延迟时间上限。这里配置为 30 * 1000 ms 过后,不管是否消息数量是否到达 batch-size 或者消息大小到达 buffer-memory 后,都直接发送一次请求
buffer-size: 33554432 # 每次批量发送消息的最大内存

server:
port: 18080

具体 batch-timeoutbuffer-size 配置项的数值配置多少,根据自己的应用来。这里,我们故意将 batch-timeout 配置成了 30 秒,主要为了演示之用。

13.1.2 Demo01Controller

修改 Demo01Controller 类,增加发送三条消息的 HTTP 接口,方便测试。代码如下:

// Demo01Controller.java

@GetMapping("/send_batch")
public boolean sendBatch() {
for (int i = 0; i < 3; i++) {
// 创建 Message
int id = new Random().nextInt();
Demo01Message message = new Demo01Message()
.setId(id);
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// 发送消息
mySource.demo01Output().send(springMessage);
logger.info("[send_transaction][发送编号:[{}] 发送成功]", id);
}
return true;
}

就是普通的发送消息的代码,多套了一层循环~

13.2 搭建消费者

直接使用「3. 快速入门」小节的 labx-11-sc-stream-kafka-consumer-demo 项目即可。

13.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_batch 接口,发送三条消息。IDEA 控制台输出日志如下:

// Producer 发送了 3 条消息,被 RecordAccumulator 收集
2020-03-10 08:58:52.736 INFO 87258 --- [io-18080-exec-1] c.i.s.l.k.k.controller.Demo01Controller : [send_batch][发送编号:[-936892120] 发送成功]
2020-03-10 08:58:52.736 INFO 87258 --- [io-18080-exec-1] c.i.s.l.k.k.controller.Demo01Controller : [send_batch][发送编号:[128684651] 发送成功]
2020-03-10 08:58:52.737 INFO 87258 --- [io-18080-exec-1] c.i.s.l.k.k.controller.Demo01Controller : [send_batch][发送编号:[-1940691507] 发送成功]

// 30 秒后,Producer 批量发送消息。
// 此时,Consumer 消费到该消息。
2020-03-10 08:59:22.753 INFO 87236 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-936892120}]
2020-03-10 08:59:22.753 INFO 87236 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=128684651}]
2020-03-10 08:59:22.753 INFO 87236 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:27 消息内容:Demo01Message{id=-1940691507}]

Consumer 在消息批量发送后,才消费到该消息。符合预期~

14. 批量消费消息

示例代码对应仓库:

在一些业务场景下,我们希望使用 Consumer 批量消费消息,提高消费速度。要注意,Consumer 的批量消费消息,和 Producer 的「13. 批量发送消息」 没有直接关联哈。

其实现方式是,Consumer 阻塞等待最多 fetch.max.wait.ms 毫秒,至少拉取 fetch.min.bytes 数据量的消息,至多拉取 max.poll.records 数量的消息,进行批量消费。

  • 如果在 fetch.max.wait.ms 秒内已经成功拉取到 max.poll.records 条消息,则直接进行批量消费消息。
  • 如果在 fetch.max.wait.ms 秒还没拉取到 max.poll.records 条消息,不再等待,而是进行批量消费消息。

下面,我们来实现一个 Spring Cloud Stream Kafka 下的 Consumer 的批量消费消息的示例。最终项目如下图所示:项目结构

14.1 搭建生产者

直接使用「3. 快速入门」小节的 labx-11-sc-stream-kafka-producer-demo 项目即可。

14.2 搭建消费者

「3. 快速入门」小节的 labx-11-sc-stream-kafka-consumer-demo 项目,复制出 labx-11-sc-stream-kafka-consumer-batch 项目作为消费者。

14.2.1 配置文件

修改 application.yaml 配置文件,增加批量消费消息相关的配置项。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binder 配置项,对应 BinderProperties Map
# binders:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 Kafka Topic
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group # 消费者分组
# Consumer 配置项,对应 ConsumerProperties 类
consumer:
batch-mode: true # 是否批量消费默认,默认为 false
# Spring Cloud Stream Kafka 配置项
kafka:
# Kafka Binder 配置项,对应 KafkaBinderConfigurationProperties 类
binder:
brokers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
configuration:
fetch.max.wait.ms: 10000 # poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
fetch.min.bytes: 1024 # poll 一次消息拉取的最小数据量,单位:字节
max.poll.records: 100 # poll 一次消息拉取的最大数量

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

① 具体 fetch.max.wait.msfetch.min.bytesmax.poll.records 配置项的数值配置多少,根据自己的应用来。这里,我们故意将 fetch.max.wait.ms 配置成了 10 秒,主要为了演示之用。

② 设置 spring.cloud.stream.bindings.<bindingName>.consumer.batch-mode 配置项为 true,开启 Consumer 批量消费模式。

14.2.2 Demo01Consumer

修改 Demo01Consumer 类,将消费消息的方法的参数改为 List<?>,从而批量消费消息。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload List<Demo01Message> messages) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), messages);
}

}

14.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。IDEA 控制台输出日志如下:

// Producer 成功同步发送了 3 条消息
2020-03-14 15:36:42.630 INFO 92203 --- [io-18080-exec-6] c.i.s.l.k.k.controller.Demo01Controller : [send][发送编号:[-536147214] 发送成功]
2020-03-14 15:36:42.877 INFO 92203 --- [io-18080-exec-7] c.i.s.l.k.k.controller.Demo01Controller : [send][发送编号:[651899347] 发送成功]
2020-03-14 15:36:43.071 INFO 92203 --- [io-18080-exec-8] c.i.s.l.k.k.controller.Demo01Controller : [send][发送编号:[-1217020146] 发送成功]

// Consumer 拉取 30 秒超时后,获取到发送的 3 条消息,并批量消费了 3 条消息
2020-03-10 15:36:48.881 INFO 92667 --- [container-0-C-1] c.i.s.l.k.c.listener.Demo01Consumer : [onMessage][线程编号:30 消息内容:[[B@5cb66abe, [B@66be19cc, [B@63f992da]]

从日志中,我们可以看出,发送的 3 条消息被 Demo01Consumer 批量消费了。

15. 监控端点

示例代码对应仓库:

Spring Cloud Stream 的 endpoint 模块,基于 Spring Boot Actuator,提供了自定义监控端点 bindingschannels,用于获取 Spring Cloud Stream 的 Binding 和 Channel 信息。

同时,Spring Cloud Stream Kafka 拓展了 Spring Boot Actuator 内置的 health 端点,通过自定义的 KafkaBinderHealthIndicator,获取 Kafka 客户端的健康状态。

友情提示:对 Spring Boot Actuator 不了解的胖友,可以后续阅读《芋道 Spring Boot 监控端点 Actuator 入门》文章。

我们来搭建一个 Spring Cloud Stream RocketMQ 监控端点的使用示例。最终项目如下图所示:项目结构

15.1 搭建生产者

「3. 快速入门」小节的 labx-11-sc-stream-kafka-consumer-demo 项目,复制出 labx-11-sc-stream-kafka-producer-actuator 项目作为生产者。

15.1.1 引入依赖

pom.xml 文件中,额外引入 Spring Boot Actuator 相关依赖。代码如下:

<!-- 实现对 Actuator 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

15.1.2 配置文件

修改 application.yaml 配置文件,额外增加 Spring Boot Actuator 配置项。配置如下:

management:
endpoints:
web:
exposure:
include: '*' # 需要开放的端点。默认值只打开 health 和 info 两个端点。通过设置 * ,可以开放所有端点。
endpoint:
# Health 端点配置项,对应 HealthProperties 配置类
health:
enabled: true # 是否开启。默认为 true 开启。
show-details: ALWAYS # 何时显示完整的健康信息。默认为 NEVER 都不展示。可选 WHEN_AUTHORIZED 当经过授权的用户;可选 ALWAYS 总是展示。

每个配置项的作用,胖友看下艿艿添加的注释。如果还不理解的话,后续看下《芋道 Spring Boot 监控端点 Actuator 入门》文章。

15.1.3 简单测试

① 使用 ProducerApplication 启动生产者。

② 访问应用的 bindings 监控端点 http://127.0.0.1:18080/actuator/bindings,返回结果如下图:`bindings` 监控端点

③ 访问应用的 channels 监控端点 http://127.0.0.1:18080/actuator/channels,返回结果如下图:`channels` 监控端点

④ 访问应用的 health 监控端点 http://127.0.0.1:18080/actuator/health,返回结果如下图:`health` 监控端点

15.2 搭建消费者

「3. 快速入门」小节的 labx-11-sc-stream-kafka-consumer-demo 项目,复制出 labx-11-sc-stream-kafka-consumer-actuator 项目作为消费者。

15.2.1 引入依赖

pom.xml 文件中,额外引入 Spring Boot Actuator 相关依赖。代码如下:

<!-- 实现对 Actuator 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

15.2.2 配置文件

修改 application.yaml 配置文件,额外增加 Spring Boot Actuator 配置项。配置如下:

management:
endpoints:
web:
exposure:
include: '*' # 需要开放的端点。默认值只打开 health 和 info 两个端点。通过设置 * ,可以开放所有端点。
endpoint:
# Health 端点配置项,对应 HealthProperties 配置类
health:
enabled: true # 是否开启。默认为 true 开启。
show-details: ALWAYS # 何时显示完整的健康信息。默认为 NEVER 都不展示。可选 WHEN_AUTHORIZED 当经过授权的用户;可选 ALWAYS 总是展示。

每个配置项的作用,胖友看下艿艿添加的注释。如果还不理解的话,后续看下《芋道 Spring Boot 监控端点 Actuator 入门》文章。

15.2.3 简单测试

① 使用 ConsumerApplication 启动消费者,随机端口为 15748。

② 访问应用的 bindings 监控端点 http://127.0.0.1:15748/actuator/bindings,返回结果如下图:`bindings` 监控端点

③ 访问应用的 channels 监控端点 http://127.0.0.1:15748/actuator/channels,返回结果如下图:`channels` 监控端点

④ 访问应用的 health 监控端点 http://127.0.0.1:15748/actuator/health,返回结果如下图:`health` 监控端点

666. 彩蛋

至此,我们已经完成 Spring Cloud Stream Kafka 的学习。如下是 Kafka 相关的官方文档:

另外,想要在 Spring Boot 项目中使用 Kafka 作为消息队列的胖友,可以阅读《芋道 Spring Boot 消息队列 Kafka 入门》文章。

文章目录
  1. 1. 1. 概述
  2. 2. 2. Spring Cloud Stream 介绍
  3. 3. 3. 快速入门
    1. 3.1. 3.1 搭建生产者
      1. 3.1.1. 3.1.1 引入依赖
      2. 3.1.2. 3.1.2 配置文件
      3. 3.1.3. 3.1.3 MySource
      4. 3.1.4. 3.1.4 Demo01Message
      5. 3.1.5. 3.1.5 Demo01Controller
      6. 3.1.6. 3.1.6 ProducerApplication
    2. 3.2. 3.2 搭建消费者
      1. 3.2.1. 3.2.1 引入依赖
      2. 3.2.2. 3.2.2 配置文件
      3. 3.2.3. 3.2.3 MySink
      4. 3.2.4. 3.2.4 Demo01Message
      5. 3.2.5. 3.2.5 Demo01Consumer
      6. 3.2.6. 3.2.6 ConsumerApplication
    3. 3.3. 3.3 测试单集群多实例的场景
    4. 3.4. 3.4 测试多集群多实例的场景
    5. 3.5. 3.5 小结
  4. 4. 4. 定时消息
  5. 5. 5. 消费重试
    1. 5.1. 5.1 搭建生产者
    2. 5.2. 5.2 搭建消费者
      1. 5.2.1. 5.2.1 配置文件
      2. 5.2.2. 5.2.2 Demo01Consumer
    3. 5.3. 5.3 简单测试
  6. 6. 6. 消费异常处理机制
    1. 6.1. 6.1 搭建生产者
    2. 6.2. 6.2 搭建消费者
      1. 6.2.1. 6.2.1 Demo01Consumer
    3. 6.3. 6.3 简单测试
  7. 7. 7. 广播消费
    1. 7.1. 7.1 搭建生产者
    2. 7.2. 7.2 搭建消费者
      1. 7.2.1. 7.2.1 配置文件
    3. 7.3. 7.3 简单测试
  8. 8. 8. 并发消费
    1. 8.1. 8.1 搭建生产者
    2. 8.2. 8.2 搭建消费者
      1. 8.2.1. 8.2.1 配置文件
    3. 8.3. 8.3 简单测试
  9. 9. 9. 顺序消息
    1. 9.1. 9.1 搭建生产者
      1. 9.1.1. 9.1.1 配置文件
      2. 9.1.2. 9.1.2 Demo01Controller
    2. 9.2. 9.2 搭建消费者
    3. 9.3. 9.3 简单测试
  10. 10. 10. 消息过滤
    1. 10.1. 10.1 搭建生产者
      1. 10.1.1. 10.1.1 Demo01Controller
    2. 10.2. 10.2 搭建消费者
      1. 10.2.1. 10.2.1 Demo01Consumer
    3. 10.3. 10.3 简单测试
  11. 11. 11. 事务消息
    1. 11.1. 11.1 搭建生产者
      1. 11.1.1. 11.1.1 配置文件
      2. 11.1.2. 11.1.2 TransactionConfig
      3. 11.1.3. 11.1.3 Demo01Controller
    2. 11.2. 11.2 搭建消费者
      1. 11.2.1. 11.2.1 配置文件
    3. 11.3. 11.3 简单测试
  12. 12. 12. 消费进度的提交机制
    1. 12.1. 12.1 搭建生产者
    2. 12.2. 12.2 搭建消费者
      1. 12.2.1. 12.2.1 配置文件
      2. 12.2.2. 12.2.2 Demo01Consumer
    3. 12.3. 12.3 简单测试
  13. 13. 13. 批量发送消息
    1. 13.1. 13.1 搭建生产者
      1. 13.1.1. 13.1.1 配置文件
      2. 13.1.2. 13.1.2 Demo01Controller
    2. 13.2. 13.2 搭建消费者
    3. 13.3. 13.3 简单测试
  14. 14. 14. 批量消费消息
    1. 14.1. 14.1 搭建生产者
    2. 14.2. 14.2 搭建消费者
      1. 14.2.1. 14.2.1 配置文件
      2. 14.2.2. 14.2.2 Demo01Consumer
    3. 14.3. 14.3 简单测试
  15. 15. 15. 监控端点
    1. 15.1. 15.1 搭建生产者
      1. 15.1.1. 15.1.1 引入依赖
      2. 15.1.2. 15.1.2 配置文件
      3. 15.1.3. 15.1.3 简单测试
    2. 15.2. 15.2 搭建消费者
      1. 15.2.1. 15.2.1 引入依赖
      2. 15.2.2. 15.2.2 配置文件
      3. 15.2.3. 15.2.3 简单测试
  16. 16. 666. 彩蛋