Appearance
本地消息表
1. 核心思想
由于数据库操作与消息发送难以在同一事务中实现原子性,我们可以将待发送消息先行写入数据库,利用数据库事务确保消息记录与业务操作的原子性。随后,通过定时任务扫描数据库,读取待发送消息并投递至 RabbitMQ。
同样,在消费端,为保障消息消费与数据库操作的原子性,可将消费消息与业务操作一并写入数据库,利用数据库事务实现一致性。
然而,此方案引入新挑战,例如:
生产端消息表需包含哪些状态与字段?定时任务如何高效查询仅 “待发送” 消息?
消费端消息表需包含哪些状态与字段?消费失败时,是返回 NACK 还是记录失败的消息后返回 ACK?
消息可能重复投递(如定时任务发送成功但更新状态前故障)。谁负责处理重复投递?生产端、消费端,还是两者?
若系统每秒产生 1000 条消息,但定时任务仅处理 500 条,导致堆积,该如何应对?
分布式部署下,多定时任务实例并发扫描,如何避免资源竞争?
随时间推移,消息表积累大量历史记录,如何处理?
监控与告警需关注哪些指标与场景?
为什么保留已发送/消费的消息记录优于直接删除?
便于后续排查、对账或审计,例如订单创建但库存未扣减的数据不一致问题。
1.1. 生产端消息表
- 消息 Id:分布式 Id 生成,确保全局唯一;
- 交换机:交换机名称;
- 路由键:消息路由键,用于路由规则;
- 消息内容:JSON 字符串,便于灵活存储与阅读;
- 消息状态:待发送(
PENDING)、发送中(SENDING)、发送成功(SENT)、发送失败(FAILED); - 重试次数:当前重试计数;
- 最大重试次数:超限后判定失败,需人工干预;
- 下次重试时间:默认当前时间,支持指数退避;
- 发送失败原因:便于故障诊断;
- 创建时间;
- 更新时间:便于时间线排查。
在消息状态与下次重试时间创建联合索引,提升查询 “待发送” 消息效率。
1.2. 消费端消息表
- 消息 Id:业务消息 Id,用于幂等判断与唯一索引;
- 交换机:交换机名称;
- 路由键:路由键,用于重发;
- 消息内容:JSON 字符串,便于重发与排查;
- 消息状态:消费成功(
SUCCESS)、待重发(PENDING_RETRY)、重发中(RETRYING)、消费失败(FAILED); - 重试次数:当前重试计数;
- 最大重试次数:超限后判定失败,需人工干预;
- 下次重试时间:默认当前时间,支持指数退避;
- 消费失败原因:便于故障诊断;
- 首次消费时间:首次接收时间;
- 最后消费时间:最近消费/重试时间;
- 成功时间:消费成功时间;
- 版本号:用于乐观锁,实现并发控制。
在消息 Id 创建唯一索引。
消费失败时,应记录失败消息后返回 ACK,避免消息在 RabbitMQ 中无限循环(僵尸消息)。
1.3. 幂等性保证
消费端(例如库存服务)负责幂等性。即使生产端确保 “仅发一次”,网络传输、RabbitMQ 处理或消费环节仍可能重复。
通过消费端消息表,结合唯一索引、乐观锁与事务实现幂等。
1.4. 消息堆积
部署多定时任务实例并行处理,并在业务允许时对上游限流。
1.5. 并发竞争
利用状态机 + SKIP LOCKED 避免多实例竞争。
SQL
SELECT id, version
FROM local_message
WHERE status = 'PENDING'
AND next_retry_time <= NOW()
ORDER BY create_time
LIMIT #{batchSize}
FOR UPDATE SKIP LOCKED1
2
3
4
5
6
7
2
3
4
5
6
7
注意:若任务标记 SENDING 后宕机,消息卡住。需另设任务,将 SENDING 超 5 分钟的消息重置为 PENDING。虽可能重复投递,但消费端幂等性保障无虞。
消费端重发任务类似。
1.6. 历史记录
定期归档或删除历史数据。
1.7. 监控与告警
| 监控层次 | 监控指标 | 示例 | 频率 |
|---|---|---|---|
| 生产端 | 消息堆积量 | PENDING 状态消息 > X 条 | 60s |
| 生产端 | 失败消息数量 | FAILED 状态消息 > 1 条 | 60s |
| 生产端 | 重试率分布 | 1 小时内重试 3 次及以上消息占比 > 20% | 10min |
| 定时任务 | RabbitMQ 队列堆积 | 使用 AmqpAdmin/RabbitAdmin 查询 QUEUE_MESSAGE_COUNT > X 条 | 60s |
| 定时任务 | RabbitMQ 连接状态 | Java | 30s |
| 消费端 | 消息堆积量 | PENDING_RETRY 状态消息 > X 条 | 60s |
| 消费端 | 失败消息数量 | FAILED 状态消息 > 1 条 | 60s |
| 消费端 | 重试率分布 | 1 小时内重试 3 次及以上消息占比 > 20% | 10min |
2. 业务流程图
3. 代码实现
3.1. 生产端
3.1.1. 生产端消息表
SQL
CREATE TABLE producer_message (
id BIGINT PRIMARY KEY,
message_id VARCHAR(64) NOT NULL,
exchange VARCHAR(100) NOT NULL,
routing_key VARCHAR(100) NOT NULL,
content TEXT NOT NULL,
status VARCHAR(20) NOT NULL,
retry_count INT DEFAULT 0,
max_retry_count INT DEFAULT 5,
next_retry_time DATETIME NULL,
error_msg TEXT,
create_time DATETIME NOT NULL,
update_time DATETIME,
version INT NOT NULL DEFAULT 0,
INDEX idx_status_retry_time (status, next_retry_time)
) ENGINE=InnoDB;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
3.1.2. 生产端消息表实体
Java
@Data
@Table(name = "producer_message")
public class ProducerMessage {
@Id
private Long id; // 分布式 Id(如雪花算法)
private String messageId; // 业务消息 Id(UUID)
private String exchange; // 交换机名称
private String routingKey; // 路由键
private String content; // 消息内容(JSON)
@Enumerated(EnumType.STRING)
private MessageStatus status; // PENDING, SENDING, SENT, FAILED
private Integer retryCount; // 当前重试次数
private Integer maxRetryCount; // 最大重试次数
private LocalDateTime nextRetryTime; // 下次重试时间
private String errorMsg; // 失败原因
private LocalDateTime createTime;
private LocalDateTime updateTime;
@Version
private Integer version; // 乐观锁版本(数据库列:version)
}
enum MessageStatus {
PENDING, // 待发送
SENDING, // 发送中
SENT, // 已发送
FAILED // 失败(超最大重试)
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
3.1.3. 生产端消息表 Mapper
Java
@Data
public class ProducerMessageIdVersion {
private Long id;
private Integer version;
}1
2
3
4
5
2
3
4
5
Java
@Mapper
public interface ProducerMessageMapper {
// 插入消息(与业务操作在同一事务中)
void insert(ProducerMessage message);
/**
* 短事务:选取待发送记录的 id 与当前 version(使用 FOR UPDATE SKIP LOCKED)
* 返回值是生产端 id + version 的数据结构(Mapper 会映射到 ProducerMessageIdVersion)
*/
List<ProducerMessageIdVersion> selectPendingIdAndVersionForUpdate(@Param("batchSize") int batchSize);
/**
* 短事务:按 id + version 列表将记录从 PENDING -> SENDING(并做 version++)
* 这里使用单条 UPDATE,WHERE 中使用多个 (id = ? AND version = ?) 的 OR 条件一次更新多个记录,
* 返回成功更新的行数(理想上等于 items.size())
*/
int updateStatusToSendingByIdAndVersion(@Param("items") List<ProducerMessageIdVersion> items);
// 在短事务内确认哪些 id 已经变为 SENDING,并读取其当前 version
List<ProducerMessageIdVersion> selectIdAndVersionByIdsAndStatus(@Param("ids") List<Long> ids,
@Param("status") String status);
// 根据 ids 查询完整记录(事务外调用)
List<ProducerMessage> selectByIds(@Param("ids") List<Long> ids);
/**
* 有条件的状态迁移:只有当当前 version == expectVersion 并且当前 status == expectStatus 时才更新(并 version++)
* 返回 1 表示更新成功,0 表示版本或状态不匹配(需重试/处理冲突)
*/
int updateStatusIfVersionMatches(@Param("id") Long id,
@Param("expectVersion") Integer expectVersion,
@Param("expectStatus") MessageStatus expectStatus,
@Param("newStatus") MessageStatus newStatus,
@Param("retryCount") Integer retryCount,
@Param("nextRetryTime") LocalDateTime nextRetryTime,
@Param("errorMsg") String errorMsg);
// 重置僵尸消息(卡在 SENDING 状态超时的消息)—— 此操作也应 version++
int resetZombieMessages(@Param("zombieThreshold") LocalDateTime zombieThreshold);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
XML
<!-- selectPendingIdAndVersionForUpdate -->
<select id="selectPendingIdAndVersionForUpdate" resultType="com.example.ProducerMessageIdVersion" parameterType="int">
SELECT id, version
FROM producer_message
WHERE status = 'PENDING'
AND next_retry_time <= NOW()
ORDER BY create_time
LIMIT #{batchSize}
FOR UPDATE SKIP LOCKED
</select>
<!-- updateStatusToSendingByIdAndVersion: 使用单条 UPDATE,WHERE 使用多个 (id=... AND version=...) OR ... -->
<update id="updateStatusToSendingByIdAndVersion" parameterType="list">
UPDATE producer_message
SET status = 'SENDING',
update_time = NOW(),
version = version + 1
WHERE
<foreach collection="items" item="item" index="idx" open="(" separator=" OR " close=")">
(id = #{item.id} AND version = #{item.version})
</foreach>
</update>
<!-- selectIdAndVersionByIdsAndStatus: 在短事务内确认哪些记录被成功置为 SENDING,并返回当前的 id + version -->
<select id="selectIdAndVersionByIdsAndStatus" resultType="com.example.ProducerMessageIdVersion" parameterType="map">
SELECT id, version
FROM producer_message
WHERE status = #{status}
AND id IN
<foreach item="id" index="i" collection="ids" open="(" separator="," close=")">
#{id}
</foreach>
</select>
<!-- selectByIds: 根据 ids 批量查询 -->
<select id="selectByIds" resultType="com.example.ProducerMessage">
SELECT *
FROM producer_message
WHERE id IN
<foreach item="id" index="i" collection="ids" open="(" separator="," close=")">
#{id}
</foreach>
</select>
<!-- updateStatusIfVersionMatches: 单条条件更新,包含期望状态检查,避免非法状态迁移 -->
<update id="updateStatusIfVersionMatches" parameterType="map">
UPDATE producer_message
SET status = #{newStatus},
retry_count = #{retryCount},
next_retry_time = #{nextRetryTime},
error_msg = #{errorMsg},
update_time = NOW(),
version = version + 1
WHERE id = #{id}
AND version = #{expectVersion}
AND status = #{expectStatus}
</update>
<!-- resetZombieMessages: 将超时的 SENDING 改回 PENDING,并 version++ -->
<update id="resetZombieMessages" parameterType="java.time.LocalDateTime">
UPDATE producer_message
SET status = 'PENDING',
next_retry_time = NOW(),
update_time = NOW(),
version = version + 1
WHERE status = 'SENDING'
AND update_time < #{zombieThreshold}
</update>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
3.1.4. 订单服务(生产者)
Java
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private ProducerMessageMapper messageMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 创建订单(本地事务保证订单和消息的原子性)
* 将消息初始状态置为 SENDING,version = 0(在 insert 时设置)
* 事务提交后立即尝试发送一次(afterCommit)
*/
@Transactional
public void createOrder(Order order) {
// 1. 构建库存扣减消息
Map<String, Object> msgContent = new HashMap<>();
msgContent.put("orderId", order.getId());
msgContent.put("productId", order.getProductId());
msgContent.put("quantity", order.getQuantity());
// 2. 插入消息记录(初始设为 SENDING,version = 0)
ProducerMessage message = new ProducerMessage();
message.setId(SnowflakeIdGenerator.nextId());
message.setMessageId(UUID.randomUUID().toString());
message.setExchange("inventory.exchange");
message.setRoutingKey("inventory.deduct");
message.setContent(JSON.toJSONString(msgContent));
message.setStatus(MessageStatus.SENDING); // 初始放为 SENDING(准备立刻发送)
message.setRetryCount(0);
message.setMaxRetryCount(5);
message.setNextRetryTime(LocalDateTime.now());
message.setCreateTime(LocalDateTime.now());
message.setUpdateTime(LocalDateTime.now());
message.setVersion(0); // 初始版本 0
messageMapper.insert(message);
// 3. 插入订单(与消息在同一事务)
orderMapper.insert(order);
// 4. 注册事务提交后的发送操作(确保业务数据已提交)
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
// 事务提交后尝试发送一次(此处在事务外)
try {
rabbitTemplate.convertAndSend(
message.getExchange(),
message.getRoutingKey(),
message.getContent(),
msg -> {
msg.getMessageProperties().setMessageId(message.getMessageId());
return msg;
}
);
// 发送成功 -> 尝试将 SENDING -> SENT(只有当 version 与期望 version 且当前 status 为 SENDING 时才更新)
int updated = messageMapper.updateStatusIfVersionMatches(
message.getId(),
message.getVersion(), // expectVersion = 0
MessageStatus.SENDING, // expectStatus
MessageStatus.SENT,
message.getRetryCount(),
null,
null
);
if (updated == 0) {
// 未能更新(可能被其他任务改过),不强制覆盖
// 可以选择记录日志或告警
}
} catch (Exception ex) {
// 发送失败 -> 回退为 PENDING 并设置下次重试时间(指数退避)
// 这里同样使用条件更新:只在 version 等于插入时的 version(0)且 status 为 SENDING 时回退
ProducerMessage fallback = new ProducerMessage();
fallback.setId(message.getId());
fallback.setRetryCount(message.getRetryCount() + 1);
if (fallback.getRetryCount() >= message.getMaxRetryCount()) {
// 直接尝试把状态置为 FAILED(条件更新)
messageMapper.updateStatusIfVersionMatches(
message.getId(),
message.getVersion(),
MessageStatus.SENDING, // expectStatus
MessageStatus.FAILED,
fallback.getRetryCount(),
null,
ex.getMessage()
);
} else {
long delaySeconds = (long) Math.pow(2, fallback.getRetryCount());
messageMapper.updateStatusIfVersionMatches(
message.getId(),
message.getVersion(),
MessageStatus.SENDING, // expectStatus
MessageStatus.PENDING,
fallback.getRetryCount(),
LocalDateTime.now().plusSeconds(delaySeconds),
ex.getMessage()
);
}
}
}
}
);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
3.1.5. 消息发送定时任务
Java
@Component
public class MessageSendTask {
@Autowired
private ProducerMessageMapper messageMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final int BATCH_SIZE = 100;
private static final int ZOMBIE_TIMEOUT_MINUTES = 5;
/**
* 定时扫描并发送消息
* 支持多实例并行执行(通过 FOR UPDATE SKIP LOCKED 防止重复处理)
* 短事务只做 select id + version + update status -> SENDING(version++),并在短事务内返回实际被标记为 SENDING 的 id + 当前 version(已 ++)
* 真正发送在事务外进行,发送结果以带 version + expectStatus 的条件更新提交
*/
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
// 1. 短事务:选取待发送 id + version,并将其状态设置为 SENDING(并 version++)
List<ProducerMessageIdVersion> items = selectAndMarkPendingAsSending(BATCH_SIZE);
if (items == null || items.isEmpty()) return;
// 2. 事务外查询具体记录并逐条发送
List<Long> ids = items.stream().map(ProducerMessageIdVersion::getId).collect(Collectors.toList());
List<ProducerMessage> messages = messageMapper.selectByIds(ids);
// items 是在短事务内实际被置为 SENDING 的 id + currentVersion(已 ++)
Map<Long, Integer> expectVersionMap = items.stream()
.collect(Collectors.toMap(ProducerMessageIdVersion::getId, ProducerMessageIdVersion::getVersion));
for (ProducerMessage message : messages) {
Integer expectVersion = expectVersionMap.get(message.getId());
try {
rabbitTemplate.convertAndSend(
message.getExchange(),
message.getRoutingKey(),
message.getContent(),
msg -> { msg.getMessageProperties().setMessageId(message.getMessageId()); return msg; }
);
// 发送成功 —— 以条件更新把 SENDING -> SENT(仅当版本与 expectVersion 且当前 status 为 SENDING 匹配)
int updated = messageMapper.updateStatusIfVersionMatches(
message.getId(),
expectVersion,
MessageStatus.SENDING, // expectStatus
MessageStatus.SENT,
message.getRetryCount(),
null,
null
);
if (updated == 0) {
// 版本不匹配或已被其他任务处理,记录日志即可(不重试)
}
} catch (Exception e) {
// 发送失败,回退到 PENDING 或 FAILED(带版本校验)
int newRetryCount = (message.getRetryCount() == null ? 0 : message.getRetryCount()) + 1;
if (newRetryCount >= message.getMaxRetryCount()) {
// 置为 FAILED(条件更新)
messageMapper.updateStatusIfVersionMatches(
message.getId(),
expectVersion,
MessageStatus.SENDING, // expectStatus
MessageStatus.FAILED,
newRetryCount,
null,
e.getMessage()
);
} else {
long delaySeconds = (long) Math.pow(2, newRetryCount);
messageMapper.updateStatusIfVersionMatches(
message.getId(),
expectVersion,
MessageStatus.SENDING, // expectStatus
MessageStatus.PENDING,
newRetryCount,
LocalDateTime.now().plusSeconds(delaySeconds),
e.getMessage()
);
}
}
}
}
/**
* 在短事务中选 id + version 并把它们更新为 SENDING(version++),并返回实际被置为 SENDING 的 id + 当前 version(已 ++)
*/
@Transactional
protected List<ProducerMessageIdVersion> selectAndMarkPendingAsSending(int batchSize) {
// 1. SELECT id, version FOR UPDATE SKIP LOCKED
List<ProducerMessageIdVersion> items = messageMapper.selectPendingIdAndVersionForUpdate(batchSize);
if (items == null || items.isEmpty()) return items;
// 2. 对这些条目做一次批量更新(单条 UPDATE,WHERE 使用 OR 多条件),将其置为 SENDING 并 version++
int updatedCount = messageMapper.updateStatusToSendingByIdAndVersion(items);
// 3. 在同一事务内再次查询这些 id 中实际被置为 SENDING 的记录,返回 id + 当前 version(已 ++)
List<Long> ids = items.stream().map(ProducerMessageIdVersion::getId).collect(Collectors.toList());
List<ProducerMessageIdVersion> actualMarked = messageMapper.selectIdAndVersionByIdsAndStatus(ids, "SENDING");
return actualMarked;
}
/**
* 清理僵尸消息(卡在 SENDING 状态超过一定时间的消息)
*/
@Scheduled(cron = "0 */5 * * * ?")
@Transactional
public void cleanZombieMessages() {
LocalDateTime zombieThreshold = LocalDateTime.now().minusMinutes(ZOMBIE_TIMEOUT_MINUTES);
messageMapper.resetZombieMessages(zombieThreshold);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
3.2. 消费端
3.2.1. 消费端消息表
SQL
CREATE TABLE consumer_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(64) NOT NULL UNIQUE,
status VARCHAR(20) NOT NULL,
content TEXT NOT NULL,
exchange VARCHAR(100) NOT NULL,
routing_key VARCHAR(100) NOT NULL,
retry_count INT DEFAULT 0,
max_retry_count INT DEFAULT 5,
next_retry_time DATETIME,
error_msg TEXT,
first_consume_time DATETIME NOT NULL,
last_consume_time DATETIME,
success_time DATETIME,
version INT NOT NULL DEFAULT 0,
INDEX idx_message_id (message_id),
INDEX idx_status_retry_time (status, next_retry_time)
) ENGINE=InnoDB;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
3.2.2. 消费端消息表实体
Java
@Data
@Table(name = "consumer_message")
public class ConsumerMessage {
@Id
private Long id;
@Column(unique = true)
private String messageId; // 消息 Id(用于幂等性判断)
@Enumerated(EnumType.STRING)
private ConsumeStatus status; // SUCCESS, PENDING_RETRY, RETRYING, FAILED
private String content; // 内容(重发用)
private String exchange;
private String routingKey;
private Integer retryCount;
private Integer maxRetryCount;
private LocalDateTime nextRetryTime;
private String errorMsg;
private LocalDateTime firstConsumeTime;
private LocalDateTime lastConsumeTime;
private LocalDateTime successTime;
@Version
private Integer version;
}
enum ConsumeStatus {
SUCCESS, // 消费成功
PENDING_RETRY, // 待重发(业务失败,等待重试)
RETRYING, // 重发中(已被重发任务取走)
FAILED // 消费失败(超过最大重试次数)
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
3.2.3. 消费端消息表 Mapper
Java
@Data
public class ConsumerMessageIdVersion {
private Long id;
private Integer version;
}1
2
3
4
5
2
3
4
5
Java
@Mapper
public interface ConsumerMessageMapper {
void insert(ConsumerMessage message);
// 使用乐观锁更新(version 字段会自动递增)
@Update("UPDATE consumer_message SET " +
"status = #{status}, " +
"retry_count = #{retryCount}, " +
"next_retry_time = #{nextRetryTime}, " +
"last_consume_time = #{lastConsumeTime}, " +
"success_time = #{successTime}, " +
"error_msg = #{errorMsg}, " +
"version = #{version} + 1 " +
"WHERE id = #{id} AND version = #{version}")
int updateWithVersion(ConsumerMessage message);
// 检查消息是否已被消费
@Select("SELECT COUNT(*) > 0 FROM consumer_message WHERE message_id = #{messageId}")
boolean existsByMessageId(@Param("messageId") String messageId);
// 根据消息 Id 查询
@Select("SELECT * FROM consumer_message WHERE message_id = #{messageId}")
ConsumerMessage selectByMessageId(@Param("messageId") String messageId);
// 短事务第一步:选取待重发的 id + version(FOR UPDATE SKIP LOCKED)
List<ConsumerMessageIdVersion> selectPendingRetryIdAndVersionForUpdate(@Param("batchSize") int batchSize);
// 短事务第二步:按 id + version 条件更新为 RETRYING,并 version++
int updateStatusToRetryingByIdAndVersion(@Param("items") List<ConsumerMessageIdVersion> items);
// 在短事务内确认哪些 id 已经变为 RETRYING,并读取其当前 version(即 oldVersion+1)
List<ConsumerMessageIdVersion> selectIdAndVersionByIdsAndStatus(@Param("ids") List<Long> ids,
@Param("status") String status);
// 事务外按 ids 查询完整记录
List<ConsumerMessage> selectByIds(@Param("ids") List<Long> ids);
// 重置僵尸消息(卡在 RETRYING 状态超时的消息)
@Update("UPDATE consumer_message " +
"SET status = 'PENDING_RETRY', " +
" next_retry_time = NOW(), " +
" version = version + 1 " +
"WHERE status = 'RETRYING' " +
"AND last_consume_time < #{zombieThreshold}")
int resetZombieMessages(@Param("zombieThreshold") LocalDateTime zombieThreshold);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
XML
<!-- selectPendingRetryIdAndVersionForUpdate -->
<select id="selectPendingRetryIdAndVersionForUpdate"
resultType="com.example.ConsumerMessageIdVersion"
parameterType="int">
SELECT id, version
FROM consumer_message
WHERE status = 'PENDING_RETRY'
AND next_retry_time <= NOW()
ORDER BY first_consume_time
LIMIT #{batchSize}
FOR UPDATE SKIP LOCKED
</select>
<!-- updateStatusToRetryingByIdAndVersion:对每条按 id + version 条件更新为 RETRYING 并 version++ -->
<update id="updateStatusToRetryingByIdAndVersion" parameterType="list">
<foreach collection="items" item="item" separator=";">
UPDATE consumer_message
SET status = 'RETRYING',
last_consume_time = NOW(),
version = version + 1
WHERE id = #{item.id}
AND version = #{item.version}
</foreach>
</update>
<!-- 在短事务内确认哪些记录被成功置为 RETRYING,并返回当前的 id + version(version 已 ++) -->
<select id="selectIdAndVersionByIdsAndStatus"
resultType="com.example.ConsumerMessageIdVersion"
parameterType="map">
SELECT id, version
FROM consumer_message
WHERE status = #{status}
AND id IN
<foreach item="id" index="i" collection="ids" open="(" separator="," close=")">
#{id}
</foreach>
</select>
<!-- selectByIds: 事务外按 ids 批量查询完整记录 -->
<select id="selectByIds" resultType="com.example.ConsumerMessage">
SELECT *
FROM consumer_message
WHERE id IN
<foreach item="id" index="i" collection="ids" open="(" separator="," close=")">
#{id}
</foreach>
</select>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
3.2.4. 库存服务(消费者)
Java
@Service
public class InventoryService {
@Autowired
private InventoryMapper inventoryMapper;
@Autowired
private ConsumerMessageMapper messageMapper;
private static final int MAX_RETRY_COUNT = 5;
/**
* 消费扣减库存消息(保证幂等性 + 失败重试管理)
*/
@RabbitListener(queues = "inventory.deduct")
public void deductInventory(Message message, Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
String content = new String(message.getBody());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1. 幂等性检查:判断该消息是否已经被处理过
ConsumerMessage existMessage = messageMapper.selectByMessageId(messageId);
if (existMessage != null) {
// 已经记录过此消息
if (existMessage.getStatus() == ConsumeStatus.SUCCESS ||
existMessage.getStatus() == ConsumeStatus.FAILED) {
// 已成功消费,直接 ACK(防止重复扣减库存)
// 已标记为最终失败,不再处理,直接 ACK 避免一直重复投递
channel.basicAck(deliveryTag, false);
return;
}
// 剩下的是 PENDING_RETRY 和 RETRYING 的记录
// 正在重试中,说明是重发任务推送的消息,继续处理
// 注意:这里不 return,继续执行业务逻辑
}
// 2. 执行业务逻辑(在事务中)
processMessage(messageId, content, existMessage);
// 3. 业务成功,发送 ACK
channel.basicAck(deliveryTag, false);
} catch (IdempotenceGuaranteeException e1) {
// 4. 幂等性保证,已被其它线程处理,发送 ACK
channel.basicAck(deliveryTag, false);
} catch (Exception e2) {
// 5. 业务失败,记录失败信息并决定是否重试
handleConsumeFailure(messageId, content, message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), e2);
// 6. 发送 ACK(不让 RabbitMQ 自动重发,由我们的重发任务控制重试)
channel.basicAck(deliveryTag, false);
}
}
/**
* 处理消息业务逻辑(事务方法)
*/
@Transactional
private void processMessage(String messageId, String content, ConsumerMessage existMessage) {
if (existMessage != null) {
// 重试场景:先尝试乐观锁更新状态为 SUCCESS
existMessage.setStatus(ConsumeStatus.SUCCESS);
existMessage.setSuccessTime(LocalDateTime.now());
existMessage.setLastConsumeTime(LocalDateTime.now());
if (messageMapper.updateWithVersion(existMessage) == 0) {
// 乐观锁更新失败,说明已被其他线程修改,抛出异常回滚事务,避免重复消费
throw new IdempotenceGuaranteeException("消息状态已被修改,messageId: " + messageId);
}
} else {
try {
ConsumerMessage newMessage = new ConsumerMessage();
newMessage.setMessageId(messageId);
newMessage.setStatus(ConsumeStatus.SUCCESS);
newMessage.setContent(content);
newMessage.setVersion(0);
newMessage.setFirstConsumeTime(LocalDateTime.now());
newMessage.setSuccessTime(LocalDateTime.now());
messageMapper.insert(newMessage);
} catch (DuplicateKeyException e) {
// 并发插入冲突,说明其他线程已插入,抛出异常回滚事务,避免重复消费
throw new IdempotenceGuaranteeException("并发插入冲突,messageId: " + messageId);
}
}
JSONObject json = JSON.parseObject(content);
Long productId = json.getLong("productId");
Integer quantity = json.getInteger("quantity");
// 执行业务逻辑:扣减库存(在消息锁定后执行,确保幂等性)
inventoryMapper.deduct(productId, quantity);
}
/**
* 处理消费失败(记录失败信息,准备重试)
* 使用纯乐观锁方案
*/
@Transactional
private void handleConsumeFailure(String messageId, String content, String exchange, String routingKey, Exception e) {
ConsumerMessage message = messageMapper.selectByMessageId(messageId);
if (message == null) {
try {
message = new ConsumerMessage();
message.setMessageId(messageId);
message.setContent(content);
message.setExchange(exchange);
message.setRoutingKey(routingKey);
message.setStatus(ConsumeStatus.PENDING_RETRY);
message.setRetryCount(1);
message.setMaxRetryCount(MAX_RETRY_COUNT);
message.setVersion(0);
message.setFirstConsumeTime(LocalDateTime.now());
message.setLastConsumeTime(LocalDateTime.now());
message.setErrorMsg(e.getMessage());
message.setNextRetryTime(LocalDateTime.now().plusSeconds(2));
messageMapper.insert(message);
return;
} catch (DuplicateKeyException ex) {
// 并发插入冲突,说明其他线程已插入,直接返回
return;
}
}
if (message.getStatus() == ConsumeStatus.SUCCESS || message.getStatus() == ConsumeStatus.FAILED) {
return;
}
message.setRetryCount(message.getRetryCount() + 1);
message.setLastConsumeTime(LocalDateTime.now());
message.setErrorMsg(e.getMessage());
if (message.getRetryCount() >= message.getMaxRetryCount()) {
message.setStatus(ConsumeStatus.FAILED);
message.setNextRetryTime(null);
} else {
message.setStatus(ConsumeStatus.PENDING_RETRY);
long delaySeconds = (long) Math.pow(2, message.getRetryCount());
message.setNextRetryTime(LocalDateTime.now().plusSeconds(delaySeconds));
}
if (messageMapper.updateWithVersion(message) == 0) {
// 乐观锁失败,说明其他线程已修改,直接返回
return;
}
}
}
/**
* 自定义异常:用于标识幂等性保证导致的回滚
*/
class IdempotenceGuaranteeException extends RuntimeException {
public IdempotenceGuaranteeException(String message) {
super(message);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
3.2.5. 消息重发定时任务
Java
@Component
public class MessageRetryTask {
@Autowired
private ConsumerMessageMapper messageMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final int BATCH_SIZE = 100;
private static final int ZOMBIE_TIMEOUT_MINUTES = 5;
/**
* 定时扫描失败的消息并重新发送
*/
@Scheduled(fixedDelay = 10000)
public void retryFailedMessages() {
// 1. 短事务:选取并标记为 RETRYING,返回实际被标记的 id + 当前 version(已 ++)
List<ConsumerMessageIdVersion> items = selectAndMarkPendingAsRetrying(BATCH_SIZE);
if (items == null || items.isEmpty()) return;
// 2. 事务外:按 ids 查询完整记录(这些就是被短事务成功标记为 RETRYING 的记录)
List<Long> ids = items.stream().map(ConsumerMessageIdVersion::getId).collect(Collectors.toList());
List<ConsumerMessage> messages = messageMapper.selectByIds(ids);
// 3. 重新发送到原队列
for (ConsumerMessage message : messages) {
try {
rabbitTemplate.convertAndSend(
message.getExchange(),
message.getRoutingKey(),
message.getContent(),
msg -> {
msg.getMessageProperties().setMessageId(message.getMessageId());
return msg;
}
);
// 发送成功:保持 RETRYING 状态,由消费者消费后写 SUCCESS;若想在这里记录发送时间/日志可做补充
} catch (Exception e) {
// 重发失败(比如 RabbitMQ 不可用),回退到 PENDING_RETRY
message.setStatus(ConsumeStatus.PENDING_RETRY);
message.setErrorMsg("重发失败: " + e.getMessage());
long delaySeconds = (long) Math.pow(2, message.getRetryCount());
message.setNextRetryTime(LocalDateTime.now().plusSeconds(delaySeconds));
messageMapper.updateWithVersion(message);
}
}
}
/**
* 短事务:选取待重发 id + version(FOR UPDATE SKIP LOCKED)
* 并把它们置为 RETRYING (version++)
* 返回在短事务中真正成功被置为 RETRYING 的 id + 当前 version(已 ++)
*/
@Transactional
protected List<ConsumerMessageIdVersion> selectAndMarkPendingAsRetrying(int batchSize) {
// 1. SELECT id, version FOR UPDATE SKIP LOCKED
List<ConsumerMessageIdVersion> items = messageMapper.selectPendingRetryIdAndVersionForUpdate(batchSize);
if (items == null || items.isEmpty()) return items;
// 2. 对每条按 id + version 做条件更新,将其置为 RETRYING 并 version++
int updatedCount = messageMapper.updateStatusToRetryingByIdAndVersion(items);
// updatedCount 表示成功更新的行数(<= items.size())
// 3. 在同一事务内再次查询这些 id 中实际被置为 RETRYING 的记录,返回 id + 当前 version(已 ++)
List<Long> ids = items.stream().map(ConsumerMessageIdVersion::getId).collect(Collectors.toList());
List<ConsumerMessageIdVersion> actualMarked = messageMapper.selectIdAndVersionByIdsAndStatus(ids, "RETRYING");
return actualMarked;
}
/**
* 清理僵尸消息(卡在 RETRYING 状态超过一定时间的消息)
* 直接重置为 PENDING_RETRY,依赖幂等性保证不会重复消费
*/
@Scheduled(cron = "0 */5 * * * ?")
@Transactional
public void cleanZombieMessages() {
LocalDateTime zombieThreshold = LocalDateTime.now().minusMinutes(ZOMBIE_TIMEOUT_MINUTES);
messageMapper.resetZombieMessages(zombieThreshold);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82