摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-store/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 RocketMQ 4.0.x 正式版


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1、概述

本文接《RocketMQ 源码分析 —— Message 发送与接收》
主要解析 CommitLog 存储消息部分。

2、CommitLog 结构

CommitLogMappedFileQueueMappedFile 的关系如下:

CommitLog、MappedFileQueue、MappedFile的关系
CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N。

反应到系统文件如下:

Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd
/Users/yunai/store/commitlog
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -l
total 10485760
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:27 00000000000000000000
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:29 00000000001073741824
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000002147483648
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:33 00000000003221225472
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000004294967296

CommitLogMappedFileQueueMappedFile 的定义如下:

  • MappedFile :00000000000000000000、00000000001073741824、00000000002147483648等文件。
  • MappedFileQueueMappedFile 所在的文件夹,对 MappedFile 进行封装成文件队列,对上层提供可无限使用的文件容量。
    • 每个 MappedFile 统一文件大小。
    • 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 CommitLog 里默认为 1GB。
  • CommitLog :针对 MappedFileQueue 的封装使用。

CommitLog 目前存储在 MappedFile 有两种内容类型:

  1. MESSAGE :消息。
  2. BLANK :文件不足以存储消息时的空白占位。

CommitLog 存储在 MappedFile的结构:

MESSAGE[1]MESSAGE[2]MESSAGE[n - 1]MESSAGE[n]BLANK

MESSAGECommitLog 存储结构:

第几位字段说明数据类型字节数
1MsgLen消息总长度Int4
2MagicCodeMESSAGE_MAGIC_CODEInt4
3BodyCRC消息内容CRCInt4
4QueueId消息队列编号Int4
5FlagflagInt4
6QueueOffset消息队列位置Long8
7PhysicalOffset物理位置。在 CommitLog 的顺序存储位置。Long8
8SysFlagMessageSysFlagInt4
9BornTimestamp生成消息时间戳Long8
10BornHost生效消息的地址+端口Long8
11StoreTimestamp存储消息时间戳Long8
12StoreHost存储消息的地址+端口Long8
13ReconsumeTimes重新消费消息次数Int4
14PreparedTransationOffsetLong8
15BodyLength + Body内容长度 + 内容Int + Bytes4 + bodyLength
16TopicLength + TopicTopic长度 + TopicByte + Bytes1 + topicLength
17PropertiesLength + Properties拓展字段长度 + 拓展字段Short + Bytes2 + PropertiesLength

BLANKCommitLog 存储结构:

第几位字段说明数据类型字节数
1maxBlank空白长度Int4
2MagicCodeBLANK_MAGIC_CODEInt4

3、CommitLog 存储消息

Broker存储发送消息顺序图

CommitLog#putMessage(…)

