zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Redis学习笔记

2023-09-27 14:19:57 时间

image-20220101105817502

Linux 基础环境配置

  1. 配置gcc
# 检查是否安装
gcc --version
# 未安装的话
yum install centos-release-scl scl-utils-build
yum install -y devtoolset-8-toolchain
scl enable devtoolset-8 bash
# 再次测试
gcc --version
  1. 压缩编译redis
# 解压缩
tar -zxvf redis-6.2.6.tar.gz
# 编译
cd redis-6.2.6
make 
make install
  1. 后台启动
# 查看是否成功安装
cd /usr/local/bin
# 后台启动
redis-server /opt/redis-6.2.6/redis.conf
# 检验端口
ps -ef | grep redis
  1. 关闭
redis-cli shutdown
# 或
kill -9 pid
# 多实例关闭
redis-cli -p 6379 shutdown

Redis 基础知识

  1. 默认16个数据库,类似数组下标从0开始,初始默认使用0号
  2. 使用命令 select <dbid> 来切换数据库,如 select 8
  3. 统一密码管理,所有库使用同样的密码
  4. dbsize 查看当前数据库的 key 数量
  5. flushdb 清空当前库
  6. flushall 通杀全部库

五大数据类型

Redis基本数据结构及底层实现原理 - 知乎 (zhihu.com)

String

  • Redis最基本的类型,一个key对应一个value;
  • String 类型是二进制安全的,意味着Redis的 string 可以包含任何数据,比如 jpg 图片或者序列化对象;
  • value 最大值为 512M
常用命令
set key value   	# 设置键值,也可用与更改
get key						# 获取值
append key value  # 为 key 追加值
strlen key				# 获取 key 长度
setnx key value   # 只有 key 不存在时,才设置 key 值

incr key 					# 将 key 中存储的【数字字符串】+1,如果为空,则新值为1
decr key 					# 同上,值减一

mset key1 value1 key2 value2 ... # 同时设置一个或多个 key-value 对
mget key1 key2 ... # 同时获取多个 key-value对
msetnx key1 value1 key2 value2  # 设置多个值【有一个失败则都失败】

getrange key fromIndex toIndex  # 切割 value -> []
setrange key fromIndex  value   # 设置 key
setex key expireTime value  		# 设置 key-value 和过期时间
getset key value								# 设置了新值同时获得旧值
数据结构

String 的数据结构为简单动态字符串(Simple Dynamic String,缩写SDS)。是可以修改的字符串,内部结构实现上类似Java的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配。

image-20220101130921655

List

  • 单值多键;
  • Redis 列表是简单的字符串列表,按照插入顺序排序。可以头插或者尾插;
  • 它的底层实际上是个双向链表,对两端操作性能很高,通过索引下标操作性能会比较差
image-20220101132831023
常用命令
lpush/rpush  key value1 value2 ...  # 头插/尾插
lrange  key fromInded toIndex  # 范围 get,包括首位 [0,-1]查所有 可以使用负数,表示倒数
lpop/rpop  key 		   # 删除且获取值

rpoplpush list_from list_to # 移除列表的最后一个元素,并将该元素添加到另一个列表并返回。

lindex key index 		 # 按照索引下标获取元素
llen key 						 # 获得列表长度
linsert key before value newvalue  # 在 value 后面都插入 newvalue 插入值
lrem key count value		 # 从左到右删除 count 个指定 value  count=0移除所有  count<0取绝对值
lset key index value # 将列表 key 下标为 index 的值替换成 value
数据结构
  • List 的数据结构为快速链表 quickList
  • 首先在列表较少的情况下会使用一块连续的内存存储,这个结构是 ziplist,即压缩列表。redis7替换为了 listpack
  • 它将所有元素紧挨着一起存储,分配的是一块连续的内存。当数据量比较多时才会改成 quicklist。
  • 由于普通链表需要的附加值真空间太大,比较浪费空间。比如这个列表里存的只是 int 类型的数据,结构上还需要两个额外的指针 prev 和 next。

image-20220101135024719

Redis 将链表和 ziplist 结合起来组成了 quicklist。也就是将多个 ziplist 使用双向指针串起来使用。这样既满足了快速的插入删除性能,又不会出现太大的空间冗余。

listpack虽然说是ziplist的改进版,但是整体思路与ziplist无太大差别,listpack的结构图如下

img

单独看这个图会感觉比较眼熟,这里再放下ziplist的结构图示:

img

会发现整体上看,listpack少了一些。其实相比较ziplist,listpack中的优化在于entry中。

Set

  • Set对外提供功能与list类似,特殊之处在于set是可以自动排重的。
  • Set是string类型的无序集合。他底层其实是一个 value 为 null 和 hash表,所以添加、删除、查找的复杂度都是O(1)
  • 一个算法,随着数据的增加,执行时间的长短,如果是O(1),数据增加,查找数据的时间不变
常用命令
sadd key value1 value2  # 将一个或多个元素加入到 key 中,忽略已存在的
smembers key 						# 取出该集合的所有值
sismember key value  		# 判断集合 key 是否含有 value,返回0 - 1
scard key								# 返回该集合的元素个数
srem key value1 value2	# 删除集合中的某个元素
spop key								# 随机 pop 一个值
srandmember key n       # 随即从该集合中取出 n 个值,不会删除元素
smove src dst value 		# 把集合中一个值从一个集合移动到另外一个集合
sinter key1 key2        # 返回两个集合【交集】
sunion key1 key2        # 返回两个集合【并集】
sdiff  key1 key2        # 返回两个集合【差集】(key1-key2)
数据结构

