Appearance
本地消息表框架 - Java 版
1. 数据库表结构
1.1. 普通版
SQL
CREATE TABLE local_message_tbl (
message_id BINARY(16) PRIMARY KEY,
category VARCHAR(32) NOT NULL DEFAULT '',
exchange VARCHAR(100) NULL,
routing_key VARCHAR(100) NULL,
message TEXT NOT NULL,
state VARCHAR(20) NOT NULL COMMENT 'PENDING, PROCESSING, PROCESSED, FAILED',
retry_count INT NOT NULL DEFAULT 0,
max_retry_count INT NOT NULL DEFAULT 3,
next_retry_time DATETIME NOT NULL DEFAULT NOW(),
err_msg TEXT NULL,
create_time DATETIME NOT NULL DEFAULT NOW(),
update_time DATETIME NULL,
version INT NOT NULL DEFAULT 0,
INDEX idx_state_next_retry_time (state, next_retry_time)
) ENGINE = InnoDB;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1.2. LIST 分区版
1.2.1. 表结构
SQL
CREATE TABLE local_message_tbl (
category VARCHAR(32) NOT NULL DEFAULT '',
message_id BINARY(16) NOT NULL,
exchange VARCHAR(100) NULL,
routing_key VARCHAR(100) NULL,
message TEXT NOT NULL,
state VARCHAR(20) NOT NULL COMMENT 'PENDING, PROCESSING, PROCESSED, FAILED',
retry_count INT NOT NULL DEFAULT 0,
max_retry_count INT NOT NULL DEFAULT 3,
next_retry_time DATETIME NOT NULL DEFAULT NOW(),
err_msg TEXT NULL,
create_time DATETIME NOT NULL DEFAULT NOW(),
update_time DATETIME NULL,
version INT NOT NULL DEFAULT 0,
PRIMARY KEY (category, message_id),
INDEX idx_state_next_retry_time (state, next_retry_time)
) ENGINE = InnoDB
PARTITION BY LIST COLUMNS (category) (
PARTITION p_default VALUES IN (''),
PARTITION p_producer VALUES IN ('producer'),
PARTITION p_consumer VALUES IN ('consumer')
);1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
差异对比
diff
@@ -1,6 +1,6 @@
CREATE TABLE local_message_tbl (
- message_id BINARY(16) PRIMARY KEY,
category VARCHAR(32) NOT NULL DEFAULT '',
+ message_id BINARY(16) NOT NULL,
exchange VARCHAR(100) NULL,
routing_key VARCHAR(100) NULL,
message TEXT NOT NULL,
@@ -12,5 +12,11 @@
create_time DATETIME NOT NULL DEFAULT NOW(),
update_time DATETIME NULL,
version INT NOT NULL DEFAULT 0,
+ PRIMARY KEY (category, message_id),
INDEX idx_state_next_retry_time (state, next_retry_time)
-) ENGINE = InnoDB;
+) ENGINE = InnoDB
+ PARTITION BY LIST COLUMNS (category) (
+ PARTITION p_default VALUES IN (''),
+ PARTITION p_producer VALUES IN ('producer'),
+ PARTITION p_consumer VALUES IN ('consumer')
+ );1.2.2. 查询所有分区
SQL
SELECT p.TABLE_SCHEMA,
p.TABLE_NAME,
p.PARTITION_NAME,
p.PARTITION_ORDINAL_POSITION,
p.PARTITION_METHOD,
p.PARTITION_EXPRESSION,
p.PARTITION_DESCRIPTION,
p.PARTITION_COMMENT,
p.CREATE_TIME,
p.UPDATE_TIME
FROM information_schema.PARTITIONS p
WHERE p.TABLE_SCHEMA = 'your_db'
AND p.TABLE_NAME = 'local_message_tbl'
ORDER BY p.PARTITION_ORDINAL_POSITION;1.2.3. 检查分区是否存在
SQL
SELECT COUNT(*) AS cnt
FROM information_schema.PARTITIONS p
WHERE p.TABLE_SCHEMA = 'your_db'
AND p.TABLE_NAME = 'local_message_tbl'
AND p.PARTITION_NAME = 'p_producer';1.2.4. 新增分区
SQL
ALTER TABLE your_db.local_message_tbl
ADD PARTITION (
PARTITION p_producer VALUES IN ('producer'),
PARTITION p_consumer VALUES IN ('consumer')
);1.2.5. 迁移分区
SQL
ALTER TABLE your_db.local_message_tbl
REORGANIZE PARTITION p_producer, p_consumer INTO (
PARTITION p_merged VALUES IN ('producer', 'consumer')
);1.2.6. 删除分区
SQL
ALTER TABLE your_db.local_message_tbl
DROP PARTITION p_merged;2. 流程图
2.1. 状态流程图
2.2. 可靠消息最终一致性流程图
3. 代码实现
3.1. LocalMessageState.java
Java
public enum LocalMessageState {
PENDING,
PROCESSING,
PROCESSED,
FAILED
}3.2. LocalMessageInsertDto.java
Java
@Data
@Accessors(chain = true)
public class LocalMessageInsertDto {
private String category;
private @NonNull Tmid messageId;
private String exchange;
private String routingKey;
private @NonNull String message;
private Integer maxRetryCount;
private LocalDateTime nextRetryTime;
}3.3. ProcessingLocalMessageDto.java
Java
@Data
@Accessors(chain = true)
public class ProcessingLocalMessageDto {
private @NonNull Tmid messageId;
private String exchange;
private String routingKey;
private @NonNull String message;
private int retryCount;
private int maxRetryCount;
private int version;
}3.4. LocalMessageStatelessCriteriaDto.java
Java
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class LocalMessageStatelessCriteriaDto {
private String category;
private String exchange;
private String routingKey;
public LocalMessageStatelessCriteriaDto(String category) {
this.category = category;
}
}3.5. LocalMessageStatusUpdateDto.java
Java
@Data
@Accessors(chain = true)
public class LocalMessageStatusUpdateDto {
private @NonNull Update update;
private @NonNull Criteria criteria;
@Data
@Accessors(chain = true)
public static class Update {
private LocalMessageState state;
private Integer retryCount;
private LocalDateTime nextRetryTime;
private String errMsg;
}
@Data
@Accessors
public static class Criteria {
private String category;
private @NonNull Tmid messageId;
private int version;
}
}3.6. LocalMessageMapper.java
Java
@Mapper
public interface LocalMessageMapper {
void insert(LocalMessageInsertDto message);
@Nullable
ProcessingLocalMessageDto takePending(@Nullable String category, Tmid messageId);
List<ProcessingLocalMessageDto> batchTakePending(LocalMessageStatelessCriteriaDto criteria, int batchSize);
@Nullable
ProcessingLocalMessageDto takeFirstPending(LocalMessageStatelessCriteriaDto criteria, boolean skipFailed);
int updateStatus(LocalMessageStatusUpdateDto message);
int resetZombie(LocalMessageStatelessCriteriaDto criteria, LocalDateTime threshold);
}3.7. LocalMessageMapper.xml
XML
<mapper namespace="per.tenormis.hellomicroservice.utils.localmsg.mapper.LocalMessageMapper">
<sql id="CATEGORY_VARIABLE"><![CDATA[
SET @category := #{criteria.category};
SELECT c.COLUMN_DEFAULT
INTO @category
FROM information_schema.COLUMNS c
WHERE @category IS NULL
AND c.TABLE_SCHEMA = DATABASE()
AND c.TABLE_NAME = 'local_message_tbl'
AND c.COLUMN_NAME = 'category';
]]>
</sql>
<insert id="insert"><![CDATA[
INSERT local_message_tbl(category, message_id, exchange, routing_key, message, state, max_retry_count, next_retry_time)
VALUES (COALESCE(#{category}, DEFAULT(category)), #{messageId}, #{exchange}, #{routingKey}, #{message}, 'PENDING',
COALESCE(#{maxRetryCount}, DEFAULT(max_retry_count)), COALESCE(#{nextRetryTime}, NOW()));
]]>
</insert>
<select id="takePending" resultType="per.tenormis.hellomicroservice.utils.localmsg.domain.ProcessingLocalMessageDto"><![CDATA[
SET @category := #{category};
SELECT c.COLUMN_DEFAULT
INTO @category
FROM information_schema.COLUMNS c
WHERE @category IS NULL
AND c.TABLE_SCHEMA = DATABASE()
AND c.TABLE_NAME = 'local_message_tbl'
AND c.COLUMN_NAME = 'category';
SELECT lm.message_id
INTO @fetched
FROM local_message_tbl lm
WHERE lm.category = @category
AND lm.message_id = #{messageId}
AND lm.state = 'PENDING'
FOR
UPDATE SKIP LOCKED;
UPDATE local_message_tbl lm
SET lm.state = 'PROCESSING',
lm.version = lm.version + 1,
lm.update_time = NOW()
WHERE lm.category = @category
AND lm.message_id = @fetched;
SELECT lm.message_id, lm.exchange, lm.routing_key, lm.message, lm.retry_count, lm.max_retry_count, lm.version
FROM local_message_tbl lm
WHERE lm.category = @category
AND lm.message_id = @fetched;
]]>
</select>
<select id="batchTakePending" resultType="per.tenormis.hellomicroservice.utils.localmsg.domain.ProcessingLocalMessageDto">
<include refid="CATEGORY_VARIABLE"/><![CDATA[
CREATE TEMPORARY TABLE fetched (
message_id BINARY(16)
);
INSERT INTO fetched (message_id)
SELECT lm.message_id
FROM local_message_tbl lm
WHERE lm.category = @category]]><if test="criteria.exchange != null and criteria.exchange != ''"><![CDATA[
AND lm.exchange = #{criteria.exchange}]]></if><if test="criteria.routingKey != null and criteria.routingKey != ''"><![CDATA[
AND lm.routing_key = #{criteria.routingKey}]]></if><![CDATA[
AND lm.state = 'PENDING'
AND lm.next_retry_time <= NOW()
ORDER BY lm.message_id
LIMIT ${batchSize}
FOR
UPDATE SKIP LOCKED;
UPDATE local_message_tbl lm
SET lm.state = 'PROCESSING',
lm.version = lm.version + 1,
lm.update_time = NOW()
WHERE lm.category = @category
AND lm.message_id IN (SELECT f.message_id FROM fetched f);
SELECT lm.message_id, lm.exchange, lm.routing_key, lm.message, lm.retry_count, lm.max_retry_count, lm.version
FROM local_message_tbl lm
WHERE lm.category = @category
AND lm.message_id IN (SELECT f.message_id FROM fetched f)
ORDER BY message_id;
DROP TEMPORARY TABLE fetched;
]]>
</select>
<select id="takeFirstPending" resultType="per.tenormis.hellomicroservice.utils.localmsg.domain.ProcessingLocalMessageDto">
<include refid="CATEGORY_VARIABLE"/><![CDATA[
SELECT lm.message_id
INTO @orderlyFirst
FROM local_message_tbl lm
WHERE lm.category = @category]]><if test="criteria.exchange != null and criteria.exchange != ''"><![CDATA[
AND lm.exchange = #{criteria.exchange}]]></if><if test="criteria.routingKey != null and criteria.routingKey != ''"><![CDATA[
AND lm.routing_key = #{criteria.routingKey}]]></if><![CDATA[
AND lm.state IN ('PENDING', 'PROCESSING']]><if test="!skipFailed"><![CDATA[, 'FAILED']]></if><![CDATA[)
ORDER BY lm.message_id
LIMIT 1
FOR
UPDATE;
SELECT lm.message_id
INTO @fetched
FROM local_message_tbl lm
WHERE lm.category = @category
AND lm.message_id = @orderlyFirst
AND lm.state = 'PENDING'
AND lm.next_retry_time <= NOW();
UPDATE local_message_tbl lm
SET lm.state = 'PROCESSING',
lm.version = lm.version + 1,
lm.update_time = NOW()
WHERE lm.category = @category
AND lm.message_id = @fetched;
SELECT lm.message_id, lm.exchange, lm.routing_key, lm.message, lm.retry_count, lm.max_retry_count, lm.version
FROM local_message_tbl lm
WHERE lm.category = @category
AND lm.message_id = @fetched;
]]>
</select>
<update id="updateStatus"><![CDATA[
UPDATE local_message_tbl
SET ]]><if test="update.state != null"><![CDATA[
state = #{update.state},]]></if><if test="update.retryCount != null"><![CDATA[
retry_count = #{update.retryCount},]]></if><if test="update.nextRetryTime != null"><![CDATA[
next_retry_time = #{update.nextRetryTime},]]></if><if test="update.errMsg != null"><![CDATA[
err_msg = #{update.errMsg},]]></if><![CDATA[
update_time = NOW(),
version = version + 1
WHERE category = COALESCE(#{criteria.category}, DEFAULT(category))
AND message_id = #{criteria.messageId}
AND version = #{criteria.version};
]]>
</update>
<update id="resetZombie"><![CDATA[
UPDATE local_message_tbl
SET state = 'PENDING',
next_retry_time = NOW(),
update_time = NOW(),
version = version + 1
WHERE category = COALESCE(#{criteria.category}, DEFAULT(category))]]><if test="criteria.exchange != null and criteria.exchange != ''"><![CDATA[
AND exchange = #{criteria.exchange}]]></if><if test="criteria.routingKey != null and criteria.routingKey != ''"><![CDATA[
AND routing_key = #{criteria.routingKey}]]></if><![CDATA[
AND state = 'PROCESSING'
AND update_time < #{threshold};
]]>
</update>
</mapper>3.8. LocalMessageTemplate.java
Java
@Component
@RequiredArgsConstructor
@NonNullByDefault
public class LocalMessageTemplate {
private final LocalMessageMapper messageMapper;
private final PlatformTransactionManager transactionManager;
public void insert(LocalMessageInsertDto message) throws DuplicateKeyException {
messageMapper.insert(message);
}
@Transactional
public void insertAtomically(Runnable businessLogic, LocalMessageInsertDto message) throws DuplicateKeyException {
businessLogic.run();
insert(message);
}
@Transactional
public void insertAtomically(Runnable businessLogic, Supplier<@NonNull LocalMessageInsertDto> messageSupplier) throws DuplicateKeyException {
businessLogic.run();
insert(messageSupplier.get());
}
@Transactional
public <T> @Nullable T insertAtomically(Supplier<T> businessLogic, LocalMessageInsertDto message) throws DuplicateKeyException {
T result = businessLogic.get();
insert(message);
return result;
}
@Transactional
public <T> @Nullable T insertAtomically(Supplier<T> businessLogic, Function<T, @NonNull LocalMessageInsertDto> messageFunc) throws DuplicateKeyException {
T result = businessLogic.get();
insert(messageFunc.apply(result));
return result;
}
@Transactional
public @Nullable ProcessingLocalMessageDto takePending(@Nullable String category, Tmid messageId) {
return messageMapper.takePending(category, messageId);
}
@Transactional
public List<ProcessingLocalMessageDto> batchTakePending(LocalMessageStatelessCriteriaDto criteria, int batchSize) {
return messageMapper.batchTakePending(criteria, batchSize);
}
@Transactional
public @Nullable ProcessingLocalMessageDto takeFirstPending(LocalMessageStatelessCriteriaDto criteria, boolean skipFailed) {
return messageMapper.takeFirstPending(criteria, skipFailed);
}
public int markAsProcessed(@Nullable String category, ProcessingLocalMessageDto message) {
return messageMapper.updateStatus(new LocalMessageStatusUpdateDto()
.setUpdate(new LocalMessageStatusUpdateDto.Update().setState(LocalMessageState.PROCESSED))
.setCriteria(getCriteria(category, message)));
}
public int markAsFailed(@Nullable String category, ProcessingLocalMessageDto message, String reason) {
int newRetryCount = message.getRetryCount() + 1;
if (newRetryCount >= message.getMaxRetryCount()) {
return messageMapper.updateStatus(new LocalMessageStatusUpdateDto()
.setUpdate(new LocalMessageStatusUpdateDto.Update()
.setState(LocalMessageState.FAILED)
.setRetryCount(newRetryCount)
.setErrMsg(reason))
.setCriteria(getCriteria(category, message)));
} else {
// TODO Use functional interface to calculate 'nextRetryTime' instead
LocalDateTime nextRetryTime = LocalDateTime.now().plusSeconds((long) Math.pow(2, newRetryCount));
return messageMapper.updateStatus(new LocalMessageStatusUpdateDto()
.setUpdate(new LocalMessageStatusUpdateDto.Update()
.setState(LocalMessageState.PENDING)
.setRetryCount(newRetryCount)
.setNextRetryTime(nextRetryTime)
.setErrMsg(reason))
.setCriteria(getCriteria(category, message)));
}
}
public boolean processOnce(@Nullable String category, ProcessingLocalMessageDto message, Consumer<ProcessingLocalMessageDto> businessLogic) {
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
if (markAsProcessed(category, message) == 0) {
transactionManager.rollback(status);
return false;
}
try {
businessLogic.accept(message);
} catch (Exception e) {
transactionManager.rollback(status);
markAsFailed(category, message, e.getMessage());
throw e;
}
transactionManager.commit(status);
return true;
}
public int resetZombie(LocalMessageStatelessCriteriaDto criteria, LocalDateTime threshold) {
return messageMapper.resetZombie(criteria, threshold);
}
private static LocalMessageStatusUpdateDto.Criteria getCriteria(@Nullable String category, ProcessingLocalMessageDto message) {
return new LocalMessageStatusUpdateDto.Criteria()
.setCategory(category)
.setMessageId(message.getMessageId())
.setVersion(message.getVersion());
}
}4. 使用示例
4.1. 生产端
4.1.1. 创建订单
Java
@PostMapping("create")
public void create(@NotNull @RequestParam("userId") Tmid userId, @NotNull String product, @NotNull BigDecimal price) {
localMessageTemplate.insertAtomically(() -> {
Tmid orderId = tmidGen.generate();
orderMapper.insert(new OrderInsertDto()
.setOrderId(orderId)
.setUserId(userId)
.setProductName(product));
return orderId;
}, orderId -> new LocalMessageInsertDto()
.setCategory("producer")
.setMessageId(tmidGen.generate())
.setRoutingKey("order-created")
.setMessage(jackson.writeValueAsString(new OrderCreatedMessageDto()
.setOrderId(orderId)
.setUserId(userId)
.setProduct(product)
.setPrice(price))));
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
4.1.2. 消息发送任务
Java
@Scheduled(fixedDelay = 1000)
public void localMessageProcessTask() {
for (ProcessingLocalMessageDto message : localMessageTemplate.batchTakePending(new LocalMessageStatelessCriteriaDto("producer"), 100)) {
try {
Message msg = MessageBuilder
.withBody(message.getMessage().getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setMessageId(message.getMessageId().toString())
.build();
// caution: sample code, reliable delivery not enabled.
rabbitTemplate.send(message.getExchange(), message.getRoutingKey(), msg);
localMessageTemplate.markAsProcessed("producer", message);
} catch (Exception e) {
localMessageTemplate.markAsFailed("producer", message, e.getMessage());
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
4.2. 消费端
4.2.1. 接收消息
Java
@RabbitListener(queuesToDeclare = @Queue("order-created"))
public void orderCreated(Message message) {
// caution: sample code, reliable consumption not enabled.
MessageProperties msgProps = message.getMessageProperties();
Tmid messageId = new Tmid(msgProps.getMessageId());
localMessageTemplate.insert(new LocalMessageInsertDto()
.setCategory("consumer")
.setMessageId(messageId)
.setExchange(msgProps.getReceivedExchange())
.setRoutingKey(msgProps.getReceivedRoutingKey())
.setMessage(new String(message.getBody())));
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
4.2.2. 扣减库存任务
Java
@Scheduled(fixedDelay = 1000)
public void localMessageProcessTask() {
for (ProcessingLocalMessageDto message : localMessageTemplate.batchTakePending(new LocalMessageStatelessCriteriaDto("consumer"), 100)) {
localMessageTemplate.processOnce("consumer", message, this::processLocalMessage);
}
}
private void processLocalMessage(ProcessingLocalMessageDto msg) {
OrderCreatedMessageDto orderCreated = Objects.requireNonNull(jackson.readValue(msg.getMessage(), OrderCreatedMessageDto.class));
userMapper.pay(orderCreated.getUserId(), orderCreated.getPrice());
}1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11