⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 RocketMQ 4.0.x 正式版


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

阅读源码最好的方式,是使用 IDEA 进行调试 RocketMQ 源码,不然会一脸懵逼。

胖友可以点击「芋道源码」扫码关注,回复 git001 关键字
获得艿艿添加了中文注释的 RocketMQ 源码地址。

阅读源码很孤单,加入源码交流群,一起坚持!

1、概述

本章主要解析 消费 逻辑涉及到的源码。 因为篇幅较长,分成上下两篇:

  1. 上篇:Broker 相关源码。
  2. 下篇:Consumer 相关源码。

本文即是上篇。


ok,先看第一张关于消费逻辑的图:

消费逻辑图

再看消费逻辑精简的顺序图(实际情况会略有差别):

Consumer&Broker消费精简图.png

2、ConsumeQueue 结构

ConsumeQueueMappedFileQueueMappedFile 的关系如下:

ConsumeQueue、MappedFileQueue、MappedFile的关系 ConsumeQueue : MappedFileQueue : MappedFile = 1 : 1 : N。

反应到系统文件如下:

Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ pwd
/Users/yunai/store/consumequeue
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ cd TopicRead3/
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ ls -ls
total 0
0 drwxr-xr-x 3 yunai staff 102 4 27 21:52 0
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 1
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 2
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 3
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ cd 0/
Yunai-MacdeMacBook-Pro-2:0 yunai$ ls -ls
total 11720
11720 -rw-r--r-- 1 yunai staff 6000000 4 27 21:55 00000000000000000000


ConsumeQueueMappedFileQueueMappedFile 的定义如下:

  • MappedFile :00000000000000000000等文件。
  • MappedFileQueueMappedFile 所在的文件夹,对 MappedFile 进行封装成文件队列,对上层提供可无限使用的文件容量。
    • 每个 MappedFile 统一文件大小。
    • 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 ConsumeQueue 里默认为 6000000B。
  • ConsumeQueue :针对 MappedFileQueue 的封装使用。
    • Store : ConsumeQueue = ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>

ConsumeQueue 存储在 MappedFile 的内容必须大小是 20B( ConsumeQueue.CQ_STORE_UNIT_SIZE ),有两种内容类型:

  1. MESSAGE_POSITION_INFO :消息位置信息。
  2. BLANK : 文件前置空白占位。当历史 Message 被删除时,需要用 BLANK占位被删除的消息。

MESSAGE_POSITION_INFOConsumeQueue 存储结构:

第几位 字段 说明 数据类型 字节数
1 offset 消息 CommitLog 存储位置 Long 8
2 size 消息长度 Int 4
3 tagsCode 消息tagsCode Long 8

BLANKConsumeQueue 存储结构:

第几位 字段 说明 数据类型 字节数
1 0 Long 8
2 Integer.MAX_VALUE Int 4
3 0 Long 8

3、ConsumeQueue 存储

CommitLog重放ConsumeQueue图

主要有两个组件:

  • ReputMessageService :write ConsumeQueue。
  • FlushConsumeQueueService :flush ConsumeQueue。

ReputMessageService

ReputMessageService顺序图

  1: class ReputMessageService extends ServiceThread {
2:
3: /**
4: * 开始重放消息的CommitLog物理位置
5: */
6: private volatile long reputFromOffset = 0;
7:
8: public long getReputFromOffset() {
9: return reputFromOffset;
10: }
11:
12: public void setReputFromOffset(long reputFromOffset) {
13: this.reputFromOffset = reputFromOffset;
14: }
15:
16: @Override
17: public void shutdown() {
18: for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
19: try {
20: Thread.sleep(100);
21: } catch (InterruptedException ignored) {
22: }
23: }
24:
25: if (this.isCommitLogAvailable()) {
26: log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
27: DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
28: }
29:
30: super.shutdown();
31: }
32:
33: /**
34: * 剩余需要重放消息字节数
35: *
36: * @return 字节数
37: */
38: public long behind() {
39: return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
40: }
41:
42: /**
43: * 是否commitLog需要重放消息
44: *
45: * @return 是否
46: */
47: private boolean isCommitLogAvailable() {
48: return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
49: }
50:
51: private void doReput() {
52: for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
53:
54: // TODO 疑问:这个是啥
55: if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
56: && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
57: break;
58: }
59:
60: // 获取从reputFromOffset开始的commitLog对应的MappeFile对应的MappedByteBuffer
61: SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
62: if (result != null) {
63: try {
64: this.reputFromOffset = result.getStartOffset();
65:
66: // 遍历MappedByteBuffer
67: for (int readSize = 0; readSize < result.getSize() && doNext; ) {
68: // 生成重放消息重放调度请求
69: DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
70: int size = dispatchRequest.getMsgSize(); // 消息长度
71: // 根据请求的结果处理
72: if (dispatchRequest.isSuccess()) { // 读取成功
73: if (size > 0) { // 读取Message
74: DefaultMessageStore.this.doDispatch(dispatchRequest);
75: // 通知有新消息
76: if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
77: && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
78: DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
79: dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
80: dispatchRequest.getTagsCode());
81: }
82: // FIXED BUG By shijia
83: this.reputFromOffset += size;
84: readSize += size;
85: // 统计
86: if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
87: DefaultMessageStore.this.storeStatsService
88: .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
89: DefaultMessageStore.this.storeStatsService
90: .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
91: .addAndGet(dispatchRequest.getMsgSize());
92: }
93: } else if (size == 0) { // 读取到MappedFile文件尾
94: this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
95: readSize = result.getSize();
96: }
97: } else if (!dispatchRequest.isSuccess()) { // 读取失败
98: if (size > 0) { // 读取到Message却不是Message
99: log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
100: this.reputFromOffset += size;
101: } else { // 读取到Blank却不是Blank
102: doNext = false;
103: if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
104: log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
105: this.reputFromOffset);
106:
107: this.reputFromOffset += result.getSize() - readSize;
108: }
109: }
110: }
111: }
112: } finally {
113: result.release();
114: }
115: } else {
116: doNext = false;
117: }
118: }
119: }
120:
121: @Override
122: public void run() {
123: DefaultMessageStore.log.info(this.getServiceName() + " service started");
124:
125: while (!this.isStopped()) {
126: try {
127: Thread.sleep(1);
128: this.doReput();
129: } catch (Exception e) {
130: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
131: }
132: }
133:
134: DefaultMessageStore.log.info(this.getServiceName() + " service end");
135: }
136:
137: @Override
138: public String getServiceName() {
139: return ReputMessageService.class.getSimpleName();
140: }
141:
142: }

  • 说明:重放消息线程服务。
    • 该服务不断生成 消息位置信息 到 消费队列(ConsumeQueue)
    • 该服务不断生成 消息索引 到 索引文件(IndexFile)
  • ReputMessageService用例图
    • 第 61 行 :获取 reputFromOffset 开始的 CommitLog 对应的 MappedFile 对应的 MappedByteBuffer
    • 第 67 行 :遍历 MappedByteBuffer
    • 第 69 行 :生成重放消息重放调度请求 (DispatchRequest) 。请求里主要包含一条消息 (Message) 或者 文件尾 (BLANK) 的基本信息。
    • 第 72 至 96 行 :请求是有效请求,进行逻辑处理。
      • 第 75 至 81 行 :当 Broker 是主节点 && Broker 开启的是长轮询,通知消费队列有新的消息。NotifyMessageArrivingListener 会 调用 PullRequestHoldService#notifyMessageArriving(...) 方法,详细解析见:PullRequestHoldService
    • 第 73 至 92 行 :请求对应的是 Message,进行调度,生成 ConsumeQueueIndexFile 对应的内容。详细解析见:
    • 第 93 至 96 行 :请求对应的是 Blank,即文件尾,跳转指向下一个 MappedFile
    • 第 97 至 110 行 :请求是无效请求。出现该情况,基本是一个BUG
  • 第 127 至 128 行 :每 1ms 循环执行重放逻辑。
  • 第 18 至 30 行 :shutdown时,多次 sleep(100) 直到 CommitLog 回放到最新位置。恩,如果未回放完,会输出警告日志。

