RabbitMQ如何保障消息不丢失

这个问题是所有MQ的共性问题,所有MQ都会面临 数据丢失的情况,但是不同的MQ产品有不同的解决方案,这里只描述 RabbitMQ。

1)可能丢失消息的环节

RabbitMQ如何保证消息不丢失

1.生产者发送消息

生产者在发送消息给MQ时,可能因为网络问题等情况,导致 消息发送失败,那么我们可以通过 生产者确认机制,来确保消息成功到达RabbitMQ。

生产者确认机制分为 同步确认和异步确认。

同步确认

​ 同步确认主要是通过在生产者端使用Channel.waitForConfirmsOrDie()指定一个等待确认的完成时间。

image-20230328173855932

异步确认

异步确认机制则是通过channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)在生产者端注入两个回调确认函数。

channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);

这里注册了2个监听器,发送者在发送完消息后,就会执行第一个监听器callback1,然后等服务端发过来的反馈后,再执行第二个监听器callback2。

sequenceNumber的生成方式需要通过channel的序列获取。int sequenceNumber = channel.getNextPublishSeqNo();

image-20230325183240833

如果我们要在第二个监听器的回调方法里面做校验,确保消息是否发送成功,我们需要自行记录消息ID到Map中,并在回调函数中 进行匹配,如果发送失败则考虑 管理员警报还是补发。

2.集群节点丢失消息

因为默认的集群模式 ,消息只会存在各自的节点上,节点之间并不会主动进行消息同步,所以当节点宕机后,就会存在丢失消息的可能性。

为此,我们可以把模式改成 镜像模式,这样的话数据就会在每个节点之间同步,确保集群消息的可靠性。

3.刷盘(存盘)问题

当消息进入MQ后会先保存到内存的page cache缓存区,然后再由操作系统写入到硬盘中。但是如果在写入硬盘的过程中服务器宕机了,就会出现消息丢失的情况。

为了解决这个问题,Classic队列就一定要设置成持久化队列,这样消息会直接存入到硬盘,丢失消息的概率就不高。

4.消费者消费数据时丢失数据

消费者消费消息时,是可以指定 是否要自动应答还是 手动应答,我们可以给他切换成手动应答,如果业务逻辑处理失败了,则可以考虑是否要通知管理员,还是继续尝试,但是按照我的经验来看,如果第一次处理失败了,再次去处理也还是会失败。

所以我的建议是 做一个try cache ,如果业务处理处了问题,就通知管理员,并且把数据发送到其他的队列进行保存,但是依旧告知MQ处理成功,否则很可能会一直不停的处理失败,陷入死循环。

在生产环境中一旦陷入死循环,日志文件就会变得非常大,随后就导致服务器硬盘溢出,并且会阻塞线程,导致后续的消息无法被正常消费。

如何保证消息幂等性

1)为什么会产生消息幂等性问题

因为RabbitMQ有一个重试机制,如果消费者消费的过程中抛出异常,或者不向MQ返回响应,MQ就会进行无限次的重试,确保消息的可靠性。

但是正因为这个重试机制,导致可能出现多个消费者收到同样的消息。

举个例子:客户在下单后,服务器发送给了MQ一条订单信息,库存服务器(消费者)收到后,在处理的过程中抛了异常,但是没有进行数据回滚,库存已经减1了,而MQ没有收到正确响应就进行重试,库存服务器再次收到这条消息又进行库存操作,以此往复…

2)解决方案

1.设置重试次数

我们可以在SpringBoot集成rabbitMQ时,通过一条命令 设置RabbitMQ 重试次数,避免重试次数太多导致的幂等性问题。

1
spring.rabbitmq.listener.simple.retry=1

image-20230328185336452

2.设置消息唯一标识

在生产者发送消息时,在消息上面指定一个全局唯一的Message ID,消费者收到消息后判断该Message ID是否已经消费过,如果没有消费过的话就进行消费,消费成功后把Message ID存入到redis中,消费过的话就返回正确应答。

