RabbitMQ高级特性

1)分组消费

高级特性-分组消费

RabbitMQ可以对消费者进行分组,不同的group,都会消费到同样的一份message副本,而在同一个group中,只会有一个消费者消费到一个message。

想要实现分组消费,得依赖于SpringCloudStream微服务框架,不然只能自己封装原生API。

2)死信队列

死信队列是一种比较特殊的队列,当一条消息不能正常被消费时,就会转发给死信交换机,死信交换机会转发给专门补救消息的普通队列,而专门的消费者会监听该队列,收到消息后进行消费,是对于不能正常被消费的消息的挽救措施。

一般用比较多的场景 都是 通过死信队列+TTL 来达到 延迟队列的效果。

名称解释

  • 死信交换机

    • 绑定 死信交换机的队列就称之为死信队列
  • 死信队列

    • 当一条消息不能正常被消费时,就会转发给死信交换机
  • 补救队列

    • 这个补救队列是我自己起的名称,用于实现延迟队列的方案,死信队列的消息变成死信后会转发给死信交换机,死信交换机收到后再转发给补救队列,然后会有一个专门处理补救措施的消费者来监听该队列

1.产生死信的原因

  • 消息被消费者明确拒绝接收
  • 消息达到了TTL 存活时间后还没有被消费
    • TTL(Time-To-Live):存活时间,也可以理解为是过期时间,一旦超过了存活时间还没被消费,则认定为 死信
  • 消息达到了队列的最大长度限制而被丢弃

2.创建死信交换机

  • 死信交换机创建的过程和普通的交换机一样,没有什么比较特殊的地方,也可以当成是普通的交换机
    • 只是 在死信队列中死信交换机绑定了这个交换机,且这个交换机 会绑定一个 补救队列,当死信队列的消息变成死信后会转发给死信交换机,死信交换机会转发给 补救队列

image-20230326180848119

3.创建死信队列

  • 绑定 死信交换机的队列就称之为死信队列。

创建死信队列时,我们必须在Arguments 增加 死信交换机的信息,如下图:

image-20230326171154813

添加死信队列成功后,队列列表会有一个明确的DLX字段标识。

image-20230326171228637

3)死信队列+TTL实现 延迟队列(不推荐)

延迟队列一般用于 定时操作,给消息设置一个TTL 存活时间,在达到存活时间后进行消费。

案例一:

需求:客户下单后,30分钟内未支付则关闭订单,并回退库存,发送通知告知客户订单已取消。

实现思路:客户下单后,服务器向MQ发送一条消息(TTL30分钟),等待30分钟之后消息变成死信,进入死信交换机后,交换机会转发给指定的队列,服务器收到消息后进行 相应的逻辑操作。

案例二:

需求:客户开通一个月的VIP会员,需要在一个月后关闭该会员状态,并提醒客户会员已过期并进行续费。

实现思路:客户下单后,服务器向MQ发送一条消息(TTL30天),等待30天之后消息变成死信,进入死信交换机后,交换机会转发给指定的队列,服务器收到消息后进行 相应的逻辑操作。

注意:不推荐该方案,因为该方案会阻塞消息,具体可以看下面的章节,我有特地描述了一下这个问题。

接下来,我们模拟10秒后死信队列的消息变成死信后,转发给死信交换机,并且消费者监听 死信交换机的补救队列。

1.创建死信交换机

image-20230326180848119

值得注意的是,如果通过死信队列+TTL的形式来达到 延迟队列的效果,死信交换机的类型必须是fanout,其他类型都不会成功,我实测的.. 如果你有其他不同的看法,可以和我一起探讨。

2.创建死信队列

image-20230326171154813

3.创建补救队列

image-20230326182241462

补救队列只是个普通的队列,只是会和死信交换机进行绑定而已,当消息进入死信交换机后,会通过广播的形式发送给补救队列。

在死信交换机中绑定 补救队列

image-20230326182529186