intset & hashtable

Hash

  • Hash 是一个键值对集合,是一个 string 类型的 field - value 的映射表,特别适合与存储对象
  • 类似 Java里的 Map<String, Object>

用户ID为查找的 key,存储的 value 用户对象包含姓名、年龄、生日信息等,如果用普通的 key-value 结构来存储

image-20220101171506153
常用命令
hset  key   field value 		# 简单添加
hget  key   field  					# 简单获取
hmset key  f1 v1 f2 v2 ...  # 批量添加
hexists key1  field 				# 检查 field 是否存在
hkeys  key                  # 列出该 hash 集合的所有 field
hvals  key									# 列出该 hash 集合的所有 value
hincrby key field n 				# 为哈希表 key 中的域 field 值+n
hsetnx key field value      # 不存在 field - value时,添加值
数据结构

ziplist & hashtable

Zset

  • Redis有序集合zset与普通集合set非常相似,都没有重复元素
  • Zset 每个成员都关联了一个评分(score),这个评分(score)被用来按照从低分到最高分的方式排序集合中的成员。集合的成员是唯一的,但评分可以重复。
  • 可以根据评分(score)或次序(position)来获取一个范围的元素
  • 访问有序集合的中间元素也是非常快的,因此能够使用有序集合作为一个没有重复成员的只能列表
常用命令
zadd key score1 value1 score2 value2 ...  # 将一个或多个元素添加到 key 中
zrange key fromIndex toIndex [withscores] # 返回下标在之间的元素,带WITHSCORES,可以让分数和值一起返回
zrangebyscore key min max [withscores] [limit offset count] # 返回有序集合 key 中,所有 min <= score <= max 的成员,返回结果按score从小到大排序
zrevrangebyscore key max min [withscores] [limit offset count] # 同上,改为从大到小排列
zincrby key n value   # key 中的 value 加上增量 n
zrem key value 				# 删除指定元素
zcount key min max    # 统计该集合,分数区间内的元素个数
zrank key value				# 返回该集合在集合中的排名,从0开始
数据结构

SortedSet是Redis提供的一个非常特别的数据结构,一方面它等价于Java的Map<String,Double>,可以给每一个元素 value 赋予一个权重 score,另一方名它又类似于TreeSet,内部的元素会按照权重 score 进行排序,可以得到每个元素的名次,还可以通过 score 的范围来获取元素的列表。所以它使用两个数据结构:

  • Hash,其作用是关联元素 value 和权重 score,保障元素 value 的唯一性,可以通过 value 找到相应的 score 值
  • 跳跃表,其目的在于给元素 value 排序,根据 score 的范围获取元素列表
image-20220101175130443

Redis6 新数据类型

Bitmaps

合理使用操作位能有效地提高内存使用率和开发效率

  1. Bitmaps 本身不是一种数据类型,实际上它就是字符串(key-value),但是它可以对字符串的位进行操作。
  2. Bitmaps 单独提供了一套命令,所以在Redis中使用 Bitmaps 和使用字符串的方法不太相同。可以把 Bitmaps 想象成一个以位为单温的数组,数组的每个单元只能存储0和1,数组的下标在Bitmaps中叫做偏移量。
image-20220102114912545

常用命令

setbit key offset value 		# 设置/清除Bitmaps中某个偏移量的值 0/1 from index 0
getbit key offset 					# 获取 Bitmaps 中某个偏移量的值
bitcount key [start end]		# 返回1的个数,区间单位是【字节】-> [],可以使用负数
bitop and/or/not/xor destkey [key...]   # 交、并、非、异或运算,结果->destkey

注:

  • 很多应用的用户id以一个指定数字(例如10000) 开头, 直接将用户id和Bitmaps的偏移量对应势必会造成一定的浪费, 通常的做法是每次做setbit操作时将用户id减去这个指定数字。
  • 在第一次初始化Bitmaps时, 假如偏移量非常大, 那么整个初始化过程执行会比较慢, 可能会造成Redis的阻塞。
image-20220102114842913 image-20220102115534272 image-20220102115546582

Bitmaps与set对比

image-20220102122103729

HyperLogLog

image-20220102132232810

常用命令

pfadd key element1 element2 ...		# 添加指定元素到 HyperLogLog 中;基数变化返回1
pfcount key1 key2 ...							# 计算HLL的近似基数,多个key->HLL和
pfmerge	dst src1 src2 ...					# 将一个或多个HLL合并存储在另一个HLL中

Geospatial

Redis 3.2 中增加了对GEO类型的支持。GEO,Geographic,地理信息的缩写。该类型,就是元素的2维坐标,在地图上就是经纬度。redis基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度Hash等常见操作。