1
2
3
4
5
6
7
8
9
10
11
/**
* 优先级队列测试
*
* @return
*/
@GetMapping(value = "/prioritySend")
public void prioritySend(Integer priority,String msg) {
Message message = MessageBuilder.withBody(msg.getBytes()).setPriority(priority).setMessageId(UUID.randomUUID().toString()).build();
rabbitTemplate.convertAndSend("priority-queue", message);
System.out.println("message sended success" + "queue name :" + "priority-queue" + "priority :" + priority);
}

消费者接收时,通过获取消息ID并在redis中查询

1
2
3
4
5
6
7
8
@RabbitListener(queues = "sharding_exchange")
public void priorityQueue1(Message message, Channel channel) throws IOException {
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
// 判断message id 在redis中是否已经消费了
messageProperties.getMessageId();
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}

3.业务代码自行判断

在做业务操作之前我们需要先判断消息对应的状态,如订单支付状态,订单发货状态等。

例如:我们通过获取订单ID去数据库或缓存中查询是否已经成功支付,或做了某些相应操作,如果做了就不进行业务操作,但是返回正确响应,如果没有做就继续往下执行业务逻辑。

如何保证消息的顺序

在rabbitMQ中很难保证消息的顺序,如果要保证消息的顺序的话就会影响到MQ的吞吐量,这是一个取舍的问题。

只有先进先出的队列,并且缩小消费者消费次数,才可以保证 消息的顺序。

注意:这里的顺序指的并不是MQ所有队列消息的顺序,而是局部的有序。

举个例子:

​ 客户下单后,服务器需要进行 计算优惠券、支付系统、库存、通知快递发货等操作,这种必须保证顺序有序,否则就会出现问题。

1)解决方案 - 单队列单消费模式

一组希望有序的消息,我们只发送到一个队列中,并且保证一个消费者进行消费,并且保证一次只读取一条消息。

因为队列先进先出的原则,可以确保消息在队列中百分百是有序的,但是消费者一次都是获取很多数据然后进行处理的,在消费者这段就会出现 乱序的情况,所以我们要确保只有一个消费者处理,并且消费者一次只能获取一条消息。

2)配置参数

1
spring.rabbitmq.listener.simple.prefetch=1
image-20230328190601120

该参数主要作用是确保每次消费者只会获取一条消息进行消费

如何避免数据堆积

由于RabbitMQ消息大量堆积会导致性能下降,所以RabbitMQ新推出了Quorum和Stream队列来解决这个问题。

除此之外,我们也可以通过其他的手段来限制消息堆积,手段都是来自于消费者这边的优化,因为你很难去优化生产者,更不可能去限流。

1)消费者优化手段

1.设置单次消费数量和线程

对于单个消费者,我们可以提高单次消费数量以及消费现场,来提高消费者的处理并发能力。

1
2
3
4
# 单次推送消息数量
spring.rabbitmq.listener.simple.prefetch=1
# 消费者的消费线程数量
spring.rabbitmq.listener.simple.concurrency=5

2.增加消费者数量

当消费端的服务器出现了故障导致大量消息堆积的话,就要立即增加消费端服务器,来加速消费掉积压的消息。

RabbitMQ的备份与恢复

我们可以通过RabbitMQ web管理端自带的导入导出功能,来完成数据备份。

但是值得注意的是,这种方式 只能备份、恢复 元数据(队列、交换机、虚拟机等不包含具体消息)。

1)元数据备份恢复

image-20230328211554473

2)消息数据备份和恢复

实际上并不建议来备份和恢复数据,会造成不可预料的后果,如 数据重复消费等问题。

备份

但是如果我们实在需要备份消息数据,可以通过 备份以下目录即可。

RabbitMQ有一个data目录会保存该节点的所有消息,目录地址:/var/lib/rabbitmq/mnesia

恢复

如果需要恢复的话,只需要将整个文件夹复制到新的服务中即可。