DefaultMessageStore#doDispatch(...)

 1: /**
2: * 执行调度请求
3: * 1. 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
4: * 2. 建立 索引信息 到 IndexFile
5: *
6: * @param req 调度请求
7: */
8: public void doDispatch(DispatchRequest req) {
9: // 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
10: final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
11: switch (tranType) {
12: case MessageSysFlag.TRANSACTION_NOT_TYPE:
13: case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
14: DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
15: req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
16: break;
17: case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
18: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
19: break;
20: }
21: // 建立 索引信息 到 IndexFile
22: if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
23: DefaultMessageStore.this.indexService.buildIndex(req);
24: }
25: }
26:
27: /**
28: * 建立 消息位置信息 到 ConsumeQueue
29: *
30: * @param topic 主题
31: * @param queueId 队列编号
32: * @param offset commitLog存储位置
33: * @param size 消息长度
34: * @param tagsCode 消息tagsCode
35: * @param storeTimestamp 存储时间
36: * @param logicOffset 队列位置
37: */
38: public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
39: long logicOffset) {
40: ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
41: cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
42: }

ConsumeQueue#putMessagePositionInfoWrapper(...)

  1: /**
2: * 添加位置信息封装
3: *
4: * @param offset commitLog存储位置
5: * @param size 消息长度
6: * @param tagsCode 消息tagsCode
7: * @param storeTimestamp 消息存储时间
8: * @param logicOffset 队列位置
9: */
10: public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
11: long logicOffset) {
12: final int maxRetries = 30;
13: boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
14: // 多次循环写,直到成功
15: for (int i = 0; i < maxRetries && canWrite; i++) {
16: // 调用添加位置信息
17: boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
18: if (result) {
19: // 添加成功,使用消息存储时间 作为 存储check point。
20: this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
21: return;
22: } else {
23: // XXX: warn and notify me
24: log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
25: + " failed, retry " + i + " times");
26:
27: try {
28: Thread.sleep(1000);
29: } catch (InterruptedException e) {
30: log.warn("", e);
31: }
32: }
33: }
34:
35: // XXX: warn and notify me 设置异常不可写入
36: log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
37: this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
38: }
39:
40: /**
41: * 添加位置信息,并返回添加是否成功
42: *
43: * @param offset commitLog存储位置
44: * @param size 消息长度
45: * @param tagsCode 消息tagsCode
46: * @param cqOffset 队列位置
47: * @return 是否成功
48: */
49: private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
50: final long cqOffset) {
51: // 如果已经重放过,直接返回成功
52: if (offset <= this.maxPhysicOffset) {
53: return true;
54: }
55: // 写入位置信息到byteBuffer
56: this.byteBufferIndex.flip();
57: this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
58: this.byteBufferIndex.putLong(offset);
59: this.byteBufferIndex.putInt(size);
60: this.byteBufferIndex.putLong(tagsCode);
61: // 计算consumeQueue存储位置,并获得对应的MappedFile
62: final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
63: MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
64: if (mappedFile != null) {
65: // 当是ConsumeQueue第一个MappedFile && 队列位置非第一个 && MappedFile未写入内容,则填充前置空白占位
66: if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // TODO 疑问:为啥这个操作。目前能够想象到的是,一些老的消息很久没发送,突然发送,这个时候刚好满足。
67: this.minLogicOffset = expectLogicOffset;
68: this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
69: this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
70: this.fillPreBlank(mappedFile, expectLogicOffset);
71: log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
72: + mappedFile.getWrotePosition());
73: }
74: // 校验consumeQueue存储位置是否合法。TODO 如果不合法,继续写入会不会有问题?
75: if (cqOffset != 0) {
76: long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
77: if (expectLogicOffset != currentLogicOffset) {
78: LOG_ERROR.warn(
79: "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
80: expectLogicOffset,
81: currentLogicOffset,
82: this.topic,
83: this.queueId,
84: expectLogicOffset - currentLogicOffset
85: );
86: }
87: }
88: // 设置commitLog重放消息到ConsumeQueue位置。
89: this.maxPhysicOffset = offset;
90: // 插入mappedFile
91: return mappedFile.appendMessage(this.byteBufferIndex.array());
92: }
93: return false;
94: }
95:
96: /**
97: * 填充前置空白占位
98: *
99: * @param mappedFile MappedFile
100: * @param untilWhere consumeQueue存储位置
101: */
102: private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
103: // 写入前置空白占位到byteBuffer
104: ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
105: byteBuffer.putLong(0L);
106: byteBuffer.putInt(Integer.MAX_VALUE);
107: byteBuffer.putLong(0L);
108: // 循环填空
109: int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
110: for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
111: mappedFile.appendMessage(byteBuffer.array());
112: }
113: }

  • #putMessagePositionInfoWrapper(...) 说明 :添加位置信息到 ConsumeQueue 的封装,实际需要调用 #putMessagePositionInfo(...) 方法。
    • 第 13 行 :判断 ConsumeQueue 是否允许写入。当发生Bug时,不允许写入。
    • 第 17 行 :调用 #putMessagePositionInfo(...) 方法,添加位置信息。
    • 第 18 至 21 行 :添加成功,使用消息存储时间 作为 存储检查点。StoreCheckpoint 的详细解析见:Store初始化与关闭
    • 第 22 至 32 行 :添加失败,目前基本可以认为是BUG。
    • 第 35 至 37 行 :写入失败时,标记 ConsumeQueue 写入异常,不允许继续写入。
  • #putMessagePositionInfo(...) 说明 :添加位置信息到 ConsumeQueue,并返回添加是否成功。
    • 第 51 至 54 行 :如果 offset(存储位置) 小于等于 maxPhysicOffset(CommitLog 消息重放到 ConsumeQueue 最大的 CommitLog 存储位置),表示已经重放过,此时,不再重复写入,直接返回写入成功。
    • 第 55 至 60 行 :写 位置信息到byteBuffer。
    • 第 62 至 63 行 :计算 ConsumeQueue存储位置,并获得对应的MappedFile。
    • 第 65 至 73 行 :当 MappedFileConsumeQueue 当前第一个文件 && MappedFile 未写入内容 && 重放消息队列位置大于0,则需要进行 MappedFile 填充前置 BLANK
      • 这块比较有疑问,什么场景下会需要。猜测产生的原因:一个 Topic 长期无消息产生,突然N天后进行发送,Topic 对应的历史消息以及和消费队列数据已经被清理,新生成的MappedFile需要前置占位。
    • 第 74 至 87 行 :校验 ConsumeQueue 存储位置是否合法,不合法则输出日志。
      • 这块比较有疑问,如果计算出来的存储位置不合法,不返回添加失败,继续进行添加位置信息,会不会有问题???
    • 第 89 行 :设置 CommitLog 重放消息到 ConsumeQueue 的最大位置。
    • 第 91 行 :插入消息位置到 MappedFile

FlushConsumeQueueService

 1: class FlushConsumeQueueService extends ServiceThread {
2: private static final int RETRY_TIMES_OVER = 3;
3: /**
4: * 最后flush时间戳
5: */
6: private long lastFlushTimestamp = 0;
7:
8: private void doFlush(int retryTimes) {
9: int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
10:
11: // retryTimes == RETRY_TIMES_OVER时,进行强制flush。主要用于shutdown时。
12: if (retryTimes == RETRY_TIMES_OVER) {
13: flushConsumeQueueLeastPages = 0;
14: }
15: // 当时间满足flushConsumeQueueThoroughInterval时,即使写入的数量不足flushConsumeQueueLeastPages,也进行flush
16: long logicsMsgTimestamp = 0;
17: int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
18: long currentTimeMillis = System.currentTimeMillis();
19: if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
20: this.lastFlushTimestamp = currentTimeMillis;
21: flushConsumeQueueLeastPages = 0;
22: logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
23: }
24: // flush消费队列
25: ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
26: for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
27: for (ConsumeQueue cq : maps.values()) {
28: boolean result = false;
29: for (int i = 0; i < retryTimes && !result; i++) {
30: result = cq.flush(flushConsumeQueueLeastPages);
31: }
32: }
33: }
34: // flush 存储 check point
35: if (0 == flushConsumeQueueLeastPages) {
36: if (logicsMsgTimestamp > 0) {
37: DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
38: }
39: DefaultMessageStore.this.getStoreCheckpoint().flush();
40: }
41: }
42:
43: public void run() {
44: DefaultMessageStore.log.info(this.getServiceName() + " service started");
45:
46: while (!this.isStopped()) {
47: try {
48: int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
49: this.waitForRunning(interval);
50: this.doFlush(1);
51: } catch (Exception e) {
52: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
53: }
54: }
55:
56: this.doFlush(RETRY_TIMES_OVER);
57:
58: DefaultMessageStore.log.info(this.getServiceName() + " service end");
59: }
60:
61: @Override
62: public String getServiceName() {
63: return FlushConsumeQueueService.class.getSimpleName();
64: }
65:
66: @Override
67: public long getJointime() {
68: return 1000 * 60;
69: }
70: }

  • 说明 :flush ConsumeQueue(消费队列) 线程服务。
  • 第 11 至 14 行 :当 retryTimes == RETRY_TIMES_OVER 时,进行强制flush。用于 shutdown 时。
  • 第 15 至 23 行 :每 flushConsumeQueueThoroughInterval 周期,执行一次 flush 。因为不是每次循环到都能满足 flushConsumeQueueLeastPages 大小,因此,需要一定周期进行一次强制 flush 。当然,不能每次循环都去执行强制 flush,这样性能较差。
  • 第 24 至 33 行 :flush ConsumeQueue(消费队列)。
  • 第 34 至 40 行 :flush StoreCheckpointStoreCheckpoint 的详细解析见:Store初始化与关闭
  • 第 43 至 59 行 :每 1000ms 执行一次 flush。如果 wakeup() 时,则会立即进行一次 flush。目前,暂时不存在 wakeup() 的调用。

