Appearance
RabbitMQ 消息的可靠投递和消费
1. 基础配置
为了保证消息在 Broker 中的持久性,需要将交换机、队列和消息设置为持久化:
声明交换机时,设置其为持久化:
JavaExchange exchange = new DirectExchange("yourExchangeName", true, false);在定义队列时,设置其为持久化:
JavaQueue queue = new Queue("yourQueueName", true);确保发送的消息也标记为持久化:
JavaMessageProperties messageProperties = MessagePropertiesBuilder .withBody("yourMessage".getBytes()) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build();
信息
其实默认情况下,通过代码声明/发送的交换机、队列、消息都是持久化的,除非明确将其设置为非持久化。
2. 生产端
2.1. application.yml 配置
YAML
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启确认模式:确认消息是否成功发送到交换机
publisher-returns: true # 开启退回模式:确认消息从交换机成功路由到队列
template:
mandatory: true配置说明:
spring.rabbitmq.publisher-confirm-type:是否开启 Publisher Confirm,可选值:none:关闭(默认,性能最高,无回调);simple:开启同步 Confirm,线程阻塞;correlated(推荐):开启异步 Confirm,通过ConfirmCallback异步接收回调。
spring.rabbitmq.publisher-returns:在 Spring AMQP 框架层面开启 Publisher Returns 机制;spring.rabbitmq.template.mandatory:RabbitMQ 协议层面,发送消息时加上mandatory标志位,如果消息无法路由到任何队列,Broker 会把消息退回给生产者。
备份交换机
替代 Publisher Returns 机制的另一种方案是使用备份交换机:
- 首先需要创建一个备份交换机,类型为
fanout类型(因为没有路由键); - 创建备份队列,并将其绑定到备份交换机;
- 删除原交换机并重建,重建时
alternate-exchange参数输入刚才新建的备份交换机的名称;
但推荐的方案还是使用生产端的退回模式进行处理。
生产者 ACK 时机及含义
只有当消息被所有匹配的 Queue 成功入队(包括持久化到磁盘,如果是持久消息 + 持久队列)时,Broker 才会向生产者发送 basic.ack。
对于不可路由的消息,RabbitMQ 照样会返回 basic.ack,而不是返回 basic.nack。如果消息设置了 mandatory = true,RabbitMQ 会在发送 basic.ack 之前,先发送一个 basic.return。
2.2. 注册回调函数
为每条消息做不同的确认/返回处理;
JavaCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback(confirm -> { // return 总是发生在 confirmation 之前,所以在 future 回调可以安全地读取 returned 信息 ReturnedMessage returned = correlationData.getReturned(); if (returned != null) { // 处理不可路由场景:记录/重发/落盘等 } else if (confirm.isAck()) { // 正常路由并被 broker 接受 } else { // nack 情况:记录/重试策略等 } }, ex -> { // 传输级故障 }); rabbitTemplate.convertAndSend("exchange", "key", "payload", correlationData);为
RabbitTemplate设置默认(全局)的确认/返回处理;Java@Configuration @Slf4j public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { // 消息成功发送到交换机 } else { // 消息发送失败,处理重发逻辑 } } @Override public void returnedMessage(ReturnedMessage returned) { // 处理退回的消息 String message = new String(returned.getMessage().getBody()); int replyCode = returned.getReplyCode(); String replyText = returned.getReplyText(); String exchange = returned.getExchange(); String routingKey = returned.getRoutingKey(); } }
提示
单条消息上的确认回调并不会覆盖 RabbitTemplate 上的默认(全局)回调,两者都会被照常调用。
3. 消费端
YAML
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto信息
有些文章会误将 spring.rabbitmq.listener.simple.prefetch = 1 解释为可靠消费的必要条件,但实际上 prefetch 与可靠消费并无直接关联。
spring.rabbitmq.listener.simple.acknowledge-mode 支持三种确认模式:
3.1. manual 手动 ACK
手动 ACK 消费端代码示例:
Java
@RabbitListener(queues = "yourQueueName")
public void receiveMessage(Message message, Channel channel) {
try {
// 消息处理逻辑
// ...
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 可以通过 redelivered 来判断该消息是否是之前重新投递的,以此来避免无限重复投递
// Boolean redelivered = message.getMessageProperties().getRedelivered();
// 如果处理失败,拒绝并重投消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}警告
NACK 后,消息会立即被重新投递,从而可能导致消息无限循环投递。需妥善处理,如绑定死信交换机等。
3.2. auto 自动 ACK(推荐)
自动 ACK,没有异常则返回 ACK,抛出异常则返回 NACK;
我们可以结合 Spring 的 Retry 机制,在消费端出现异常时先进行本地重试,在达到最大重试次数之后,再将失败消息投递到指定的交换机,交由人工处理。
application.yml 配置:
YAML
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
initial-interval: 1000ms
multiplier: 2
max-attempts: 3
stateless: true创建 MessageRecoverer 的接口实例,由它来处理重试次数耗尽的失败消息。默认提供三种实现:
RejectAndDontRequeueRecoverer:Reject,由 RabbitMQ 处理(如直接丢弃或转发到死信交换机);ImmediateRequeueMessageRecoverer:返回 NACK,并重新入队;RepublishMessageRecoverer:将失败消息投递到指定的交换机。
以下是一个 RepublishMessageRecoverer 的简单示例:
Java
@Bean
public DirectExchange errorExchange() {
return new DirectExchange(ERROR_EXCHANGE);
}
@Bean
public Queue errorQueue() {
return new Queue(ERROR_QUEUE);
}
@Bean
public Binding errorBinding(DirectExchange errorExchange, Queue errorQueue) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with(ERROR_ROUTING_KEY);
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, ERROR_EXCHANGE, ERROR_ROUTING_KEY);
}Auto ACK + Spring Retry + RepublishMessageRecoverer 方案,优点是不仅支持本地重试,而且当重试次数耗尽后,被投递到 Error Queue 的失败消息头部(x-exception-message、x-exception-stacktrace)还会有带有异常堆栈信息,极大方便了后续开发人员人工介入处理。
3.3. none 关闭 ACK
消息投递后被立即删除。