⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 www.cnblogs.com/Finley/p/16400287.html 「finley」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

我们先看看以下业务场景:

  1. 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存?
  2. 新创建店铺,N天内没有上传商品,系统如何知道该信息,并发送激活短信?

上述场景最简单直接的解决方案是定时扫表。我们假设 10 分钟未支付则关闭订单、定时任务设置为 5 分钟一次,那么一个订单最晚会在 15 分钟关闭。高达 5 分钟的误差是业务难以接受的。另一方面频繁的扫表可能消耗过多数据库资源,影响线上交易吞吐量。

此外还有朋友使用 Redis 的过期通知、时间轮、Java 的 DelayQueue 等方式实现延时任务。我们在之前的文章中讨论过他们的缺陷:比如使用 Redis 过期通知不保证准时、发送即忘不保证送达,时间轮缺乏持久化机制容易丢失等。

总结一下,我们对于延时队列的要求有下列几条(从重要到不重要排列):

  1. 持久化: 服务重启或崩溃不能丢失任务
  2. 确认重试机制: 任务处理失败或超时应该有重试
  3. 定时尽量精确

最合适的解决方案是使用 Pulsa、RocketMQ 等专业消息队列的延时投递功能。不过引入新的中间件通常存在各种非技术方面的麻烦。Redis 作为广泛使用的中间件,何不用 Redis 来制作延时队列呢?

使用有序集合结构实现延时队列的方法已经广为人知,无非是将消息作为有序集合的 member 投递时间戳作为 score,使用 zrangebyscore 命令搜索已到投递时间的消息然后将其发给消费者。

除了基本的延时投递之外我们的消息队列具有下列优势:

  1. 提供 ACK 和重试机制
  2. 只需要 Redis 和消费者即可运行,无需其它组件
  3. 提供 At-Least-One 投递语义、并保证消息不会并发消费

本文的完整代码实现在hdt3213/delayqueue,可以直接 go get github.com/hdt3213/delayqueue 完成安装。

具体使用也非常简单,只需要注册处理消息的回调函数并调用 start() 即可:

package main

import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
)

func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
// 注册处理消息的回调函数
// 返回 true 表示已成功消费,返回 false 消息队列会重新投递次消息
return true
})
// 发送延时消息
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
}
}

// 启动消费协程
done := queue.StartConsume()
// 阻塞等待消费协程退出
<-done
}

由于数据存储在 redis 中所以我们最多能保证在 redis 无故障且消息队列相关 key 未被外部篡改的情况下不会丢失消息。

原理详解

消息队列涉及几个关键的 redis 数据结构:

  • msgKey: 为了避免两条内容完全相同的消息造成意外的影响,我们将每条消息放到一个字符串类型的键中,并分配一个 UUID 作为它的唯一标识。其它数据结构中只存储 UUID 而不存储完整的消息内容。每个 msg 拥有一个独立的 key 而不是将所有消息放到一个哈希表是为了利用 TTL 机制避免
  • pendingKey: 有序集合类型,member 为消息 ID, score 为投递时间的 unix 时间戳。
  • readyKey: 列表类型,需要投递的消息 ID。
  • unAckKey: 有序集合类型,member 为消息 ID, score 为重试时间的 unix 时间戳。
  • retryKey: 列表类型,已到重试时间的消息 ID
  • garbageKey: 集合类型,用于暂存已达重试上线的消息 ID
  • retryCountKey: 哈希表类型,键为消息 ID, 值为剩余的重试次数

流程如下图所示:

由于我们允许分布式地部署多个消费者,每个消费者都在定时执行 lua 脚本,所以多个消费者可能处于上述流程中不同状态,我们无法预知(或控制)上图中五个操作发生的先后顺序,也无法控制有多少实例正在执行同一个操作。

因此我们需要保证上图中五个操作满足三个条件:

  1. 都是原子性的
  2. 不会重复处理同一条消息
  3. 操作前后消息队列始终处于正确的状态

只要满足这三个条件,我们就可以部署多个实例且不需要使用分布式锁等技术来进行状态同步。

是不是听起来有点吓人?😂 其实简单的很,让我们一起来详细看看吧~

pending2ReadyScript