4、Broker 提供[拉取消息]接口

PullMessageRequestHeader

 1: public class PullMessageRequestHeader implements CommandCustomHeader {
2: /**
3: * 消费者分组
4: */
5: @CFNotNull
6: private String consumerGroup;
7: /**
8: * Topic
9: */
10: @CFNotNull
11: private String topic;
12: /**
13: * 队列编号
14: */
15: @CFNotNull
16: private Integer queueId;
17: /**
18: * 队列开始位置
19: */
20: @CFNotNull
21: private Long queueOffset;
22: /**
23: * 消息数量
24: */
25: @CFNotNull
26: private Integer maxMsgNums;
27: /**
28: * 系统标识
29: */
30: @CFNotNull
31: private Integer sysFlag;
32: /**
33: * 提交消费进度位置
34: */
35: @CFNotNull
36: private Long commitOffset;
37: /**
38: * 挂起超时时间
39: */
40: @CFNotNull
41: private Long suspendTimeoutMillis;
42: /**
43: * 订阅表达式
44: */
45: @CFNullable
46: private String subscription;
47: /**
48: * 订阅版本号
49: */
50: @CFNotNull
51: private Long subVersion;
52: }

  • 说明:拉取消息请求Header
  • topic + queueId + queueOffset + maxMsgNums
  • sysFlag :系统标识。
    • 第 0 位 FLAG_COMMIT_OFFSET :标记请求提交消费进度位置,和 commitOffset 配合。
    • 第 1 位 FLAG_SUSPEND :标记请求是否挂起请求,和 suspendTimeoutMillis 配合。当拉取不到消息时, Broker 会挂起请求,直到有消息。最大挂起时间:suspendTimeoutMillis 毫秒。
    • 第 2 位 FLAG_SUBSCRIPTION :是否过滤订阅表达式,和 subscription 配置。
  • subVersion :订阅版本号。请求时,如果版本号不对,则无法拉取到消息,需要重新获取订阅信息,使用最新的订阅版本号。

PullMessageProcessor#processRequest(...)

  1: private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