4.发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 死信队列+TTL实现延迟队列
* @return
* @throws AmqpException
* @throws UnsupportedEncodingException
*/
@GetMapping(value="/deadSend")
public Object deadSend() throws AmqpException, UnsupportedEncodingException {

// 创建实体类
User user = new User();
user.setUsername("admin");
user.setPassword("123456");
user.setAge(12);

//设置消息转换器,如json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend("dead-letter-queue",user, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
});
return "message sended : "+user;
}

5.消费者监听消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.javaxing.springbootrabbitmq.consumer;

import com.javaxing.springbootrabbitmq.util.MyConstants;
import com.rabbitmq.client.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Component
public class DirectReceiver {
// 消费者监听补救队列,收到消息后就进行处理,如:关闭订单,告知客户订单已取消等
@RabbitListener(queues = "resurgence-queue")
public void resurgenceReceiver(String message){
System.out.println("resurgenceReceiver received message : "+ message);
}
}

6.阻塞消息的问题

当我们往消息队列发送第一条消息 TTL 120秒,随后在发送第二条消息 TTL 5秒,那么就会出现消息阻塞的问题。

问题:第一条消息TTL没有到期之前,第二条消息TTL就不会到期。

案例:

如果有10个用户开通了会员,到期时间都不一样,那么 第一条进入队列的消息如果不过期,后面的消息哪怕TTL很短也不会过期。

验证阻塞问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 死信队列+TTL实现延迟队列
* @return
* @throws AmqpException
* @throws UnsupportedEncodingException
*/
@GetMapping(value="/deadSend")
public Object deadSend(String ttl) throws AmqpException, UnsupportedEncodingException {

// 创建实体类
User user = new User();
user.setUsername("admin");
user.setPassword("123456");
user.setAge(12);
user.setCreateTime(DateUtil.now());
user.setTtl(ttl);

//设置消息转换器,如json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend("dead-letter-queue",user, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(ttl);
return message;
}
});

return "message sended : "+user +" send time:"+ DateUtil.now() +" ttl:"+ttl;
}

第一条消息 TTL 120秒后过期

image-20230327123618517

第二条消息 TTL 3秒后过期

image-20230327123628558

此时队列中,存在2条消息

image-20230327122847277

但是我们发现,30秒过去了,第二条消息的TTL依旧没有到期而变成死信。

image-20230327123756656

第一条消息TTL 120秒到期后,第二条消息TTL才会随着过期

image-20230327123852018

死信队列+TTL实现的延迟队列必须等前面的消息先到期后,才能进入下一条消息,这样就违背了我们做延迟队列的初衷。

4)官方插件实现延迟队列(推荐)

1.安装插件

插件github:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

实验3.8.8版本插件地址(国内服务器):https://files.javaxing.com/rabbitmq_delayed_message_exchange-3.8.0.ez

为了方便,这里提供了实验用的插件下载地址,如果你的版本不是3.8.8,可以去github下载。

注意:插件的版本必须与RabbitMQ服务端版本一致,否则插件会安装不上。

版本不一致的错误提示

插件版本不对的错误

下载插件并安装

image-20230326183506664

下载插件到:rabbitmq/plugins 目录

查找RabbitMQ插件目录

1
[root@S1 ~]# find / -iname "*rabbit*"
image-20230831120726551

安装命令

1
[root@S1 plugins]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

注意:

  1. 如果是集群的话,集群每个服务端都要安装插件
  2. 如果是高版本的rabbitMQ,如 3.8.8 不需要重启服务端也可以直接使用

如何验证插件安装成功?

当我们再次刷新Web管理界面后,添加新的交换机时,会出现新的选项x-delayed-message,就代表插件安装成功。

image-20230326184804742

2.创建延迟交换机和延迟队列

  • 延迟队列只是一个很普通的队列,只是和延迟交换机绑定了而已
  • 创建延迟交换机时,需要指定 交换机的类型为:x-delayed-message

以下2种方式,代码和Web管理端 不论哪种创建都可以。

步骤分为三步:

  1. 创建死信交换机
  2. 创建普通的队列
  3. 把队列绑定到死信交换机中,根据实际情况是否要绑定routingKey

1.代码创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.javaxing.springbootrabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;

import java.util.HashMap;

