Appearance
RabbitMQ 客户端限流
1. Prefetch 介绍
Prefetch Count 指的是单个消费者在 RabbitMQ 中最多可以同时接收的未确认消息的数量。这个数量决定了 RabbitMQ 在向消费者发送新消息之前,必须等待消费者确认(acknowledge)已经处理的消息。
2. 功能与作用
流量控制:通过设置 prefetch count,可以有效控制消息的流量,避免消费者因处理能力有限而被过多的消息淹没。例如,如果设置为 10,消费者最多可以同时处理 10 条未确认的消息。
提高效率:合理设置 prefetch count 能够提高系统的整体吞吐量。若设置过高,可能导致某些消费者处理较慢,从而影响其他消费者的工作;若设置过低,则可能导致资源浪费和处理延迟。
3. 配置方式
在 RabbitMQ 中,prefetch count 通过
basic.qos信令进行设置。该信令是在建立 channel 后发送的,可以针对每个 channel 进行配置。示例代码:
Javachannel.basicQos(10); // 设置 prefetch count 为 10可以在 Spring Boot 的配置文件
application.yml中设置prefetch值,示例如下:YAMLspring: rabbitmq: host: localhost port: 5672 username: guest password: guest listener: simple: concurrency: 1 # 最小并发消费者数 max-concurrency: 10 # 最大并发消费者数 prefetch: 10 # 设置预取数量为 101
2
3
4
5
6
7
8
9
10
11如果需要在特定的消费者方法中覆盖全局配置,可以直接在
@RabbitListener注解中设置prefetch:Java@Component public class RabbitMQConsumer { @RabbitListener(queues = "prefetch_queue", concurrency = "3", prefetch = "1") public void receiveMessage(String message) { // 处理消息逻辑 } }1
2
3
4
5
6
7自定义
RabbitListenerContainerFactory:Java@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(10); factory.setPrefetchCount(10); // 设置预取数量 return factory; }1
2
3
4
5
6
7
8
9
4. 实际应用
最佳实践:根据 RabbitMQ 官方建议,prefetch count 通常设置为 30,以达到较好的性能平衡。在高延迟或低吞吐量情况下,可以适当调整该值以优化性能。
自动确认与手动确认:prefetch count 仅在手动确认模式下生效。在自动确认模式下,RabbitMQ 会立即将消息发送给消费者,而不考虑其处理能力。