1: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
2: // Set the storage time
3: msg.setStoreTimestamp(System.currentTimeMillis());
4: // Set the message body BODY CRC (consider the most appropriate setting
5: // on the client)
6: msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
7: // Back to Results
8: AppendMessageResult result = null;
9:
10: StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
11:
12: String topic = msg.getTopic();
13: int queueId = msg.getQueueId();
14:
15: // 事务相关 TODO 待读:事务相关
16: final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
17: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
18: || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
19: // Delay Delivery
20: if (msg.getDelayTimeLevel() > 0) {
21: if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
22: msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
23: }
24:
25: topic = ScheduleMessageService.SCHEDULE_TOPIC;
26: queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
27:
28: // Backup real topic, queueId
29: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
30: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
31: msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
32:
33: msg.setTopic(topic);
34: msg.setQueueId(queueId);
35: }
36: }
37:
38: long eclipseTimeInLock = 0;
39:
40: // 获取写入映射文件
41: MappedFile unlockMappedFile = null;
42: MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
43:
44: // 获取写入锁
45: lockForPutMessage(); //spin...
46: try {
47: long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
48: this.beginTimeInLock = beginLockTimestamp;
49:
50: // Here settings are stored timestamp, in order to ensure an orderly
51: // global
52: msg.setStoreTimestamp(beginLockTimestamp);
53:
54: // 当不存在映射文件时,进行创建
55: if (null == mappedFile || mappedFile.isFull()) {
56: mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
57: }
58: if (null == mappedFile) {
59: log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
60: beginTimeInLock = 0;
61: return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
62: }
63:
64: // 存储消息
65: result = mappedFile.appendMessage(msg, this.appendMessageCallback);
66: switch (result.getStatus()) {
67: case PUT_OK:
68: break;
69: case END_OF_FILE: // 当文件尾时,获取新的映射文件,并进行插入
70: unlockMappedFile = mappedFile;
71: // Create a new file, re-write the message
72: mappedFile = this.mappedFileQueue.getLastMappedFile(0);
73: if (null == mappedFile) {
74: // XXX: warn and notify me
75: log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
76: beginTimeInLock = 0;
77: return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
78: }
79: result = mappedFile.appendMessage(msg, this.appendMessageCallback);
80: break;
81: case MESSAGE_SIZE_EXCEEDED:
82: case PROPERTIES_SIZE_EXCEEDED:
83: beginTimeInLock = 0;
84: return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
85: case UNKNOWN_ERROR:
86: beginTimeInLock = 0;
87: return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
88: default:
89: beginTimeInLock = 0;
90: return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
91: }
92:
93: eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
94: beginTimeInLock = 0;
95: } finally {
96: // 释放写入锁
97: releasePutMessageLock();
98: }
99:
100: if (eclipseTimeInLock > 500) {
101: log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
102: }
103:
104: //
105: if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
106: this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
107: }
108:
109: PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
110:
111: // Statistics
112: storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
113: storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
114:
115: // 进行同步||异步 flush||commit
116: GroupCommitRequest request = null;
117: // Synchronization flush
118: if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
119: final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
120: if (msg.isWaitStoreMsgOK()) {
121: request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
122: service.putRequest(request);
123: boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
124: if (!flushOK) {
125: log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
126: + " client address: " + msg.getBornHostString());
127: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
128: }
129: } else {
130: service.wakeup();
131: }
132: }
133: // Asynchronous flush
134: else {
135: if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
136: flushCommitLogService.wakeup(); // important:唤醒commitLog线程,进行flush
137: } else {
138: commitLogService.wakeup();
139: }
140: }
141:
142: // Synchronous write double 如果是同步Master,同步到从节点 // TODO 待读:数据同步
143: if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
144: HAService service = this.defaultMessageStore.getHaService();
145: if (msg.isWaitStoreMsgOK()) {
146: // Determine whether to wait
147: if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
148: if (null == request) {
149: request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
150: }
151: service.putRequest(request);
152:
153: service.getWaitNotifyObject().wakeupAll();
154:
155: boolean flushOK =
156: // TODO
157: request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
158: if (!flushOK) {
159: log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
160: + msg.getTags() + " client address: " + msg.getBornHostString());
161: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
162: }
163: }
164: // Slave problem
165: else {
166: // Tell the producer, slave not available
167: putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
168: }
169: }
170: }
171:
172: return putMessageResult;
173: }
  • 说明 :存储消息,并返回存储结果。
  • 第 2 行 :设置存储时间等。
  • 第 16 至 36 行 :事务消息相关,暂未了解。
  • 第 45 & 97 行 :获取锁与释放锁。
  • 第 52 行 :再次设置存储时间。目前会有多处地方设置存储时间。
  • 第 55 至 62 行 :获取 MappedFile,若不存在或已满,则进行创建。详细解析见:MappedFileQueue#getLastMappedFile(…)
  • 第 65 行 :插入消息MappedFile,解析解析见:MappedFile#appendMessage(…)
  • 第 69 至 80 行 :MappedFile 已满,创建新的,再次插入消息
  • 第 116 至 140 行 :消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService
  • 第 143 至 173 行 :Broker 主从同步。后面的文章会详细解析😈。

MappedFileQueue#getLastMappedFile(…)

