扫码关注公众号:芋道源码

发送: 百事可乐
获取永久解锁本站全部文章的链接

《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/spring-cloud-integration/ 「芋道源码」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

坑已经挖好,就看 Spring Cloud Alibaba 更新这块。

貌似 https://github.com/spring-cloud-incubator/spring-cloud-alibaba/tree/master/spring-cloud-stream-binder-rocketmq 已经可以看到了。

看了下开发者是小马哥,瞬间就放心了~

如下内容,暂时先转载 《Spring Cloud Alibaba RocketMQ Binder 原理》

RocketMQ 介绍

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力

RocketMQ 基本使用

  • 下载 RocketMQ

下载 RocketMQ最新的二进制文件,并解压

解压后的目录结构如下:

apache-rocketmq
├── LICENSE
├── NOTICE
├── README.md
├── benchmark
├── bin
├── conf
└── lib
  • 启动 NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
  • 启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
  • 发送、接收消息

发送消息:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

发送成功后显示:SendResult [sendStatus=SEND_OK, msgId= …

接收消息:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

接收成功后显示:ConsumeMessageThread_%d Receive New Messages: [MessageExt…

  • 关闭 Server
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

Spring Cloud Stream 介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。

Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。

Spring Cloud Stream 内部有两个概念:Binder 和 Binding。

  • Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。

比如 Kafka 的实现 KafkaMessageChannelBinderRabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder

  • Binding: 包括 Input Binding 和 Output Binding。

Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

SCSt overview

Figure 1. Spring Cloud Stream

使用 Spring Cloud Stream 完成一段简单的消息发送和消息接收代码:

MessageChannel messageChannel = new DirectChannel();

// 消息订阅
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("receive msg: " + message.getPayload());
}
});

// 消息发送
messageChannel.send(MessageBuilder.withPayload("simple msg").build());

这段代码所有的消息类都是 spring-messaging 模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。

Spring Cloud Stream 底层也是基于这段代码去做了各种抽象。

Spring Cloud Alibaba RocketMQ Binder 实现原理

1543560843558 24525bf4 1d0e 4e10 be5f bdde7127f6e6

Figure 2. RocketMQ Binder处理流程

RocketMQ Binder 的核心主要就是这3个类:RocketMQMessageChannelBinderRocketMQInboundChannelAdapterRocketMQMessageHandler

RocketMQMessageChannelBinder 是个标准的 Binder 实现,其内部构建 RocketMQInboundChannelAdapterRocketMQMessageHandler

RocketMQMessageHandler 用于 RocketMQ Producer 的启动以及消息的发送,其内部会根据 spring-messaging 模块内 org.springframework.messaging.Message 消息类,去创建 RocketMQ 的消息类 org.apache.rocketmq.common.message.Message

在构造 org.apache.rocketmq.common.message.Message 的过程中会根据 org.springframework.messaging.Message 的 Header 构造成 RocketMQMessageHeaderAccessor。然后再根据 RocketMQMessageHeaderAccessor 中的一些属性,比如 tags、keys、flag等属性设置到 RocketMQ 的消息类 org.apache.rocketmq.common.message.Message 中。RocketMQMessageHeaderAccessor 中的 Key 可以参考下面的表格进行获取或设置:

配置项 含义
ACKNOWLEDGEMENT 获取 Acknowledgement
TAGS Message Tags
KEYS Message Keys
ORIGINAL_ROCKETMQ_MESSAGE RocketMQ 消息对象 MessageExt
DELAY Message Delay Level
ROCKETMQ_FLAG Message Flag
ROCKETMQ_TRANSACTIONAL_ARG 事务消息中 LocalTransactionExecuter 中使用的参数
ROCKETMQ_SEND_RESULT 消息发送结果

RocketMQInboundChannelAdapter 用于 RocketMQ Consumer 的启动以及消息的接收。其内部还支持 spring-retry 的使用。

在消费消息的时候可以从 Header 中获取 Acknowledgement 并进行一些设置。

比如使用 MessageListenerConcurrently 进行异步消费的时候,可以设置延迟消费:

@StreamListener("input")
public void receive(Message message) {
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER);
acknowledgement.setConsumeConcurrentlyDelayLevel(1);
}

比如使用 MessageListenerOrderly 进行顺序消费的时候,可以设置延迟消费:

@StreamListener("input")
public void receive(Message message) {
RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message);
acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT);
acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000);
}

Provider端支持的配置:

配置项 含义 默认值
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled 是否启用consumer true
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 “\ \ “ 分割(不填表示不进行tags的过滤,订阅所有消息)
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql Consumer订阅满足sql要求的topic消息(如果同时配置了tags内容,sql的优先级更高)
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting Consumer是否是广播模式 false
spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly 顺序消费 or 异步消费 false

Endpoint支持

在使用Endpoint特性之前需要在 Maven 中添加 spring-boot-starter-actuator 依赖,并在配置中允许 Endpoints 的访问。

  • Spring Boot 1.x 中添加配置 management.security.enabled=false。暴露的 endpoint 路径为 /rocketmq_binder
  • Spring Boot 2.x 中添加配置 management.endpoints.web.exposure.include=*。暴露的 endpoint 路径为 /actuator/rocketmq-binder

Endpoint 会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。

{
"runtime": {
"lastSend.timestamp": 1542786623915
},
"metrics": {
"scs-rocketmq.consumer.test-topic.totalConsumed": {
"count": 11
},
"scs-rocketmq.consumer.test-topic.totalConsumedFailures": {
"count": 0
},
"scs-rocketmq.producer.test-topic.totalSentFailures": {
"count": 0
},
"scs-rocketmq.consumer.test-topic.consumedPerSecond": {
"count": 11,
"fifteenMinuteRate": 0.012163847780107841,
"fiveMinuteRate": 0.03614605351360527,
"meanRate": 0.3493213353657594,
"oneMinuteRate": 0.17099243039490175
},
"scs-rocketmq.producer.test-topic.totalSent": {
"count": 5
},
"scs-rocketmq.producer.test-topic.sentPerSecond": {
"count": 5,
"fifteenMinuteRate": 0.005540151995103271,
"fiveMinuteRate": 0.01652854617838251,
"meanRate": 0.10697493212602836,
"oneMinuteRate": 0.07995558537067671
},
"scs-rocketmq.producer.test-topic.sentFailuresPerSecond": {
"count": 0,
"fifteenMinuteRate": 0.0,
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
},
"scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": {
"count": 0,
"fifteenMinuteRate": 0.0,
"fiveMinuteRate": 0.0,
"meanRate": 0.0,
"oneMinuteRate": 0.0
}
}
}

注意:要想查看统计数据需要在pom里加上 metrics-core依赖。如若不加,endpoint 将会显示 warning 信息而不会显示统计信息:

{
"warning": "please add metrics-core dependency, we use it for metrics"
}
文章目录
  1. 1. RocketMQ 介绍
  2. 2. RocketMQ 基本使用
  3. 3. Spring Cloud Stream 介绍
  4. 4. Spring Cloud Alibaba RocketMQ Binder 实现原理
  5. 5. Endpoint支持