/**
* 延迟队列
*/
@Configuration
public class DelayedConfig {

/**
* 延迟队列
*/
String DELAY_QUEUE ="delay.queue";

/**
* 延迟交换机
*/
String EXCHANGE = "delay.exchange";

/**
* 路由Key
*/
String ROUTE_KEY = "delay.routeKey";

/**
* 延迟队列,一个很普通的队列,只是和延迟交换机绑定了而已
* @return
*/
@Bean
Queue delayQueue() {
return new Queue(DELAY_QUEUE);
}

/**
* 延迟交换机
* @return
*/
@Bean
public CustomExchange delayExchange(){
HashMap<String, Object> args = new HashMap<>();
args.put("x-delayed-type","direct");
return new CustomExchange(EXCHANGE,"x-delayed-message",true,false,args);
}

/**
* 延迟队列绑定交换机
* @param delayQueue 延迟队列
* @param delayExchange 交换机
* @return Binding
*/
@Bean
public Binding bindDelayExchange(@Qualifier("delayQueue") Queue delayQueue,
@Qualifier("delayExchange") CustomExchange delayExchange){
return BindingBuilder.bind(delayQueue).to(delayExchange).with(ROUTE_KEY).noargs();
}
}

1、创建延迟队列

2、创建延迟交换机

3、在延迟交换机里面绑定延迟队列

验证是否创建成功

image-20230326195153536

image-20230326195254238

2.Web管理端创建

创建延迟交换机

image-20230326200207293

创建延迟队列

image-20230326195806743

绑定关系

image-20230326200318673

3.生产者发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 插件实现延迟队列
* @return
* @throws AmqpException
* @throws UnsupportedEncodingException
*/
@GetMapping(value="/delayedSend")
public Object delayedSend() throws AmqpException, UnsupportedEncodingException {

// 创建实体类
User user = new User();
user.setUsername("admin");
user.setPassword("123456");
user.setAge(12);

//设置消息转换器,如json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 死信交换机 路由key
rabbitTemplate.convertAndSend("delay.exchange2","order",user, message -> {
// 设置延迟毫秒值
message.getMessageProperties().setDelay(3000);
return message;
});

return "message sended : "+user;
}

我们可以根据routingKey 来分配给不同的消费队列,如果不想使用routingKey,可以为空。那么交换机收到消息后就会转发给所有绑定的队列。

如订单服务端 则只需要监听 routingKey -> order,支付服务端 只监听 routingKey -> pay ,以此类推。

image-20230326202033172

4.消费者消费消息

监听的队列 delay.queue,是直接监听 死信交换机所绑定的交换机队列。

生产者发送消息的话,是直接发送到 死信交换机去的:

  1. 如果消息指定了routingKey,那么交换机就会发送到匹配routingKey的队列。

  2. 如果消息不指定routingKey,交换机则会广播给所有队列。

1
2
3
4
@RabbitListener(queues = "delay.queue2")
public void delayQueue(String message){
System.out.println("delayQueueReceiver received message : "+ message+" received time:" + DateUtil.now());
}
image-20230326201617737

5.验证是否会阻塞消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 插件实现延迟队列
* @return
* @throws AmqpException
* @throws UnsupportedEncodingException
*/
@GetMapping(value="/delayedSend")
public Object delayedSend(Integer ttl) throws AmqpException, UnsupportedEncodingException {

// 创建实体类
User user = new User();
user.setUsername("admin");
user.setPassword("123456");
user.setAge(12);

//设置消息转换器,如json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend("delay.exchange","delay.routeKey",user, message -> {
// 设置延迟毫秒值
message.getMessageProperties().setDelay(ttl);
return message;
});

return "message sended : "+user +" send time:"+ DateUtil.now() +" ttl:"+ttl;
}

第一条消息TTL 120秒

image-20230327131050601

第二条消息TTL 10秒

image-20230327131113346

此时延迟交换机中有2条延迟消息

image-20230327131103162

10秒后,消费者收到了第二条消息,120秒后收到了第一条消息,这样才符合我们的需求。

image-20230327131314757

5)优先队列

