Appearance
RabbitMQ 死信
1. 什么是死信消息?
RabbitMQ 中的死信(Dead Letter)是指在消息队列中无法被正常消费的消息。这些消息会被转发到一个特殊的死信队列(Dead Letter Queue, DLQ)。使用死信队列可以帮助我们监控、分析和处理无法正常消费的消息。
死信消息会被转发到死信队列的几种情况:
- 消息被拒绝(rejected):消费者明确使用
reject或nack且设置requeue=false; - 消息过期(TTL 到期):消息设置了 TTL(Time-To-Live),超过存活时间未被消费;
- 队列满了:队列设置了最大长度,超出限制时多余的消息变为死信;
2. 配置死信队列
声明一个死信交换机;
声明一个死信队列,并绑定到死信交换机;
声明主队列,并绑定死信交换机和死信路由键;
- 死信交换机(DLX, Dead Letter Exchange)
- 死信路由键(DLK, Dead Letter Routing Key)
| 参数名 | 说明 |
|---|---|
x-dead-letter-exchange | 指定死信交换机 |
x-dead-letter-routing-key | 指定死信路由键 |
x-message-ttl | 设置消息的存活时间(毫秒) |
x-max-length | 设置队列的最大消息数,超出部分变死信 |
2.1. 测试流程
- 向主队列发送一条消息;
- 消费者消费主队列,手动拒绝消息(使用
reject或nack); - 消息被路由到死信队列;
- 通过死信队列消费者监控该消息;
3. 代码示例
下面是基于 Spring Boot 的 Java 实现,展示如何配置和使用死信队列(DLQ)。代码包括配置类和生产者、消费者示例。
3.1. 引入依赖
XML
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>3.2. RabbitMQ 配置类
创建一个 Spring Boot 配置类来声明交换机、队列和绑定。
Java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
public static final String DEAD_ROUTING_KEY = "dead_routing_key";
public static final String MAIN_EXCHANGE = "main_exchange";
public static final String MAIN_QUEUE = "main_queue";
public static final String MAIN_ROUTING_KEY = "main_routing_key";
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_ROUTING_KEY);
}
@Bean
public DirectExchange mainExchange() {
return new DirectExchange(MAIN_EXCHANGE);
}
@Bean
public Queue mainQueue() {
return QueueBuilder.durable(MAIN_QUEUE)
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE) // 指定死信交换机
.withArgument("x-dead-letter-routing-key", DEAD_ROUTING_KEY) // 指定死信路由键
.withArgument("x-message-ttl", 10000) // 可选:设置消息的 TTL(单位:毫秒)
.build();
}
@Bean
public Binding mainQueueBinding(Queue mainQueue, DirectExchange mainExchange) {
return BindingBuilder.bind(mainQueue).to(mainExchange).with(MAIN_ROUTING_KEY);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
3.3. 生产者代码
创建一个消息生产者,将消息发送到主队列。
Java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
System.out.println("发送消息到主队列: " + message);
rabbitTemplate.convertAndSend(RabbitMQConfig.MAIN_EXCHANGE, RabbitMQConfig.MAIN_ROUTING_KEY, message);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
3.4. 消费者代码
模拟从主队列接收消息并手动拒绝消息,使其进入死信队列。
Java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class MainQueueConsumer {
@RabbitListener(queues = RabbitMQConfig.MAIN_QUEUE)
public void consumeMessage(Message message) {
String body = new String(message.getBody());
System.out.println("从主队列收到消息: " + body);
// 模拟拒绝消息并不重新入队(将消息转入死信队列)
throw new RuntimeException("模拟消费失败");
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
监听死信队列,处理转移过来的消息。
Java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class DeadLetterQueueConsumer {
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
public void consumeDeadLetterMessage(String message) {
System.out.println("从死信队列收到消息: " + message);
// 在这里可以实现日志记录、报警等操作
}
}1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
3.5. 测试代码
通过 Producer 向主队列发送消息:
Java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class AppRunner implements CommandLineRunner {
@Autowired
private Producer producer;
@Override
public void run(String... args) throws Exception {
producer.sendMessage("测试消息 1");
producer.sendMessage("测试消息 2");
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
在主队列的消费者日志中:
Log
从主队列收到消息: 测试消息 1
Exception: 模拟消费失败
从主队列收到消息: 测试消息 2
Exception: 模拟消费失败在死信队列的消费者日志中:
Log
从死信队列收到消息: 测试消息 1
从死信队列收到消息: 测试消息 24. 实际场景应用
- 消息延迟投递:使用 TTL 和死信队列实现延迟消息。
- 监控与报警:分析死信队列中的消息,发现消费系统异常。
- 失败重试机制:将死信队列中的消息重新发布到主队列以实现重试。