# 添加地理位置(经度,纬度,名称)
geoadd key longitude latitude member [longtitude latitude member ...]
# 获取指定地区的坐标值
geopos key member [member ...]
# 获取两个位置之间的直线距离,单位可选 m米(默认) km千米,mi英里,ft英尺
geodist key member1 member2 [m|km|ft|mi] 
# 以给定的经纬度为中心,找出某一半径内的元素
georadius key longitude latitude radius m|km|ft|mi 

image-20220102141906833

Redis 基础操作

Redis 键(Key)

keys *    	# 查看当前库所有key  keys *num 顺序选取
set key value   # 设置键值
exists key  # 判断某个 key 是否存在
type key  	#	判断某个 key 的类型
del key 		# 【直接删除】
unlink key 	# 【异步删除】只是先删除链接,之后再异步删除
expire key 10 # 设置 key 的过期时间,单位s
ttl key  		# 查看过期时间
select 			# 切换数据库
dbsize			# 查看当前数据库的 key 数量
flushdb 		# 清空当前库
flushall		# 通杀全部库

发布和订阅

image-20220102112807781 image-20220102112836409
redis-cli1 > subscribe channel1
redis-cli2 > publish channel1 message

Jedis 操作

import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.resps.Tuple;

import java.util.List;
import java.util.Set;

public class Test1 {
    Jedis jedis = new Jedis("123.60.108.20", 6379);

    {
        jedis.auth("1243");
        jedis.flushAll();
    }

    @Test
    public void test0() {
        // 连接测试
        String ping = jedis.ping();
        System.out.println(ping);
    }

    @Test
    public void test1() {
        Set<String> keys = jedis.keys("*");
        for (String key : keys) {
            System.out.println(key);
            System.out.println(jedis.type(key));
        }
    }

    @Test
    public void test2() {
        // List
        jedis.lpush("key1", "1", "2", "3", "5", "6");
        List<String> key1 = jedis.lrange("key1", 0, -1);
        for (String s : key1) {
            System.out.println(s);
        }
    }


    @Test
    public void test3() {
        // Set
        jedis.sadd("set1", "1,2", "3", "4");
        System.out.println(jedis.smembers("set1"));
    }

    @Test
    public void test4() {
        // Hash
        jedis.hset("h1", "name", "June");
        jedis.hset("h1", "age", "15");

        Set<String> h1 = jedis.hkeys("h1");
        for (String s : h1) {
            System.out.println(jedis.hget("h1", s));
        }
    }


    @Test
    public void test5() {
        // zset
        jedis.zadd("countries", 100d, "china");
        jedis.zadd("countries", 1000d, "us");
        jedis.zadd("countries", 666d, "uk");

        List<Tuple> countries = jedis.zrangeByScoreWithScores("countries", 0, 9999);
        for (Tuple country : countries) {
            System.out.println(country);
        }
    }
}

简单案例:验证码

  1. 输入手机号,点击发送后随机生成6位数字码,2分钟有效。
  2. 输入验证码,点击验证,返回或失败。
  3. 每个手机号每天只能输入3次。
import redis.clients.jedis.Jedis;

import java.util.Random;

public class VerificationCode {
    public static void main(String[] args) {
        new VerificationCode().verifyCode("45678", getNewCode());
    }

    public void verifyCode(String phone, String code) {
        Jedis jedis = new Jedis("123.60.108.20", 6379);
        jedis.auth("1243");

        String countKey = "VerificationCode" + phone + ":count";
        String codeKey = "VerificationCode" + phone + ":code";

        String count = jedis.get(countKey);
        if (count != null && Integer.parseInt(count) >= 3) {
            System.out.println("超过3次");
        } else {
            jedis.incr(countKey);
            jedis.expire(countKey, 24 * 60 * 60); // 一天只能发三次
            jedis.setex(codeKey, 2 * 60, code); // 验证码过期时间2min
        }

        jedis.close();
    }

    public static String getNewCode() {
        Random random = new Random();
        StringBuilder s = new StringBuilder();
        for (int i = 0; i < 6; i++) {
            int i1 = random.nextInt(10);
            s.append(i1);
        }
        return s.toString();
    }
}

Redis 整合 SpringBoot

  1. pom.xml 依赖
<!-- redis -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- spring2.X集成redis所需common-pool2-->
<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-pool2</artifactId>
  <version>2.6.0</version>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-core</artifactId>
  <version>2.12.1</version>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-annotations</artifactId>
  <version>2.11.4</version>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.11.4</version>
</dependency>
  1. application.properties 配置
#Redis服务器地址
spring.redis.host=123.60.108.20
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
  1. Redis 配置类
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setConnectionFactory(factory);
//key序列化方式
        template.setKeySerializer(redisSerializer);
//value序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }
}
  1. 测试
@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping
    public String testRedis() {
        //设置值到redis
        redisTemplate.opsForValue().set("name","lucy");
        //从redis获取值
        String name = (String)redisTemplate.opsForValue().get("name");
        return name;
    }
}

Redis 事务

Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

Redis 事务的主要作用就是串联多个命令防止别的命令插队。

Multi、Exec、discard

从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。组队的过程中可以通过discard来放弃组队。

image-20220102191747448 image-20220102192037349

事务错误处理

image-20220102192143438

事务冲突

image-20220102215431742

悲观锁

悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁表锁等,读锁写锁等,都是在做操作之前先上锁。

乐观锁

