海量消息堆积MQ

9

一、提高消费者的消费能力 ,可以使用多线程消费任务

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(5);  // 并发消费者数量
    factory.setMaxConcurrentConsumers(10); // 最大并发消费者数量
    return factory;
}
​
// 消费者示例
@RabbitListener(queues = "work.queue")
public void listen(String message) {
    System.out.println("线程 " + Thread.currentThread().getName() + " 处理消息: " + message);
}

二、增加更多消费者,提高消费速度 ,使用工作队列模式,设置多个消费者消费消费同一个队列中的消息

# application.yml 配置多个消费者
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1  # 每个消费者每次只预取1条消息(公平分发)
// 消费者1
@RabbitListener(queues = "work.queue")
public void worker1(String message) {
    System.out.println("Worker1 处理: " + message);
}
​
// 消费者2
@RabbitListener(queues = "work.queue")
public void worker2(String message) {
    System.out.println("Worker2 处理: " + message);
}

三、扩大队列容积,提高堆积上限

@Bean
public Queue lazyQueue() {
    return QueueBuilder
            .durable("lazy.queue")
            .lazy()  // 启用惰性队列
            .build();
}

可以使用RabbitMQ惰性队列,惰性队列的好处主要是

①接收到消息后直接存入磁盘而非内存

②消费者要消费消息时才会从磁盘中读取并加载到内存

③支持数百万条的消息存储

方案

优点

缺点

适用场景

多线程消费

单消费者即可并行处理

线程管理复杂

CPU密集型任务

工作队列模式

横向扩展简单

需要部署多个消费者实例

消费者可水平扩展的场景

惰性队列

支持海量消息堆积

吞吐量降低(磁盘IO瓶颈)

消息堆积严重的场景