1: public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
2: long createOffset = -1; // 创建文件开始offset。-1时,不创建
3: MappedFile mappedFileLast = getLastMappedFile();
4:
5: if (mappedFileLast == null) { // 一个映射文件都不存在
6: createOffset = startOffset - (startOffset % this.mappedFileSize);
7: }
8:
9: if (mappedFileLast != null && mappedFileLast.isFull()) { // 最后一个文件已满
10: createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
11: }
12:
13: if (createOffset != -1 && needCreate) { // 创建文件
14: String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
15: String nextNextFilePath = this.storePath + File.separator
16: + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
17: MappedFile mappedFile = null;
18:
19: if (this.allocateMappedFileService != null) {
20: mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
21: nextNextFilePath, this.mappedFileSize);
22: } else {
23: try {
24: mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
25: } catch (IOException e) {
26: log.error("create mappedFile exception", e);
27: }
28: }
29:
30: if (mappedFile != null) {
31: if (this.mappedFiles.isEmpty()) {
32: mappedFile.setFirstCreateInQueue(true);
33: }
34: this.mappedFiles.add(mappedFile);
35: }
36:
37: return mappedFile;
38: }
39:
40: return mappedFileLast;
41: }
  • 说明 :获取最后一个 MappedFile,若不存在或文件已满,则进行创建。
  • 第 5 至 11 行 :计算当文件不存在或已满时,新创建文件的 createOffset
  • 第 14 行 :计算文件名。从此处我们可
    以得知,MappedFile的文件命名规则:

    fileName[n] = fileName[n - 1] + n * mappedFileSize
    fileName[0] = startOffset - (startOffset % this.mappedFileSize)

    目前 CommitLogstartOffset 为 0。
    此处有个疑问,为什么需要 (startOffset % this.mappedFileSize)。例如:

    | startOffset | mappedFileSize | createOffset |
    | — | :– | :– |
    | 5 | 1 | 5 |
    | 5 | 2 | 4 |
    | 5 | 3 | 3 |
    | 5 | 4 | 4 |
    | 5 | > 5 | 0 |

    如果有知道的同学,麻烦提示下。😈
    解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 计算出来的是,以 this.mappedFileSize 为每个文件大小时,startOffset 所在文件的开始offset

  • 第 30 至 35 行 :设置 MappedFile是否是第一个创建的文件。该标识用于 ConsumeQueue 对应的 MappedFile ,详见 ConsumeQueue#fillPreBlank

MappedFile#appendMessage(…)

1: public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
2: assert msg != null;
3: assert cb != null;
4:
5: int currentPos = this.wrotePosition.get();
6:
7: if (currentPos < this.fileSize) {
8: ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
9: byteBuffer.position(currentPos);
10: AppendMessageResult result =
11: cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
12: this.wrotePosition.addAndGet(result.getWroteBytes());
13: this.storeTimestamp = result.getStoreTimestamp();
14: return result;
15: }
16:
17: log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: "
18: + this.fileSize);
19: return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
20: }
  • 说明 :插入消息MappedFile,并返回插入结果。
  • 第 8 行 :获取需要写入的字节缓冲区。为什么会有 writeBuffer != null 的判断后,使用不同的字节缓冲区,见:FlushCommitLogService
  • 第 9 至 11 行 :设置写入 position,执行写入,更新 wrotePosition(当前写入位置,下次开始写入开始位置)。

DefaultAppendMessageCallback#doAppend(…)