2: throws RemotingCommandException {
3: RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
4: final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
5: final PullMessageRequestHeader requestHeader =
6: (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
7:
8: response.setOpaque(request.getOpaque());
9:
10: if (LOG.isDebugEnabled()) {
11: LOG.debug("receive PullMessage request command, {}", request);
12: }
13:
14: // 校验 broker 是否可读
15: if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
16: response.setCode(ResponseCode.NO_PERMISSION);
17: response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
18: return response;
19: }
20:
21: // 校验 consumer分组配置 是否存在
22: SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
23: if (null == subscriptionGroupConfig) {
24: response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
25: response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
26: return response;
27: }
28: // 校验 consumer分组配置 是否可消费
29: if (!subscriptionGroupConfig.isConsumeEnable()) {
30: response.setCode(ResponseCode.NO_PERMISSION);
31: response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
32: return response;
33: }
34:
35: final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag()); // 是否挂起请求,当没有消息时
36: final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag()); // 是否提交消费进度
37: final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag()); // 是否过滤订阅表达式(subscription)
38: final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0; // 挂起请求超时时长
39:
40: // 校验 topic配置 存在
41: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
42: if (null == topicConfig) {
43: LOG.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
44: response.setCode(ResponseCode.TOPIC_NOT_EXIST);
45: response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
46: return response;
47: }
48: // 校验 topic配置 权限可读
49: if (!PermName.isReadable(topicConfig.getPerm())) {
50: response.setCode(ResponseCode.NO_PERMISSION);
51: response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
52: return response;
53: }
54: // 校验 读取队列 在 topic配置 队列范围内
55: if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
56: String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
57: requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
58: LOG.warn(errorInfo);
59: response.setCode(ResponseCode.SYSTEM_ERROR);
60: response.setRemark(errorInfo);
61: return response;
62: }
63:
64: // 校验 订阅关系
65: SubscriptionData subscriptionData;
66: if (hasSubscriptionFlag) {
67: try {
68: subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
69: requestHeader.getSubscription());
70: } catch (Exception e) {
71: LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
72: requestHeader.getConsumerGroup());
73: response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
74: response.setRemark("parse the consumer's subscription failed");
75: return response;
76: }
77: } else {
78: // 校验 消费分组信息 是否存在
79: ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
80: if (null == consumerGroupInfo) {
81: LOG.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
82: response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
83: response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
84: return response;
85: }
86: // 校验 消费分组信息 消息模型是否匹配
87: if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
88: && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
89: response.setCode(ResponseCode.NO_PERMISSION);
90: response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
91: return response;
92: }
93:
94: // 校验 订阅信息 是否存在
95: subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
96: if (null == subscriptionData) {
97: LOG.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
98: response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
99: response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
100: return response;
101: }
102: // 校验 订阅信息版本 是否合法
103: if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
104: LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
105: subscriptionData.getSubString());
106: response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
107: response.setRemark("the consumer's subscription not latest");
108: return response;
109: }
110: }
111:
112: // 获取消息
113: final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
114: requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
115: if (getMessageResult != null) {
116: response.setRemark(getMessageResult.getStatus().name());
117: responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
118: responseHeader.setMinOffset(getMessageResult.getMinOffset());
119: responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
120:
121: // TODO 待读
122: // 计算建议读取brokerId
123: if (getMessageResult.isSuggestPullingFromSlave()) {
124: responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
125: } else {
126: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
127: }
128:
129: switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
130: case ASYNC_MASTER:
131: case SYNC_MASTER:
132: break;
133: case SLAVE:
134: if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // 从节点不允许读取,告诉consumer读取主节点。
135: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
136: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
137: }
138: break;
139: }
140:
141: if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
142: // consume too slow ,redirect to another machine
143: if (getMessageResult.isSuggestPullingFromSlave()) {
144: responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
145: }
146: // consume ok
147: else {
148: responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
149: }
150: } else {
151: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
152: }
153:
154: switch (getMessageResult.getStatus()) {
155: case FOUND:
156: response.setCode(ResponseCode.SUCCESS);
157: break;
158: case MESSAGE_WAS_REMOVING:
159: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
160: break;
161: case NO_MATCHED_LOGIC_QUEUE:
162: case NO_MESSAGE_IN_QUEUE:
163: if (0 != requestHeader.getQueueOffset()) {
164: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
165:
166: // XXX: warn and notify me
167: LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
168: requestHeader.getQueueOffset(), //
169: getMessageResult.getNextBeginOffset(), //
170: requestHeader.getTopic(), //
171: requestHeader.getQueueId(), //
172: requestHeader.getConsumerGroup()//
173: );
174: } else {
175: response.setCode(ResponseCode.PULL_NOT_FOUND);
176: }
177: break;
178: case NO_MATCHED_MESSAGE:
179: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
180: break;
181: case OFFSET_FOUND_NULL:
182: response.setCode(ResponseCode.PULL_NOT_FOUND);
183: break;
184: case OFFSET_OVERFLOW_BADLY:
185: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
186: // XXX: warn and notify me
187: LOG.info("The request offset:{} over flow badly, broker max offset:{} , consumer: {}", requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
188: break;
189: case OFFSET_OVERFLOW_ONE:
190: response.setCode(ResponseCode.PULL_NOT_FOUND);
191: break;
192: case OFFSET_TOO_SMALL:
193: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
194: LOG.info("The request offset is too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
195: requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
196: getMessageResult.getMinOffset(), channel.remoteAddress());
197: break;
198: default:
199: assert false;
200: break;
201: }
202:
203: // hook:before
204: if (this.hasConsumeMessageHook()) {
205: ConsumeMessageContext context = new ConsumeMessageContext();
206: context.setConsumerGroup(requestHeader.getConsumerGroup());
207: context.setTopic(requestHeader.getTopic());
208: context.setQueueId(requestHeader.getQueueId());
209:
210: String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
211:
212: switch (response.getCode()) {
213: case ResponseCode.SUCCESS:
214: int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
215: int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
216:
217: context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
218: context.setCommercialRcvTimes(incValue);
219: context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
220: context.setCommercialOwner(owner);
221:
222: break;
223: case ResponseCode.PULL_NOT_FOUND:
224: if (!brokerAllowSuspend) {
225:
226: context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
227: context.setCommercialRcvTimes(1);
228: context.setCommercialOwner(owner);
229:
230: }
231: break;
232: case ResponseCode.PULL_RETRY_IMMEDIATELY:
233: case ResponseCode.PULL_OFFSET_MOVED:
234: context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
235: context.setCommercialRcvTimes(1);
236: context.setCommercialOwner(owner);
237: break;
238: default:
239: assert false;
240: break;
241: }
242:
243: this.executeConsumeMessageHookBefore(context);
244: }
245:
246: switch (response.getCode()) {
247: case ResponseCode.SUCCESS:
248:
249: this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
250: getMessageResult.getMessageCount());
251: this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
252: getMessageResult.getBufferTotalSize());
253: this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
254: // 读取消息
255: if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { // 内存中
256: final long beginTimeMills = this.brokerController.getMessageStore().now();
257:
258: final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
259:
260: this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
261: requestHeader.getTopic(), requestHeader.getQueueId(),
262: (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
263: response.setBody(r);
264: } else { // zero-copy
265: try {
266: FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
267: channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
268: @Override
269: public void operationComplete(ChannelFuture future) throws Exception {
270: getMessageResult.release();
271: if (!future.isSuccess()) {
272: LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
273: }
274: }
275: });
276: } catch (Throwable e) {
277: LOG.error("Error occurred when transferring messages from page cache", e);
278: getMessageResult.release();
279: }
280:
281: response = null;
282: }
283: break;
284: case ResponseCode.PULL_NOT_FOUND:
285: // 消息未查询到 && broker允许挂起请求 && 请求允许挂起
286: if (brokerAllowSuspend && hasSuspendFlag) {
287: long pollingTimeMills = suspendTimeoutMillisLong;
288: if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
289: pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
290: }
291:
292: String topic = requestHeader.getTopic();
293: long offset = requestHeader.getQueueOffset();
294: int queueId = requestHeader.getQueueId();
295: PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
296: this.brokerController.getMessageStore().now(), offset, subscriptionData);
297: this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
298: response = null;
299: break;
300: }
301:
302: case ResponseCode.PULL_RETRY_IMMEDIATELY:
303: break;
304: case ResponseCode.PULL_OFFSET_MOVED:
305: if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
306: || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { // TODO 待博客补充
307: MessageQueue mq = new MessageQueue();
308: mq.setTopic(requestHeader.getTopic());
309: mq.setQueueId(requestHeader.getQueueId());
310: mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
311:
312: OffsetMovedEvent event = new OffsetMovedEvent();
313: event.setConsumerGroup(requestHeader.getConsumerGroup());
314: event.setMessageQueue(mq);
315: event.setOffsetRequest(requestHeader.getQueueOffset());
316: event.setOffsetNew(getMessageResult.getNextBeginOffset());
317: this.generateOffsetMovedEvent(event);
318: LOG.warn(
319: "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
320: requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
321: responseHeader.getSuggestWhichBrokerId());
322: } else {
323: responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
324: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
325: LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
326: requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
327: responseHeader.getSuggestWhichBrokerId());
328: }
329:
330: break;
331: default:
332: assert false;
333: }
334: } else {
335: response.setCode(ResponseCode.SYSTEM_ERROR);
336: response.setRemark("store getMessage return null");
337: }
338:
339: // 请求要求持久化进度 && broker非主,进行持久化进度。
340: boolean storeOffsetEnable = brokerAllowSuspend;
341: storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
342: storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
343: if (storeOffsetEnable) {
344: this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
345: requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
346: }
347: return response;
348: }

  • 说明:处理拉取消息请求,返回响应。
  • 第 14 至 19 行 :校验 Broker 是否可读。
  • 第 21 至 33 行 :校验 SubscriptionGroupConfig(订阅分组配置) 是否存在 && 可以消费。
  • 第 35 至 38 行 :处理 PullMessageRequestHeader.sysFlag 对应的标志位。
  • 第 40 至 62 行 :校验 TopicConfig(主题配置) 是否存在 && 可读 && 队列编号正确。
  • 第 64 至 110 行 :校验 SubscriptionData(订阅信息) 是否正确。
  • 第 113 行 :调用 MessageStore#getMessage(...) 获取 GetMessageResult(消息)。详细解析见:MessageStore#getMessage(...)
  • 第 122 至 152 行 :计算建议拉取消息 brokerId
  • 第 154 至 201 行 :PullMessageProcessor拉取消息状态图
  • 第 204 至 244 行 :Hook 逻辑,#executeConsumeMessageHookBefore(...)
  • 第 247 至 283 行 :拉取消息成功,即拉取到消息。
    • 第 255 至 263 行 :方式一 :调用 readGetMessageResult(...) 获取消息内容到堆内内存,设置到 响应body
    • 第 265 至 281 行 :方式二 :基于 zero-copy 实现,直接响应,无需堆内内存,性能更优。TODO :此处等对zero-copy有研究,再补充一些
  • 第 284 至 300 行 :拉取不到消息,当满足条件 (Broker 允许挂起 && 请求要求挂起),执行挂起请求。详细解析见:PullRequestHoldService
  • 第 304 至 328 行 :TODO :此处等对tools模块研究后再补充
  • 第 339 至 346 :持久化消费进度,当满足 (Broker 非主 && 请求要求持久化进度)。详细解析见:更新消费进度

