中小公司常见的缓存架构

我们在一些中小的公司经常会遇到以下的这种做法,如:查询到数据后 就缓存到redis。更新(添加)数据后 缓存到redis。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
	private final SysUserService userService;
@Autowired
private StringRedisTemplate stringRedisTemplate;
private final static String USER_INFO = "user:info:";

/**
* 通过ID查询用户信息
* @param id ID
* @return 用户信息
*/
@GetMapping("/{id}")
public R user(@PathVariable Long id) {
UserVO data = null;
// 判断redis中是否有该用户数据
String userResult = stringRedisTemplate.opsForValue().get(USER_INFO + id);
if(EmptyUtils.isNotEmpty(userResult)){
data = JSON.parseObject(userResult, UserVO.class);
return R.ok(data);
}

data = userService.selectUserVoById(id);
if(EmptyUtils.isNotEmpty(data)){
// 不为空,代表查询到了用户数据,我们就进行缓存
stringRedisTemplate.opsForValue().set(USER_INFO+data.getUserId(),JSON.toJSONString(data));
}
return R.ok(data);
}

/**
* 添加用户
* @param userDto 用户信息
* @return success/false
*/
@SysLog("添加用户")
@PostMapping
@PreAuthorize("@pms.hasPermission('sys_user_add')")
public R user(@RequestBody UserDTO userDto) {
if(userService.saveUser(userDto)){
stringRedisTemplate.opsForValue().set(USER_INFO+userDto.getUserId(), JSON.toJSONString(userDto));
}
return R.ok();
}

/**
* 更新用户信息
* @param userDto 用户信息
* @return R
*/
@SysLog("更新用户信息")
@PutMapping
@PreAuthorize("@pms.hasPermission('sys_user_edit')")
public R updateUser(@Valid @RequestBody UserDTO userDto) {
if(userService.updateUser(userDto)){
stringRedisTemplate.opsForValue().set(USER_INFO+userDto.getUpdateBy(), JSON.toJSONString(userDto));
}
return R.ok(userDto);
}

但是这种做法,如果是对于京东电商、淘宝等且流量巨大时,明显是不合理的。

因为电商的商品、sku都是非常多的,如京东商品都是以亿为单位的,如果每个商品、sku都缓存的话,需要大量的redis服务器。

优化方案:冷热分离

缓存数据冷热分离

  • 以京东商城为例,90%的商品都不会被经常访问,只有10%的商品才是热门、主推、经常被访问的商品,对于一些经常请求的商品、sku或其他数据,我们可以缓存到redis中。
  • 对于一些不经常访问的商品(数据),我们可以不缓存,节省redis服务器的内存空间,这就是冷热分离的基本概念。
  • 当然该模式,需要根据实际的场景和项目需求来分析,适用于一些高并发或大数据量的缓存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
	private final static String USER_INFO = "user:info:";
private final static Integer USER_CACHE_TIMEOUT = 60 * 60 * 24;

/**
* 通过ID查询用户信息
* @param id ID
* @return 用户信息
*/
@GetMapping("/{id}")
public R user(@PathVariable Long id) {
UserVO data = null;
// 判断redis中是否有该用户数据
String userResult = stringRedisTemplate.opsForValue().get(USER_INFO + id);
if(EmptyUtils.isNotEmpty(userResult)){
data = JSON.parseObject(userResult, UserVO.class);
// 如果有数据,则延长超时时间,因为说明该数据有人经常访问,修改TTL 速度是非常快的,比set get都快很多
stringRedisTemplate.expire(USER_INFO+id,USER_CACHE_TIMEOUT, TimeUnit.SECONDS);
return R.ok(data);
}

data = userService.selectUserVoById(id);
if(EmptyUtils.isNotEmpty(data)){
// 不为空,代表查询到了用户数据,我们就进行缓存
stringRedisTemplate.opsForValue().set(USER_INFO+data.getUserId(), JSON.toJSONString(data),USER_CACHE_TIMEOUT, TimeUnit.SECONDS);
}
return R.ok(data);
}

/**
* 添加用户
* @param userDto 用户信息
* @return success/false
*/
@SysLog("添加用户")
@PostMapping
@PreAuthorize("@pms.hasPermission('sys_user_add')")
public R user(@RequestBody UserDTO userDto) {
if(userService.saveUser(userDto)){
stringRedisTemplate.opsForValue().set(USER_INFO+userDto.getUserId(), JSON.toJSONString(userDto),USER_CACHE_TIMEOUT, TimeUnit.SECONDS);
}
return R.ok();
}

/**
* 更新用户信息
* @param userDto 用户信息
* @return R
*/
@SysLog("更新用户信息")
@PutMapping
@PreAuthorize("@pms.hasPermission('sys_user_edit')")
public R updateUser(@Valid @RequestBody UserDTO userDto) {
if(userService.updateUser(userDto)){
stringRedisTemplate.opsForValue().set(USER_INFO+userDto.getUserId(), JSON.toJSONString(userDto),USER_CACHE_TIMEOUT, TimeUnit.SECONDS);
}
return R.ok(userDto);
}

优化地方:

  1. 添加和更新数据时候,会存入到redis缓存中并设置超时时间,超时时间为一天(可自行调整)
  2. 查询数据时,如果redis中存在缓存,说明该数据经常被访问,我们就在该key的超时时间上 延长超时时间
    • 超时时间是一天,如果还存在缓存说明经常被访问,因为不经常访问的都已经过期被redis删除了

缓存击穿(缓存失效)

  • 由于redis大批量缓存在同时失效,导致大量请求同时击穿到了数据库,导致数据库瞬间压力过大甚至宕机。
  • 解决方案
    • 给缓存设置过期时间时,时间增加随机值,这样的话 每个缓存过期时间都不一样,就不会出现大批量缓存同时失效。

举2个例子:

1、当后台管理员批量上传1000个商品时,如果我们没有增加随机值,那么1000个商品的缓存过期时间都是一样的,就很可能存在缓存击穿的风险,因为1000个商品的缓存会同时都失效。那么如果大量的客户来访问时,请求就都会打到数据库。