1: class DefaultAppendMessageCallback implements AppendMessageCallback {
2: // File at the end of the minimum fixed length empty
3: private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
4: /**
5: * 存储在内存中的消息编号字节Buffer
6: */
7: private final ByteBuffer msgIdMemory;
8: /**
9: * Store the message content
10: * 存储在内存中的消息字节Buffer
11: * 当消息传递到{@link #doAppend(long, ByteBuffer, int, MessageExtBrokerInner)}方法时,最终写到该参数
12: */
13: private final ByteBuffer msgStoreItemMemory;
14: /**
15: * The maximum length of the message
16: * 消息最大长度
17: */
18: private final int maxMessageSize;
19: /**
20: * Build Message Key
21: * {@link #topicQueueTable}的key
22: * 计算方式:topic + "-" + queueId
23: */
24: private final StringBuilder keyBuilder = new StringBuilder();
25: /**
26: * host字节buffer
27: * 用于重复计算host的字节内容
28: */
29: private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
30:
31: DefaultAppendMessageCallback(final int size) {
32: this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
33: this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);
34: this.maxMessageSize = size;
35: }
36:
37: public ByteBuffer getMsgStoreItemMemory() {
38: return msgStoreItemMemory;
39: }
40:
41: public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) {
42: // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
43:
44: // PHY OFFSET
45: long wroteOffset = fileFromOffset + byteBuffer.position();
46:
47: // 计算commitLog里的msgId
48: this.resetByteBuffer(hostHolder, 8);
49: String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
50:
51: // Record ConsumeQueue information 获取队列offset
52: keyBuilder.setLength(0);
53: keyBuilder.append(msgInner.getTopic());
54: keyBuilder.append('-');
55: keyBuilder.append(msgInner.getQueueId());
56: String key = keyBuilder.toString();
57: Long queueOffset = CommitLog.this.topicQueueTable.get(key);
58: if (null == queueOffset) {
59: queueOffset = 0L;
60: CommitLog.this.topicQueueTable.put(key, queueOffset);
61: }
62:
63: // Transaction messages that require special handling // TODO 疑问:用途
64: final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
65: switch (tranType) {
66: // Prepared and Rollback message is not consumed, will not enter the
67: // consumer queue
68: case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
69: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
70: queueOffset = 0L;
71: break;
72: case MessageSysFlag.TRANSACTION_NOT_TYPE:
73: case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
74: default:
75: break;
76: }
77:
78: // 计算消息长度
79: final byte[] propertiesData =
80: msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
81: final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
82: if (propertiesLength > Short.MAX_VALUE) {
83: log.warn("putMessage message properties length too long. length={}", propertiesData.length);
84: return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
85: }
86: final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
87: final int topicLength = topicData.length;
88: final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
89: final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
90: // Exceeds the maximum message
91: if (msgLen > this.maxMessageSize) {
92: CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
93: + ", maxMessageSize: " + this.maxMessageSize);
94: return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
95: }
96:
97: // Determines whether there is sufficient(足够) free space
98: if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
99: this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
100: // 1 TOTAL_SIZE
101: this.msgStoreItemMemory.putInt(maxBlank);
102: // 2 MAGIC_CODE
103: this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
104: // 3 The remaining space may be any value
105: //
106:
107: // Here the length of the specially set maxBlank
108: final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
109: byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
110: return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
111: queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
112: }
113:
114: // Initialization of storage space
115: this.resetByteBuffer(msgStoreItemMemory, msgLen);
116: // 1 TOTAL_SIZE
117: this.msgStoreItemMemory.putInt(msgLen);
118: // 2 MAGIC_CODE
119: this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
120: // 3 BODY_CRC
121: this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
122: // 4 QUEUE_ID
123: this.msgStoreItemMemory.putInt(msgInner.getQueueId());
124: // 5 FLAG
125: this.msgStoreItemMemory.putInt(msgInner.getFlag());
126: // 6 QUEUE_OFFSET
127: this.msgStoreItemMemory.putLong(queueOffset);
128: // 7 PHYSICAL_OFFSET
129: this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
130: // 8 SYS_FLAG
131: this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
132: // 9 BORN_TIMESTAMP
133: this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
134: // 10 BORN_HOST
135: this.resetByteBuffer(hostHolder, 8);
136: this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
137: // 11 STORE_TIMESTAMP
138: this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
139: // 12 STORE_HOST_ADDRESS
140: this.resetByteBuffer(hostHolder, 8);
141: this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
142: //this.msgStoreItemMemory.put(msgInner.getStoreHostBytes());
143: // 13 RECONSUME_TIMES
144: this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
145: // 14 Prepared Transaction Offset
146: this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
147: // 15 BODY
148: this.msgStoreItemMemory.putInt(bodyLength);
149: if (bodyLength > 0)
150: this.msgStoreItemMemory.put(msgInner.getBody());
151: // 16 TOPIC
152: this.msgStoreItemMemory.put((byte) topicLength);
153: this.msgStoreItemMemory.put(topicData);
154: // 17 PROPERTIES
155: this.msgStoreItemMemory.putShort((short) propertiesLength);
156: if (propertiesLength > 0)
157: this.msgStoreItemMemory.put(propertiesData);
158:
159: final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
160: // Write messages to the queue buffer
161: byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
162:
163: AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
164: msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
165:
166: switch (tranType) {
167: case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
168: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
169: break;
170: case MessageSysFlag.TRANSACTION_NOT_TYPE:
171: case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
172: // The next update ConsumeQueue information 更新队列的offset
173: CommitLog.this.topicQueueTable.put(key, ++queueOffset);
174: break;
175: default:
176: break;
177: }
178: return result;
179: }
180:
181: /**
182: * 重置字节缓冲区
183: *
184: * @param byteBuffer 字节缓冲区
185: * @param limit 长度
186: */
187: private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
188: byteBuffer.flip();
189: byteBuffer.limit(limit);
190: }
191: }
  • 说明 :插入消息到字节缓冲区。
  • 第 45 行 :计算物理位置。在 CommitLog 的顺序存储位置。
  • 第 47 至 49 行 :计算 CommitLog 里的 offsetMsgId。这里一定要和 msgId 区分开。
