⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 blog.csdn.net/qq330983778/article/details/99341671 「大·风」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

设计

之前学习Redis的时候发现有赞团队之前分享过一篇关于延时队列的设计:有赞延时队列 现在就尝试实现一下

业务流程

首先我们分析下这个流程

  1. 用户提交任务。首先将任务推送至延迟队列中。
  2. 延迟队列接收到任务后,首先将任务推送至job pool中,然后计算其执行时间。
  3. 然后生成延迟任务(仅仅包含任务id)放入某个桶中
  4. 时间组件时刻轮询各个桶,当时间到达的时候从job pool中获得任务元信息。
  5. 监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间
  6. 如果合法则计算时间,如果时间合法:根据topic将任务放入对应的ready queue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容
  7. 消费端轮询对应topic的ready queue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。
  8. 完成消费后,发送finish消息,服务端根据job id删除对应信息。

对象

我们现在可以了解到中间存在的几个组件

  1. 延迟队列,为Redis延迟队列。实现消息传递
  2. Job pool 任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为job
  3. Delay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入
  4. Timer 时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个Bucket
  5. Ready Queue 负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个Ready Queue。

其中Timer负责轮询,Job pool、Delay Bucket、Ready Queue都是不同职责的集合。

任务状态

  • ready:可执行状态,
  • delay:不可执行状态,等待时钟周期。
  • reserved:已被消费者读取,但没有完成消费。
  • deleted:已被消费完成或者已被删除。

对外提供的接口

接口 描述 数据
add 添加任务 Job数据
pop 取出待处理任务 topic就是任务分组
finish 完成任务 任务ID
delete 删除任务 任务ID

额外的内容

  1. 首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。
  2. 根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。
  3. 文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。
  4. 文章中因为使用了集群,所以使用redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。

实现

现在我们根据设计内容完成设计。这一块设计我们分四步完成

任务及相关对象

目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象(delay job)

任务对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Job implements Serializable {

/**
* 延迟任务的唯一标识,用于检索任务
*/
@JsonSerialize(using = ToStringSerializer.class)
private Long id;

/**
* 任务类型(具体业务类型)
*/
private String topic;

/**
* 任务的延迟时间
*/
private long delayTime;

/**
* 任务的执行超时时间
*/
private long ttrTime;

/**
* 任务具体的消息内容,用于处理具体业务逻辑用
*/
private String message;

/**
* 重试次数
*/
private int retryCount;
/**
* 任务状态
*/
private JobStatus status;
}

任务引用对象

@Data
@AllArgsConstructor
public class DelayJob implements Serializable {


/**
* 延迟任务的唯一标识
*/
private long jodId;

/**
* 任务的执行时间
*/
private long delayDate;

/**
* 任务类型(具体业务类型)
*/
private String topic;


public DelayJob(Job job) {
this.jodId = job.getId();
this.delayDate = System.currentTimeMillis() + job.getDelayTime();
this.topic = job.getTopic();
}

public DelayJob(Object value, Double score) {
this.jodId = Long.parseLong(String.valueOf(value));
this.delayDate = System.currentTimeMillis() + score.longValue();
}
}

容器

目前我们需要完成三个容器的创建,Job任务池、延迟任务容器、待完成任务容器

job任务池,为普通的K/V结构,提供基础的操作

@Component
@Slf4j
public class JobPool {

@Autowired
private RedisTemplate redisTemplate;

private String NAME = "job.pool";

private BoundHashOperations getPool () {
BoundHashOperations ops = redisTemplate.boundHashOps(NAME);
return ops;
}

/**
* 添加任务
* @param job
*/
public void addJob (Job job) {
log.info("任务池添加任务:{}", JSON.toJSONString(job));
getPool().put(job.getId(),job);
return ;
}

/**
* 获得任务
* @param jobId
* @return
*/
public Job getJob(Long jobId) {
Object o = getPool().get(jobId);
if (o instanceof Job) {
return (Job) o;
}
return null;
}

/**
* 移除任务
* @param jobId
*/
public void removeDelayJob (Long jobId) {
log.info("任务池移除任务:{}",jobId);
// 移除任务
getPool().delete(jobId);
}
}

延迟任务,使用可排序的ZSet保存数据,提供取出最小值等操作

@Slf4j
@Component
public class DelayBucket {

@Autowired
private RedisTemplate redisTemplate;

private static AtomicInteger index = new AtomicInteger(0);

@Value("${thread.size}")
private int bucketsSize;

private List <String> bucketNames = new ArrayList <>();

@Bean
public List <String> createBuckets() {
for (int i = 0; i < bucketsSize; i++) {
bucketNames.add("bucket" + i);
}
return bucketNames;
}

/**
* 获得桶的名称
* @return
*/
private String getThisBucketName() {
int thisIndex = index.addAndGet(1);
int i1 = thisIndex % bucketsSize;
return bucketNames.get(i1);
}

/**
* 获得桶集合
* @param bucketName
* @return
*/
private BoundZSetOperations getBucket(String bucketName) {
return redisTemplate.boundZSetOps(bucketName);
}

/**
* 放入延时任务
* @param job
*/
public void addDelayJob(DelayJob job) {
log.info("添加延迟任务:{}", JSON.toJSONString(job));
String thisBucketName = getThisBucketName();
BoundZSetOperations bucket = getBucket(thisBucketName);
bucket.add(job,job.getDelayDate());
}

/**
* 获得最新的延期任务
* @return
*/
public DelayJob getFirstDelayTime(Integer index) {
String name = bucketNames.get(index);
BoundZSetOperations bucket = getBucket(name);
Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);
if (CollectionUtils.isEmpty(set)) {
return null;
}
ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];
Object value = typedTuple.getValue();
if (value instanceof DelayJob) {
return (DelayJob) value;
}
return null;
}