2、如秒杀,后台管理人员上架秒杀商品时,是根据场次 如 14:00 、15:00 上架的,批量上架1000个商品,上架时 都会把热点商品提前存入缓存中,并设置缓存过期时间,就会存在缓存击穿风险,因为1000个商品的缓存会同时都失效。那么如果大量的客户来访问时,请求就都会打到数据库。

优化方案

结合上面的代码,我们来优化一下,给缓存的过期时间增加一个随机值,让每个key的过期时间都不一样,依次达到解决缓存击穿问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
	/**
* 更新用户信息
* @param userDto 用户信息
* @return R
*/
@SysLog("更新用户信息")
@PutMapping
@PreAuthorize("@pms.hasPermission('sys_user_edit')")
public R updateUser(@Valid @RequestBody UserDTO userDto) {
if(userService.updateUser(userDto)){
stringRedisTemplate.opsForValue().set(USER_INFO+userDto.getUserId(), JSON.toJSONString(userDto),createRandomExpTime(), TimeUnit.SECONDS);
}
return R.ok(userDto);
}

// 给超时时间增加 随机时间
public Integer createRandomExpTime(){
return USER_CACHE_TIMEOUT + new Random().nextInt(5) * 60 * 60;
}

缓存穿透

  • 查询一个不存在的数据,导致redis缓存和数据库都不会命中。
    • 如果此时高并发访问(或黑客CC攻击),大量请求会直接穿透到数据库,导致数据库处理卡顿甚至宕机。

缓存击穿

如图所示:此时客户端查询一条记录,我们在nginx中本地缓存查询不到,则会在jvm 内存缓存查询,如果内存缓存查询不到就会去redis中查询,redis还查不到,就会直接 去数据库中查询,如果数据库都查不到 这就叫 缓存穿透,查询一个不存在的数据,导致缓存和数据库都不会命中。

缓存穿透的风险

以京东为例,如果黑客想要通过缓存穿透进行压力攻击的话,只需要找到不存在的数据即可。

如:https://item.jd.com/100024707737.html ,这是一个商品链接,假设我们在100024707737 后面随机增加几个值,然后通过DDOS 或 压测软件疯狂的去访问,大量的请求会去查询一个不存在的商品,redis肯定查询不到数据,所以就会去查询数据库,直接穿透到数据库,数据库的瞬间压力过大,很可能会宕机。

以京东这个商品网站为例,因为京东这个是生成了静态页面,所以如果我们输入不存在的页面,在nginx肯定就已经被重定向了。当然上面这个只是举一个例子,核心本质 就是 查询一些不存在的数据,导致缓存层(redis)查询不到后 继而去查询存储层(数据库),这就是缓存穿透。

希望大家能举一反三,反复斟酌,解决在生产环境中可能遇到的缓存穿透问题。

造成缓存穿透的基本原因

  1. 自身业务出现BUG或数据出问题(根据墨菲定律,脏数据一定会出问题)
  2. 恶意攻击或爬虫造成大量空命中

穿透解决方案

解决缓存穿透问题主要有2个方案:

  1. 缓存空对象
  2. 布隆过滤器

缓存空对象

为了安全考虑,如果第一次从数据库中查询不到,就会在redis中创建一个空值,并设置过期时间(1分钟,具体自己调整)。

这样的话,下次在来请求该数据,会直接从缓存直接返回,避免再次请求数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private final static String USER_INFO = "user:info:";
private final static Integer USER_CACHE_TIMEOUT = 60 * 60 * 24;
private final static String EMPTY_CACHE = "{}";

/**
* 通过ID查询用户信息
* @param id ID
* @return 用户信息
*/
@GetMapping("/{id}")
public R user(@PathVariable Long id) {
UserVO data = null;
// 判断redis中是否有该用户数据
String userResult = stringRedisTemplate.opsForValue().get(USER_INFO + id);
if(EmptyUtils.isNotEmpty(userResult)){
// 判断是否为空值
if(EMPTY_CACHE.equals(userResult)){
// 如果为空,说明该用户经常被请求,则延长缓存60秒,然后返回空。
stringRedisTemplate.expire(USER_INFO+id,60, TimeUnit.SECONDS);
return null;
}

data = JSON.parseObject(userResult, UserVO.class);
// 如果有数据,则延长超时时间,因为说明该数据有人经常访问,修改TTL 速度是非常快的,比set get都快很多
stringRedisTemplate.expire(USER_INFO+id,createRandomExpTime(), TimeUnit.SECONDS);
return R.ok(data);
}

data = userService.selectUserVoById(id);
if(EmptyUtils.isNotEmpty(data)){
// 不为空,代表查询到了用户数据,我们就进行缓存
stringRedisTemplate.opsForValue().set(USER_INFO+data.getUserId(), JSON.toJSONString(data),createRandomExpTime(), TimeUnit.SECONDS);
}else{
// 查询不到数据就存储一个空json字符串
stringRedisTemplate.opsForValue().set(USER_INFO+data.getUserId(), EMPTY_CACHE,60, TimeUnit.SECONDS);
}
return R.ok(data);
}

优化细节:

  1. 如果数据库查询不到数据,就创建一个空的json字符串,过期时间为60秒(别搞太长)
    • 如果不设置过期时间的话或时间设置太长,黑客并发攻击时,redis内存空间会存在大量的空字符串,占用宝贵的内存空间
  2. 在redis中如果拿到了数据,判断数据是否是空的json字符串{},如果是的话就延长该key的过期时间并返回null

布隆过滤器

对于恶意攻击导致的缓存穿透,除了缓存空对象,还可以引入 布隆过滤器,布隆过滤器会先对请求进行一次过滤,对于不存在的数据会直接返回错误,不会往后面执行。

布隆过滤器根据内部hash算法可以判断出 查询的key是否存在,他认为存在就可能存在,认为不存在就肯定不存在。

布隆过滤器

布隆过滤器就是一个大型的位数组和几个不一样的无偏 hash 函数。所谓无偏就是能够把元素的 hash 值算得比较均匀。