计算方式长度
offsetMsgIdBroker存储时生成Hex(storeHostBytes, wroteOffset)32
msgIdClient发送消息时生成Hex(进程编号, IP, ClassLoader, startTime, currentTime, 自增序列)32《RocketMQ 源码分析 —— Message 基础》
  • 第 51 至 61 行 :获取队列位置(offset)。
  • 第 78 至 95 行 :计算消息总长度。
  • 第 98 至 112 行 :当文件剩余空间不足时,写入 BLANK 占位,返回结果。
  • 第 114 至 161 行 :写入 MESSAGE
  • 第 173 行 :更新队列位置(offset)。

FlushCommitLogService

FlushCommitLogService类图

线程服务场景插入消息性能
CommitRealTimeService异步刷盘 && 开启内存字节缓冲区第一
FlushRealTimeService异步刷盘 && 关闭内存字节缓冲区第二
GroupCommitService同步刷盘第三

MappedFile#落盘

方式
方式一写入内存字节缓冲区(writeBuffer)从内存字节缓冲区(write buffer)提交(commit)到文件通道(fileChannel)文件通道(fileChannel)flush
方式二写入映射文件字节缓冲区(mappedByteBuffer)映射文件字节缓冲区(mappedByteBuffer)flush

MappedFile的position迁移图

flush相关代码

考虑到写入性能,满足 flushLeastPages * OS_PAGE_SIZE 才进行 flush

1: /**
2: * flush
3: *
4: * @param flushLeastPages flush最小页数
5: * @return The current flushed position
6: */
7: public int flush(final int flushLeastPages) {
8: if (this.isAbleToFlush(flushLeastPages)) {
9: if (this.hold()) {
10: int value = getReadPosition();
11:
12: try {
13: //We only append data to fileChannel or mappedByteBuffer, never both.
14: if (writeBuffer != null || this.fileChannel.position() != 0) {
15: this.fileChannel.force(false);
16: } else {
17: this.mappedByteBuffer.force();
18: }
19: } catch (Throwable e) {
20: log.error("Error occurred when force data to disk.", e);
21: }
22:
23: this.flushedPosition.set(value);
24: this.release();
25: } else {
26: log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
27: this.flushedPosition.set(getReadPosition());
28: }
29: }
30: return this.getFlushedPosition();
31: }
32:
33: /**
34: * 是否能够flush。满足如下条件任意条件:
35: * 1. 映射文件已经写满
36: * 2. flushLeastPages > 0 && 未flush部分超过flushLeastPages
37: * 3. flushLeastPages = 0 && 有新写入部分
38: *
39: * @param flushLeastPages flush最小分页
40: * @return 是否能够写入
41: */
42: private boolean isAbleToFlush(final int flushLeastPages) {
43: int flush = this.flushedPosition.get();
44: int write = getReadPosition();
45:
46: if (this.isFull()) {
47: return true;
48: }
49:
50: if (flushLeastPages > 0) {
51: return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
52: }
53:
54: return write > flush;
55: }

commit相关代码:

考虑到写入性能,满足 commitLeastPages * OS_PAGE_SIZE 才进行 commit

