RabbitMQ高级特性 1)分组消费
RabbitMQ可以对消费者进行分组,不同的group,都会消费到同样的一份message副本,而在同一个group中,只会有一个消费者消费到一个message。
想要实现分组消费,得依赖于SpringCloudStream微服务框架,不然只能自己封装原生API。
2)死信队列 死信队列是一种比较特殊的队列,当一条消息不能正常被消费时,就会转发给死信交换机,死信交换机会转发给专门补救消息的普通队列,而专门的消费者会监听该队列,收到消息后进行消费,是对于不能正常被消费的消息的挽救措施。
一般用比较多的场景 都是 通过死信队列+TTL 来达到 延迟队列的效果。
名称解释
死信交换机
死信队列
补救队列
这个补救队列是我自己起的名称,用于实现延迟队列的方案,死信队列的消息变成死信后会转发给死信交换机,死信交换机收到后再转发给补救队列,然后会有一个专门处理补救措施的消费者来监听该队列
1.产生死信的原因
消息被消费者明确拒绝接收
消息达到了TTL 存活时间后还没有被消费
TTL(Time-To-Live):存活时间,也可以理解为是过期时间,一旦超过了存活时间还没被消费,则认定为 死信
消息达到了队列的最大长度限制而被丢弃
2.创建死信交换机
死信交换机创建的过程和普通的交换机一样,没有什么比较特殊的地方,也可以当成是普通的交换机
只是 在死信队列中死信交换机绑定了这个交换机,且这个交换机 会绑定一个 补救队列,当死信队列的消息变成死信后会转发给死信交换机,死信交换机会转发给 补救队列
3.创建死信队列
创建死信队列时,我们必须在Arguments 增加 死信交换机的信息,如下图:
添加死信队列成功后,队列列表会有一个明确的DLX字段标识。
3)死信队列+TTL实现 延迟队列(不推荐) 延迟队列一般用于 定时操作,给消息设置一个TTL 存活时间,在达到存活时间后进行消费。
案例一:
需求:客户下单后,30分钟内未支付则关闭订单,并回退库存,发送通知告知客户订单已取消。
实现思路:客户下单后,服务器向MQ发送一条消息(TTL30分钟),等待30分钟之后消息变成死信,进入死信交换机后,交换机会转发给指定的队列,服务器收到消息后进行 相应的逻辑操作。
案例二:
需求:客户开通一个月的VIP会员,需要在一个月后关闭该会员状态,并提醒客户会员已过期并进行续费。
实现思路:客户下单后,服务器向MQ发送一条消息(TTL30天),等待30天之后消息变成死信,进入死信交换机后,交换机会转发给指定的队列,服务器收到消息后进行 相应的逻辑操作。
注意:不推荐该方案,因为该方案会阻塞消息,具体可以看下面的章节,我有特地描述了一下这个问题。
接下来,我们模拟10秒后死信队列的消息变成死信后,转发给死信交换机,并且消费者监听 死信交换机的补救队列。
1.创建死信交换机
值得注意的是,如果通过死信队列+TTL的形式来达到 延迟队列的效果,死信交换机的类型必须是fanout,其他类型都不会成功,我实测的.. 如果你有其他不同的看法,可以和我一起探讨。
2.创建死信队列
3.创建补救队列
补救队列只是个普通的队列,只是会和死信交换机进行绑定而已,当消息进入死信交换机后,会通过广播的形式发送给补救队列。
在死信交换机中绑定 补救队列
4.发送消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @GetMapping(value="/deadSend") public Object deadSend () throws AmqpException, UnsupportedEncodingException { User user = new User (); user.setUsername("admin" ); user.setPassword("123456" ); user.setAge(12 ); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter ()); rabbitTemplate.convertAndSend("dead-letter-queue" ,user, new MessagePostProcessor () { @Override public Message postProcessMessage (Message message) throws AmqpException { message.getMessageProperties().setExpiration("10000" ); return message; } }); return "message sended : " +user; }
5.消费者监听消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package com.javaxing.springbootrabbitmq.consumer;import com.javaxing.springbootrabbitmq.util.MyConstants;import com.rabbitmq.client.*;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.core.Message;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.HashMap;import java.util.Map;@Component public class DirectReceiver { @RabbitListener(queues = "resurgence-queue") public void resurgenceReceiver (String message) { System.out.println("resurgenceReceiver received message : " + message); } }
6.阻塞消息的问题 当我们往消息队列发送第一条消息 TTL 120秒,随后在发送第二条消息 TTL 5秒,那么就会出现消息阻塞的问题。
问题:第一条消息TTL没有到期之前,第二条消息TTL就不会到期。
案例:
如果有10个用户开通了会员,到期时间都不一样,那么 第一条进入队列的消息如果不过期,后面的消息哪怕TTL很短也不会过期。
验证阻塞问题 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @GetMapping(value="/deadSend") public Object deadSend (String ttl) throws AmqpException, UnsupportedEncodingException { User user = new User (); user.setUsername("admin" ); user.setPassword("123456" ); user.setAge(12 ); user.setCreateTime(DateUtil.now()); user.setTtl(ttl); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter ()); rabbitTemplate.convertAndSend("dead-letter-queue" ,user, new MessagePostProcessor () { @Override public Message postProcessMessage (Message message) throws AmqpException { message.getMessageProperties().setExpiration(ttl); return message; } }); return "message sended : " +user +" send time:" + DateUtil.now() +" ttl:" +ttl; }
第一条消息 TTL 120秒后过期
第二条消息 TTL 3秒后过期
此时队列中,存在2条消息
但是我们发现,30秒过去了,第二条消息的TTL依旧没有到期而变成死信。
第一条消息TTL 120秒到期后,第二条消息TTL才会随着过期
死信队列+TTL实现的延迟队列必须等前面的消息先到期后,才能进入下一条消息,这样就违背了我们做延迟队列的初衷。
4)官方插件实现延迟队列(推荐) 1.安装插件 插件github:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
实验3.8.8版本插件地址(国内服务器):https://files.javaxing.com/rabbitmq_delayed_message_exchange-3.8.0.ez
为了方便,这里提供了实验用的插件下载地址,如果你的版本不是3.8.8,可以去github下载。
注意:插件的版本必须与RabbitMQ服务端版本一致,否则插件会安装不上。
版本不一致的错误提示
下载插件并安装
下载插件到:rabbitmq/plugins 目录
查找RabbitMQ插件目录
1 [root@S1 ~]# find / -iname "*rabbit*"
安装命令
1 [root@S1 plugins]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
注意:
如果是集群的话,集群每个服务端都要安装插件
如果是高版本的rabbitMQ,如 3.8.8 不需要重启服务端也可以直接使用
如何验证插件安装成功?
当我们再次刷新Web管理界面后,添加新的交换机时,会出现新的选项x-delayed-message,就代表插件安装成功。
2.创建延迟交换机和延迟队列
延迟队列只是一个很普通的队列,只是和延迟交换机绑定了而已
创建延迟交换机时,需要指定 交换机的类型为:x-delayed-message
以下2种方式,代码和Web管理端 不论哪种创建都可以。
步骤分为三步:
创建死信交换机
创建普通的队列
把队列绑定到死信交换机中,根据实际情况是否要绑定routingKey
1.代码创建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 package com.javaxing.springbootrabbitmq.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.CustomExchange;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.amqp.core.Queue;import java.util.HashMap;@Configuration public class DelayedConfig { String DELAY_QUEUE = "delay.queue" ; String EXCHANGE = "delay.exchange" ; String ROUTE_KEY = "delay.routeKey" ; @Bean Queue delayQueue () { return new Queue (DELAY_QUEUE); } @Bean public CustomExchange delayExchange () { HashMap<String, Object> args = new HashMap <>(); args.put("x-delayed-type" ,"direct" ); return new CustomExchange (EXCHANGE,"x-delayed-message" ,true ,false ,args); } @Bean public Binding bindDelayExchange (@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") CustomExchange delayExchange) { return BindingBuilder.bind(delayQueue).to(delayExchange).with(ROUTE_KEY).noargs(); } }
1、创建延迟队列
2、创建延迟交换机
3、在延迟交换机里面绑定延迟队列
验证是否创建成功
2.Web管理端创建 创建延迟交换机
创建延迟队列
绑定关系
3.生产者发送消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @GetMapping(value="/delayedSend") public Object delayedSend () throws AmqpException, UnsupportedEncodingException { User user = new User (); user.setUsername("admin" ); user.setPassword("123456" ); user.setAge(12 ); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter ()); rabbitTemplate.convertAndSend("delay.exchange2" ,"order" ,user, message -> { message.getMessageProperties().setDelay(3000 ); return message; }); return "message sended : " +user; }
我们可以根据routingKey 来分配给不同的消费队列,如果不想使用routingKey,可以为空。那么交换机收到消息后就会转发给所有绑定的队列。
如订单服务端 则只需要监听 routingKey -> order,支付服务端 只监听 routingKey -> pay ,以此类推。
4.消费者消费消息 监听的队列 delay.queue,是直接监听 死信交换机所绑定的交换机队列。
生产者发送消息的话,是直接发送到 死信交换机去的:
如果消息指定了routingKey,那么交换机就会发送到匹配routingKey的队列。
如果消息不指定routingKey,交换机则会广播给所有队列。
1 2 3 4 @RabbitListener(queues = "delay.queue2") public void delayQueue (String message) { System.out.println("delayQueueReceiver received message : " + message+" received time:" + DateUtil.now()); }
5.验证是否会阻塞消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @GetMapping(value="/delayedSend") public Object delayedSend (Integer ttl) throws AmqpException, UnsupportedEncodingException { User user = new User (); user.setUsername("admin" ); user.setPassword("123456" ); user.setAge(12 ); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter ()); rabbitTemplate.convertAndSend("delay.exchange" ,"delay.routeKey" ,user, message -> { message.getMessageProperties().setDelay(ttl); return message; }); return "message sended : " +user +" send time:" + DateUtil.now() +" ttl:" +ttl; }
第一条消息TTL 120秒
第二条消息TTL 10秒
此时延迟交换机中有2条延迟消息
10秒后,消费者收到了第二条消息,120秒后收到了第一条消息,这样才符合我们的需求。
5)优先队列 默认情况下,所有队列中的消息优先级同一优先级(都是0),就会根据先进先出原则按顺序发送给消费者。
但是如果我们给队列中的某个消息设置了优先级后,RabbitMQ会把优先级高的消息先发送消息给消费者 ,而同一优先级消息还是根据先进先出原则。
注意:RabbitMQ 3.5.0之后才新增的功能,低于这个版本是没有优先级的,且优先级的概念是针对于消息的 。
1.开启优先级队列
这里我们对队列设置了一个最大优先级,范围建议 1- 255 ,类型是number类型。这个步骤是必须的,这里既设置了最大优先级,也代表开启优先级队列。
注意:这里一定要设置最大优先级,否则是不会开启优先级队列的,哪怕你发送消息时设置了优先级也毫无意义。
2.生产者发送消息
当我们开启了优先级队列后,就可以在发送消息时候设置优先级。
1 2 3 4 5 6 7 8 9 10 11 @GetMapping(value = "/prioritySend") public void prioritySend (Integer priority) { Message message = MessageBuilder.withBody("testMsg" .getBytes()).setPriority(priority).build(); rabbitTemplate.convertAndSend("priority-queue" , message); System.out.println("message sended success" + "queue name :" + "priority-queue" + "priority :" + priority); }
我们先关闭监听功能,然后发送4条消息到达队列后,让消息在队列中堆积起来。
默认情况下,所有消息优先级都一致,那么消费顺序应该是先进先出,但是这里我们设置了优先级,正确顺序应该是:4 3 2 1
3.消费者消费消息 1 2 3 4 5 6 7 8 @RabbitListener(queues = "priority-queue") public void priorityQueue1 (Message message, Channel channel) throws IOException { byte [] body = message.getBody(); MessageProperties messageProperties = message.getMessageProperties(); String content = new String (body, Charset.forName("UTF-8" )); System.out.println("来自交换机: 【" + messageProperties.getReceivedExchange() + "】, 队列:【" + messageProperties.getConsumerQueue()+ "】:\n内容: " + content); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true ); }
输出结果
确实如我们一开始想的一样,按照我们预设的消息优先级进行输出,同一个队列中 优先级高的消息,会先发送给消费者。
6)远程数据分发 远程数据分发插件:Federation Plugin
如果我们在北京和上海各有一个MQ集群机房,此时我们需要上海的机房可以同步北京机房的数据,我们就可以使用到 远程数据分发插件。
因为跨维度比较大,且集群的同步是双向的,对网络开销会比较大,所以不适合搭建集群,但是我们如果用了数据分发插件,则可以让 上游分发数据给下游。
1.启动插件 只需要在下游服务器上安装插件,安装成功后无需重启。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 [root@S4 ~]# rabbitmq-plugins enable rabbitmq_federation Enabling plugins on node rabbit@S4 : rabbitmq_federation The following plugins have been configured: rabbitmq_federation rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@S4 ... The following plugins have been enabled: rabbitmq_federation started 1 plugins. [root@S4 ~]# rabbitmq-plugins enable rabbitmq_federation_management Enabling plugins on node rabbit@S4 : rabbitmq_federation_management The following plugins have been configured: rabbitmq_federation rabbitmq_federation_management rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@S4 ... The following plugins have been enabled: rabbitmq_federation_management started 1 plugins. [root@S4 ~]#
2.配置Upstream 配置上游服务器信息,这样的话数据就会从上游服务主动同步到下游服务中。
name:名字可以随意定义
uri:amqp://admin:123456@S1:5672/,如果我们在Upstream中已经指定了Virtual host,则在URI后面不需要在携带虚拟主机,直接写amqp://admin:123456@S1:5672
3.配置Federation策略
Pattern:^fed_* ,匹配以fed_开头,任意字符结尾的交换机或队列的消息
Definition:federation-upstream-set 指定配合接收上游的数据集合,federation-upstream 上游数据,all代表全部接收
4.测试 当我们配置完Upstream和Federation策略后,就可以进入Federation Status菜单查看 分发状态。
在远程服务RabbitMQ服务中,可以看到 插件为我们自动生成的对应生成的Federation交换机。
7)懒队列 默认情况下,RabbitMQ接收到消息时,会保存到内存以便使用,同时把消息写到硬盘。
但是如果消息堆积太多,内存不够用,就会内存溢出,为此我们可以引入 懒队列的概念,牺牲硬盘IO从而减少内存占用。
懒队列的设计目标是为了支持非常长的队列(数百万级别),它是作为大数据量堆积的消息队列的优化手段。
核心概念:懒队列会尽可能将消息内容保存到硬盘当中,并且只有在用户请求到时,才临时从硬盘加载到RAM内存当中。
启用懒队列只需要在创建队列时指定参数即可:
8)消息分片存储插件 Sharding Plugin 大量数据堆积的长队列会降低rabbitMQ性能,为此我们上面引入了 懒队列来解决,但这并不能治本,所以我们可以使用 消息分片的方式来解决长队列。
在rabbitMQ中也提供了一种类似于 分库分表的方案,可以把一个队列的消息分片存储在多个队列中,并提供负载均衡的读与写功能,从而降低长队列大量数据堆积的问题。
好处:
提高队列的吞吐量,提供消费性能
解决数据堆积的长队列问题
1.安装插件 1 [root@S1 ~]# rabbitmq-plugins enable rabbitmq_sharding
2.配置Sharding策略
查看策略:
3.新增分片交换机
添加后的交换机状态:
交换机详情:
查看队列列表:
当我们添加分片交换机后,他会根据环境给我们创建相应的分片队列,我们刚才指定的是分成3个队列,但是由于我们是集群环境(3个节点),他会为每个集群的节点都分片3个队列,总共9个队列。
4.发送10W条消息到该交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @GetMapping(value = "/shardingSend") public Object shardingSend () throws AmqpException, UnsupportedEncodingException { User user = new User (); user.setUsername("admin" ); user.setPassword("123456" ); user.setAge(12 ); for (int i = 0 ; i < 100000 ; i++) { rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter ()); rabbitTemplate.convertAndSend("sharding_exchange" ,"sharding" , user); } return "message sended : " + user; }
为什么都分发到了同一个队列中,这个由 插件自行决定的,包括在消费消息时,会有一套算法来决定 消息哪个队列的消息。
5.消费消息 消费规则:会根据哪个分片队列的消费者客户端少,就先消费哪个队列 的消息。
消费消息的时候,我需要指定一个伪队列:sharding_exchange(分片交换机的名称) ,这个队列并没有真实存在,但是他能通过这个伪队列找到其他分片队列进行消费。
1 2 3 4 5 6 7 8 @RabbitListener(queues = "sharding_exchange") public void priorityQueue1 (Message message, Channel channel) throws IOException { byte [] body = message.getBody(); MessageProperties messageProperties = message.getMessageProperties(); String content = new String (body, Charset.forName("UTF-8" )); System.out.println("来自交换机: 【" + messageProperties.getReceivedExchange() + "】, 队列:【" + messageProperties.getConsumerQueue()+ "】:\n内容: " + content); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true ); }
6.注意事项 1、Sharding分片存储插件适合于那些对于消息延迟要求不严格 ,以及对消费顺序没有任何要求 的的场景。
2、尽量使用伪队列消费,不要去消费单独的分片队列。