添加key

当我们向布隆过滤中添加 key时,会根据key 使用多个hash函数进行哈希运算,得出一个整数索引值,然后根据数组的总长度对索引值进行取模运算,得出数组存放位置,每个hash函数的位置都不同,在把每个存放的位置都置为1 。

查询key

当我们向布隆过滤器中查询key是否存在时,会跟添加时候一样计算出位置,并且确认 所有位置是否都为1 ,如果有一个为0,key就不存在,如果都为1 也并不一定代表存在,只是极有可能,但是我们认为他存在 给他放行。

布隆过滤器适合对 数据命中率要求不高,且较为固定、实时性低的应用场景,缓存空间占用很少,但是维护比较麻烦。

注意:布隆过滤器不能删除数据,如果要删除得重新初始化数据。

可以用redisson实现布隆过滤器,引入依赖:

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>

示例伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.redisson;

import org.redisson.Redisson;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

public class RedissonBloomFilter {

public static void main(String[] args) {
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
//构造Redisson
RedissonClient redisson = Redisson.create(config);

RBloomFilter<String> bloomFilter = redisson.getBloomFilter("nameList");
//初始化布隆过滤器:预计元素为100000000L,误差率为3%,根据这两个参数会计算出底层的bit数组大小
bloomFilter.tryInit(100000000L,0.03);
//将chenguanxi插入到布隆过滤器中
bloomFilter.add("chenguanxi");

//判断下面号码是否在布隆过滤器中
System.out.println(bloomFilter.contains("zhangbaizhi"));//false
System.out.println(bloomFilter.contains("wangbaoqiang"));//false
System.out.println(bloomFilter.contains("chenguanxi"));//true
}
}

使用布隆过滤器需要把所有数据提前放入布隆过滤器,并且在增加数据时也要往布隆过滤器里放,布隆过滤器缓存过滤伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//初始化布隆过滤器
RBloomFilter<String> bloomFilter = redisson.getBloomFilter("nameList");
//初始化布隆过滤器:预计元素为100000000L,误差率为3%
bloomFilter.tryInit(100000000L,0.03);

//把所有数据存入布隆过滤器
void init(){
    for (String key: keys) {
        bloomFilter.put(key);
    }
}

String get(String key) {
    // 从布隆过滤器这一级缓存判断下key是否存在
    Boolean exist = bloomFilter.contains(key);
    if(!exist){
        return "";
    }
    // 从缓存中获取数据
    String cacheValue = cache.get(key);
    // 缓存为空
    if (StringUtils.isBlank(cacheValue)) {
        // 从存储中获取
        String storageValue = storage.get(key);
        cache.set(key, storageValue);
        // 如果存储数据为空, 需要设置一个过期时间(300秒)
        if (storageValue == null) {
            cache.expire(key, 60 * 5);
        }
        return storageValue;
    } else {
        // 缓存非空
        return cacheValue;
    }
}

缓存雪崩

  • 缓存雪崩指的是缓存层(redis)宕机了,请求会直接打到存储层(数据库),之前有缓存层可以有效的保护存储层请求不会直接请求到存储层,但是如果由于某种原因导致 缓存层挂了,如:高并发请求,那么存储层也很可能跟着一起挂掉,而请求数据的服务器随着线程池越来越多,也会内存溢出,如雪崩一样 所有环上的节点都出现问题

由于一般单机redis节点的QPS只有十万+,如果突然出现了爆发性的并发请求。

  1. 很可能会直接把redis节点打挂掉,如果redis节点挂了,请求就会打到数据库来,数据库也跟着挂掉,那么所有服务器和中间件都会和雪崩一样,全部瘫痪,所有的请求都会直接超时
  2. 即便redis节点没有挂掉,也会处于一个 hang的状态(暂停服务,挂起)
  3. 由于客户端的请求都超时了,前面的客户端(服务器)的请求就会阻塞在线程池里面,随着请求越来越多,客户端的线程池满了,内存也会跟着满,就会出现 OOM 内存泄漏溢出的情况。

如:微博大V 出了花边新闻,他的粉丝会一拥而入,可能会有几十万甚至几百万爆发性增长的访问量,redis集群很可能会撑不住,一旦redis撑不住,请求会直接打到数据库来,数据库很可能也会跟着挂掉。

一般情况都是 秒杀、或者 特大热点数据,才会出现这种 缓存雪崩的情况。

解决方案

解决缓存雪崩的方式有很多,有以下几点:

  1. 保证缓存层的高可用,如Redis Sentinel或Redis Cluster
  2. 使用 后端限流熔断并降级的组件,如Sentinel或Hystrix限流降级组件
  3. 提前做高并发测试,为避免以后突发情况,需要提前演练 高并发的场景,做一些基础的预案
  4. 双缓存架构

双缓存架构

我们可以在redis缓存之前,在增加一层 jvm的缓存,redis缓存单机的QPS在十万左右,而jvm的缓存qps则最少是百万级的,但是jvm的缓存有一个缺点:内存有限,存储的数据有限。

因为内存有限,我们需要对缓存进行一定的管理,定时删除一些 不是很热点的数据,只保存最热门的数据(访问的人并发自然高),为此我们可以通过 jvm的缓存框架来实现,如:Guava Cache

双缓存架构(热门预热的)

在一些大厂里面,如 微博,都是有专门做 缓存预热的系统,如 特别热门的数据,会提前的存储在jvm缓存中。

如图所示,java服务端会订阅预热系统的通道,当预热系统检测到有可能存在高并发大流量的热门数据,就会 发送通知给java服务端,java服务端就会把数据 通过jvm缓存框架 存储在缓存中。

jvm的QPS是非常强的,纯粹的访问内存缓存,效率比redis高了好几倍,随着服务器的集群部署,能抗的并发也是非常高的。

当然,jvm内存缓存只会存储一些可能产生 高并发的热门数据,对于一些不是很热门或冷门数据,如果内存缓存中不存在,还是会从redis乃至数据库中去查询。