默认情况下,所有队列中的消息优先级同一优先级(都是0),就会根据先进先出原则按顺序发送给消费者。

但是如果我们给队列中的某个消息设置了优先级后,RabbitMQ会把优先级高的消息先发送消息给消费者,而同一优先级消息还是根据先进先出原则。

注意:RabbitMQ 3.5.0之后才新增的功能,低于这个版本是没有优先级的,且优先级的概念是针对于消息的

1.开启优先级队列

image-20230327203448624

这里我们对队列设置了一个最大优先级,范围建议 1- 255 ,类型是number类型。这个步骤是必须的,这里既设置了最大优先级,也代表开启优先级队列。

注意:这里一定要设置最大优先级,否则是不会开启优先级队列的,哪怕你发送消息时设置了优先级也毫无意义。

2.生产者发送消息

  • 当我们开启了优先级队列后,就可以在发送消息时候设置优先级。
1
2
3
4
5
6
7
8
9
10
11
/**
* 优先级队列测试
*
* @return
*/
@GetMapping(value = "/prioritySend")
public void prioritySend(Integer priority) {
Message message = MessageBuilder.withBody("testMsg".getBytes()).setPriority(priority).build();
rabbitTemplate.convertAndSend("priority-queue", message);
System.out.println("message sended success" + "queue name :" + "priority-queue" + "priority :" + priority);
}

我们先关闭监听功能,然后发送4条消息到达队列后,让消息在队列中堆积起来。

1
2
3
4
http://localhost:8080/prioritySend?priority=1&msg=第一条消息:priority-1		 第一条
http://localhost:8080/prioritySend?priority=2&msg=第二条消息:priority-2 第二条
http://localhost:8080/prioritySend?priority=5&msg=第三条消息:priority-5 第三条
http://localhost:8080/prioritySend?priority=10&msg=第四条消息:priority-10 第四条
image-20230327203121425

image-20230327202757835

默认情况下,所有消息优先级都一致,那么消费顺序应该是先进先出,但是这里我们设置了优先级,正确顺序应该是:4 3 2 1

3.消费者消费消息

1
2
3
4
5
6
7
8
@RabbitListener(queues = "priority-queue")
public void priorityQueue1(Message message, Channel channel) throws IOException {
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
String content = new String(body, Charset.forName("UTF-8"));
System.out.println("来自交换机: 【" + messageProperties.getReceivedExchange() + "】, 队列:【" + messageProperties.getConsumerQueue()+ "】:\n内容: " + content);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}

输出结果

image-20230327204042491

确实如我们一开始想的一样,按照我们预设的消息优先级进行输出,同一个队列中 优先级高的消息,会先发送给消费者。

6)远程数据分发

远程数据分发插件:Federation Plugin

image-20230327211845824

如果我们在北京和上海各有一个MQ集群机房,此时我们需要上海的机房可以同步北京机房的数据,我们就可以使用到 远程数据分发插件。

因为跨维度比较大,且集群的同步是双向的,对网络开销会比较大,所以不适合搭建集群,但是我们如果用了数据分发插件,则可以让 上游分发数据给下游。

1.启动插件

只需要在下游服务器上安装插件,安装成功后无需重启。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[root@S4 ~]# rabbitmq-plugins enable rabbitmq_federation
Enabling plugins on node rabbit@S4:
rabbitmq_federation
The following plugins have been configured:
rabbitmq_federation
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@S4...
The following plugins have been enabled:
rabbitmq_federation

started 1 plugins.
[root@S4 ~]# rabbitmq-plugins enable rabbitmq_federation_management
Enabling plugins on node rabbit@S4:
rabbitmq_federation_management
The following plugins have been configured:
rabbitmq_federation
rabbitmq_federation_management
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@S4...
The following plugins have been enabled:
rabbitmq_federation_management

started 1 plugins.
[root@S4 ~]#

2.配置Upstream

配置上游服务器信息,这样的话数据就会从上游服务主动同步到下游服务中。

image-20230328130657306

name:名字可以随意定义