/**
* 移除延时任务
* @param index
* @param delayJob
*/
public void removeDelayTime(Integer index,DelayJob delayJob) {
String name = bucketNames.get(index);
BoundZSetOperations bucket = getBucket(name);
bucket.remove(delayJob);
}

}

待完成任务,内部使用topic进行细分,每个topic对应一个list集合

@Component
@Slf4j
public class ReadyQueue {

@Autowired
private RedisTemplate redisTemplate;

private String NAME = "process.queue";

private String getKey(String topic) {
return NAME + topic;
}

/**
* 获得队列
* @param topic
* @return
*/
private BoundListOperations getQueue (String topic) {
BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));
return ops;
}

/**
* 设置任务
* @param delayJob
*/
public void pushJob(DelayJob delayJob) {
log.info("执行队列添加任务:{}",delayJob);
BoundListOperations listOperations = getQueue(delayJob.getTopic());
listOperations.leftPush(delayJob);
}

/**
* 移除并获得任务
* @param topic
* @return
*/
public DelayJob popJob(String topic) {
BoundListOperations listOperations = getQueue(topic);
Object o = listOperations.leftPop();
if (o instanceof DelayJob) {
log.info("执行队列取出任务:{}", JSON.toJSONString((DelayJob) o));
return (DelayJob) o;
}
return null;
}

}

轮询处理

设置了线程池为每个bucket设置一个轮询操作

@Component
public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {

@Autowired
private DelayBucket delayBucket;
@Autowired
private JobPool jobPool;
@Autowired
private ReadyQueue readyQueue;

@Value("${thread.size}")
private int length;

@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
ExecutorService executorService = new ThreadPoolExecutor(
length,
length,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue <Runnable>());

for (int i = 0; i < length; i++) {
executorService.execute(
new DelayJobHandler(
delayBucket,
jobPool,
readyQueue,
i));
}

}
}

测试请求

/**
* 测试用请求
* @author daify
* @date 2019-07-29 10:26
**/
@RestController
@RequestMapping("delay")
public class DelayController {

@Autowired
private JobService jobService;
/**
* 添加
* @param request
* @return
*/
@RequestMapping(value = "add",method = RequestMethod.POST)
public String addDefJob(Job request) {
DelayJob delayJob = jobService.addDefJob(request);
return JSON.toJSONString(delayJob);
}

/**
* 获取
* @return
*/
@RequestMapping(value = "pop",method = RequestMethod.GET)
public String getProcessJob(String topic) {
Job process = jobService.getProcessJob(topic);
return JSON.toJSONString(process);
}

/**
* 完成一个执行的任务
* @param jobId
* @return
*/
@RequestMapping(value = "finish",method = RequestMethod.DELETE)
public String finishJob(Long jobId) {
jobService.finishJob(jobId);
return "success";
}

@RequestMapping(value = "delete",method = RequestMethod.DELETE)
public String deleteJob(Long jobId) {
jobService.deleteJob(jobId);
return "success";
}

}

测试

添加延迟任务

通过postman请求:localhost:8000/delay/add

此时这条延时任务被添加进了线程池中

2019-08-12 21:21:36.589  INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}
2019-08-12 21:21:36.609 INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}

根据设置10秒钟之后任务会被添加至ReadyQueue中

2019-08-12 21:21:46.744  INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue     : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)

获得任务

这时候我们请求localhost:8000/delay/pop

这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在DelayBucket中

2019-08-09 19:36:02.342  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue     : 执行队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}
2019-08-09 19:36:02.364 INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000}
2019-08-09 19:36:02.384 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}

按照设计在30秒后,任务假如没有被消费将会重新放置在ReadyQueue中

2019-08-12 21:21:48.239  INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue     : 执行队列取出任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
2019-08-12 21:21:48.261 INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}

任务的删除/消费

现在我们请求:localhost:8000/delay/delete

此时在Job pool中此任务将会被移除,此时元数据已经不存在,但任务还在DelayBucket中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。

2019-08-12 21:21:54.880  INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool  : 任务池移除任务:3
2019-08-12 21:21:59.104 INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler : 移除不存在任务:{"delayDate":1565616118261,"jodId":3,"topic":"test"}

源码地址

首先点击右下方在看,再长按下二维码关注哦,并后台回复【p045】即可获得使用示例 GitHub。

二维码

文章目录
  1. 1. 设计
    1. 1.0.1. 业务流程
    2. 1.0.2. 对象
    3. 1.0.3. 任务状态
    4. 1.0.4. 对外提供的接口
    5. 1.0.5. 额外的内容
  • 2. 实现
    1. 2.0.1. 任务及相关对象
    2. 2.0.2. 容器
    3. 2.0.3. 轮询处理
    4. 2.0.4. 测试请求
  • 3. 测试
  • 4. 源码地址