MessageStore#getMessage(...)

  1: /**
2: * 获取消息结果
3: *
4: * @param group 消费分组
5: * @param topic 主题
6: * @param queueId 队列编号
7: * @param offset 队列位置
8: * @param maxMsgNums 消息数量
9: * @param subscriptionData 订阅信息
10: * @return 消息结果
11: */
12: public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
13: final SubscriptionData subscriptionData) {
14: // 是否关闭
15: if (this.shutdown) {
16: log.warn("message store has shutdown, so getMessage is forbidden");
17: return null;
18: }
19: // 是否可读
20: if (!this.runningFlags.isReadable()) {
21: log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
22: return null;
23: }
24:
25: long beginTime = this.getSystemClock().now();
26:
27: GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
28: long nextBeginOffset = offset;
29: long minOffset = 0;
30: long maxOffset = 0;
31:
32: GetMessageResult getResult = new GetMessageResult();
33:
34: final long maxOffsetPy = this.commitLog.getMaxOffset();
35:
36: // 获取消费队列
37: ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
38: if (consumeQueue != null) {
39: minOffset = consumeQueue.getMinOffsetInQueue(); // 消费队列 最小队列编号
40: maxOffset = consumeQueue.getMaxOffsetInQueue(); // 消费队列 最大队列编号
41:
42: // 判断 队列位置(offset)
43: if (maxOffset == 0) { // 消费队列无消息
44: status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
45: nextBeginOffset = nextOffsetCorrection(offset, 0);
46: } else if (offset < minOffset) { // 查询offset 太小
47: status = GetMessageStatus.OFFSET_TOO_SMALL;
48: nextBeginOffset = nextOffsetCorrection(offset, minOffset);
49: } else if (offset == maxOffset) { // 查询offset 超过 消费队列 一个位置
50: status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
51: nextBeginOffset = nextOffsetCorrection(offset, offset);
52: } else if (offset > maxOffset) { // 查询offset 超过 消费队列 太多(大于一个位置)
53: status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
54: if (0 == minOffset) { // TODO blog 这里是??为啥0 == minOffset做了特殊判断
55: nextBeginOffset = nextOffsetCorrection(offset, minOffset);
56: } else {
57: nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
58: }
59: } else {
60: // 获得 映射Buffer结果(MappedFile)
61: SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
62: if (bufferConsumeQueue != null) {
63: try {
64: status = GetMessageStatus.NO_MATCHED_MESSAGE;
65:
66: long nextPhyFileStartOffset = Long.MIN_VALUE; // commitLog下一个文件(MappedFile)对应的开始offset。
67: long maxPhyOffsetPulling = 0; // 消息物理位置拉取到的最大offset
68:
69: int i = 0;
70: final int maxFilterMessageCount = 16000;
71: final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
72: // 循环获取 消息位置信息
73: for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
74: long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // 消息物理位置offset
75: int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // 消息长度
76: long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); // 消息tagsCode
77: // 设置消息物理位置拉取到的最大offset
78: maxPhyOffsetPulling = offsetPy;
79: // 当 offsetPy 小于 nextPhyFileStartOffset 时,意味着对应的 Message 已经移除,所以直接continue,直到可读取的Message。
80: if (nextPhyFileStartOffset != Long.MIN_VALUE) {
81: if (offsetPy < nextPhyFileStartOffset)
82: continue;
83: }
84: // 校验 commitLog 是否需要硬盘,无法全部放在内存
85: boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
86: // 是否已经获得足够消息
87: if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
88: isInDisk)) {
89: break;
90: }
91: // 判断消息是否符合条件
92: if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
93: // 从commitLog获取对应消息ByteBuffer
94: SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
95: if (selectResult != null) {
96: this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
97: getResult.addMessage(selectResult);
98: status = GetMessageStatus.FOUND;
99: nextPhyFileStartOffset = Long.MIN_VALUE;
100: } else {
101: // 从commitLog无法读取到消息,说明该消息对应的文件(MappedFile)已经删除,计算下一个MappedFile的起始位置
102: if (getResult.getBufferTotalSize() == 0) {
103: status = GetMessageStatus.MESSAGE_WAS_REMOVING;
104: }
105: nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
106: }
107: } else {
108: if (getResult.getBufferTotalSize() == 0) {
109: status = GetMessageStatus.NO_MATCHED_MESSAGE;
110: }
111:
112: if (log.isDebugEnabled()) {
113: log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode);
114: }
115: }
116: }
117: // 统计剩余可拉取消息字节数
118: if (diskFallRecorded) {
119: long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
120: brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
121: }
122: // 计算下次拉取消息的消息队列编号
123: nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
124: // 根据剩余可拉取消息字节数与内存判断是否建议读取从节点
125: long diff = maxOffsetPy - maxPhyOffsetPulling;
126: long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
127: * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
128: getResult.setSuggestPullingFromSlave(diff > memory);
129: } finally {
130: bufferConsumeQueue.release();
131: }
132: } else {
133: status = GetMessageStatus.OFFSET_FOUND_NULL;
134: nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
135: log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
136: + maxOffset + ", but access logic queue failed.");
137: }
138: }
139: } else {
140: status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
141: nextBeginOffset = nextOffsetCorrection(offset, 0);
142: }
143: // 统计
144: if (GetMessageStatus.FOUND == status) {
145: this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
146: } else {
147: this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
148: }
149: long eclipseTime = this.getSystemClock().now() - beginTime;
150: this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
151: // 设置返回结果
152: getResult.setStatus(status);
153: getResult.setNextBeginOffset(nextBeginOffset);
154: getResult.setMaxOffset(maxOffset);
155: getResult.setMinOffset(minOffset);
156: return getResult;
157: }
158:
159: /**
160: * 根据 主题 + 队列编号 获取 消费队列
161: *
162: * @param topic 主题
163: * @param queueId 队列编号
164: * @return 消费队列
165: */
166: public ConsumeQueue findConsumeQueue(String topic, int queueId) {
167: // 获取 topic 对应的 所有消费队列
168: ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
169: if (null == map) {
170: ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<>(128);
171: ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
172: if (oldMap != null) {
173: map = oldMap;
174: } else {
175: map = newMap;
176: }
177: }
178: // 获取 queueId 对应的 消费队列
179: ConsumeQueue logic = map.get(queueId);
180: if (null == logic) {
181: ConsumeQueue newLogic = new ConsumeQueue(//
182: topic, //
183: queueId, //
184: StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
185: this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
186: this);
187: ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
188: if (oldLogic != null) {
189: logic = oldLogic;
190: } else {
191: logic = newLogic;
192: }
193: }
194:
195: return logic;
196: }
197:
198: /**
199: * 下一个获取队列offset修正
200: * 修正条件:主节点 或者 从节点开启校验offset开关
201: *
202: * @param oldOffset 老队列offset
203: * @param newOffset 新队列offset
204: * @return 修正后的队列offset
205: */
206: private long nextOffsetCorrection(long oldOffset, long newOffset) {
207: long nextOffset = oldOffset;
208: if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) {
209: nextOffset = newOffset;
210: }
211: return nextOffset;
212: }
213:
214: /**
215: * 校验 commitLog 是否需要硬盘,无法全部放在内存
216: *
217: * @param offsetPy commitLog 指定offset
218: * @param maxOffsetPy commitLog 最大offset
219: * @return 是否需要硬盘
220: */
221: private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
222: long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
223: return (maxOffsetPy - offsetPy) > memory;
224: }
225:
226: /**
227: * 判断获取消息是否已经满
228: *
229: * @param sizePy 字节数
230: * @param maxMsgNums 最大消息数
231: * @param bufferTotal 目前已经计算字节数
232: * @param messageTotal 目前已经计算消息数
233: * @param isInDisk 是否在硬盘中
234: * @return 是否已满
235: */
236: private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {
237: if (0 == bufferTotal || 0 == messageTotal) {
238: return false;
239: }
240: // 消息数量已经满足请求数量(maxMsgNums)
241: if ((messageTotal + 1) >= maxMsgNums) {
242: return true;
243: }
244: // 根据消息存储配置的最大传输字节数、最大传输消息数是否已满
245: if (isInDisk) {
246: if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
247: return true;
248: }
249:
250: if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
251: return true;
252: }
253: } else {
254: if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
255: return true;
256: }
257:
258: if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
259: return true;
260: }
261: }
262:
263: return false;
264: }

  • 说明 :根据 消息分组(group) + 主题(Topic) + 队列编号(queueId) + 队列位置(offset) + 订阅信息(subscriptionData) 获取 指定条数(maxMsgNums) 消息(Message)。
  • 第 14 至 18 行 :判断 Store 是否处于关闭状态,若关闭,则无法获取消息。
  • 第 19 至 23 行 :判断当前运行状态是否可读,若不可读,则无法获取消息。
  • 第 37 行 :根据 主题(Topic) + 队列编号(queueId) 获取 消息队列(ConsumeQueue)。
    • #findConsumeQueue(...) :第 159 至 196 行。
  • 第 43 至 58 行 :各种队列位置(offset) 无法读取消息,并针对对应的情况,计算下一次 Client 队列拉取位置。
    • 第 43 至 45 行 :消息队列无消息。
    • 第 46 至 48 行 :查询的消息队列位置(offset) 太小。
    • 第 49 至 51 行 :查询的消息队列位置(offset) 恰好等于 消息队列最大的队列位置。该情况是正常现象,相当于查询最新的消息。
    • 第 52 至 58 行 :查询的消息队列位置(offset) 超过过多。
    • #nextOffsetCorrection(...) :第 198 至 212 行。
  • 第 61 行 :根据 消费队列位置(offset) 获取 对应的MappedFile
  • 第 72 至 128 行 :循环获取 消息位置信息
    • 第 74 至 76 行 :读取每一个 消息位置信息
    • 第 79 至 83 行 :当 offsetPy 小于 nextPhyFileStartOffset 时,意味着对 应的 Message 已经移除,所以直接continue,直到可读取的 Message
    • 第 84 至 90 行 :判断是否已经获得足够的消息。
      • #checkInDiskByCommitOffset(...) :第 214 至 224 行。
      • #isTheBatchFull(...) :第 226 至 264 行。
  • 第 92 行 :判断消息是否符合条件。详细解析见:DefaultMessageFilter#isMessageMatched(...)
  • 第 94 行 :从 CommitLog 获取对应 消息的MappedByteBuffer
  • 第 95 至 99 行 :获取 消息MappedByteBuffer 成功。
  • 第 100 至 106 行 :获取 消息MappedByteBuffer 失败。从 CommitLog 无法读取到消息,说明 该消息对应的文件(MappedFile) 已经删除,此时计算下一个MappedFile的起始位置。该逻辑需要配合(第 79 至 83 行)一起理解。
  • 第 117 至 120 行 :统计剩余可拉取消息字节数。
  • 第 123 行 :计算下次拉取消息的消息队列编号。
  • 第 124 至 128 行 :根据剩余可拉取消息字节数与内存判断是否建议读取从节点。
  • 第 130 行 :释放 bufferConsumeQueueMappedFile 的指向。此处 MappedFileConsumeQueue 里的文件,不是 CommitLog 下的文件。
  • 第 133 至 136 行 :获得消费队列位置(offset) 获取 对应的MappedFile,计算ConsumeQueueoffset 开始的下一个 MappedFile 对应的位置。
  • 第 143 至 150 行 :记录统计信息:消耗时间、拉取到消息/未拉取到消息次数。
  • 第 151 至 156 行 :设置返回结果并返回。