JVM缓存代码示意图

  • 判断jvm内存缓存是否存在数据,如果查询到内存缓存中有数据就直接返回,如果不存在则 代表是不热门或冷门数据,走redis或数据库查询。

冷门数据突发性热点重建缓存

  • 当一个冷门的数据(比如:商品,没有建立缓存),突然变成热门数据,被大量的并发访问,进行重建缓存,数据库瞬间压力压力暴增。

案例1:

冷门数据突发性情况

秒杀场景是非常常见的,如 抖音直播时,主播大V推销一款冷门商品,几万人乃至几十万人同时访问该商品,几十万的并发突然同一时间请求接口,由于冷门商品之前没有建立redis缓存,这些请求都会打到数据库去,数据库如果扛不住会直接宕机。

单机架构解决方案

  • 在单机架构下,我们可以通过 同步锁 + 双重检测 来解决。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private final SysUserService userService;
@Autowired
private StringRedisTemplate stringRedisTemplate;
private final static String USER_INFO = "user:info:";
private final static Integer USER_CACHE_TIMEOUT = 60 * 60 * 24;
private final static String EMPTY_CACHE = "{}";

/**
* 通过ID查询用户信息
*
* @param id ID
* @return 用户信息
*/
@GetMapping("/{id}")
public R user(@PathVariable Long id) {
UserVO data = null;
// 判断redis中是否有该用户数据
UserVO userResult = getUserHotCacheInfo(id);
if(userResult != null){
return R.ok(userResult);
}

synchronized (this) {
userResult = getUserHotCacheInfo(id);
if(userResult != null){
return R.ok(userResult);
}

data = userService.selectUserVoById(id);
if (EmptyUtils.isNotEmpty(data)) {
// 不为空,代表查询到了用户数据,我们就进行缓存
stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), JSON.toJSONString(data), createRandomExpTime(), TimeUnit.SECONDS);
} else {
// 查询不到数据就存储一个空json字符串
stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), EMPTY_CACHE, 60, TimeUnit.SECONDS);
}
}
return R.ok(data);
}

public UserVO getUserHotCacheInfo(Long id) {
UserVO data = null;
String userResult = stringRedisTemplate.opsForValue().get(USER_INFO + id);
if (EmptyUtils.isNotEmpty(userResult)) {
// 判断是否为空值
if (EMPTY_CACHE.equals(userResult)) {
// 如果为空,说明该用户经常被请求,则延长缓存60秒,然后返回空。
stringRedisTemplate.expire(USER_INFO + id, 60, TimeUnit.SECONDS);
return new UserVO();
}

data = JSON.parseObject(userResult, UserVO.class);
// 如果有数据,则延长超时时间,因为说明该数据有人经常访问,修改TTL 速度是非常快的,比set get都快很多
stringRedisTemplate.expire(USER_INFO + id, createRandomExpTime(), TimeUnit.SECONDS);

}
return data;
}

缺点:

  • 一旦使用了同步锁锁住了代码,此时访问代码块的所有线程都会串行执行,即便访问的冷门数据不是同一个,也要 乖乖的排队。
    • 举例:2个大V在直播,推荐的两个冷门商品都不同,如果使用了同步锁,那么在大量请求访问这两个商品时,请求进入接口后都得变成串行化执行,非常影响效率,性能也得不到体现。
      • 如:此时有两个商品:商品 101、商品102 ,商品101 进入接口后进入同步锁,此时102 也进入了接口后会阻塞线程,等待商品101的线程锁释放。

最终解决方案

  • 通过 redisson的分布式锁来解决 冷门数据突发性热点重建缓存的问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private final static String USER_INFO = "user:info:";
private final static Integer USER_CACHE_TIMEOUT = 60 * 60 * 24;
private final static String EMPTY_CACHE = "{}";
private final static String LOCK_USER_HOT_CACHE_CREATE_PREFIX = "lock:user:hot_cache_create:";
@Autowired
private Redisson redisson;

/**
* 通过ID查询用户信息
*
* @param id ID
* @return 用户信息
*/
@GetMapping("/{id}")
public R user(@PathVariable Long id) {
UserVO data = null;
// 判断redis中是否有该用户数据
UserVO userResult = getUserHotCacheInfo(id);
if (userResult != null) {
return R.ok(userResult);
}

// 创建 redisson分布式锁
RLock hotCacheCreateLock = redisson.getLock(LOCK_USER_HOT_CACHE_CREATE_PREFIX + id);
// 加锁
hotCacheCreateLock.lock();
try {
userResult = getUserHotCacheInfo(id);
if (userResult != null) {
return R.ok(userResult);
}

data = userService.selectUserVoById(id);
if (EmptyUtils.isNotEmpty(data)) {
// 不为空,代表查询到了用户数据,我们就进行缓存
stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), JSON.toJSONString(data), createRandomExpTime(), TimeUnit.SECONDS);
} else {
// 查询不到数据就存储一个空json字符串
stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), EMPTY_CACHE, 60, TimeUnit.SECONDS);
}
}finally {
// 释放锁
hotCacheCreateLock.unlock();
}
return R.ok(data);
}

public UserVO getUserHotCacheInfo(Long id) {
UserVO data = null;
String userResult = stringRedisTemplate.opsForValue().get(USER_INFO + id);
if (EmptyUtils.isNotEmpty(userResult)) {
// 判断是否为空值
if (EMPTY_CACHE.equals(userResult)) {
// 如果为空,说明该用户经常被请求,则延长缓存60秒,然后返回空。
stringRedisTemplate.expire(USER_INFO + id, 60, TimeUnit.SECONDS);
return new UserVO();
}

data = JSON.parseObject(userResult, UserVO.class);
// 如果有数据,则延长超时时间,因为说明该数据有人经常访问,修改TTL 速度是非常快的,比set get都快很多
stringRedisTemplate.expire(USER_INFO + id, createRandomExpTime(), TimeUnit.SECONDS);
}
return data;
}

代码执行流程

image-2image-3

