海量消息堆积MQ
一、提高消费者的消费能力 ,可以使用多线程消费任务
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(5); // 并发消费者数量
factory.setMaxConcurrentConsumers(10); // 最大并发消费者数量
return factory;
}
// 消费者示例
@RabbitListener(queues = "work.queue")
public void listen(String message) {
System.out.println("线程 " + Thread.currentThread().getName() + " 处理消息: " + message);
}
二、增加更多消费者,提高消费速度 ,使用工作队列模式,设置多个消费者消费消费同一个队列中的消息
# application.yml 配置多个消费者
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每个消费者每次只预取1条消息(公平分发)
// 消费者1
@RabbitListener(queues = "work.queue")
public void worker1(String message) {
System.out.println("Worker1 处理: " + message);
}
// 消费者2
@RabbitListener(queues = "work.queue")
public void worker2(String message) {
System.out.println("Worker2 处理: " + message);
}
三、扩大队列容积,提高堆积上限
@Bean
public Queue lazyQueue() {
return QueueBuilder
.durable("lazy.queue")
.lazy() // 启用惰性队列
.build();
}
可以使用RabbitMQ惰性队列,惰性队列的好处主要是
①接收到消息后直接存入磁盘而非内存
②消费者要消费消息时才会从磁盘中读取并加载到内存
③支持数百万条的消息存储