1: /**
2: * commit
3: * 当{@link #writeBuffer}为null时,直接返回{@link #wrotePosition}
4: *
5: * @param commitLeastPages commit最小页数
6: * @return 当前commit位置
7: */
8: public int commit(final int commitLeastPages) {
9: if (writeBuffer == null) {
10: //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
11: return this.wrotePosition.get();
12: }
13: if (this.isAbleToCommit(commitLeastPages)) {
14: if (this.hold()) {
15: commit0(commitLeastPages);
16: this.release();
17: } else {
18: log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
19: }
20: }
21:
22: // All dirty data has been committed to FileChannel. 写到文件尾时,回收writeBuffer。
23: if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
24: this.transientStorePool.returnBuffer(writeBuffer);
25: this.writeBuffer = null;
26: }
27:
28: return this.committedPosition.get();
29: }
30:
31: /**
32: * commit实现,将writeBuffer写入fileChannel。
33: * @param commitLeastPages commit最小页数。用不上该参数
34: */
35: protected void commit0(final int commitLeastPages) {
36: int writePos = this.wrotePosition.get();
37: int lastCommittedPosition = this.committedPosition.get();
38:
39: if (writePos - this.committedPosition.get() > 0) {
40: try {
41: // 设置需要写入的byteBuffer
42: ByteBuffer byteBuffer = writeBuffer.slice();
43: byteBuffer.position(lastCommittedPosition);
44: byteBuffer.limit(writePos);
45: // 写入fileChannel
46: this.fileChannel.position(lastCommittedPosition);
47: this.fileChannel.write(byteBuffer);
48: // 设置position
49: this.committedPosition.set(writePos);
50: } catch (Throwable e) {
51: log.error("Error occurred when commit data to FileChannel.", e);
52: }
53: }
54: }
55:
56: /**
57: * 是否能够commit。满足如下条件任意条件:
58: * 1. 映射文件已经写满
59: * 2. commitLeastPages > 0 && 未commit部分超过commitLeastPages
60: * 3. commitLeastPages = 0 && 有新写入部分
61: *
62: * @param commitLeastPages commit最小分页
63: * @return 是否能够写入
64: */
65: protected boolean isAbleToCommit(final int commitLeastPages) {
66: int flush = this.committedPosition.get();
67: int write = this.wrotePosition.get();
68:
69: if (this.isFull()) {
70: return true;
71: }
72:
73: if (commitLeastPages > 0) {
74: return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
75: }
76:
77: return write > flush;
78: }

FlushRealTimeService

消息插入成功时,异步刷盘时使用。

1: class FlushRealTimeService extends FlushCommitLogService {
2: /**
3: * 最后flush时间戳
4: */
5: private long lastFlushTimestamp = 0;
6: /**
7: * print计时器。
8: * 满足print次数时,调用{@link #printFlushProgress()}
9: */
10: private long printTimes = 0;
11:
12: public void run() {
13: CommitLog.log.info(this.getServiceName() + " service started");
14:
15: while (!this.isStopped()) {
16: boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
17: int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
18: int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
19: int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
20:
21: // Print flush progress
22: // 当时间满足flushPhysicQueueThoroughInterval时,即使写入的数量不足flushPhysicQueueLeastPages,也进行flush
23: boolean printFlushProgress = false;
24: long currentTimeMillis = System.currentTimeMillis();
25: if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
26: this.lastFlushTimestamp = currentTimeMillis;
27: flushPhysicQueueLeastPages = 0;
28: printFlushProgress = (printTimes++ % 10) == 0;
29: }
30:
31: try {
32: // 等待执行
33: if (flushCommitLogTimed) {
34: Thread.sleep(interval);
35: } else {
36: this.waitForRunning(interval);
37: }
38:
39: if (printFlushProgress) {
40: this.printFlushProgress();
41: }
42:
43: // flush commitLog
44: long begin = System.currentTimeMillis();
45: CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
46: long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
47: if (storeTimestamp > 0) {
48: CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
49: }
50: long past = System.currentTimeMillis() - begin;
51: if (past > 500) {
52: log.info("Flush data to disk costs {} ms", past);
53: }
54: } catch (Throwable e) {
55: CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
56: this.printFlushProgress();
57: }
58: }
59:
60: // Normal shutdown, to ensure that all the flush before exit
61: boolean result = false;
62: for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
63: result = CommitLog.this.mappedFileQueue.flush(0);
64: CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
65: }
66:
67: this.printFlushProgress();
68:
69: CommitLog.log.info(this.getServiceName() + " service end");
70: }
71:
72: @Override
73: public String getServiceName() {
74: return FlushRealTimeService.class.getSimpleName();
75: }
76:
77: private void printFlushProgress() {
78: // CommitLog.log.info("how much disk fall behind memory, "
79: // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
80: }
81:
82: @Override
83: @SuppressWarnings("SpellCheckingInspection")
84: public long getJointime() {
85: return 1000 * 60 * 5;
86: }
87: }
  • 说明:实时 flush线程服务,调用 MappedFile#flush 相关逻辑。
  • 第 23 至 29 行 :每 flushPhysicQueueThoroughInterval 周期,执行一次 flush 。因为不是每次循环到都能满足 flushCommitLogLeastPages 大小,因此,需要一定周期进行一次强制 flush 。当然,不能每次循环都去执行强制 flush,这样性能较差。
  • 第 33 行 至 37 行 :根据 flushCommitLogTimed 参数,可以选择每次循环是固定周期还是等待唤醒。默认配置是后者,所以,每次插入消息完成,会去调用 commitLogService.wakeup()
  • 第 45 行 :调用 MappedFile 进行 flush
  • 第 61 至 65 行 :Broker 关闭时,强制 flush,避免有未刷盘的数据。