执行流程:

  1. 判断redis中是否有该用户数据(getUserHotCacheInfo)
    1. getUserHotCacheInfo方法
      1. 如果没有数据,则返回null,如果redis缓存保存的是 空的json字符串{},这是之前防止缓存穿透的优化代码,我们就返回空的实体类,空的实体类和null还是有区别的。
    2. 返回的userResult,有三种可能性
      1. 空的实体类,UserVO data = new UserVO()
      2. redis中有该用户的数据,直接返回
      3. redis中没有该用户的数据,继续往下执行去数据库中查询数据
  2. 创建分布式锁,加锁,抢到锁的线程继续往下执行,没抢到锁的线程阻塞等待,然后while自循环尝试加锁
  3. 抢到锁的线程,在分布式锁里面,再次执行getUserHotCacheInfo方法,进行二次检查
  4. 返回的userResult,有三种可能性,和上面一样,如果userResult返回null,则继续往下执行
  5. 根据用户id查询数据库
    1. 如果数据库中查询到数据,就加入redis缓存并返回给客户
    2. 如果数据库中查询不到数据,就创建一个空的 json字符串,防止缓存穿透,设置过期时间 60秒
  6. 因为我们加了 try catch finally,最终结果不论如何,都会执行 释放锁的命令
  7. 当持有锁的线程释放锁后,唤醒等待的线程,线程再次去争夺锁,抢到锁后 进入锁内的方法 会再次进行二次检查,此时redis中肯定已经有对应的缓存了,就不会再走数据库查询,会直接返回

缓存与数据库双写不一致

  • 在高并发场景下,多个线程同时操作数据库和缓存,一定会存在并发安全问题。

导致缓存与数据库双写不一致的情况非常多,这里只举2个例子,更多的是在实际项目中思考,如何规避掉不一致的情况。

值得注意的是,该情况一般只会出现在 大型的互联网项目、并发较高的场景中或存在多线程同时操作数据库和缓存的场景。

在一般的常规项目中,并发的几率很少,很少会遇到缓存双写不一致的情况,所以不用考虑这个双写不一致的问题,应当根据实际的规模和业务场景分析,看是否要使用 分布式锁来解决缓存不一致的问题。

案例一

执行流程图

如图所示,这里有2个线程,thread 1,2 (下面都这样叫),他们各自的行为操作:

  1. thread1 写入数据库 stock=10后更新缓存。
  2. thread2去修改数据库库存,stock = 6 后 更新缓存。

问题来了,如thread1 写入数据库 stock = 10后更新缓存,在更新缓存的过程中,thread2也开始写数据库(stock = 6), 并比thread1 更早的更新缓存,此时stock = 6,但是当thread1 执行完了之后 又会把stock 改成10 。

那么此时,缓存中实际上是10,而数据库是6,这就造成了 缓存与数据库双写不一致的情况。

案例二

image-2

如图所示,这里有2个线程,thread 2,3 (下面都这样叫),他们各自的行为操作:

  1. thread2去修改数据库库存,stock = 6 后 更新缓存。
  2. thread3 查询缓存,缓存是空的,在去查询数据库,取得stock后 会更新到缓存中 ,更新stock = 10。

thread3去查询缓存,发现是空的,在去查询数据库stock=10后准备更新到缓存中,刚好thread2 写入数据库 改变库存 stock =6,并且删除了缓存,并且thread2执行比thread3 更早结束,那么此时 缓存中是没有该记录的,当thread3执行更新缓存结束后,结果还是stock = 6,那么 此时的 缓存中的stock是 10,数据库是6,这又造成了 双写不一致。

解决方案

  1. 对于一些并发机率很小的数据,如 个人的订单数据,购物车,用户数据等,就不需要考虑双写不一致的问题,因为很少会出现,即便出现了 缓存不一致,我们通过对缓存设置超时时间,也可以解决。
  2. 就算并发很高,如果业务上能容忍短时间的缓存数据不一致(如商品名称,商品分类菜单等),缓存加上过期时间依然可以解决大部分业务对于缓存的要求。
  3. 对于要求强一致性的数据,如 库存,商品价格等,我们就可以通过 分布式读写锁 来解决 不一致的问题。
  4. 也可以用阿里开源的canal通过监听数据库的binlog日志及时的去修改缓存,但是对系统侵入性比较大
    1. 解决方案

互联网大部分场景都是读多写少 的场景,加入了缓存确实能提高查询的一个效率,如果是读多写多的场景,又必须保证数据的一致性,就需要谨慎使用缓存,存入缓存的数据应该是对实时性和一致性要求不高的数据。

注意:引入了分布式锁势必就会降低并发的性能,应当谨慎使用。

分布式锁

分布式锁

在高并发场景下,多个线程同时操作数据库和缓存,一定会存在并发安全问题,我们可以通过 分布式锁来解决。

如:

thread3 在查询数据库之前加锁,直到 更新完缓存后 在释放锁。

thread2 在操作数据库之前加锁,操作完数据库并删除缓存后 释放锁。

核心思想:当某个线程在操作一个商品的数据时,通过分布式锁确保不会有其他线程介入进来同时操作,只有该线程执行结束后,才允许其他线程来操作该商品。

以下代码,我们通过操作user,来模拟操作商品。 把 user当成 product 商品来看就可以了。

查询用户:

优化思路:

  1. 在查询数据库之前,为了确保现场安全,不让其他线程来操作这个user(或看成商品信息),我们对该商品信息的key进行加锁
  2. 操作完该商品的数据库和缓存后,在释放锁

分布式锁执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
	private final static String USER_INFO = "user:info:";
private final static Integer USER_CACHE_TIMEOUT = 60 * 60 * 24;
private final static String EMPTY_CACHE = "{}";
private final static String LOCK_USER_HOT_CACHE_CREATE_PREFIX = "lock:user:hot_cache_create:";

private final static String LOCK_USER_UPDATE_PREFIX = "lock:user:update:";
@Autowired
private Redisson redisson;

