⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 blog.ahao.moe/posts/Two_implementations_of_the_RabbitMQ_delay_queue.html 「ahao」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

前言

RabbitMQ是没有延迟队列, 但是我们可以通过TTL和死信队列间接来实现.

  1. Message指定TTL后放入队列中.
  2. 等超时后, Message放入死信队列.
  3. 死信队列将Message转发到目标队列.

很麻烦. 幸运的是, RabbitMQ官方提供了一个rabbitmq-delayed-message-exchange延迟消息插件. 本文基于Spring Boot AMQP来操作.

使用官方延迟插件 rabbitmq-delayed-message-exchange

要求版本 >= 3.5.8. GitHub地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange 下载地址: https://www.rabbitmq.com/community-plugins.html

我这里用的是3.6.x的版本.

# 1. 下载 plugin
cd /opt/
wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

# 2. 移动到 plugins 文件夹内, 不同操作系统 plugins 位置不同
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10/plugins/
cp /opt/rabbitmq_delayed_message_exchange-20171215-3.6.x.ez ./

# 3. 启动延时插件
rabbitmq-plugins list | grep delayed
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后再声明一个延迟交换机Exchange.

@Configuration
@EnableRabbit
public class RabbitMQConfig {
public static final String DELAY_EXCHANGE_NAME = "ahao_delayed_exchange";
@Bean(DELAY_EXCHANGE_NAME)
public Exchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
}

然后我们需要将Queue队列绑定到交换机上

@Configuration
@EnableRabbit
public class RabbitMQConfig {
public static final String QUEUE_NAME = "ahao_queue";
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME);
}

@Bean
public Binding binding(Queue queue, CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME).noargs();
}
}

绑定后, 就可以直接发送消息了.

@Service
public class RabbitService {
public static final String DELAY_EXCHANGE_NAME = "ahao_delayed_exchange";

@Autowired
private RabbitTemplate rabbitTemplate;

public void doSendDelay(String queueName, Object data, long delayMilliSeconds) throws IllegalArgumentException {
if(delayMilliSeconds > 0xffffffffL) {
throw new IllegalArgumentException("超时过长, 只支持 < 4294967296 的延时值");
}

CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, queueName, data, message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.getHeaders().put("x-delay", delayMilliSeconds);
return message;
}, correlationId);
}
}

坑点1 延时最长为 2^32-1 毫秒

根据官方文档来看, 本插件的延时时长最长为2^32-1毫秒, 也就是0xffffffff毫秒. 换算一下, 大约是49天. 如果超过2^32-1毫秒, 那么延时值就会溢出, 也就是会立即消费.

Issue#122也有提到. 这应该是Erlang本身的限制.

In Erlang a timer can be set up to (2^32)-1 milliseconds in the future

坑点2 队列需要和延时 Exchange 绑定

之前以为指定了x-delayed-typedirect, 就可以不用绑定Queue到这个延时Exchange交换机上. 结果发的消息接收不到, 还是需要绑定一下.

使用原生死信队列实现延时队列

原生方法就是利用死信队列.

  1. Message指定TTL后放入队列中.
  2. 等超时后, Message放入死信队列.
  3. 死信队列将Message转发到目标队列.

我们先设计下消息流转流程图 消息流转流程图

[消息流转流程图](https://yuml.me/diagram/nofunky;dir:LR/activity/(start)

-Msg>(delay_exchange)-fanout>(delay_queue)-dead>(biz_exchange)->[key1]->(biz_queue1)->(consumer),[key2]->(biz_queue2)->(consumer)->(end))

  1. 用户发送带着RoutingKeybiz_queue1的一条消息到延时交换机delay_exchange上(注意, 这个延时交换机就是一个普通交换机).
  2. 延时交换机delay_exchange将消息fanout到队列delay_queue, 这个队列配置了一堆死信参数.
  3. 等待消息在delay_queue超时, 然后将消息转发到该队列的死信交换机biz_exchange上.
  4. 因为delay_queue没有指定x-dead-letter-routing-key, 所以使用的还是原来的biz_queue1. 路由到biz_queue1队列上.
  5. 延时消费成功.

设计完毕开始编码实战. 我们需要初始化交换机Exchange和队列Queue

@Configuration
@EnableRabbit
public class RabbitMQConfig {
@Bean
public Exchange delayExchange() {
return new FanoutExchange("delay_exchange", true, false, null);
}
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", bizExchange().getName()); // 声明死信交换机
// args.put("x-dead-letter-routing-key", ""); // 声明死信路由键
args.put("x-message-ttl", 10000); // 所有消息的默认超时时间
return new Queue("delay_queue", true, false, false, args);
}
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(delayQueue().getName()).noargs();
}