DefaultMessageFilter#isMessageMatched(...)

 1: public class DefaultMessageFilter implements MessageFilter {
2:
3: @Override
4: public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
5: // 消息tagsCode 空
6: if (tagsCode == null) {
7: return true;
8: }
9: // 订阅数据 空
10: if (null == subscriptionData) {
11: return true;
12: }
13: // classFilter
14: if (subscriptionData.isClassFilterMode())
15: return true;
16: // 订阅表达式 全匹配
17: if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
18: return true;
19: }
20: // 订阅数据code数组 是否包含 消息tagsCode
21: return subscriptionData.getCodeSet().contains(tagsCode.intValue());
22: }
23:
24: }

  • 说明 :消息过滤器默认实现。

PullRequestHoldService

  1: public class PullRequestHoldService extends ServiceThread {
2:
3: private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
4:
5: private static final String TOPIC_QUEUEID_SEPARATOR = "@";
6:
7: private final BrokerController brokerController;
8:
9: private final SystemClock systemClock = new SystemClock();
10: /**
11: * 消息过滤器
12: */
13: private final MessageFilter messageFilter = new DefaultMessageFilter();
14: /**
15: * 拉取消息请求集合
16: */
17: private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
18: new ConcurrentHashMap<>(1024);
19:
20: public PullRequestHoldService(final BrokerController brokerController) {
21: this.brokerController = brokerController;
22: }
23:
24: /**
25: * 添加拉取消息挂起请求
26: *
27: * @param topic 主题
28: * @param queueId 队列编号
29: * @param pullRequest 拉取消息请求
30: */
31: public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
32: String key = this.buildKey(topic, queueId);
33: ManyPullRequest mpr = this.pullRequestTable.get(key);
34: if (null == mpr) {
35: mpr = new ManyPullRequest();
36: ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
37: if (prev != null) {
38: mpr = prev;
39: }
40: }
41:
42: mpr.addPullRequest(pullRequest);
43: }
44:
45: /**
46: * 根据 主题 + 队列编号 创建唯一标识
47: *
48: * @param topic 主题
49: * @param queueId 队列编号
50: * @return key
51: */
52: private String buildKey(final String topic, final int queueId) {
53: StringBuilder sb = new StringBuilder();
54: sb.append(topic);
55: sb.append(TOPIC_QUEUEID_SEPARATOR);
56: sb.append(queueId);
57: return sb.toString();
58: }
59:
60: @Override
61: public void run() {
62: log.info("{} service started", this.getServiceName());
63: while (!this.isStopped()) {
64: try {
65: // 根据 长轮训 还是 短轮训 设置不同的等待时间
66: if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
67: this.waitForRunning(5 * 1000);
68: } else {
69: this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
70: }
71: // 检查挂起请求是否有需要通知的
72: long beginLockTimestamp = this.systemClock.now();
73: this.checkHoldRequest();
74: long costTime = this.systemClock.now() - beginLockTimestamp;
75: if (costTime > 5 * 1000) {
76: log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
77: }
78: } catch (Throwable e) {
79: log.warn(this.getServiceName() + " service has exception. ", e);
80: }
81: }
82:
83: log.info("{} service end", this.getServiceName());
84: }
85:
86: @Override
87: public String getServiceName() {
88: return PullRequestHoldService.class.getSimpleName();
89: }
90:
91: /**
92: * 遍历挂起请求,检查是否有需要通知的请求。
93: */
94: private void checkHoldRequest() {
95: for (String key : this.pullRequestTable.keySet()) {
96: String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
97: if (2 == kArray.length) {
98: String topic = kArray[0];
99: int queueId = Integer.parseInt(kArray[1]);
100: final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
101: try {
102: this.notifyMessageArriving(topic, queueId, offset);
103: } catch (Throwable e) {
104: log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
105: }
106: }
107: }
108: }
109:
110: /**
111: * 检查是否有需要通知的请求
112: *
113: * @param topic 主题
114: * @param queueId 队列编号
115: * @param maxOffset 消费队列最大offset
116: */
117: public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
118: notifyMessageArriving(topic, queueId, maxOffset, null);
119: }
120:
121: /**
122: * 检查是否有需要通知的请求
123: *
124: * @param topic 主题
125: * @param queueId 队列编号
126: * @param maxOffset 消费队列最大offset
127: * @param tagsCode 过滤tagsCode
128: */
129: public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
130: String key = this.buildKey(topic, queueId);
131: ManyPullRequest mpr = this.pullRequestTable.get(key);
132: if (mpr != null) {
133: //
134: List<PullRequest> requestList = mpr.cloneListAndClear();
135: if (requestList != null) {
136: List<PullRequest> replayList = new ArrayList<>(); // 不符合唤醒的请求数组
137:
138: for (PullRequest request : requestList) {
139: // 如果 maxOffset 过小,则重新读取一次。
140: long newestOffset = maxOffset;
141: if (newestOffset <= request.getPullFromThisOffset()) {
142: newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
143: }
144: // 有新的匹配消息,唤醒请求,即再次拉取消息。
145: if (newestOffset > request.getPullFromThisOffset()) {
146: if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {
147: try {
148: this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
149: request.getRequestCommand());
150: } catch (Throwable e) {
151: log.error("execute request when wakeup failed.", e);
152: }
153: continue;
154: }
155: }
156: // 超过挂起时间,唤醒请求,即再次拉取消息。
157: if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
158: try {
159: this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
160: request.getRequestCommand());
161: } catch (Throwable e) {
162: log.error("execute request when wakeup failed.", e);
163: }
164: continue;
165: }
166: // 不符合再次拉取的请求,再次添加回去
167: replayList.add(request);
168: }
169: // 添加回去
170: if (!replayList.isEmpty()) {
171: mpr.addPullRequest(replayList);
172: }
173: }
174: }
175: }
176: }

  • PullRequestHoldService 说明 :拉取消息请求挂起维护线程服务。
    • 当拉取消息请求获得不了消息时,则会将请求进行挂起,添加到该服务。
    • 当有符合条件信息时 或 挂起超时时,重新执行获取消息逻辑。
  • #suspendPullRequest(...) 说明 :添加拉取消息挂起请求到集合( pullRequestTable )。
  • #run(...) 说明 :定时检查挂起请求是否有需要通知重新拉取消息并进行通知。
    • 第 65 至 70 行 :根据长轮训or短轮训设置不同的等待时间。
    • 第 71 至 77 行 :检查挂起请求是否有需要通知的。
  • #checkHoldRequest(...) 说明 :遍历挂起请求,检查是否有需要通知的。
  • #notifyMessageArriving(...) 说明 :检查指定队列是否有需要通知的请求。
    • 第 139 至 143 行 :如果 maxOffset 过小,重新获取一次最新的。
    • 第 144 至 155 行 :有新的匹配消息,唤醒请求,即再次拉取消息。
    • 第 156 至 165 行 :超过挂起时间,唤醒请求,即再次拉取消息。
    • 第 148 || 159 行 :唤醒请求,再次拉取消息。原先担心拉取消息时间过长,导致影响整个挂起请求的遍历,后面查看#executeRequestWhenWakeup(...),实际是丢到线程池进行一步的消息拉取,不会有性能上的问题。详细解析见:PullMessageProcessor#executeRequestWhenWakeup(...)
    • 第 166 至 172 行 :不符合唤醒的请求重新添加到集合(pullRequestTable)。

PullMessageProcessor#executeRequestWhenWakeup(...)

 1: public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