乐观锁(Optimistic Lock),顾名思义,乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多度的应用类型,这样可以提高吞吐量。Redis就是利用这种 check-and-set 机制实现事务的。

Redis 加锁机制

WATCH key [ key ... ]

在执行 multi 之前,先执行 watch key1 key2 … ,可以监视一个或多个 key,如果在事务执行之前这些 key 被其它命令所改动,那么事务将会被打断。

unwatch

取消监视。如果在执行了WATCH 命令之后,EXEC命令DISCARD命令先被执行了,那么就不需要UNWATCH了。

Redis 事务三特性

  • 单独的隔离操作
    • 事务中的所有命令都会被序列化、按顺序执行。事务在执行过程中,不会被其他客户端发来的命令打断。
  • 没有隔离级别的概念
    • 队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
  • 不保证原子性
    • 事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

Redis 秒杀实现

并发测试工具

yum install httpd-tools

测试模板

ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded http://124.223.14.18:8080/Seckill/doseckill

Java 代码

public static boolean doSecKill(String uid, String prodid) throws IOException {
        // 判断非空
        if (uid == null || prodid == null) {
            return false;
        }
        // 连接 redis
//        Jedis jedis = new Jedis("123.60.108.20", 6379);
        JedisPool jedisPool = JedisPoolUtil.getJedisPoolInstance();
        Jedis jedis = jedisPool.getResource();

        // 拼接 key
        String kcKey = "sk:" + prodid + ":qt";
        String userKey = "sk:" + prodid + ":user";
        // 监视库存
        jedis.watch(kcKey);

        String kc = jedis.get(kcKey);
        if(kc == null){
            System.out.println("秒杀未开始");
            jedis.close();
            return false;
        }
        if(jedis.sismember(userKey,uid)){
            System.out.println("已经秒杀成功,不能重复秒杀");
            jedis.close();
            return false;
        }
        if (Integer.parseInt(kc)<=0){
            System.out.println("秒杀已经结束");
            jedis.close();
            return false;
        };
        Transaction multi = jedis.multi();
        // 组队:搭配watch
        multi.decr(kcKey);        // 库存--
        multi.sadd(userKey,uid);  // 成功用户添加至set
        List<Object> result = multi.exec();
        // 获取执行结果
        if (result == null|| result.size()==0){
            System.out.println("秒杀失败..");
            jedis.close();
            return false;
        }

        jedis.close();
        return true;
    }
public class JedisPoolUtil {
   private static volatile JedisPool jedisPool = null;

   private JedisPoolUtil() {
   }

   public static JedisPool getJedisPoolInstance() {
      if (null == jedisPool) {
         synchronized (JedisPoolUtil.class) {
            if (null == jedisPool) {
               JedisPoolConfig poolConfig = new JedisPoolConfig();
               poolConfig.setMaxTotal(200);
               poolConfig.setMaxIdle(32);
               poolConfig.setMaxWaitMillis(100*1000);
               poolConfig.setBlockWhenExhausted(true);
               poolConfig.setTestOnBorrow(true);  // ping  PONG
             
               jedisPool = new JedisPool(poolConfig, "123.60.108.20", 6379, 60000 );
            }
         }
      }
      return jedisPool;
   }

   public static void release(JedisPool jedisPool, Jedis jedis) {
      if (null != jedis) {
         jedisPool.returnResource(jedis);
      }
   }

}

最终实现

public class SecKill_redisByScript {

    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SecKill_redisByScript.class);

    static String secKillScript = "local userid=KEYS[1];\r\n" +
            "local prodid=KEYS[2];\r\n" +
            "local qtkey='sk:'..prodid..\":qt\";\r\n" +
            "local usersKey='sk:'..prodid..\":usr\";\r\n" +
            "local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" +
            "if tonumber(userExists)==1 then \r\n" +
            "   return 2;\r\n" +
            "end\r\n" +
            "local num= redis.call(\"get\" ,qtkey);\r\n" +
            "if tonumber(num)<=0 then \r\n" +
            "   return 0;\r\n" +
            "else \r\n" +
            "   redis.call(\"decr\",qtkey);\r\n" +
            "   redis.call(\"sadd\",usersKey,userid);\r\n" +
            "end\r\n" +
            "return 1";

    static String secKillScript2 =
            "local userExists=redis.call(\"sismember\",\"{sk}:0101:usr\",userid);\r\n" +
                    " return 1";

    public static boolean doSecKill(String uid, String prodid) throws IOException {
        JedisPool jedispool = JedisPoolUtil.getJedisPoolInstance();
        Jedis jedis = jedispool.getResource();

        String sha1 = jedis.scriptLoad(secKillScript);

        Object result = jedis.evalsha(sha1, 2, uid, prodid);

        String reString = String.valueOf(result);
        if ("0".equals(reString)) {
            System.err.println("已抢空!!");
        } else if ("1".equals(reString)) {
            System.out.println("抢购成功!!!!");
        } else if ("2".equals(reString)) {
            System.err.println("该用户已抢过!!");
        } else {
            System.err.println("抢购异常!!");
        }
        jedis.close();
        return true;
    }
}

关于 LUA 脚本

image-20220103162037180 image-20220103162050366

一些问题

image-20220103162224978

Redis 持久化

