RabbitMQ延迟队列
延迟队列就是用到了死信交换机和TTL(消息存活时间)实现的。
如果消息超时未消费就会变成死信,在RabbitMQ中如果消息成为死信,队列可以绑定一个死信交换机,在死信交换机上可以绑定其他队列,在我们发消息的时候可以按照需求指定TTL的时间,这样就实现了延迟队列的功能了。
@Bean
public Queue ttlQueue() {
return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
.ttl(10000) // 设置队列的超时时间,10秒
.deadLetterExchange("dl.direct") // 指定死信交换机
.build();
}
这里的TTL一个是消息本身设置的存活时间,还有一个是消息所在的队列设置了存活时间。以谁的时间短为准。
// 创建消息
Message message = MessageBuilder
.withBody("hello,ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
主要包含:
消息内容及编码
消息过期时间
消息ID
发给ttl.direct的路由键ttl
延迟队列插件
RabbitMQ还有一种方式可以实现延迟队列,在RabbitMQ中安装一个死信插件,这样更方便一些,我们只需要在声明交互机的时候,指定这个就是死信交换机,然后在发送消息的时候直接指定超时时间就行了,相对于死信交换机+TTL要省略了一些步骤
@Component
public class DelayedQueueConsumer {
//使用@RabbitListener注解配置延迟队列监听
@RabbitListener(bindings = @QueueBinding(
//通过@QueueBinding绑定队列、交换机和路由键
value = @Queue(name = "delay.queue", durable = "true"),//队列配置为持久化(durable = "true")
exchange = @Exchange(name = "delay.direct", delayed = "true"),//交换机启用延迟功能(delayed = "true")
key = "delay"
))
public void listenDelayedQueue(String msg) {
log.info("接收到delay.queue的延迟消息:{}", msg);
}
}