摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-schedule-and-retry/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 RocketMQ 4.0.x 正式版


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

建议前置阅读内容:

😈 为什么把定时消息消息重试放在一起?你猜。
👻 你猜我猜不猜。

2. 定时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

下图是定时消息的处理逻辑图:

定时消息逻辑图.png

2.1 延迟级别

RocketMQ 目前只支持固定精度的定时消息。官方说法如下:

如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

  • 延迟级别:
延迟级别时间
11s
25s
310s
430s
51m
62m
73m
84m
95m
106m
117m
128m
139m
1410m
1520m
1630m
171h
182h
  • 核心源码如下:

    1: // ⬇️⬇️⬇️【MessageStoreConfig.java】
    2: /**
    3: * 消息延迟级别字符串配置
    4: */
    5: private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    6:
    7: // ⬇️⬇️⬇️【ScheduleMessageService.java】
    8: /**
    9: * 解析延迟级别
    10: *
    11: * @return 是否解析成功
    12: */
    13: public boolean parseDelayLevel() {
    14: HashMap<String, Long> timeUnitTable = new HashMap<>();
    15: timeUnitTable.put("s", 1000L);
    16: timeUnitTable.put("m", 1000L * 60);
    17: timeUnitTable.put("h", 1000L * 60 * 60);
    18: timeUnitTable.put("d", 1000L * 60 * 60 * 24);
    19:
    20: String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    21: try {
    22: String[] levelArray = levelString.split(" ");
    23: for (int i = 0; i < levelArray.length; i++) {
    24: String value = levelArray[i];
    25: String ch = value.substring(value.length() - 1);
    26: Long tu = timeUnitTable.get(ch);
    27:
    28: int level = i + 1;
    29: if (level > this.maxDelayLevel) {
    30: this.maxDelayLevel = level;
    31: }
    32: long num = Long.parseLong(value.substring(0, value.length() - 1));
    33: long delayTimeMillis = tu * num;
    34: this.delayLevelTable.put(level, delayTimeMillis);
    35: }
    36: } catch (Exception e) {
    37: log.error("parseDelayLevel exception", e);
    38: log.info("levelString String = {}", levelString);
    39: return false;
    40: }
    41:
    42: return true;
    43: }

2.2 Producer 发送定时消息

  • 🦅发送时,设置消息的延迟级别
Message msg = new Message(...);
msg.setDelayTimeLevel(level);

2.3 Broker 存储定时消息

  • 🦅 存储消息时,延迟消息进入 TopicSCHEDULE_TOPIC_XXXX
  • 🦅 延迟级别 与 消息队列编号 做固定映射:QueueId = DelayLevel - 1

核心代码如下:

1: // ⬇️⬇️⬇️【CommitLog.java】
2: /**
3: * 添加消息,返回消息结果
4: *
5: * @param msg 消息
6: * @return 结果
7: */
8: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
9: // ....(省略代码)
10:
11: // 定时消息处理
12: final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
13: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
14: || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
15: // Delay Delivery
16: if (msg.getDelayTimeLevel() > 0) {
17: if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
18: msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
19: }
20:
21: // 存储消息时,延迟消息进入 `Topic` 为 `SCHEDULE_TOPIC_XXXX` 。
22: topic = ScheduleMessageService.SCHEDULE_TOPIC;
23:
24: // 延迟级别 与 消息队列编号 做固定映射
25: queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
26:
27: // Backup real topic, queueId
28: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
29: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
30: msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
31:
32: msg.setTopic(topic);
33: msg.setQueueId(queueId);
34: }
35: }
36:
37: // ....(省略代码)
38: }
39:
40: // ⬇️⬇️⬇️【ScheduleMessageService.java】
41: /**
42: * 根据 延迟级别 计算 消息队列编号
43: * QueueId = DelayLevel - 1
44: *
45: * @param delayLevel 延迟级别
46: * @return 消息队列编号
47: */
48: public static int delayLevel2QueueId(final int delayLevel) {
49: return delayLevel - 1;
50: }

  • 🦅 生成 ConsumeQueue 时,每条消息的 tagsCode 使用【消息计划消费时间】。这样,ScheduleMessageService 在轮询 ConsumeQueue 时,可以使用 tagsCode 进行过滤。

核心代码如下:

1: // ⬇️⬇️⬇️【CommitLog.java】
2: /**
3: * check the message and returns the message size
4: *
5: * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
6: */
7: public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
8: try {
9: // // ....(省略代码)
10:
11: // 17 properties
12: short propertiesLength = byteBuffer.getShort();
13: if (propertiesLength > 0) {
14: // ....(省略代码)
15: String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
16: if (tags != null && tags.length() > 0) {
17: tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
18: }
19:
20: // Timing message processing
21: {
22: String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
23: if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
24: int delayLevel = Integer.parseInt(t);
25:
26: if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
27: delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
28: }
29:
30: if (delayLevel > 0) {
31: tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
32: storeTimestamp);
33: }
34: }
35: }
36: }
37:
38: // ....(省略代码)
39:
40: return new DispatchRequest(//
41: topic, // 1
42: queueId, // 2
43: physicOffset, // 3
44: totalSize, // 4
45: tagsCode, // 5
46: storeTimestamp, // 6
47: queueOffset, // 7
48: keys, // 8
49: uniqKey, //9
50: sysFlag, // 9
51: preparedTransactionOffset// 10
52: );
53: } catch (Exception e) {
54: }
55:
56: return new DispatchRequest(-1, false /* success */);
57: }
58:
59: // ⬇️⬇️⬇️【ScheduleMessageService.java】
60: /**
61: * 计算 投递时间【计划消费时间】
62: *
63: * @param delayLevel 延迟级别
64: * @param storeTimestamp 存储时间
65: * @return 投递时间【计划消费时间】
66: */
67: public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
68: Long time = this.delayLevelTable.get(delayLevel);
69: if (time != null) {
70: return time + storeTimestamp;
71: }
72:
73: return storeTimestamp + 1000;
74: }

2.4 Broker 发送定时消息

  • 🦅 对 SCHEDULE_TOPIC_XXXX 每条消费队列对应单独一个定时任务进行轮询,发送 到达投递时间【计划消费时间】 的消息。

下图是发送定时消息的处理逻辑图:

定时消息定时逻辑

实现代码如下:

1: /**
2: * ⬇️⬇️⬇️ 发送(投递)延迟消息定时任务
3: */
4: class DeliverDelayedMessageTimerTask extends TimerTask {
5: /**
6: * 延迟级别
7: */
8: private final int delayLevel;
9: /**
10: * 位置
11: */
12: private final long offset;
13:
14: public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
15: this.delayLevel = delayLevel;
16: this.offset = offset;
17: }
18:
19: @Override
20: public void run() {
21: try {
22: this.executeOnTimeup();
23: } catch (Exception e) {
24: // XXX: warn and notify me
25: log.error("ScheduleMessageService, executeOnTimeup exception", e);
26: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
27: this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
28: }
29: }
30:
31: /**
32: * 纠正可投递时间。
33: * 因为发送级别对应的发送间隔可以调整,如果超过当前间隔,则修正成当前配置,避免后面的消息无法发送。
34: *
35: * @param now 当前时间
36: * @param deliverTimestamp 投递时间
37: * @return 纠正结果
38: */
39: private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
40: long result = deliverTimestamp;
41:
42: long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
43: if (deliverTimestamp > maxTimestamp) {
44: result = now;
45: }
46:
47: return result;
48: }
49:
50: public void executeOnTimeup() {
51: ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
52:
53: long failScheduleOffset = offset;
54:
55: if (cq != null) {
56: SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
57: if (bufferCQ != null) {
58: try {
59: long nextOffset = offset;
60: int i = 0;
61: for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
62: long offsetPy = bufferCQ.getByteBuffer().getLong();
63: int sizePy = bufferCQ.getByteBuffer().getInt();
64: long tagsCode = bufferCQ.getByteBuffer().getLong();
65:
66: long now = System.currentTimeMillis();
67: long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
68:
69: nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
70:
71: long countdown = deliverTimestamp - now;
72:
73: if (countdown <= 0) { // 消息到达可发送时间
74: MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
75: if (msgExt != null) {
76: try {
77: // 发送消息
78: MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
79: PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
80: if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功
81: continue;
82: } else { // 发送失败
83: // XXX: warn and notify me
84: log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId());
85:
86: // 安排下一次任务
87: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);
88:
89: // 更新进度
90: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
91: return;
92: }
93: } catch (Exception e) {
94: // XXX: warn and notify me
95: log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
96: + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);
97: }
98: }
99: } else {
100: // 安排下一次任务
101: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
102:
103: // 更新进度
104: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
105: return;
106: }
107: } // end of for
108:
109: nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
110:
111: // 安排下一次任务
112: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
113:
114: // 更新进度
115: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
116: return;
117: } finally {
118: bufferCQ.release();
119: }
120: } // end of if (bufferCQ != null)
121: else { // 消费队列已经被删除部分,跳转到最小的消费进度
122: long cqMinOffset = cq.getMinOffsetInQueue();
123: if (offset < cqMinOffset) {
124: failScheduleOffset = cqMinOffset;
125: log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
126: + cqMinOffset + ", queueId=" + cq.getQueueId());
127: }
128: }
129: } // end of if (cq != null)
130:
131: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
132: }
133:
134: /**
135: * 设置消息内容
136: *
137: * @param msgExt 消息
138: * @return 消息
139: */
140: private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
141: MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
142: msgInner.setBody(msgExt.getBody());
143: msgInner.setFlag(msgExt.getFlag());
144: MessageAccessor.setProperties(msgInner, msgExt.getProperties());
145:
146: TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
147: long tagsCodeValue =
148: MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
149: msgInner.setTagsCode(tagsCodeValue);
150: msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
151:
152: msgInner.setSysFlag(msgExt.getSysFlag());
153: msgInner.setBornTimestamp(msgExt.getBornTimestamp());
154: msgInner.setBornHost(msgExt.getBornHost());
155: msgInner.setStoreHost(msgExt.getStoreHost());
156: msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
157:
158: msgInner.setWaitStoreMsgOK(false);
159: MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
160:
161: msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
162:
163: String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
164: int queueId = Integer.parseInt(queueIdStr);
165: msgInner.setQueueId(queueId);
166:
167: return msgInner;
168: }
169: }