pending2ReadyScript 使用 zrangebyscore 扫描已到投递时间的消息ID并把它们移动到 ready 中:

-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 从 pending key 中找出已到投递时间的消息
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- 将他们放入 ready key 中
for _,v in ipairs(msgs) do
table.insert(args2, v)
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 从 pending key 中删除已投递的消息

ready2UnackScript

ready2UnackScript 从 ready 或者 retry 中取出一条消息发送给消费者并放入 unack 中,类似于 RPopLPush:

-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg

unack2RetryScript

unack2RetryScript 从 retry 中找出所有已到重试时间的消息并把它们移动到 unack 中:

-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 找到已到重试时间的消息
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查询剩余重试次数
for i,v in ipairs(retryCounts) do
local k = msgs[i]
if tonumber(v) > 0 then -- 剩余次数大于 0
redis.call("HIncrBy", KEYS[2], k, -1) -- 减少剩余重试次数
redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中
else -- 剩余重试次数为 0
redis.call("HDel", KEYS[2], k) -- 删除重试次数记录
redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后续删除
end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 将已处理的消息从 unack key 中删除

因为 redis 要求 lua 脚本必须在执行前在 KEYS 参数中声明自己要访问的 key, 而我们将每个 msg 有一个独立的 key,我们在执行 unack2RetryScript 之前是不知道哪些 msg key 需要被删除。所以 lua 脚本只将需要删除的消息记在 garbage key 中,脚本执行完后再通过 del 命令将他们删除:

func (q *DelayQueue) garbageCollect() error {
ctx := context.Background()
msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
if err != nil {
return fmt.Errorf("smembers failed: %v", err)
}
if len(msgIds) == 0 {
return nil
}
// allow concurrent clean
msgKeys := make([]string, 0, len(msgIds))
for _, idStr := range msgIds {
msgKeys = append(msgKeys, q.genMsgKey(idStr))
}
err = q.redisCli.Del(ctx, msgKeys...).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("del msgs failed: %v", err)
}
err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
if err != nil && err != redis.Nil {
return fmt.Errorf("remove from garbage key failed: %v", err)
}
return nil
}

之前提到的 lua 脚本都是原子性执行的,不会有其它命令插入其中。 gc 函数由 3 条 redis 命令组成,在执行过程中可能会有其它命令插入执行过程中,不过考虑到一条消息进入垃圾回收流程之后不会复活所以不需要保证 3 条命令原子性。

ack

ack 只需要将消息彻底删除即可:

func (q *DelayQueue) ack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
if err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
}
// msg key has ttl, ignore result of delete
_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx, q.retryCountKey, idStr)
return nil
}

否定确认只需要将 unack key 中消息的重试时间改为现在,随后执行的 unack2RetryScript 会立即将它移动到 retry key

func (q *DelayQueue) nack(idStr string) error {
ctx := context.Background()
// update retry time as now, unack2Retry will move it to retry immediately
err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
Member: idStr,
Score: float64(time.Now().Unix()),
}).Err()
if err != nil {
return fmt.Errorf("negative ack failed: %v", err)
}
return nil
}

consume

消息队列的核心逻辑是每秒执行一次的 consume 函数,它负责调用上述脚本将消息转移到正确的集合中并回调 consumer 来消费消息:

func (q *DelayQueue) consume() error {
// 执行 pending2ready,将已到时间的消息转移到 ready
err := q.pending2Ready()
if err != nil {
return err
}
// 循环调用 ready2Unack 拉取消息进行消费
var fetchCount uint
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
// 将 nack 或超时的消息放入重试队列
err = q.unack2Retry()
if err != nil {
return err
}
// 清理已达到最大重试次数的消息
err = q.garbageCollect()
if err != nil {
return err
}
// 消费重试队列
fetchCount = 0
for {
idStr, err := q.retry2Unack()
if err == redis.Nil { // consumed all
break
}
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
break
}
}
return nil
}

至此一个简单可靠的延时队列就做好了,何不赶紧开始试用呢😘😋?

文章目录
  1. 1. 原理详解
    1. 1.1. pending2ReadyScript
    2. 1.2. ready2UnackScript
    3. 1.3. unack2RetryScript
    4. 1.4. ack
    5. 1.5. consume