CommitRealTimeService

消息插入成功时,异步刷盘时使用。
FlushRealTimeService 类似,性能更好。

1: class CommitRealTimeService extends FlushCommitLogService {
2:
3: /**
4: * 最后 commit 时间戳
5: */
6: private long lastCommitTimestamp = 0;
7:
8: @Override
9: public String getServiceName() {
10: return CommitRealTimeService.class.getSimpleName();
11: }
12:
13: @Override
14: public void run() {
15: CommitLog.log.info(this.getServiceName() + " service started");
16: while (!this.isStopped()) {
17: int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
18: int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
19: int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
20:
21: // 当时间满足commitDataThoroughInterval时,即使写入的数量不足commitDataLeastPages,也进行flush
22: long begin = System.currentTimeMillis();
23: if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
24: this.lastCommitTimestamp = begin;
25: commitDataLeastPages = 0;
26: }
27:
28: try {
29: // commit
30: boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
31: long end = System.currentTimeMillis();
32: if (!result) { // TODO 疑问:未写入成功,为啥要唤醒flushCommitLogService
33: this.lastCommitTimestamp = end; // result = false means some data committed.
34: //now wake up flush thread.
35: flushCommitLogService.wakeup();
36: }
37:
38: if (end - begin > 500) {
39: log.info("Commit data to file costs {} ms", end - begin);
40: }
41:
42: // 等待执行
43: this.waitForRunning(interval);
44: } catch (Throwable e) {
45: CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
46: }
47: }
48:
49: boolean result = false;
50: for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
51: result = CommitLog.this.mappedFileQueue.commit(0);
52: CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
53: }
54: CommitLog.log.info(this.getServiceName() + " service end");
55: }
56: }

GroupCommitService

消息插入成功时,同步刷盘时使用。