2.5 Broker 持久化定时发送进度

  • 🦅 定时消息发送进度存储在文件(../config/delayOffset.json)里
  • 🦅 每 10s 定时持久化发送进度。

核心代码如下:

1: // ⬇️⬇️⬇️【ScheduleMessageService.java】
2: /**
3: public void start() {
4: // 定时发送消息
5: for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
6: Integer level = entry.getKey();
7: Long timeDelay = entry.getValue();
8: Long offset = this.offsetTable.get(level);
9: if (null == offset) {
10: offset = 0L;
11: }
12:
13: if (timeDelay != null) {
14: this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
15: }
16: }
17:
18: // 定时持久化发送进度
19: this.timer.scheduleAtFixedRate(new TimerTask() {
20:
21: @Override
22: public void run() {
23: try {
24: ScheduleMessageService.this.persist();
25: } catch (Exception e) {
26: log.error("scheduleAtFixedRate flush exception", e);
27: }
28: }
29: }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
30: }

3. 消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。

  • 🦅 Consumer 将消费失败的消息发回 Broker,进入延迟消息队列。即,消费失败的消息,不会立即消费。

核心代码如下:

1: // ⬇️⬇️⬇️【SendMessageProcessor.java】
2: /**
3: * 消费者发回消息
4: *
5: * @param ctx ctx
6: * @param request 请求
7: * @return 响应
8: * @throws RemotingCommandException 当远程调用异常
9: */
10: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
11: throws RemotingCommandException {
12: // ....(省略代码)
13: // 处理 delayLevel(独有)。
14: int delayLevel = requestHeader.getDelayLevel();
15: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
16: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
17: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
18: }
19: if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
20: // ....(省略代码)
21: } else {
22: if (0 == delayLevel) {
23: delayLevel = 3 + msgExt.getReconsumeTimes();
24: }
25: msgExt.setDelayTimeLevel(delayLevel);
26: }
27:
28: // ....(省略代码)
29: return response;
30: }
文章目录
  1. 1. 1. 概述
  2. 2. 2. 定时消息
    1. 2.1. 2.1 延迟级别
    2. 2.2. 2.2 Producer 发送定时消息
    3. 2.3. 2.3 Broker 存储定时消息
    4. 2.4. 2.4 Broker 发送定时消息
    5. 2.5. 2.5 Broker 持久化定时发送进度
  3. 3. 3. 消息重试