Appearance
消息的可靠投递和消费
1. MQ 原则
数据不能多,也不能少,不能多是说消息不能重复消费,这个我们上一节已解决;不能少,就是说不能丢失数据。如果 MQ 传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的。
2. 丢失数据场景
丢数据一般分为两种,一种是 MQ 把消息丢了,一种就是消费时将消息丢了。下面从 RabbitMQ 和 Kafka 分别说一下,丢失数据的场景。
2.1. RabbitMQ
生产者弄丢了数据
生产者将数据发送到 RabbitMQ 的时候,可能在传输过程中因为网络等问题而将数据弄丢了。
RabbitMQ 自己丢了数据
如果没有开启 RabbitMQ 的持久化,那么 RabbitMQ 一旦重启,那么数据就丢了。所依必须开启持久化将消息持久化到磁盘,这样就算 RabbitMQ 挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,RabbitMQ 还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。
消费端弄丢了数据
主要是因为消费者消费时,刚消费到,还没有处理,结果消费者就挂了,这样你重启之后,RabbitMQ 就认为你已经消费过了,然后就丢了数据。
RabbitMQ 数据丢失示意图:

2.2. Kafka
生产者弄丢了数据
生产者没有设置相应的策略,发送过程中丢失数据。
Kafka 弄丢了数据
比较常见的一个场景,就是 Kafka 的某个 broker 宕机了,然后重新选举 partition 的 leader 时。如果此时 follower 还没来得及同步数据,leader 就挂了,然后某个 follower 成为了 leader,他就少了一部分数据。
消费者弄丢了数据
消费者消费到了这个数据,然后消费之自动提交了 offset,让 Kafka 知道你已经消费了这个消息,当你准备处理这个消息时,自己挂掉了,那么这条消息就丢了。
Kafka 数据丢失示意图:

3. 如何防止消息丢失
3.1. RabbitMQ
3.1.1. 生产者丢失消息
3.1.1.1. RabbitMQ 事务
可以选择使用 RabbitMQ 提供的事物功能,就是生产者在发送数据之前开启事物,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会受到异常报错,这时就可以回滚事物,然后尝试重新发送;如果收到了消息,那么就可以提交事物。
Java
channel.txSelect();// 开启事物
try{
// 发送消息
}catch(Exection e){
channel.txRollback();// 回滚事物
// 重新提交
}1
2
3
4
5
6
7
2
3
4
5
6
7
缺点:RabbitMQ 事物已开启,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。
3.1.1.2. RabbitMQ Publish Confirm
可以开启 confirm 模式。在生产者哪里设置开启了 confirm 模式之后,每次写的消息都会分配一个唯一的 id,然后如何写入了 RabbitMQ 之中,RabbitMQ 会给你回传一个 ack 消息,告诉你这个消息发送 OK 了;如果 RabbitMQ 没能处理这个消息,会回调你一个 nack 接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的 id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。
Java
// 开启 confirm
channel.confirm();
// 发送成功回调
public void ack(String messageId){
}
// 发送失败回调
public void nack(String messageId){
// 重发该消息
}1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
3.1.1.3. 二者不同
事务机制是同步的,你提交了一个事物之后会阻塞住,但是 confirm 机制是异步的,发送消息之后可以接着发送下一个消息,然后 RabbitMQ 会回调告知成功与否。
一般在生产者这块避免丢失,都是用 confirm 机制。
3.1.2. RabbitMQ 自己弄丢了数据
设置消息持久化到磁盘。设置持久化有两个步骤:
- 创建 queue 的时候将其设置为持久化的,这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里面的数据;
- 发送消息的时候讲消息的
deliveryMode设置为 2,这样消息就会被设为持久化方式,此时 RabbitMQ 就会将消息持久化到磁盘上;
必须要同时开启这两个才可以。
而且持久化可以跟生产的 confirm 机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者 ack,这样就算是在持久化之前 RabbitMQ 挂了,数据丢了,生产者收不到 ack 回调也会进行消息重发。
3.1.3. 消费者弄丢了数据
使用 RabbitMQ 提供的 ack 机制,首先关闭 RabbitMQ 的自动 ack,然后每次在确保处理完这个消息之后,在代码里手动调用 ack。这样就可以避免消息还没有处理完就 ack。
3.2. Kafka
3.2.1. 消费端弄丢了数据
关闭自动提交 offset,在自己处理完毕之后手动提交 offset,这样就不会丢失数据。
3.2.2. Kafka 弄丢了数据
一般要求设置 4 个参数来保证消息不丢失:
- 给 topic 设置
replication.factor参数:这个值必须大于 1,表示要求每个 partition 必须至少有 2 个副本; - 在 Kafka 服务端设置
min.isync.replicas参数:这个值必须大于 1,表示 要求一个 leader 至少感知到有至少一个 follower 在跟自己保持联系正常同步数据,这样才能保证 leader 挂了之后还有一个 follower; - 在生产者端设置
acks=all:表示要求每条每条数据,必须是写入所有 replica 副本之后,才能认为是写入成功了 - 在生产者端设置
retries=MAX(很大的一个值,表示无限重试):表示这个是要求一旦写入事变,就无限重试
3.2.3. 生产者弄丢了数据
如果按照上面设置了 ack=all,则一定不会丢失数据,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。