2: Runnable run = new Runnable() {
3: @Override
4: public void run() {
5: try {
6: // 调用拉取请求。本次调用,设置不挂起请求。
7: final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
8:
9: if (response != null) {
10: response.setOpaque(request.getOpaque());
11: response.markResponseType();
12: try {
13: channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
14: @Override
15: public void operationComplete(ChannelFuture future) throws Exception {
16: if (!future.isSuccess()) {
17: LOG.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
18: LOG.error(request.toString());
19: LOG.error(response.toString());
20: }
21: }
22: });
23: } catch (Throwable e) {
24: LOG.error("ProcessRequestWrapper process request over, but response failed", e);
25: LOG.error(request.toString());
26: LOG.error(response.toString());
27: }
28: }
29: } catch (RemotingCommandException e1) {
30: LOG.error("ExecuteRequestWhenWakeup run", e1);
31: }
32: }
33: };
34: // 提交拉取请求到线程池
35: this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
36: }

  • 说明 :执行请求唤醒,即再次拉取消息。该方法调用线程池,因此,不会阻塞。
  • 第 7 行 :调用拉取消息请求。本次调用,设置即使请求不到消息,也不挂起请求。如果不设置,请求可能被无限挂起,被 Broker 无限循环。
  • 第 35 行 :提交拉取消息请求到线程池

5、Broker 提供[更新消费进度]接口

Yunai-MacdeMacBook-Pro-2:config yunai$ pwd
/Users/yunai/store/config
Yunai-MacdeMacBook-Pro-2:config yunai$ ls -ls
total 40
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json.bak
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json.bak
8 -rw-r--r-- 1 yunai staff 1401 4 27 21:51 topics.json
Yunai-MacdeMacBook-Pro-2:config yunai$ cat consumerOffset.json
{
"offsetTable":{
"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
},
"TopicRead3@please_rename_unique_group_name_4":{1:5
}
}
}

  • consumerOffset.json :消费进度存储文件。
  • consumerOffset.json.bak :消费进度存储文件备份。
  • 每次写入 consumerOffset.json,将原内容备份到 consumerOffset.json.bak。实现见:MixAll#string2File(...)

BrokerController#initialize(...)

 1:this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
2: @Override
3: public void run() {
4: try {
5: BrokerController.this.consumerOffsetManager.persist();
6: } catch (Throwable e) {
7: log.error("schedule persist consumerOffset error.", e);
8: }
9: }
10:}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

  • 说明 :每 5s 执行一次持久化逻辑。

ConfigManager

 1: public abstract class ConfigManager {
2: private static final Logger PLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
3:
4: /**
5: * 编码内容
6: * @return 编码后的内容
7: */
8: public abstract String encode();
9:
10: /**
11: * 加载文件
12: *
13: * @return 加载是否成功
14: */
15: public boolean load() {
16: String fileName = null;
17: try {
18: fileName = this.configFilePath();
19: String jsonString = MixAll.file2String(fileName);
20: // 如果内容不存在,则加载备份文件
21: if (null == jsonString || jsonString.length() == 0) {
22: return this.loadBak();
23: } else {
24: this.decode(jsonString);
25: PLOG.info("load {} OK", fileName);
26: return true;
27: }
28: } catch (Exception e) {
29: PLOG.error("load " + fileName + " Failed, and try to load backup file", e);
30: return this.loadBak();
31: }
32: }
33:
34: /**
35: * 配置文件地址
36: *
37: * @return 配置文件地址
38: */
39: public abstract String configFilePath();
40:
41: /**
42: * 加载备份文件
43: *
44: * @return 是否成功
45: */
46: private boolean loadBak() {
47: String fileName = null;
48: try {
49: fileName = this.configFilePath();
50: String jsonString = MixAll.file2String(fileName + ".bak");
51: if (jsonString != null && jsonString.length() > 0) {
52: this.decode(jsonString);
53: PLOG.info("load " + fileName + " OK");
54: return true;
55: }
56: } catch (Exception e) {
57: PLOG.error("load " + fileName + " Failed", e);
58: return false;
59: }
60:
61: return true;
62: }
63:
64: /**
65: * 解码内容
66: *
67: * @param jsonString 内容
68: */
69: public abstract void decode(final String jsonString);
70:
71: /**
72: * 持久化
73: */
74: public synchronized void persist() {
75: String jsonString = this.encode(true);
76: if (jsonString != null) {
77: String fileName = this.configFilePath();
78: try {
79: MixAll.string2File(jsonString, fileName);
80: } catch (IOException e) {
81: PLOG.error("persist file Exception, " + fileName, e);
82: }
83: }
84: }
85:
86: /**
87: * 编码存储内容
88: *
89: * @param prettyFormat 是否格式化
90: * @return 内容
91: */
92: public abstract String encode(final boolean prettyFormat);
93: }

MixAll#string2File(...)

 1: /**
2: * 将内容写到文件
3: * 安全写
4: * 1. 写到.tmp文件
5: * 2. 备份准备写入文件到.bak文件
6: * 3. 删除原文件,将.tmp修改成文件
7: *
8: * @param str 内容
9: * @param fileName 文件名
10: * @throws IOException 当IO发生异常时
11: */
12: public static void string2File(final String str, final String fileName) throws IOException {
13: // 写到 tmp文件
14: String tmpFile = fileName + ".tmp";
15: string2FileNotSafe(str, tmpFile);
16: //
17: String bakFile = fileName + ".bak";
18: String prevContent = file2String(fileName);
19: if (prevContent != null) {
20: string2FileNotSafe(prevContent, bakFile);
21: }
22:
23: File file = new File(fileName);
24: file.delete();
25:
26: file = new File(tmpFile);
27: file.renameTo(new File(fileName));
28: }
29:
30: /**
31: * 将内容写到文件
32: * 非安全写
33: *
34: * @param str 内容
35: * @param fileName 文件内容
36: * @throws IOException 当IO发生异常时
37: */
38: public static void string2FileNotSafe(final String str, final String fileName) throws IOException {
39: File file = new File(fileName);
40: // 创建上级目录
41: File fileParent = file.getParentFile();
42: if (fileParent != null) {
43: fileParent.mkdirs();
44: }
45: // 写内容
46: FileWriter fileWriter = null;
47: try {
48: fileWriter = new FileWriter(file);
49: fileWriter.write(str);
50: } catch (IOException e) {
51: throw e;
52: } finally {
53: if (fileWriter != null) {
54: fileWriter.close();
55: }
56: }
57: }

ConsumerOffsetManager

 1: public class ConsumerOffsetManager extends ConfigManager {
2: private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
3: private static final String TOPIC_GROUP_SEPARATOR = "@";
4:
5: /**
6: * 消费进度集合
7: */
8: private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>(512);
9:
10: private transient BrokerController brokerController;
11:
12: public ConsumerOffsetManager() {
13: }
14:
15: public ConsumerOffsetManager(BrokerController brokerController) {
16: this.brokerController = brokerController;
17: }
18:
19: /**
20: * 提交消费进度
21: *
22: * @param clientHost 提交client地址
23: * @param group 消费分组
24: * @param topic 主题
25: * @param queueId 队列编号
26: * @param offset 进度(队列位置)
27: */
28: public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
29: // topic@group
30: String key = topic + TOPIC_GROUP_SEPARATOR + group;
31: this.commitOffset(clientHost, key, queueId, offset);
32: }
33:
34: /**
35: * 提交消费进度
36: *
37: * @param clientHost 提交client地址
38: * @param key 主题@消费分组
39: * @param queueId 队列编号
40: * @param offset 进度(队列位置)
41: */
42: private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
43: ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
44: if (null == map) {
45: map = new ConcurrentHashMap<>(32);
46: map.put(queueId, offset);
47: this.offsetTable.put(key, map);
48: } else {
49: Long storeOffset = map.put(queueId, offset);
50: if (storeOffset != null && offset < storeOffset) {
51: log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
52: }
53: }
54: }
55:
56: public String encode() {
57: return this.encode(false);
58: }
59:
60: @Override
61: public String configFilePath() {
62: return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
63: }
64:
65: /**
66: * 解码内容
67: * 格式:JSON
68: *
69: * @param jsonString 内容
70: */
71: @Override
72: public void decode(String jsonString) {
73: if (jsonString != null) {
74: ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
75: if (obj != null) {
76: this.offsetTable = obj.offsetTable;
77: }
78: }
79: }
80:
81: /**
82: * 编码内容
83: * 格式为JSON
84: *
85: * @param prettyFormat 是否格式化
86: * @return 编码后的内容
87: */
88: public String encode(final boolean prettyFormat) {
89: return RemotingSerializable.toJson(this, prettyFormat);
90: }
91:
92: }

  • 说明 :消费进度管理器。