@GetMapping("/{id}")
public R user(@PathVariable Long id) {
UserVO data = null;
// 判断redis中是否有该用户数据
UserVO userResult = getUserHotCacheInfo(id);
if (userResult != null) {
return R.ok(userResult);
}

// 创建 redisson分布式锁
RLock hotCacheCreateLock = redisson.getLock(LOCK_USER_HOT_CACHE_CREATE_PREFIX + id);
// 加锁
hotCacheCreateLock.lock();
try {
userResult = getUserHotCacheInfo(id);
if (userResult != null) {
return R.ok(userResult);
}


// 查询数据库之前,先进行加锁
RLock userUpdateLock = redisson.getLock(LOCK_USER_UPDATE_PREFIX + id);
userUpdateLock.lock();

try {
data = userService.selectUserVoById(id);
if (EmptyUtils.isNotEmpty(data)) {
// 不为空,代表查询到了用户数据,我们就进行缓存
stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), JSON.toJSONString(data), createRandomExpTime(), TimeUnit.SECONDS);
} else {
// 查询不到数据就存储一个空json字符串
stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), EMPTY_CACHE, 60, TimeUnit.SECONDS);
}
}finally {
userUpdateLock.unlock();
}
}finally {
hotCacheCreateLock.unlock();
}


return R.ok(data);
}

更新用户信息:

优化思路:

  1. 更新用户信息之前,我们要先加锁,确保其他线程不会在我们更新的过程中来修改用户数据库和redis缓存,造成我们的线程安全问题
  2. 操作结束后 释放锁

更新用户信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 更新用户信息
*
* @param userDto 用户信息
* @return R
*/
@SysLog("更新用户信息")
@PutMapping
@PreAuthorize("@pms.hasPermission('sys_user_edit')")
public R updateUser(@Valid @RequestBody UserDTO userDto) {
// 更新用户之前,先进行加锁
RLock userUpdateLock = redisson.getLock(LOCK_USER_UPDATE_PREFIX + userDto.getUserId());
userUpdateLock.lock();
try {
if (userService.updateUser(userDto)) {
stringRedisTemplate.opsForValue().set(USER_INFO + userDto.getUserId(), JSON.toJSONString(userDto), createRandomExpTime(), TimeUnit.SECONDS);
}
}finally {
userUpdateLock.unlock();
}
return R.ok(userDto);
}

什么场景下使用分布式锁来解决

值得注意的是,该情况一般只会出现在 大型的互联网项目、并发较高的场景中或存在多线程同时操作数据库和缓存的场景。

在一般的常规项目中,很少会遇到缓存双写不一致的情况,所以应当根据实际的规模和业务场景分析,看是否要使用 分布式锁来解决缓存不一致的问题。

关于代码复杂度的问题

image-1

大部分用户请求该接口,都是为了查询用户信息,而用户信息在redis中有存储的,所以 很少会走到下面的复杂逻辑,即便走到了下面的逻辑,也就第一次来查询的时候,redis中不存在数据的情况才会出现,比起 直接出现了缓存穿透和双写不一致的风险来说,是可以接受的。

可以把用户看成商品,这里我没有去做商品的原因,是刚好没有商品的demo,换言之,大部分用户请求该接口都是为了查询该商品的信息,而redis中如果有商品信息,都会在第一步就返回了。如果没有,也就会走下面的逻辑,但仅仅也只有一次。

分布式读写锁(推荐)

除了分布式锁外,还可以通过 分布式读写锁来解决缓存与数据库双写不一致的问题,通过 读写锁来解决双写不一致的问题,效率会比 普通的分布式锁效率高。

读写锁,分为 读锁和写锁,写锁和写锁互斥,读写和写锁互斥,读锁和读锁 不互斥(相当于无锁),一般在操作或查询数据库之前使用,避免线程相互之间干扰造成的线程安全问题。

写读:写锁和读锁同时竞争一个锁,假设 A线程抢到了这个写锁,那么B线程的读锁就会阻塞线程等待A线程释放锁。

写写:写锁和写锁同时竞争一个锁,假设 A线程抢到了这个写锁,那么B写锁就会阻塞线程等待 B线程释放锁。

读读:多个线程同时访问读锁时,相当于无锁,不需要阻塞,可以并发同时执行。

注意:写锁和读锁的key 要一致,才能进行互斥,保证并发下的线程安全。

实现原理

分布式读写锁实现流程

1、读读(并发执行)

当线程创建读写锁方法,根据读取的锁key名,发现锁已经被别的线程持有了,通过model字段类型 判断当前的锁 角色,锁的model 是读锁,线程执行的方法也是读锁,就是读读,那么就会实现 重入锁,并且 线程并行执行,不会进入阻塞状态。

2、写读

当线程创建读写锁方法,发现锁已经被别的线程持有了,model 类型是 写锁,且当前线程的方法是创建读锁,那么线程就会进入阻塞状态,等待 持有写锁的线程释放锁。

3、写写

当线程创建读写锁方法,发现锁已经被别的线程持有了,model 类型是 写锁,且当前线程的方法是创建写锁,那么线程就会进入阻塞状态,等待 持有写锁的线程释放锁。

代码实现

读锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 // 创建读写锁对象
RReadWriteLock userUpdateLock = redisson.getReadWriteLock(LOCK_USER_UPDATE_PREFIX + id);
// 创建读锁
RLock readLock = userUpdateLock.readLock();
// 加锁
readLock.lock();

try {
data = userService.selectUserVoById(id);
if (EmptyUtils.isNotEmpty(data)) {
// 不为空,代表查询到了用户数据,我们就进行缓存
stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), JSON.toJSONString(data), createRandomExpTime(), TimeUnit.SECONDS);
} else {
// 查询不到数据就存储一个空json字符串
stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), EMPTY_CACHE, 60, TimeUnit.SECONDS);
}
}finally {
readLock.unlock();;
}

写锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 更新用户信息
*
* @param userDto 用户信息
* @return R
*/
@SysLog("更新用户信息")
@PutMapping
@PreAuthorize("@pms.hasPermission('sys_user_edit')")
public R updateUser(@Valid @RequestBody UserDTO userDto) {
// 更新用户之前,先进行加锁
// RLock userUpdateLock = redisson.getLock(LOCK_USER_UPDATE_PREFIX + userDto.getUserId());
// userUpdateLock.lock();

// 创建读写锁对象
RReadWriteLock userUpdateLock = redisson.getReadWriteLock(LOCK_USER_UPDATE_PREFIX + id);
// 创建写锁
RLock writeLock = userUpdateLock.writeLock();
// 加锁
writeLock.lock();

try {
if (userService.updateUser(userDto)) {
stringRedisTemplate.opsForValue().set(USER_INFO + userDto.getUserId(), JSON.toJSONString(userDto), createRandomExpTime(), TimeUnit.SECONDS);
}
}finally {
// 释放锁
writeLock.unlock();
}
return R.ok(userDto);
}

什么情况下使用读写锁

一般都是用于解决双写不一致的情况下使用,那什么时候使用 读锁,什么时候使用 写锁?

这里的读和写,指的是 对数据库的读和写,而并非redis,对数据库修改操作时,使用写锁。对数据库只是读取,但是会更新到缓存,则使用 读锁。

读写锁Lua源码

实现的方法和之前一样,只是多了一个model 来区分是 读锁还是写锁。

读写锁lua源码

串行转并行

  • 分布式锁在部分场景下也是可以进行优化的,比如把串行变成并行执行。

如下图,我们使用了 tryLock,该方法会在指定的时间内尝试加锁,如果时间过了后还没加锁成功,就会直接返回false。

如,我们设定了1秒,但是查询业务1秒返回是最基本的。这个时间可以根据业务场景自行调控。

优缺点

缺点:可能存在bug,如果其他线程业务执行超过了 指定时间(如 1秒),没抢到锁的线程就会直接返回false,就会出现BUG。

优点:能大幅度的提升性能,之前的分布式锁,抢到锁的线程需要释放锁,没抢到锁的线程阻塞等待唤醒,唤醒后继续抢锁,抢锁后业务执行结束再释放锁。

如果使用了tryLock 这里的场景,抢到锁的线程业务执行结束会释放锁,但是没抢到锁的线程 不会阻塞等待,而是在指定时间内不停的重试加锁,直到时间结束,与之相比,少了 阻塞和 线程之间抢锁 解锁的等待时间。

具体业务场景具体分析,不能一概而论,也没有完美的方案,作为一名架构师,应当 权利利弊,对技术有一个正确的认知,并在合理的场景下使用合理的方案。

image-1

开发规范与性能优化


键值设计

1. key名设计

  • (1)【建议】: 可读性和可管理性

以业务名(或数据库名)为前缀(防止key冲突),用冒号分隔,比如业务名:表名:id

​ trade:order:1

  • (2)【建议】:简洁性

保证语义的前提下,控制key的长度,当key较多时,内存占用也不容忽视,例如:

​ user:{uid}:friends:messages:{mid} 简化为 u:{uid}:fr:m:{mid}

  • (3)【强制】:不要包含特殊字符

反例:包含空格、换行、单双引号以及其他转义字符

2. value设计

  • (1)【强制】:拒绝bigkey(防止网卡流量、慢查询)

在Redis中,一个字符串最大512MB,一个二级数据结构(例如hash、list、set、zset)可以存储大约40亿个(2^32-1)个元素,但实际中如果下面两种情况,我就会认为它是bigkey。

  1. 字符串类型:它的big体现在单个value值很大,一般认为超过10KB就是bigkey。
  2. 非字符串类型:哈希、列表、集合、有序集合,它们的big体现在元素个数太多。

一般来说,string类型控制在10KB以内,hash、list、set、zset元素个数不要超过5000。

反例:一个包含200万个元素的list。

非字符串的bigkey,不要使用del删除,使用hscan、sscan、zscan方式渐进式删除,同时要注意防止bigkey过期时间自动删除问题(例如一个200万的zset设置1小时过期,会触发del操作,造成阻塞)

ibgkey的危害

1.导致redis阻塞

2.网络拥塞

bigkey也就意味着每次获取要产生的网络流量较大,假设一个bigkey为1MB,客户端每秒访问量为1000,那么每秒产生1000MB的流量,对于普通的千兆网卡(按照字节算是128MB/s)的服务器来说简直是灭顶之灾,而且一般服务器会采用单机多实例的方式来部署,也就是说一个bigkey可能会对其他实例也造成影响,其后果不堪设想。

3.过期删除

有个bigkey,它安分守己(只执行简单的命令,例如hget、lpop、zscore等),但它设置了过期时间,当它过期后,会被删除,如果没有使用Redis 4.0的过期异步删除(lazyfree-lazy-expire yes),就会存在阻塞Redis的可能性。

bigkey的产生

一般来说,bigkey的产生都是由于程序设计不当,或者对于数据规模预料不清楚造成的,来看几个例子:

(1) 社交类:粉丝列表,如果某些明星或者大v不精心设计下,必是bigkey。

(2) 统计类:例如按天存储某项功能或者网站的用户集合,除非没几个人用,否则必是bigkey。

(3) 缓存类:将数据从数据库load出来序列化放到Redis里,这个方式非常常用,但有两个地方需要注意,第一,是不是有必要把所有字段都缓存;第二,有没有相关关联的数据,有的同学为了图方便把相关数据都存一个key下,产生bigkey。

如何优化bigkey

big list: list1、list2、…listN

big hash:可以讲数据分段存储,比如一个大的key,假设存了1百万的用户数据,可以拆分成200个key,每个key下面存放5000个用户数据

  1. 如果bigkey不可避免,也要思考一下要不要每次把所有元素都取出来(例如有时候仅仅需要hmget,而不是hgetall),删除也是一样,尽量使用优雅的方式来处理。
  • (2)【推荐】:选择适合的数据类型。

例如:实体类型(要合理控制和使用数据结构,但也要注意节省内存和性能之间的平衡)

反例:

​ set user:1:name tom set user:1:age 19 set user:1:favor football

正例:

​ hmset user:1 name tom age 19 favor football