@Bean
public Exchange bizExchange() {
return new DirectExchange("biz_exchange", true, false, null);
}
@Bean
public Queue bizQueue1() {
return new Queue("biz_queue1", true, false, false, null);
}
@Bean
public Binding bizBinding1() {
return BindingBuilder.bind(bizQueue1()).to(bizExchange()).with(bizQueue1().getName()).noargs();
}
@Bean
public Queue bizQueue2() {
return new Queue("biz_queue2", true, false, false, null);
}
@Bean
public Binding bizBinding2() {
return BindingBuilder.bind(bizQueue2()).to(bizExchange()).with(bizQueue2().getName()).noargs();
}
}

然后写一个单元测试, 我用的Junit5

@Service
public class DirectConsumer {
public static final String QUEUE_NAME = "biz_queue1";
public static Object value;
@RabbitListener(queuesToDeclare = @Queue(QUEUE_NAME))
@RabbitHandler
public void directQueue(String msg) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息接收时间:"+sdf.format(new Date()));
System.out.println("接收到的消息:"+msg);
Thread.sleep(1000);
value = msg;
}
}

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@ContextConfiguration(classes = {RabbitMQConfig.class, RabbitAutoConfiguration.class, SpringContextHolder.class, DirectConsumer.class})
public class DirectProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DirectConsumer consumer;
@Test
public void send() throws Exception {
Assert.assertNotNull(rabbitTemplate);
Assert.assertNotNull(consumer);

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息发送时间:" + sdf.format(new Date()));
String msg = "send()";

rabbitTemplate.convertAndSend("delay_exchange", DirectConsumer.QUEUE_NAME, msg, message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setExpiration("3000");
return message;
});

Assert.assertNull(DirectConsumer.value);
Thread.sleep(5000);
Assert.assertEquals(msg, DirectConsumer.value);
}
}

我们可以给队列设置x-message-ttl, 也可以给每条消息设置expiration, RabbitMQ会取两者最小值作为消息过期时间.

用死信队列来实现延迟队列, 只要套多几个死信队列, 就可以绕过官方延时插件的只能延时2^32-1毫秒的bug. 但是和官方延时插件一样, 还是得每个队列都绑定到延时交换机上.

并且! 推荐给队列设置x-message-ttl, 而不是给消息设置expiration.

坑点 同一队列的延时时长不一样导致消息阻塞

我们先看下面这个单元测试, 比起上面那个单元测试, 就是连续发送了两条消息.

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@ContextConfiguration(classes = {RabbitMQConfig.class, RabbitAutoConfiguration.class, SpringContextHolder.class, DirectConsumer.class})
public class DirectProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DirectConsumer consumer;
@Test
public void sendFailure() throws Exception {
Assert.assertNotNull(rabbitTemplate);
Assert.assertNotNull(consumer);

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息发送时间:" + sdf.format(new Date()));

String msg1 = "sendFailure(1)";
rabbitTemplate.convertAndSend("delay_exchange", DirectConsumer.QUEUE_NAME, msg1, message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setExpiration("1000000");
return message;
});
String msg2 = "sendFailure(2)";
rabbitTemplate.convertAndSend("delay_exchange", DirectConsumer.QUEUE_NAME, msg2, message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setExpiration("3000");
return message;
});

Assert.assertNull(DirectConsumer.value);
Thread.sleep(5000);
Assert.assertNull(msg, DirectConsumer.value);
}
}

执行后可以发现, 5000毫秒后, 消费者仍然不能接受到sendFailure(2)这条消息. 因为消息队列是先进先出的, 当第一条消息没有被消费, 后面的消息也会阻塞不能消费.

所以推荐还是使用给队列设置x-message-ttl的形式来设置延时时长. 当然, 官方延时插件就没这个问题了.

总结

使用官方插件

  • 优点:
    1. 使用简单
    2. 不会出现因为前一条消息没有消费, 导致后面的消息阻塞的情况
  • 缺点:
    1. 延时时长不能超过2^32-1毫秒, 大约49天.

使用原生死信队列

  • 优点:
    1. 使用死信队列套死信队列, 可以突破2^32-1毫秒的官方插件限制.
  • 缺点:
    1. 实现复杂.
    2. 如果给每条消息设置expiration, 则前一条消息会阻塞后一条消息.

然后我写了个工具类RabbitMQHelper可以拿来用下.

参考资料

文章目录
  1. 1. 前言
  2. 2. 使用官方延迟插件 rabbitmq-delayed-message-exchange
    1. 2.1. 坑点1 延时最长为 2^32-1 毫秒
    2. 2.2. 坑点2 队列需要和延时 Exchange 绑定
  3. 3. 使用原生死信队列实现延时队列
    1. 3.1. 坑点 同一队列的延时时长不一样导致消息阻塞
  4. 4. 总结
  5. 5. 参考资料