6、Broker 提供[发回消息]接口

大部分逻辑和 Broker 提供[接收消息]接口 类似,可以先看下相关内容。

SendMessageProcessor#consumerSendMsgBack(...)

  1: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
2: throws RemotingCommandException {
3:
4: // 初始化响应
5: final RemotingCommand response = RemotingCommand.createResponseCommand(null);
6: final ConsumerSendMsgBackRequestHeader requestHeader =
7: (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
8:
9: // hook(独有)
10: if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
11:
12: ConsumeMessageContext context = new ConsumeMessageContext();
13: context.setConsumerGroup(requestHeader.getGroup());
14: context.setTopic(requestHeader.getOriginTopic());
15: context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
16: context.setCommercialRcvTimes(1);
17: context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
18:
19: this.executeConsumeMessageHookAfter(context);
20: }
21:
22: // 判断消费分组是否存在(独有)
23: SubscriptionGroupConfig subscriptionGroupConfig =
24: this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
25: if (null == subscriptionGroupConfig) {
26: response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
27: response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
28: + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
29: return response;
30: }
31:
32: // 检查 broker 是否有写入权限
33: if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
34: response.setCode(ResponseCode.NO_PERMISSION);
35: response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
36: return response;
37: }
38:
39: // 检查 重试队列数 是否大于0(独有)
40: if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
41: response.setCode(ResponseCode.SUCCESS);
42: response.setRemark(null);
43: return response;
44: }
45:
46: // 计算retry Topic
47: String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
48:
49: // 计算队列编号(独有)
50: int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
51:
52: // 计算sysFlag(独有)
53: int topicSysFlag = 0;
54: if (requestHeader.isUnitMode()) {
55: topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
56: }
57:
58: // 获取topicConfig。如果获取不到,则进行创建
59: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
60: newTopic, //
61: subscriptionGroupConfig.getRetryQueueNums(), //
62: PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
63: if (null == topicConfig) { // 没有配置
64: response.setCode(ResponseCode.SYSTEM_ERROR);
65: response.setRemark("topic[" + newTopic + "] not exist");
66: return response;
67: }
68: if (!PermName.isWriteable(topicConfig.getPerm())) { // 不允许写入
69: response.setCode(ResponseCode.NO_PERMISSION);
70: response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
71: return response;
72: }
73:
74: // 查询消息。若不存在,返回异常错误。(独有)
75: MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
76: if (null == msgExt) {
77: response.setCode(ResponseCode.SYSTEM_ERROR);
78: response.setRemark("look message by offset failed, " + requestHeader.getOffset());
79: return response;
80: }
81:
82: // 设置retryTopic到拓展属性(独有)
83: final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
84: if (null == retryTopic) {
85: MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
86: }
87:
88: // 设置消息不等待存储完成(独有) TODO 疑问:如果设置成不等待存储,broker设置成同步落盘,岂不是不能批量提交了?
89: msgExt.setWaitStoreMsgOK(false);
90:
91: // 处理 delayLevel(独有)。
92: int delayLevel = requestHeader.getDelayLevel();
93: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
94: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
95: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
96: }
97: if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
98: || delayLevel < 0) { // 如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入 死信队列(Dead Letter Queue)
99: newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
100: queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
101:
102: topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
103: DLQ_NUMS_PER_GROUP, //
104: PermName.PERM_WRITE, 0
105: );
106: if (null == topicConfig) {
107: response.setCode(ResponseCode.SYSTEM_ERROR);
108: response.setRemark("topic[" + newTopic + "] not exist");
109: return response;
110: }
111: } else {
112: if (0 == delayLevel) {
113: delayLevel = 3 + msgExt.getReconsumeTimes();
114: }
115: msgExt.setDelayTimeLevel(delayLevel);
116: }
117:
118: // 创建MessageExtBrokerInner
119: MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
120: msgInner.setTopic(newTopic);
121: msgInner.setBody(msgExt.getBody());
122: msgInner.setFlag(msgExt.getFlag());
123: MessageAccessor.setProperties(msgInner, msgExt.getProperties());
124: msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
125: msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
126: msgInner.setQueueId(queueIdInt);
127: msgInner.setSysFlag(msgExt.getSysFlag());
128: msgInner.setBornTimestamp(msgExt.getBornTimestamp());
129: msgInner.setBornHost(msgExt.getBornHost());
130: msgInner.setStoreHost(this.getStoreHost());
131: msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
132:
133: // 设置原始消息编号到拓展字段(独有)
134: String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
135: MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
136:
137: // 添加消息
138: PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
139: if (putMessageResult != null) {
140: switch (putMessageResult.getPutMessageStatus()) {
141: case PUT_OK:
142: String backTopic = msgExt.getTopic();
143: String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
144: if (correctTopic != null) {
145: backTopic = correctTopic;
146: }
147:
148: this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
149:
150: response.setCode(ResponseCode.SUCCESS);
151: response.setRemark(null);
152:
153: return response;
154: default:
155: break;
156: }
157:
158: response.setCode(ResponseCode.SYSTEM_ERROR);
159: response.setRemark(putMessageResult.getPutMessageStatus().name());
160: return response;
161: }
162:
163: response.setCode(ResponseCode.SYSTEM_ERROR);
164: response.setRemark("putMessageResult is null");
165: return response;
166: }

  • 说明 :当 Consumer 消费某条消息失败时,会调用该接口发回消息。Broker 会存储发回的消息。这样,下次 Consumer 拉取该消息,能够从 CommitLogConsumeQueue 顺序读取。
  • [x] 因为大多数逻辑和 Broker 接收普通消息 很相似,时候 TODO 标记成独有的逻辑。
  • 第 4 至 7 行 :初始化响应。
  • [x] 第 9 至 20 行 :Hook逻辑。
  • [x] 第22 至 30 行 :判断消费分组是否存在。
  • 第 32 至 37 行 :检查 Broker 是否有写入权限。
  • [x] 第 39 至 44 行 :检查重试队列数是否大于0。
  • 第 47 行 :计算 retry topic。
  • [x] 第 50 行 :随机分配队列编号,依赖 retryQueueNums
  • [x] 第 52 至 56 行 :计算 sysFlag
  • 第 58 至 72 行 :获取 TopicConfig。如果不存在,则创建。
  • [x] 第 74 至 80 行 :查询消息。若不存在,返回异常错误。
  • [x] 第 82 至 86 行 :设置 retryTopic 到消息拓展属性。
  • [x] 第 89 行 :设置消息不等待存储完成。
    • Broker 刷盘方式为同步,会导致同步落盘不能批量提交,这样会不会存在问题?有知道的同学麻烦告知下。😈。
  • [x] 第 91 至 116 行 :处理 delayLevel
  • 第 118 至 131 行 :创建 MessageExtBrokerInner
  • [x] 第 133 至 135 行 :设置原始消息编号到拓展属性。
  • 第 137 至 161 行 :添加消息。

7、结尾

知识星球

感谢同学们对本文的阅读、收藏、点赞。

😈如果解析存在问题或者表达误解的,表示抱歉。如果方便的话,可以一起沟通下。让我们来一场 1 :1 交流(搞基)。

再次表示十分感谢。

文章目录
  1. 1. 1、概述
  2. 2. 2、ConsumeQueue 结构
  3. 3. 3、ConsumeQueue 存储
    1. 3.1. ReputMessageService
      1. 3.1.1. DefaultMessageStore#doDispatch(...)
      2. 3.1.2. ConsumeQueue#putMessagePositionInfoWrapper(...)
    2. 3.2. FlushConsumeQueueService
  4. 4. 4、Broker 提供[拉取消息]接口
    1. 4.1. PullMessageRequestHeader
    2. 4.2. PullMessageProcessor#processRequest(...)
    3. 4.3. MessageStore#getMessage(...)
    4. 4.4. DefaultMessageFilter#isMessageMatched(...)
    5. 4.5. PullRequestHoldService
    6. 4.6. PullMessageProcessor#executeRequestWhenWakeup(...)
  5. 5. 5、Broker 提供[更新消费进度]接口
    1. 5.1. BrokerController#initialize(...)
    2. 5.2. ConfigManager
      1. 5.2.1. MixAll#string2File(...)
    3. 5.3. ConsumerOffsetManager
  6. 6. 6、Broker 提供[发回消息]接口
    1. 6.1. SendMessageProcessor#consumerSendMsgBack(...)
  7. 7. 7、结尾