uri:amqp://admin:123456@S1:5672/,如果我们在Upstream中已经指定了Virtual host,则在URI后面不需要在携带虚拟主机,直接写amqp://admin:123456@S1:5672

3.配置Federation策略

image-20230328114814153

Pattern:^fed_* ,匹配以fed_开头,任意字符结尾的交换机或队列的消息

Definition:federation-upstream-set 指定配合接收上游的数据集合,federation-upstream 上游数据,all代表全部接收

4.测试

当我们配置完Upstream和Federation策略后,就可以进入Federation Status菜单查看 分发状态。

status

在远程服务RabbitMQ服务中,可以看到 插件为我们自动生成的对应生成的Federation交换机。

exchange

7)懒队列

默认情况下,RabbitMQ接收到消息时,会保存到内存以便使用,同时把消息写到硬盘。

但是如果消息堆积太多,内存不够用,就会内存溢出,为此我们可以引入 懒队列的概念,牺牲硬盘IO从而减少内存占用。

懒队列的设计目标是为了支持非常长的队列(数百万级别),它是作为大数据量堆积的消息队列的优化手段。

核心概念:懒队列会尽可能将消息内容保存到硬盘当中,并且只有在用户请求到时,才临时从硬盘加载到RAM内存当中。

启用懒队列只需要在创建队列时指定参数即可:

image-20230328132659510

8)消息分片存储插件 Sharding Plugin

大量数据堆积的长队列会降低rabbitMQ性能,为此我们上面引入了 懒队列来解决,但这并不能治本,所以我们可以使用 消息分片的方式来解决长队列。

在rabbitMQ中也提供了一种类似于 分库分表的方案,可以把一个队列的消息分片存储在多个队列中,并提供负载均衡的读与写功能,从而降低长队列大量数据堆积的问题。

好处:

  1. 提高队列的吞吐量,提供消费性能
  2. 解决数据堆积的长队列问题

1.安装插件

1
[root@S1 ~]# rabbitmq-plugins enable rabbitmq_sharding

2.配置Sharding策略

image-20230328140444913

查看策略:

image-20230328140541516

3.新增分片交换机

image-20230328140710152

添加后的交换机状态:

image-20230328141002460

交换机详情:

image-20230328141109706

查看队列列表:

image-20230328141300062

当我们添加分片交换机后,他会根据环境给我们创建相应的分片队列,我们刚才指定的是分成3个队列,但是由于我们是集群环境(3个节点),他会为每个集群的节点都分片3个队列,总共9个队列。

4.发送10W条消息到该交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 消息分片
*
* @return
* @throws AmqpException
* @throws UnsupportedEncodingException
*/
@GetMapping(value = "/shardingSend")
public Object shardingSend() throws AmqpException, UnsupportedEncodingException {

// 创建实体类
User user = new User();
user.setUsername("admin");
user.setPassword("123456");
user.setAge(12);

//设置消息转换器,如json
for (int i = 0; i < 100000; i++) {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend("sharding_exchange","sharding", user);
}
return "message sended : " + user;
}

image-20230328142156299

为什么都分发到了同一个队列中,这个由 插件自行决定的,包括在消费消息时,会有一套算法来决定 消息哪个队列的消息。

5.消费消息

消费规则:会根据哪个分片队列的消费者客户端少,就先消费哪个队列的消息。

消费消息的时候,我需要指定一个伪队列:sharding_exchange(分片交换机的名称) ,这个队列并没有真实存在,但是他能通过这个伪队列找到其他分片队列进行消费。

1
2
3
4
5
6
7
8
@RabbitListener(queues = "sharding_exchange")
public void priorityQueue1(Message message, Channel channel) throws IOException {
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
String content = new String(body, Charset.forName("UTF-8"));
System.out.println("来自交换机: 【" + messageProperties.getReceivedExchange() + "】, 队列:【" + messageProperties.getConsumerQueue()+ "】:\n内容: " + content);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}

6.注意事项

1、Sharding分片存储插件适合于那些对于消息延迟要求不严格,以及对消费顺序没有任何要求的的场景。

2、尽量使用伪队列消费,不要去消费单独的分片队列。