RabbitMQ延迟队列

11

延迟队列就是用到了死信交换机和TTL(消息存活时间)实现的。

如果消息超时未消费就会变成死信,在RabbitMQ中如果消息成为死信,队列可以绑定一个死信交换机,在死信交换机上可以绑定其他队列,在我们发消息的时候可以按照需求指定TTL的时间,这样就实现了延迟队列的功能了。

@Bean
public Queue ttlQueue() {
    return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
            .ttl(10000) // 设置队列的超时时间,10秒
            .deadLetterExchange("dl.direct") // 指定死信交换机
            .build();
}

这里的TTL一个是消息本身设置的存活时间,还有一个是消息所在的队列设置了存活时间。以谁的时间短为准。

// 创建消息
Message message = MessageBuilder
        .withBody("hello,ttl message".getBytes(StandardCharsets.UTF_8))
        .setExpiration("5000")
        .build();
​
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
​
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);

主要包含:

  1. 消息内容及编码

  2. 消息过期时间

  3. 消息ID

  4. 发给ttl.direct的路由键ttl

延迟队列插件

RabbitMQ还有一种方式可以实现延迟队列,在RabbitMQ中安装一个死信插件,这样更方便一些,我们只需要在声明交互机的时候,指定这个就是死信交换机,然后在发送消息的时候直接指定超时时间就行了,相对于死信交换机+TTL要省略了一些步骤

@Component
public class DelayedQueueConsumer {
    //使用@RabbitListener注解配置延迟队列监听
    @RabbitListener(bindings = @QueueBinding(   
        //通过@QueueBinding绑定队列、交换机和路由键
        value = @Queue(name = "delay.queue", durable = "true"),//队列配置为持久化(durable = "true")
        exchange = @Exchange(name = "delay.direct", delayed = "true"),//交换机启用延迟功能(delayed = "true")
        key = "delay"
    ))
    public void listenDelayedQueue(String msg) {
        log.info("接收到delay.queue的延迟消息:{}", msg);
    }
}