3.【推荐】:控制key的生命周期,redis不是垃圾桶。

建议使用expire设置过期时间(条件允许可以打散过期时间,防止集中过期)。

命令使用

1.【推荐】 O(N)命令关注N的数量

例如hgetall、lrange、smembers、zrange、sinter等并非不能使用,但是需要明确N的值。有遍历的需求可以使用hscan、sscan、zscan代替。

2.【推荐】:禁用命令

禁止线上使用keys、flushall、flushdb等,通过redis的rename机制禁掉命令,或者使用scan的方式渐进式处理。

3.【推荐】合理使用select

redis的多数据库较弱,使用数字进行区分,很多客户端支持较差,同时多业务用多数据库实际还是单线程处理,会有干扰。

4.【推荐】使用批量操作提高效率

​ 原生命令:例如mget、mset。 非原生命令:可以使用pipeline提高效率。

但要注意控制一次批量操作的元素个数(例如500以内,实际也和元素字节数有关)。

注意两者不同:

  1. 原生命令是原子操作,pipeline是非原子操作。
  2. pipeline可以打包不同的命令,原生命令做不到
  3. pipeline需要客户端和服务端同时支持。

5.【建议】Redis事务功能较弱,不建议过多使用,可以用lua替代

客户端使用

  1. 避免多个应用使用同一个redis实例
    • 对业务进行拆分,不同业务尽量使用不同的redis实例
  2. 使用连接池可以有效控制连接,提高效率

连接池

使用带有连接池的数据库,可以有效控制连接,同时提高效率,标准使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(5);
jedisPoolConfig.setMaxIdle(2);
jedisPoolConfig.setTestOnBorrow(true);

JedisPool jedisPool = new JedisPool(jedisPoolConfig, "192.168.0.60", 6379, 3000, null);

Jedis jedis = null;
try {
jedis = jedisPool.getResource();
//具体的命令
jedis.executeCommand()
} catch (Exception e) {
logger.error("op key {} error: " + e.getMessage(), key, e);
} finally {
//注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。
if (jedis != null)
jedis.close();
}

连接池参数含义:

序号 参数名 含义 默认值 使用建议
1 maxTotal 资源池中最大连接数 8 设置建议见下面
2 maxIdle 资源池允许最大空闲的连接数 8 设置建议见下面
3 minIdle 资源池确保最少空闲的连接数 0 设置建议见下面
4 blockWhenExhausted 当资源池用尽后,调用者是否要等待。只有当为true时,下面的maxWaitMillis才会生效 true 建议使用默认值
5 maxWaitMillis 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒) -1:表示永不超时 不建议使用默认值
6 testOnBorrow 向资源池借用连接时是否做连接有效性检测(ping),无效连接会被移除 false 业务量很大时候建议设置为false(多一次ping的开销)。
7 testOnReturn 向资源池归还连接时是否做连接有效性检测(ping),无效连接会被移除 false 业务量很大时候建议设置为false(多一次ping的开销)。
8 jmxEnabled 是否开启jmx监控,可用于监控 true 建议开启,但应用本身也要开启

优化建议:

1)maxTotal:最大连接数,早期的版本叫maxActive

实际上这个是一个很难回答的问题,考虑的因素比较多:

  • 业务希望Redis并发量

  • 客户端执行命令时间

  • Redis资源:例如 nodes(例如应用个数) * maxTotal 是不能超过redis的最大连接数maxclients(默认10000个)。

  • 资源开销:例如虽然希望控制空闲连接(连接池此刻可马上使用的连接),但是不希望因为连接池的频繁释放创建连接造成不必靠开销。

以一个例子说明,假设:

  • 一次命令时间(borrow|return resource + Jedis执行命令(含网络) )的平均耗时约为1ms,一个连接的QPS大约是1000
  • 业务期望的QPS是50000

那么理论上需要的资源池大小是50000 / 1000 = 50个。但事实上这是个理论值,还要考虑到要比理论值预留一些资源,通常来讲maxTotal可以比理论值大一些。

但这个值不是越大越好,一方面连接太多占用客户端和服务端资源,另一方面对于Redis这种高QPS的服务器,一个大命令的阻塞即使设置再大资源池仍然会无济于事。

2)maxIdle和minIdle

maxIdle实际上才是业务需要的最大连接数,maxTotal是为了给出余量,所以maxIdle不要设置过小,否则会有new Jedis(新连接)开销。

连接池的最佳性能是maxTotal = maxIdle,这样就避免连接池伸缩带来的性能干扰。但是如果并发量不大或者maxTotal设置过高,会导致不必要的连接资源浪费。一般推荐maxIdle可以设置为按上面的业务期望QPS计算出来的理论连接数,maxTotal可以再放大一倍。

minIdle(最小空闲连接数),与其说是最小空闲连接数,不如说是”至少需要保持的空闲连接数“,在使用连接的过程中,如果连接数超过了minIdle,那么继续建立连接,如果超过了maxIdle,当超过的连接执行完业务后会慢慢被移出连接池释放掉。

如果系统启动完马上就会有很多的请求过来,那么可以给redis连接池做预热,比如快速的创建一些redis连接,执行简单命令,类似ping(),快速的将连接池里的空闲连接提升到minIdle的数量。

连接池预热示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
List<Jedis> minIdleJedisList = new ArrayList<Jedis>(jedisPoolConfig.getMinIdle());

for (int i = 0; i < jedisPoolConfig.getMinIdle(); i++) {
Jedis jedis = null;
try {
jedis = pool.getResource();
minIdleJedisList.add(jedis);
jedis.ping();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
//注意,这里不能马上close将连接还回连接池,否则最后连接池里只会建立1个连接。。
//jedis.close();
}
}
//统一将预热的连接还回连接池
for (int i = 0; i < jedisPoolConfig.getMinIdle(); i++) {
Jedis jedis = null;
try {
jedis = minIdleJedisList.get(i);
//将连接归还回连接池
jedis.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
}
}

总之,要根据实际系统的QPS和调用redis客户端的规模整体评估每个节点所使用的连接池大小。