1: class GroupCommitService extends FlushCommitLogService {
2: /**
3: * 写入请求队列
4: */
5: private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<>();
6: /**
7: * 读取请求队列
8: */
9: private volatile List<GroupCommitRequest> requestsRead = new ArrayList<>();
10:
11: /**
12: * 添加写入请求
13: *
14: * @param request 写入请求
15: */
16: public synchronized void putRequest(final GroupCommitRequest request) {
17: // 添加写入请求
18: synchronized (this.requestsWrite) {
19: this.requestsWrite.add(request);
20: }
21: // 切换读写队列
22: if (hasNotified.compareAndSet(false, true)) {
23: waitPoint.countDown(); // notify
24: }
25: }
26:
27: /**
28: * 切换读写队列
29: */
30: private void swapRequests() {
31: List<GroupCommitRequest> tmp = this.requestsWrite;
32: this.requestsWrite = this.requestsRead;
33: this.requestsRead = tmp;
34: }
35:
36: private void doCommit() {
37: synchronized (this.requestsRead) {
38: if (!this.requestsRead.isEmpty()) {
39: for (GroupCommitRequest req : this.requestsRead) {
40: // There may be a message in the next file, so a maximum of
41: // two times the flush (可能批量提交的messages,分布在两个MappedFile)
42: boolean flushOK = false;
43: for (int i = 0; i < 2 && !flushOK; i++) {
44: // 是否满足需要flush条件,即请求的offset超过flush的offset
45: flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
46: if (!flushOK) {
47: CommitLog.this.mappedFileQueue.flush(0);
48: }
49: }
50: // 唤醒等待请求
51: req.wakeupCustomer(flushOK);
52: }
53:
54: long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
55: if (storeTimestamp > 0) {
56: CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
57: }
58:
59: // 清理读取队列
60: this.requestsRead.clear();
61: } else {
62: // Because of individual messages is set to not sync flush, it
63: // will come to this process 不合法的请求,比如message上未设置isWaitStoreMsgOK。
64: // 走到此处的逻辑,相当于发送一条消息,落盘一条消息,实际无批量提交的效果。
65: CommitLog.this.mappedFileQueue.flush(0);
66: }
67: }
68: }
69:
70: public void run() {
71: CommitLog.log.info(this.getServiceName() + " service started");
72:
73: while (!this.isStopped()) {
74: try {
75: this.waitForRunning(10);
76: this.doCommit();
77: } catch (Exception e) {
78: CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
79: }
80: }
81:
82: // Under normal circumstances shutdown, wait for the arrival of the
83: // request, and then flush
84: try {
85: Thread.sleep(10);
86: } catch (InterruptedException e) {
87: CommitLog.log.warn("GroupCommitService Exception, ", e);
88: }
89:
90: synchronized (this) {
91: this.swapRequests();
92: }
93:
94: this.doCommit();
95:
96: CommitLog.log.info(this.getServiceName() + " service end");
97: }
98:
99: /**
100: * 每次执行完,切换读写队列
101: */
102: @Override
103: protected void onWaitEnd() {
104: this.swapRequests();
105: }
106:
107: @Override
108: public String getServiceName() {
109: return GroupCommitService.class.getSimpleName();
110: }
111:
112: @Override
113: public long getJointime() {
114: return 1000 * 60 * 5;
115: }
116: }
  • 说明:批量写入线程服务。
  • 第 16 至 25 行 :添加写入请求。方法设置了 sync 的原因:this.requestsWrite 会和 this.requestsRead 不断交换,无法保证稳定的同步。
  • 第 27 至 34 行 :读写队列交换。
  • 第 38 至 60 行 :循环写入队列,进行 flush
    • 第 43 行 :考虑到有可能每次循环的消息写入的消息,可能分布在两个 MappedFile(写第N个消息时,MappedFile 已满,创建了一个新的),所以需要有循环2次。
    • 第 51 行 :唤醒等待写入请求线程,通过 CountDownLatch 实现。
  • 第 61 至 66 行 :直接刷盘。此处是由于发送的消息的 isWaitStoreMsgOK 未设置成 TRUE ,导致未走批量提交。
  • 第 73 至 80 行 :每 10ms 执行一次批量提交。当然,如果 wakeup() 时,则会立即进行一次批量提交。当 Broker 设置成同步落盘 && 消息 isWaitStoreMsgOK=true,消息需要略大于 10ms 才能发送成功。当然,性能相对异步落盘较差,可靠性更高,需要我们在实际使用时去取舍。

结尾

写的第二篇与RocketMQ源码相关的博文,看到有阅读、点赞、收藏甚至订阅,很受鼓舞。

《Message存储》比起《Message发送&接收》从难度上说是更大的,当然也是更有趣的,如果存在理解错误或者表达不清晰,还请大家多多包含。如果可以的话,还请麻烦添加 QQ:7685413 进行指出,避免自己的理解错误,给大家造成困扰。

推荐《Kafka设计解析(六)- Kafka高性能架构之道》,作者站在的高度比我高的多的多,嗯,按照李小璐的说法:高一个喜马拉雅山。😈认真啃读《Linux内核设计与实现(原书第3版)》,day day up。

再次感谢大家的阅读、点赞、收藏。

下一篇:《RocketMQ 源码分析 —— Message 拉取与消费》 起航!

文章目录
  1. 1. 1、概述
  2. 2. 2、CommitLog 结构
  3. 3. 3、CommitLog 存储消息
    1. 3.1. CommitLog#putMessage(…)
    2. 3.2. MappedFileQueue#getLastMappedFile(…)
    3. 3.3. MappedFile#appendMessage(…)
    4. 3.4. DefaultAppendMessageCallback#doAppend(…)
    5. 3.5. FlushCommitLogService
      1. 3.5.1. MappedFile#落盘
      2. 3.5.2. FlushRealTimeService
      3. 3.5.3. CommitRealTimeService
      4. 3.5.4. GroupCommitService
  4. 4. 结尾