Redis 提供了2个不同形式的持久化方式。

  • RDB(Redis DataBase)
  • AOF(Append Of File)

RDB

[Redis主从复制原理总结 - 老虎死了还有狼 - 博客园 (cnblogs.com)](https://www.cnblogs.com/daofaziran/p/10978628.html#:~:text=Redis增量复制是指Slave初始化后开始正常工作时主服务器发生的写操作同步到从服务器的过程。,增量复制的过程主要是主服务器每执行一个写命令就会向从服务器发送相同的写命令,从服务器接收并执行收到的写命令。 主从刚刚连接的时候,进行全量同步;全同步结束后,进行增量同步。)

  • Redis使用fork函数复制一份当前进程(父进程)的副本(子进程);

  • 父进程继续接收并处理客户端发来的命令,而子进程开始将内存中的数据写入硬盘中的临时文件;

  • 当子进程写入完所有数据后会用该临时文件替换旧的RDB文件,至此一次快照操作完成。

  • 在执行fork的时候操作系统(类Unix操作系统)会使用写时复制(copy-on-write)策略,即fork函数发生的一刻父子进程共享同一内存数据,当父进程要更改其中某片数据时(如执行一个写命令 ),操作系统会将该片数据复制一份以保证子进程的数据不受影响,所以新的RDB文件存储的是执行fork一刻的内存数据。

    Redis在进行快照的过程中不会修改RDB文件,只有快照结束后才会将旧的文件替换成新的,也就是说任何时候RDB文件都是完整的。这使得我们可以通过定时备份RDB文件来实 现Redis数据库备份。RDB文件是经过压缩(可以配置rdbcompression参数以禁用压缩节省CPU占用)的二进制格式,所以占用的空间会小于内存中的数据大小,更加利于传输。

    除了自动快照,还可以手动发送SAVE或BGSAVE命令让Redis执行快照,两个命令的区别在于,前者是由主进程进行快照操作,会阻塞住其他请求,后者会通过fork子进程进行快照操作。 Redis启动后会读取RDB快照文件,将数据从硬盘载入到内存。根据数据量大小与结构和服务器性能不同,这个时间也不同。通常将一个记录一千万个字符串类型键、大小为1GB的快照文件载入到内 存中需要花费20~30秒钟。 通过RDB方式实现持久化,一旦Redis异常退出,就会丢失最后一次快照以后更改的所有数据。这就需要开发者根据具体的应用场合,通过组合设置自动快照条件的方式来将可能发生的数据损失控制在能够接受的范围。如果数据很重要以至于无法承受任何损失,则可以考虑使用AOF方式进行持久化。

配置以下三项即可开启 RDB

image-20220103190307246 image-20220103190549903
  • save :默认如下配置
save 900 1:表示900 秒内如果至少有 1 个 key 的值变化,则保存
save 300 10:表示300 秒内如果至少有 10 个 key 的值变化,则保存
save 60 10000:表示60 秒内如果至少有 10000 个 key 的值变化,则保存

可以通过lastsave命令获取最后一次成功执行快照的时间

执行flushall命令,也会产生dump.rdb文件,但里面是空的,无意义

  • stop-writes-on-bgsave-error:默认yes,当启用了 RDB 且最后一次后台保存数据失败,Redis 是否停止接收数据。
  • rdbcompression:默认yes。对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果yes,redis 会采用LZF算法进行压缩。会产生10%的性能损耗。

触发方式

  1. save/bgsave 命令主动触发
  2. 配置 save 由redis自动触发
  3. 其他触发方式
    1. 主从复制时,自动生成 RDB 文件
    2. Redis中的 debug reload 提供debug级别的重启(不清空内存),此时自动生成 RDB 文件
    3. shutdown 自动生成 RDB文件
image-20220103190238146

动态停止 RDB:config set save " "

优势

  • 适合大规模数据恢复
  • 对数据完整性和一致性要求不高更适合使用
  • 节省磁盘空间
  • 恢复速度块

劣势

  • Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑
  • 虽然Redis在fork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。
  • 在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改。
image-20220103193419660

AOF

以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作

持久化流程

  1. 客户端请求写命令会被append到AOF缓冲区中;
  2. AOF缓冲区根据AOF持久化策略[ always, everysec, no ]将 sync 同步到磁盘的AOF文件中;
  3. AOF文件大小超过重写策略或手动重写时,会对AOF文件 rewrite 重写,压缩 AOF文件容量;
  4. Redis 服务重启时,会重新 load 加载 AOF 文件中的写操作来恢复数据
image-20220103193801017

AOF 默认关闭,可以在redis.conf中配置文件名称,默认为 appendonly.aof。AOF文件的保存路径,同RDB的路径一致。

AOF 启动/修复/恢复

  • AOF的备份机制和性能虽然和RDB不同,但是备份和恢复的操作同RDB一样,都是拷贝文件,需要恢复时再拷贝到 Redis 工作目录下,启动系统即加载。
  • 正常启动/恢复
    • 修改默认的 appendonly no 改为 yes
    • 将有数据的 aof 文件复制一份保存到对应目录( config get dir
    • 恢复:重启 redis 然后重新加载
  • 异常恢复
    • 修改默认的 appendonly no -> yes
    • 如遇到 AOF 文件损坏,通过 /usr/local/bin/redis-check-aof --fix appendonly.aof 进行恢复
    • 备份被写坏的 AOF 文件
    • 恢复:重启 redis,然后重新加载

AOF 同步频率

appendfsync always:始终同步,每次 Redis 的写入都会立刻计入日志;性能较差但数据完整性较好

appendfsync everysec:每秒同步,存在数据丢失风险

appendfsync no:redis 不主动进行同步,把同步时机交给操作系统

Rewrite 重写

AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制, 当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩, 只保留可以恢复数据的最小指令集。可以使用命令bgrewriteaof主动重写

AOF文件持续增长而过大时,会 fork 出一条新进程来将文件重写(也是先写临时文件最后再rename),redis4.0版本后的重写,就是把rdb 的快照,以二级制的形式附在新的aof头部,作为已有的历史数据,替换掉原来的流水账操作。

no-appendfsync-on-rewrite:如果 yes,那么 aof 文件只写入缓存,用户请求不会阻塞,但是在这段时间如果宕机会丢失这段时间的缓存数据。(降低数据安全性,提高性能);如果 no,还是会把数据往磁盘里刷,但是遇到重写操作,可能会发生阻塞。(数据安全,但是性能降低)

redis中AOF的no-appendfsync-on-rewrite参数详解 - Arbitrary233 - 博客园 (cnblogs.com)

何时重写?

Redis 会记录上次重写时的 AOF 大小,默认大小是上次 rewrite 后大小的一倍且文件大于 64M 时触发

重写虽然可以节省大量磁盘空间,减少恢复时间。但是每次重写还是有一定负担的,因此可以设定 Redis 要满足一定条件才会进行重写。

auto-aof-rewrite-percentage:设置重写的基准值,文件达到100%时开始重写(文件是原来重写后文件的2倍时触发)

auto-aof-rewrite-min-size:设置重写的基准值,最小64M,达到这个值开始重写。

例如:文件达到70M开始重写,下降到50M,下次则会在100MB时进行重写。

系统载入时或者上次重写完毕时,Redis会记录此时AOF大小,设为 base_size,如果Redis的AOF当前大小 >= base_size + base_size * 100%(默认)且当前大小 >= 64MB(默认)情况下,Redis会对 AOF 进行重写

重写流程

  1. bgrewriteaof触发重写,判断当前是否有 bgsave 或 bgrewriteaof 在运行,如果有,则等待该命令结束后再继续执行。
  2. 主进程 fork 出子进程执行重写操作,保证主进程不会阻塞。
  3. 子进程遍历 redis 内存中数据到临时文件,客户端的写请求同时写入 aof_buf 缓冲区和 aof_rewrite_buf 重写缓冲区保证原 AOF 文件完整以及新 AOF 文件生成期间的新的数据修改动作不会丢失。
  4. 子进程写完新的 AOF 文件后,向主进程发信号,父进程更新统计信息。主进程把 aof_rewrite_buf 中的数据写入到新的 AOF 文件。
  5. 使用新的 AOF 文件覆盖旧的 AOF 文件,完成AOF重写。
image-20220103200018563

优势

  • 备份机制更稳健,丢失数据概率低。
  • 可读的日志文本,通过操作 AOF 文件,可以处理误操作。

劣势

  • 比起 RDB 占用更多的磁盘空间
  • 恢复备份速度慢
  • 每次读写都同步的话,有一定的性能压力
  • 存在个别 Bug,造成不能恢复问题
image-20220103200246800

使用建议

  • 官方推荐两个都启用;
  • 如果对数据不敏感,可以选单独用RDB;
  • 不建议单独用 AOF,因为可能会出现Bug;
  • 如果只是做纯内存缓存,可以都不用

性能建议

因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留save 900 1这条规则。 如果使用AOF,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了。代价,一是带来了持续的IO,二是AOF rewrite的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上。默认超过原大小100%大小时重写可以改到适当的数值。

Redis 主从复制

读(Slave)写(Master)分离、性能扩展、容灾恢复

复制原理

  • Slave 启动成功连接到 master 后会发送一个 sync 命令
  • Master 接到命令后启动后台的存盘进程,同时收集所有接收到的用于修改数据集的命令,在后台进程执行完毕之后,master 将传送整个数据文件到 slave,以完成一次完全同步
  • 全量赋值:slave 服务在接收到数据库文件数据后,将其存盘并加载到内存中。
  • 增量复制:Master 继续将新的所有收集到的修改命令依次传给 slave,完成同步
  • 但是只要是重新连接 master,全量复制将自动被执行

常用三招

一主二仆

############## 一主二从 #################
# 1. 配置统一 redis.conf
daemonize yes
appendonly no
# 2. 配置单个redis 配置文件
mkdir /redisconfs
cp /opt/redis-6.2.6/redis.conf /redisconfs/

vim /redisconfs/redis6379.conf
i
include /redisconfs/redis.conf
pidfile /var/run/redis_6379.pid
port 6379
dbfilename dump6379.rdb

vim /redisconfs/redis6380.conf
i
include /redisconfs/redis.conf
pidfile /var/run/redis_6380.pid
port 6380
dbfilename dump6380.rdb

vim /redisconfs/redis6381.conf
i
include /redisconfs/redis.conf
pidfile /var/run/redis_6381.pid
port 6381
dbfilename dump6381.rdb

# 3. 启动三台服务器
redis-server /redisconfs/redis6379.conf
redis-server /redisconfs/redis6380.conf
redis-server /redisconfs/redis6381.conf

# 4. 查看启动情况
ps -ef | grep redis

# 5. 进入redis查看主从
redis-cli -p 6379
info replication

# 6. 设置从机
redis-cli -p 6380
slaveof 127.0.0.1 6379

redis-cli -p 6381
slaveof 127.0.0.1 6379

注意:

  1. 主服务器挂掉后,从机不会上位,等待主机的重连
  2. 从机挂掉后,再次重连需要重新指定 master

薪火相传

上一个Slave可以是下一个slave的Master,Slave同样可以接收其他 slaves的连接和同步请求,那么该slave作为了链条中下一个的master, 可以有效减轻master的写压力,去中心化降低风险。

反客为主

当一个 master 宕机后,后面的 slave 可以立刻提升为 master,其后面的 slave 不用做任何修改

slaveof no one 从 -> 主

哨兵模式

Redis Sentinel是一个分布式系统,Sentinel运行在有许多Sentinel进程互相合作的环境下,它本身就是这样被设计的。有许多Sentinel进程互相合作的优点如下:

  1. 当多个Sentinel同意一个master不再可用的时候,就执行故障检测。这明显降低了错误概率。
  2. 即使并非全部的Sentinel都在工作,Sentinel也可以正常工作,这种特性,让系统非常的健康
vi /redisconfs/sentinel.conf
sentinel monitor mymaster 127.0.0.1 6379 1
redis-sentinel /redisconfs/sentinel.conf

复制延迟

由于所有的写操作都是现在 Master 上操作,所以从 Master 同步到 Slave 及其有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave 机器数量的增加也会使这个问题更加严重。

故障恢复

image-20220105130333189

优先级在redis.conf中默认:slave-priority 100(新版本中叫 replica-priority),值越小优先级越高

偏移量是指获得原主机数据最全的

每个redis实例启动后都会随机生成一个40位的runid

Java 实现

private static JedisSentinelPool jedisSentinelPool=null;

public static  Jedis getJedisFromSentinel(){
if(jedisSentinelPool==null){
            Set<String> sentinelSet=new HashSet<>();
            sentinelSet.add("192.168.11.103:26379");

            JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
            jedisPoolConfig.setMaxTotal(10); //最大可用连接数
jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
jedisPoolConfig.setMinIdle(5); //最小闲置连接数
jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
jedisPoolConfig.setMaxWaitMillis(2000); //等待时间
jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong

jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig);
return jedisSentinelPool.getResource();
        }else{
return jedisSentinelPool.getResource();
        }
}

Redis 集群

Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。

Redis 集群通过分区(partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。

简单搭建Demo

# 1. 删除之前的数据文件 rdb
rm -rf  /redisconfs/ dump63*
# 2.设置6个节点的配置文件,仅仅是端口号不同

include /redisconfs/redis.conf
pidfile "/var/run/redis_6379.pid"
port 6379
dbfilename "dump6379.rdb"
cluster-enabled yes
cluster-config-file nodes-6379.conf
cluster-node-timeout 15000
# :%s/6379/63..  vim中可以替换

# 3.依次启动6个服务器
redis-server /redisconfs/redis6379.conf
redis-server /redisconfs/redis6380.conf
redis-server /redisconfs/redis6381.conf
redis-server /redisconfs/redis6382.conf
redis-server /redisconfs/redis6383.conf
redis-server /redisconfs/redis6384.conf

# 4.合成集群,注意ip和端口
redis-cli --cluster create --cluster-replicas 1 124.223.14.18:6379 124.223.14.18:6380 124.223.14.18:6381 124.223.14.18:6382 124.223.14.18:6383 124.223.14.18:6384

# 5.登录查看,任意一个都可以登录到集群
redis-cli -c -p 6379

集群命令

# 多值set必须使用以下形式 {} 组
mset k1{user} v1 k2{user1} v2
# 返回 count 个slot槽中的值
cluster getkeysinslot slot count

注意事项

  1. 尽量保证每个主数据库运行在不同的IP地址,每个从库和主库不在一个IP地址上
  2. 主节点下线,从节点会替补上去,而主节点再次上线会变成从节点。
  3. 集群的主从节点都挂掉,是否继续服务? cluster-require-full-coverage yes->挂掉 no->继续服务

Java集群Jedis

public class JedisClusterTest {
  public static void main(String[] args) { 
     Set<HostAndPort>set =new HashSet<HostAndPort>();
     set.add(new HostAndPort("192.168.31.211",6379));
     JedisCluster jedisCluster=new JedisCluster(set);
     jedisCluster.set("k1", "v1");
     System.out.println(jedisCluster.get("k1"));
  }
}

集群利弊

  • 实现扩容
  • 分摊压力
  • 无中心配置相对简单

  • 多键操作不支持
  • 多键的Redis事务是不被支持的。lua脚本不被支持
  • 由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。

Redis 应用问题

缓存穿透

问题描述

key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。

image-20220105160516791

解决方案

  1. 对空值缓存:如果一个查询返回的数据为空(不管数据是否存在),都对其进行缓存,设置空结果过期时间很短,最长不超过5分钟
  2. 设置白名单:使用 bitmaps 类型定义一个可以访问的白名单,名单 id 作为 bitmaps 的偏移量,每次访问和 bitmap 里面的 id 进行比较,如果访问 id 不在 bitmaps 里面,则进行拦截,不允许访问。
  3. 采用布隆过滤器:它是一个很长的二进制向量和一些列随机映射函数(哈希函数),可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远超过一般的算法,缺点是有一定的误识率和删除困难。将所有可能存在的数据哈希到一个足够大的 bitmaps 中,一个一定不存在的数据会被这个 bitmaps 拦截点,从而避免了对底层存储系统的查询压力。
  4. 进行实时监控:当发现Redis的命中率开始急剧降低,需要排查访问对象和访问的数据,和运维配合,设置黑名单限制服务

缓存击穿

问题描述

key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

image-20220105160309360

解决方案

  1. 预先设置热门数据:在redis高峰访问之前,把一些热门数据提前存入到 redis 里,加大热门数据 key 时长
  2. 实时调整:现场监控哪些数据热门,实时调整 key 过期时长
  3. 使用锁
    • 缓存失效时(判断拿出来的值为空),不是立刻去 load db
    • 先使用缓存工具的某些带成功操作返回值的操作(比如 Redis 的 SETNX)去 set 一个 mutex key
    • 当操作返回成功时,再进行 load db操作,并回设缓存,最后删除 mutex key
    • 当操作返回失败,证明有线程在 load db,当前线程睡眠一段时间后再重试整个 get 缓存的方法
image-20220105200916960

缓存雪崩

问题描述

key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,前者则是某一个key

image-20220105201023607

正常访问

image-20220105201211998

缓存失效瞬间

解决方案

  1. 构建多级缓存架构:nginx 缓存 + redis 缓存 + 其他缓存(ehcache 等)
  2. 使用锁或者队列:用加锁或队列的方式保证不会有大量得线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况
  3. 设置过期标志更新缓存:记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际 key 的缓存
  4. 将缓存失效时间分散开:比如我们可以在原有的失效时间基础上增加一个随机值,比如 1-5分钟随机,这样每一个缓存的过期时间重复概率就会变低,很难引发集体缓存失效事件。

分布式锁

问题描述

随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布式锁的主流解决方案:

  1. 基于数据库实现分布式锁;
  2. 基于缓存(Redis等);
  3. 基于 Zookeeper

每一种分布式锁解决方案都有各自的优缺点:

  1. 性能:redis最高
  2. 可靠性:zookeeper 最高

解决方案:Redis 实现分布式锁

image-20220105203231257
set key value nx ex n
屏幕截图 2022-01-05 205552
@GetMapping("testLockLua")
public void testLockLua() {
    //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
    String uuid = UUID.randomUUID().toString();
    //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
    String skuId = "25"; // 访问skuId 为25号的商品 100008348542
    String locKey = "lock:" + skuId; // 锁住的是每个商品的数据

    // 3 获取锁
    Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);

    // 第一种: lock 与过期时间中间不写任何的代码。
    // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
    // 如果true
    if (lock) {
        // 执行的业务逻辑开始
        // 获取缓存中的num 数据
        Object value = redisTemplate.opsForValue().get("num");
        // 如果是空直接返回
        if (StringUtils.isEmpty(value)) {
            return;
        }
        // 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
        int num = Integer.parseInt(value + "");
        // 使num 每次+1 放入缓存
        redisTemplate.opsForValue().set("num", String.valueOf(++num));
        /*使用lua脚本来锁*/
        // 定义lua 脚本
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        // 使用redis执行lua执行
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        // 设置一下返回值类型 为Long
        // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
        // 那么返回字符串与0 会有发生错误。
        redisScript.setResultType(Long.class);
        // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
        redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
    } else {
        // 其他线程等待
        try {
            // 睡眠
            Thread.sleep(1000);
            // 睡醒了之后,调用方法。
            testLockLua();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Redis6 新功能

ACL

Redis ACL是Access Control List(访问控制列表)的缩写,该功能允许根据可以执行的命令和可以访问的键来限制某些连接。

在Redis 5版本之前,Redis 安全规则只有密码控制 还有通过rename 来调整高危命令比如 flushdbkeys *shutdown 等。Redis 6 则提供ACL的功能对用户进行更细粒度的权限控制 :

(1)接入权限:用户名和密码

(2)可以执行的命令

(3)可以操作的 KEY

参考官网:https://redis.io/topics/acl

命令

acl list				# 显示用户权限列表
image-20220105211013192
acl cat 				# 查看添加权限指令类别,加参数类型名可以查看具体命令
acl whoami			# 查看当前用户
aclsetuser			# 创建和编辑用户ACL
image-20220105211236710
acl setuser username		# 创建新用户
image-20220105212247697
auth username password # 切换用户

IO 多线程

IO多线程其实指客户端交互部分的网络IO交互处理模块多线程,而非执行命令多线程。Redis6执行命令依然是单线程。

image-20220105212524008
# 多线程IO默认关闭,开启:
io-threads-do-reads yes
io-threads 4