Appearance
RabbitMQ 延迟队列
RabbitMQ 本身并没有直接支持延迟队列的功能,但可以通过 TTL(Time-To-Live)+ 死信队列(DLQ)的机制来实现延迟队列,或者通过 RabbitMQ 的插件 rabbitmq_delayed_message_exchange 实现。
下面我们介绍两种方法:
1. 基于 TTL + DLQ 实现延迟队列
通过设置消息的 TTL 和绑定死信队列,可以实现延迟投递消息的功能。
1.1. 原理
- 在一个队列(延迟队列)中设置消息的 TTL;
- 消息在 TTL 到期后,会进入绑定的死信队列(真正消费的队列);
- 消费者从死信队列中获取消息,达到延迟效果;
1.2. RabbitMQ 配置类
Java
@Configuration
public class RabbitMQConfig {
public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
public static final String DEAD_LETTER_ROUTING_KEY = "dead_routing_key";
public static final String DELAY_EXCHANGE = "delay_exchange";
public static final String DELAY_QUEUE = "delay_queue";
public static final String DELAY_ROUTING_KEY = "delay_routing_key";
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY);
}
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE);
}
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE)
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE) // 死信交换机
.withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY) // 死信路由键
.withArgument("x-message-ttl", 10000) // 设置队列消息的默认 TTL(单位:毫秒)
.build();
}
@Bean
public Binding delayQueueBinding(Queue delayQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1.3. 生产者代码
生产者将消息发送到延迟队列。
Java
@Service
public class DelayQueueProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessageWithDelay(String message) {
System.out.println("发送消息到延迟队列: " + message);
rabbitTemplate.convertAndSend(
RabbitMQConfig.DELAY_EXCHANGE,
RabbitMQConfig.DELAY_ROUTING_KEY,
message,
msg -> {
// 手动设置消息的 TTL(以毫秒为单位),如果同时指定了队列的 TTL 和消息的 TTL,
// 则哪个时间短以哪个为准,建议只设置队列的 TTL,而不是为每条消息单独设置 TTL。
// 另外需要注意的是,RabbitMQ 的消息过期检测是基于队列的顺序进行的。如果某条消
// 息的 TTL 已经过期,但前面有未到期的消息,过期消息可能不会被立即处理。
// msg.getMessageProperties().setExpiration(String.valueOf(10000));
return msg;
}
);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1.4. 消费者代码
消费者监听死信队列,接收延迟队列转发过来的消息。
Java
@Service
public class DeadLetterQueueConsumer {
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
public void consumeMessage(String message) {
System.out.println("从死信队列收到消息: " + message);
}
}1
2
3
4
5
6
7
2
3
4
5
6
7
1.5. 测试代码
在 CommandLineRunner 中测试发送消息:
Java
@Component
public class AppRunner implements CommandLineRunner {
@Autowired
private DelayQueueProducer producer;
@Override
public void run(String... args) throws Exception {
// 发送一条延迟消息
producer.sendMessageWithDelay("测试延迟消息");
}
}1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
1.6. 运行结果
- 消息在延迟队列中等待到期(TTL 设置时间)。
- TTL 到期后,消息被路由到死信队列。
- 消费者从死信队列消费消息。
控制台输出示例:
Log
发送消息到延迟队列: 测试延迟消息
从死信队列收到消息: 测试延迟消息2. 基于 RabbitMQ 的延迟队列插件实现
该插件允许直接设置消息的延迟时间,而不需要通过 TTL 和死信队列的组合。
2.1. 安装插件
在 RabbitMQ 服务器上安装插件:
Bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange2.2. RabbitMQ 配置类
与方法 1 类似,但交换机需要声明为延迟类型。
Java
@Configuration
public class RabbitMQConfig {
public static final String DELAYED_EXCHANGE = "delayed_exchange";
public static final String DELAYED_QUEUE = "delayed_queue";
public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
@Bean
public CustomExchange delayedExchange() {
// x-delayed-message 是 rabbitmq_delayed_message_exchange 插件提供的自定义交换机类型。
// x-delayed-type 是 x-delayed-message 类型交换机的特定参数,定义该延迟交换机的基础路由类型。
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, Map.of("x-delayed-type", "direct"));
}
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE);
}
@Bean
public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2.3. 生产者代码
直接通过交换机设置延迟时间:
Java
@Service
public class DelayQueueProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayedMessage(String message, int delayInMilliseconds) {
System.out.println("发送延迟消息: " + message + ",延迟时间: " + delayInMilliseconds + " 毫秒");
rabbitTemplate.convertAndSend(
RabbitMQConfig.DELAYED_EXCHANGE,
RabbitMQConfig.DELAYED_ROUTING_KEY,
message,
msg -> {
// 注意:需通过插件提供的 x-delay 属性,来定义延迟时间
msg.getMessageProperties().setHeader("x-delay", delayInMilliseconds);
return msg;
}
);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
警告
rabbitmq_delayed_message_exchange 插件在和 mandatory=true + ReturnsCallback 一起使用时,会把 “延迟消息” 也当成不可路由的消息触发返回。可以参考以下解决方案:
给
ReturnsCallback加上判断:如果是发到延迟交换机的返回,或者消息带有x-delay(或你自定义的标记头),就忽略它;JavarabbitTemplate.setReturnsCallback(returned -> { String exchange = returned.getExchange(); Message returnedMessage = returned.getMessage(); Map<String, Object> headers = returnedMessage.getMessageProperties().getHeaders(); // 根据交换机名判断,忽略延迟交换机产生的 return if ("my-delayed-exchange".equals(exchange)) return; // 或者根据 x-delay 头判断(延迟插件会带 x-delay) if (headers != null && headers.containsKey("x-delay")) return; // 如需更精细,可检查自定义 header: // if ("delayed".equals(headers.get("my-delay-flag"))) return; // 不是延迟消息,按原逻辑处理(日志/告警/重投等) handleRealReturn(returned); });为延迟发布创建一个单独的
RabbitTemplate,设置mandatory=false或者不设置ReturnsCallback;Java@Bean @Qualifier("delayedRabbitTemplate") public RabbitTemplate delayedRabbitTemplate(ConnectionFactory cf) { RabbitTemplate tpl = new RabbitTemplate(cf); tpl.setExchange("my-delayed-exchange"); // set default exchange tpl.setMandatory(false); // 关闭 mandatory,这样就不会触发 return callback return tpl; }然后发送延迟消息时用
@Qualifier("delayedRabbitTemplate")注入并调用。
2.4. 消费者代码
消费者监听延迟队列:
Java
@Service
public class DelayedQueueConsumer {
@RabbitListener(queues = RabbitMQConfig.DELAYED_QUEUE)
public void consumeMessage(String message) {
System.out.println("从延迟队列收到消息: " + message);
}
}1
2
3
4
5
6
7
2
3
4
5
6
7
2.5. 运行结果
与方法 1 类似,但实现更简洁,无需使用死信队列。
3. 总结
| 方案 | 优点 | 缺点 |
|---|---|---|
| 基于 TTL + DLQ | 1. 无需额外插件 2. 兼容性好,适用于大多数 RabbitMQ 版本 | 1. 灵活性有限,每个队列需统一 TTL 2. 额外的性能开销(两个队列) 3. 时间精度一般(通常为秒级) |
| 基于延迟队列插件 | 1. 灵活性高,可设置不同延迟时间 2. 时间精度高 3. 性能优化 | 1. 需要安装插件 2. 兼容性问题,需确保插件和 RabbitMQ 版本兼容 3. 延迟消息的调试和监控相对复杂 |