zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Redis---事务篇

2023-03-14 22:35:30 时间

Redis6


Redis的事务定义

Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

Redis事务的主要作用就是串联多个命令防止别的命令插队


Multi、Exec、discard命令

从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行

组队的过程中可以通过discard来放弃组队


案例1:组队成功,提交成功


案例2:组队阶段报错,提交失败

提交失败,一条命令都不会执行


案例3:组队成功,提交有成功有失败情况


事务的错误处理

组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消

如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。


事务冲突的问题

例子

我有10000元

一个请求想给金额减8000

一个请求想给金额减5000

一个请求想给金额减1000


解决办法

悲观锁

悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上

锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读

锁,写锁等,都是在做操作之前先上锁。


乐观锁

**乐观锁(**Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时

候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐

量。Redis就是利用这种check-and-set机制实现事务的


乐观锁在Redis中的应用

WATCH key [key …] 命令

在执行multi之前,先执行watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。

如果两个终端同时对当前key进行修改,一个终端修改完后,对应key的版本号更新,那么另一个终端修改完之后,需要比对当前key的版本是否已经更新,如果更新了,那么当前终端的操作就失效了,即事务被打断了

演示: a终端先执行exec提交事务指令,b终端后执行

a终端:

b终端:


unwatch 命令

取消 WATCH 命令对所有 key 的监视

如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行UNWATCH 了


Redis事务三特性

单独的隔离操作

事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

没有隔离级别的概念

队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行

不保证原子性

事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚


Redis命令大全

Redis 命令参考


秒杀案例

设置商品的id为0101,库存10件

秒杀页面:

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
  <title>秒杀页面</title>
  <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
  <!-- 引入 layui.css -->
  <link rel="stylesheet" href="//unpkg.com/layui@2.6.8/dist/css/layui.css"/>

</head>
<body>

<div class="layui-carousel" id="test1">
  <div carousel-item>
    <div><img src="ms1.png" width="1696px" height="280" ></div>
    <div><img src="ms2.png" width="1696px" height="280" ></div>
  </div>
</div>
<div style="text-align:center">
  <button type="button" class="layui-btn layui-btn-lg" id="msBtn">秒杀</button>
</div>


<script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script>
<!-- 引入 layui.js -->
<script src="//unpkg.com/layui@2.6.8/dist/layui.js"/>
<!-- 条目中可以是任意内容,如:<img src=""> -->
<script src="/static/build/layui.js"></script>
<script>
  layui.use(['layer', 'form'], function(){
    var layer = layui.layer
            ,form = layui.form;
  });

  layui.use('carousel', function(){
    var carousel = layui.carousel;
    //建造实例
    carousel.render({
      elem: '#test1'
      ,width: '100%' //设置容器宽度
      ,arrow: 'always' //始终显示箭头
      //,anim: 'fade' //切换动画方式
    });
  });
  //按钮每次点击一下,那么都会发送请求到/ms
    $("#msBtn").click(function (){
      $.ajax(
              {
               url:"ms",
                type:"get",
              }
      )
    });
</script>
</body>

controller层代码:

@RestController
@RequestMapping("/ms")
public class RedisTestController
{
    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping
    public String testRedis()
    {
         //每次点击秒杀按钮生成一个随机的四位数字的用户id
            String uid = UUID.randomUUID().toString().substring(0, 3);
      doSecKill("0101",uid);
         return "";
    }
    //秒杀过程: pid秒杀商品的id , uid秒杀商品的用户的id
    public boolean doSecKill(String pid,String  uid)
    {
    //1.uid和pid的非空判断
        if(uid==null||pid==null)
            return false;
        //2.拼接key
        //库存key
        String kcKey="sk:"+pid+":qt";
        //秒杀成功的用户id
        String userKey="sk:"+uid+":qt";
        //4.获取库存,如果库存为空,则秒杀还未开始
        Integer kc= (Integer) redisTemplate.opsForValue().get(kcKey);
        if(kc==null)
        {
            System.out.println("秒杀还未开始,敬请期待...");
            return false;
        }
        //5.判断用户是否重复秒杀---判断set集合中是否已经存在对应的用户Id
       if(redisTemplate.opsForSet().isMember(userKey, uid))
       {
           System.out.println("已经秒杀成功了,不能重复秒杀");
           return false;
       }
     //6.如果商品的数量小于1,那么秒杀结束
        if(kc<1)
        {
            System.out.println("秒杀结束");
            return false;
        }
     //7.秒杀过程: 库存减去1,加入秒杀成功的用户id
        redisTemplate.opsForValue().decrement(kcKey);
        redisTemplate.opsForSet().add(userKey,uid);
        System.out.println("秒杀成功");
        return true;
    }
}

秒杀并发模拟

使用工具ab模拟测试

联网:yum install httpd-tools


ab -help :可以查看ab工具的使用说明

vim postfile 模拟表单提交参数,以&符号结尾;存放当前目录。

内容:prodid=0101&

如果不加注解,那么参数的名字需要和请求体中参数名字相同,才会完成赋值操作,因为spring底层就是通过名字匹配的,要不然就是别名匹配

-n :当前的请求次数

-c :当前的并发次数

-n 2000 -c 200 :2000个请求中有200个并发操作,即有200个操作在同一时刻发生

-T :提交的数据类型,类型设置为 :'application/x-www-form-urlencoded’

-p postfile :用post方式提交,需要把参数放到一个文件中


最终要执行的命令:

ab -n 1000 -c 100 -p ~/postfile -T application/x-www-form-urlencoded http://172.28.36.243:8080/ms

记住localhost要用主机ip地址替换


测试前设置存储为10

高并发问题出现:


超卖和超时问题解决

连接超时,通过连接池解决

连接池

节省每次连接redis服务带来的消耗,把连接好的实例反复利用。

通过参数管理连接的行为

如果没有连接池,每一次的存取都需要新建一个连接,使用完后再断开,如果是频繁访问的场景,那也太不划算了。有了连接池,就相当于有了一个“大池子”,池子里的连接都是通着的。你如果想连接,自己去池子里找,自助连接。


连接池参数

MaxTotal:控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;如果赋值为-1,则表示不限制;如果pool已经分配了MaxTotal个jedis实例,则此时pool的状态为exhausted。

maxIdle:控制一个pool最多有多少个状态为idle(空闲)的jedis实例;

MaxWaitMillis:表示当borrow一个jedis实例时,最大的等待毫秒数,如果超过等待时间,则直接抛 JedisConnectionException;

testOnBorrow:获得一个jedis实例的时候是否检查连接可用性(ping());如果为true,则得到的jedis实例均是可用的;

java代码配置:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * Created by silivar on 2018/11/3.
 * 连接池
 */
public class RedisUtil {
    private RedisUtil() {
    }

    private static String ip = "localhost";
    private static int port = 6379;
    //向redis要连接池的超时时间
    private static int timeout = 10000;
    //进入redis的密码
    //private static String auth = "root";
    private static JedisPool pool = null;

    static {
        JedisPoolConfig config = new JedisPoolConfig();
        //最大连接数,默认是1万
        config.setMaxTotal(1024);
        //最大空闲实例数
        config.setMaxIdle(200);
        //等连接池给连接的最大时间,毫秒,设成-1表示永远不超时
        config.setMaxWaitMillis(10000);
        //borrow一个实例的时候,是否提前进行validate操作
        config.setTestOnBorrow(true);

        pool = new JedisPool(config, ip, port, timeout);
    }

    //得到redis连接
    public synchronized static Jedis getJedis() {
        if (pool != null) {
            return pool.getResource();
        } else {
            return null;
        }
    }

    //关闭redis连接
    public static void close(final Jedis redis) {
        if (redis != null) {
            redis.close();
        }
    }
}

下面是springboot整合redis的配置:

#Redis服务器地址
spring.redis.host=192.168.112.128
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0

【Redis】连接池的使用


超卖问题

利用乐观锁淘汰用户,解决超卖问题

如果不是整合springboot,而是使用原生的redis,那么增加乐观锁后的代码如下:

//增加乐观锁
jedis.watch(qtkey);
 
//3.判断库存
String qtkeystr = jedis.get(qtkey);
if(qtkeystr==null || "".equals(qtkeystr.trim())) {
System.out.println("未初始化库存");
jedis.close();
return false ;
}
 
int qt = Integer.parseInt(qtkeystr);
if(qt<=0) {
System.err.println("已经秒光");
jedis.close();
return false;
}
 
//增加事务
Transaction multi = jedis.multi();
 
//4.减少库存
//jedis.decr(qtkey);
multi.decr(qtkey);
 
//5.加人
//jedis.sadd(usrkey, uid);
multi.sadd(usrkey, uid);
 
//执行事务
List<Object> list = multi.exec();
 
//判断事务提交是否失败
if(list==null || list.size()==0) {
System.out.println("秒杀失败");
jedis.close();
return false;
}
System.err.println("秒杀成功");
jedis.close();

整合springboot后的代码:

@RestController
@RequestMapping("/ms")
public class RedisTestController
{
    @Autowired
    private RedisTemplate redisTemplate;

    @PostMapping
    public String testRedis(String prodid)
    {
         //每次点击秒杀按钮生成一个随机的四位数字的用户id
            String uid = UUID.randomUUID().toString().substring(0, 3);
      doSecKill(prodid,uid);
         return "";
    }
    //秒杀过程: pid秒杀商品的id , uid秒杀商品的用户的id
    public boolean doSecKill(String pid,String  uid)
    {
    //1.uid和pid的非空判断
        if(uid==null||pid==null)
            return false;
        //2.拼接key
        //库存key
        String kcKey="sk:"+pid+":qt";
        //秒杀成功的用户id
        String userKey="sk:"+uid+":qt";
        //4.获取库存,如果库存为空,则秒杀还未开始
        Integer kc= (Integer) redisTemplate.opsForValue().get(kcKey);
        if(kc==null)
        {
            System.out.println("秒杀还未开始,敬请期待...");
            return false;
        }
        //5.判断用户是否重复秒杀---判断set集合中是否已经存在对应的用户Id
        if(redisTemplate.opsForSet().isMember(userKey, uid))
        {
            System.out.println("已经秒杀成功了,不能重复秒杀");
            return false;
        }
     //6.如果商品的数量小于1,那么秒杀结束
        if(kc<1)
        {
            System.out.println("秒杀结束");
            return false;
        }
     //7.秒杀过程: 库存减去1,加入秒杀成功的用户id
        //手动开启事务
        redisTemplate.setEnableTransactionSupport(true);
        //增加乐观锁:通过乐观锁机制
        redisTemplate.watch(kcKey);
        //增加事务
        redisTemplate.multi();
        //减库存
        redisTemplate.opsForValue().decrement(kcKey);
        //加人
        redisTemplate.opsForSet().add(userKey,uid);
        //执行事务
        List exec = redisTemplate.exec();
        //判断事务是否提交失败
        if(exec==null||exec.size()==0)
        {
            System.out.println("秒杀失败");
        }
        System.out.println("秒杀成功");
        return true;
    }
}

另一种写法:

@RestController
@RequestMapping("/ms")
public class RedisTestController
{
    @Autowired
    private RedisTemplate redisTemplate;

    @PostMapping
    public String testRedis(String prodid)
    {
         //每次点击秒杀按钮生成一个随机的四位数字的用户id
            String uid = UUID.randomUUID().toString().substring(0, 3);
      doSecKill(prodid,uid);
         return "";
    }
    //秒杀过程: pid秒杀商品的id , uid秒杀商品的用户的id
    @Transactional
    public boolean doSecKill(String pid,String  uid)
    {
        //手动开启事务
        redisTemplate.setEnableTransactionSupport(true);
    //1.uid和pid的非空判断
        if(uid==null||pid==null)
            return false;
        //2.拼接key
        //库存key
        String kcKey="sk:"+pid+":qt";
        //秒杀成功的用户id
        String userKey="sk:"+uid+":qt";
        //4.获取库存,如果库存为空,则秒杀还未开始
        Integer kc= (Integer) redisTemplate.opsForValue().get(kcKey);
        if(kc==null)
        {
            System.out.println("秒杀还未开始,敬请期待...");
            return false;
        }
        //5.判断用户是否重复秒杀---判断set集合中是否已经存在对应的用户Id
        if(redisTemplate.opsForSet().isMember(userKey, uid))
        {
            System.out.println("已经秒杀成功了,不能重复秒杀");
            return false;
        }
     //6.如果商品的数量小于1,那么秒杀结束
        if(kc<1)
        {
            System.out.println("秒杀结束");
            return false;
        }
     //7.秒杀过程: 库存减去1,加入秒杀成功的用户id
        List<Object> execute = (List<Object>)redisTemplate.execute(new SessionCallback<List<Object>>() {
            public List<Object> execute(RedisOperations operations) throws DataAccessException {
                //增加乐观锁:通过乐观锁机制
                operations.watch(kcKey);
                //增加事务
                operations.multi();
                //减库存
                operations.opsForValue().decrement(kcKey);
                //加人
                operations.opsForSet().add(userKey, uid);
                //执行事务
                return operations.exec();
            }
        });
        //判断事务是否提交失败
        if(execute==null||execute.size()==0)
        {
            System.out.println("秒杀失败");
        }
        System.out.println("秒杀成功");
        return true;
    }
}

关于RedisTemplate的ERR EXEC without MULTI错误

RedisTemplate默认是不开启事务支持的,而且在执行exec方法时,会重新创建一个连接对象(或者从当前线程的ThreadLocal中拿到上一次绑定的连接)。所以,我们在不开启事务的情况下,自己在外面执行的multi方法时完全不会生效的(因为连接对象都换了)

手动开启事务:

redisTemplate.setEnableTransactionSupport(true);

关于RedisTemplate的ERR EXEC without MULTI错误


已经秒光,可是还有库存

已经秒光,可是还有库存。原因,就是乐观锁导致很多请求都失败。先点的没秒到,后点的可能秒到了。

当500人同时请求时,一个人秒杀到之后,版本号更新,那么剩余人的操作会因为版本号的更新而失效


LUA脚本

Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言

很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。 这其中包括魔兽争霸地图、魔兽世界、博德之门、愤怒的小鸟等众多游戏插件或外挂

Lua


LUA脚本在Redis中的优势

将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。

LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。

但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。

利用lua脚本淘汰用户,解决超卖问题。

redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。


解决库存依赖问题,LUA脚本

加事务-乐观锁(解决超卖),但出现遗留库存和连接超时

连接池解决超时问题

local userid=KEYS[1]; 
local prodid=KEYS[2];
local qtkey="sk:"..prodid..":qt";
local usersKey="sk:"..prodid.":usr'; 
local userExists=redis.call("sismember",usersKey,userid);
if tonumber(userExists)==1 then 
  return 2;
end
local num= redis.call("get" ,qtkey);
if tonumber(num)<=0 then 
  return 0; 
else 
  redis.call("decr",qtkey);
  redis.call("sadd",usersKey,userid);
end
return 1;

SpringBoot中使用redis事务

SpringBoot中使用redis事务