zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

玩转Mysql系列 - 第26篇:聊聊mysql如何实现分布式锁?

mysql分布式分布式 实现 如何 系列 玩转 26
2023-09-27 14:26:04 时间

Mysql系列的目标是:通过这个系列从入门到全面掌握一个高级开发所需要的全部技能。

欢迎大家加我微信itsoku一起交流java、算法、数据库相关技术。

这是Mysql系列第26篇。

本篇我们使用mysql实现一个分布式锁。

分布式锁的功能

  1. 分布式锁使用者位于不同的机器中,锁获取成功之后,才可以对共享资源进行操作

  2. 锁具有重入的功能:即一个使用者可以多次获取某个锁

  3. 获取锁有超时的功能:即在指定的时间内去尝试获取锁,超过了超时时间,如果还未获取成功,则返回获取失败

  4. 能够自动容错,比如:A机器获取锁lock1之后,在释放锁lock1之前,A机器挂了,导致锁lock1未释放,结果会lock1一直被A机器占有着,遇到这种情况时,分布式锁要能够自动解决,可以这么做:持有锁的时候可以加个持有超时时间,超过了这个时间还未释放的,其他机器将有机会获取锁

预备技能:乐观锁

通常我们修改表中一条数据过程如下:

  1. t1:select获取记录R1
  2. t2:对R1进行编辑
  3. t3:update R1

我们来看一下上面的过程存在的问题:

如果A、B两个线程同时执行到t1,他们俩看到的R1的数据一样,然后都对R1进行编辑,然后去执行t3,最终2个线程都会更新成功,后面一个线程会把前面一个线程update的结果给覆盖掉,这就是并发修改数据存在的问题。

我们可以在表中新增一个版本号,每次更新数据时候将版本号作为条件,并且每次更新时候版本号+1,过程优化一下,如下:

  1. t1:打开事务start transaction
  2. t2:select获取记录R1,声明变量v=R1.version
  3. t3:对R1进行编辑
  4. t4:执行更新操作
  5.     update R1 set version = version + 1 where user_id=#user_id# and version = #v#;
  6. t5:t4中的update会返回影响的行数,我们将其记录在count中,然后根据count来判断提交还是回滚
  7.     if(count==1){
  8.         //提交事务
  9.         commit;
  10.     }else{
  11.         //回滚事务
  12.         rollback;
  13.     }

上面重点在于步骤t4,当多个线程同时执行到t1,他们看到的R1是一样的,但是当他们执行到t4的时候,数据库会对update的这行记录加锁,确保并发情况下排队执行,所以只有第一个的update会返回1,其他的update结果会返回0,然后后面会判断count是否为1,进而对事务进行提交或者回滚。可以通过count的值知道修改数据是否成功了。

上面这种方式就乐观锁。我们可以通过乐观锁的方式确保数据并发修改过程中的正确性。

使用mysql实现分布式锁

建表

我们创建一个分布式锁表,如下

  1. DROP DATABASE IF EXISTS javacode2018;
  2. CREATE DATABASE javacode2018;
  3. USE javacode2018;
  4. DROP TABLE IF EXISTS t_lock;
  5. create table t_lock(
  6.   lock_key varchar(32PRIMARY KEY NOT NULL COMMENT '锁唯一标志',
  7.   request_id varchar(64NOT NULL DEFAULT '' COMMENT '用来标识请求对象的',
  8.   lock_count INT NOT NULL DEFAULT 0 COMMENT '当前上锁次数',
  9.   timeout BIGINT NOT NULL DEFAULT 0 COMMENT '锁超时时间',
  10.   version INT NOT NULL DEFAULT 0 COMMENT '版本号,每次更新+1'
  11. )COMMENT '锁信息表';

分布式锁工具类:

  1. package com.itsoku.sql;
  2. import lombok.Builder;
  3. import lombok.Getter;
  4. import lombok.Setter;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.junit.Test;
  7. import java.sql.*;
  8. import java.util.Objects;
  9. import java.util.UUID;
  10. import java.util.concurrent.TimeUnit;
  11. /**
  12.  * 工作10年的前阿里P7分享Java、算法、数据库方面的技术干货!坚信用技术改变命运,让家人过上更体面的生活!
  13.  * 喜欢的请关注公众号:路人甲Java
  14.  */
  15. @Slf4j
  16. public class LockUtils {
  17.     //将requestid保存在该变量中
  18.     static ThreadLocal<String> requestIdTL = new ThreadLocal<>();
  19.     /**
  20.      * 获取当前线程requestid
  21.      *
  22.      * @return
  23.      */
  24.     public static String getRequestId() {
  25.         String requestId = requestIdTL.get();
  26.         if (requestId == null || "".equals(requestId)) {
  27.             requestId = UUID.randomUUID().toString();
  28.             requestIdTL.set(requestId);
  29.         }
  30.         log.info("requestId:{}", requestId);
  31.         return requestId;
  32.     }
  33.     /**
  34.      * 获取锁
  35.      *
  36.      * @param lock_key        锁key
  37.      * @param locktimeout(毫秒) 持有锁的有效时间,防止死锁
  38.      * @param gettimeout(毫秒)  获取锁的超时时间,这个时间内获取不到将重试
  39.      * @return
  40.      */
  41.     public static boolean lock(String lock_key, long locktimeout, int gettimeout) throws Exception {
  42.         log.info("start");
  43.         boolean lockResult = false;
  44.         String request_id = getRequestId();
  45.         long starttime = System.currentTimeMillis();
  46.         while (true) {
  47.             LockModel lockModel = LockUtils.get(lock_key);
  48.             if (Objects.isNull(lockModel)) {
  49.                 //插入一条记录,重新尝试获取锁
  50.                 LockUtils.insert(LockModel.builder().lock_key(lock_key).request_id("").lock_count(0).timeout(0L).version(0).build());
  51.             } else {
  52.                 String reqid = lockModel.getRequest_id();
  53.                 //如果reqid为空字符,表示锁未被占用
  54.                 if ("".equals(reqid)) {
  55.                     lockModel.setRequest_id(request_id);
  56.                     lockModel.setLock_count(1);
  57.                     lockModel.setTimeout(System.currentTimeMillis() + locktimeout);
  58.                     if (LockUtils.update(lockModel) == 1) {
  59.                         lockResult = true;
  60.                         break;
  61.                     }
  62.                 } else if (request_id.equals(reqid)) {
  63.                     //如果request_id和表中request_id一样表示锁被当前线程持有者,此时需要加重入锁
  64.                     lockModel.setTimeout(System.currentTimeMillis() + locktimeout);
  65.                     lockModel.setLock_count(lockModel.getLock_count() + 1);
  66.                     if (LockUtils.update(lockModel) == 1) {
  67.                         lockResult = true;
  68.                         break;
  69.                     }
  70.                 } else {
  71.                     //锁不是自己的,并且已经超时了,则重置锁,继续重试
  72.                     if (lockModel.getTimeout() < System.currentTimeMillis()) {
  73.                         LockUtils.resetLock(lockModel);
  74.                     } else {
  75.                         //如果未超时,休眠100毫秒,继续重试
  76.                         if (starttime + gettimeout > System.currentTimeMillis()) {
  77.                             TimeUnit.MILLISECONDS.sleep(100);
  78.                         } else {
  79.                             break;
  80.                         }
  81.                     }
  82.                 }
  83.             }
  84.         }
  85.         log.info("end");
  86.         return lockResult;
  87.     }
  88.     /**
  89.      * 释放锁
  90.      *
  91.      * @param lock_key
  92.      * @throws Exception
  93.      */
  94.     public static void unlock(String lock_key) throws Exception {
  95.         //获取当前线程requestId
  96.         String requestId = getRequestId();
  97.         LockModel lockModel = LockUtils.get(lock_key);
  98.         //当前线程requestId和库中request_id一致 && lock_count>0,表示可以释放锁
  99.         if (Objects.nonNull(lockModel) && requestId.equals(lockModel.getRequest_id()) && lockModel.getLock_count() > 0) {
  100.             if (lockModel.getLock_count() == 1) {
  101.                 //重置锁
  102.                 resetLock(lockModel);
  103.             } else {
  104.                 lockModel.setLock_count(lockModel.getLock_count() - 1);
  105.                 LockUtils.update(lockModel);
  106.             }
  107.         }
  108.     }
  109.     /**
  110.      * 重置锁
  111.      *
  112.      * @param lockModel
  113.      * @return
  114.      * @throws Exception
  115.      */
  116.     public static int resetLock(LockModel lockModel) throws Exception {
  117.         lockModel.setRequest_id("");
  118.         lockModel.setLock_count(0);
  119.         lockModel.setTimeout(0L);
  120.         return LockUtils.update(lockModel);
  121.     }
  122.     /**
  123.      * 更新lockModel信息,内部采用乐观锁来更新
  124.      *
  125.      * @param lockModel
  126.      * @return
  127.      * @throws Exception
  128.      */
  129.     public static int update(LockModel lockModel) throws Exception {
  130.         return exec(conn -> {
  131.             String sql = "UPDATE t_lock SET request_id = ?,lock_count = ?,timeout = ?,version = version + 1 WHERE lock_key = ? AND  version = ?";
  132.             PreparedStatement ps = conn.prepareStatement(sql);
  133.             int colIndex = 1;
  134.             ps.setString(colIndex++, lockModel.getRequest_id());
  135.             ps.setInt(colIndex++, lockModel.getLock_count());
  136.             ps.setLong(colIndex++, lockModel.getTimeout());
  137.             ps.setString(colIndex++, lockModel.getLock_key());
  138.             ps.setInt(colIndex++, lockModel.getVersion());
  139.             return ps.executeUpdate();
  140.         });
  141.     }
  142.     public static LockModel get(String lock_key) throws Exception {
  143.         return exec(conn -> {
  144.             String sql = "select * from t_lock t WHERE t.lock_key=?";
  145.             PreparedStatement ps = conn.prepareStatement(sql);
  146.             int colIndex = 1;
  147.             ps.setString(colIndex++, lock_key);
  148.             ResultSet rs = ps.executeQuery();
  149.             if (rs.next()) {
  150.                 return LockModel.builder().
  151.                         lock_key(lock_key).
  152.                         request_id(rs.getString("request_id")).
  153.                         lock_count(rs.getInt("lock_count")).
  154.                         timeout(rs.getLong("timeout")).
  155.                         version(rs.getInt("version")).build();
  156.             }
  157.             return null;
  158.         });
  159.     }
  160.     public static int insert(LockModel lockModel) throws Exception {
  161.         return exec(conn -> {
  162.             String sql = "insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?)";
  163.             PreparedStatement ps = conn.prepareStatement(sql);
  164.             int colIndex = 1;
  165.             ps.setString(colIndex++, lockModel.getLock_key());
  166.             ps.setString(colIndex++, lockModel.getRequest_id());
  167.             ps.setInt(colIndex++, lockModel.getLock_count());
  168.             ps.setLong(colIndex++, lockModel.getTimeout());
  169.             ps.setInt(colIndex++, lockModel.getVersion());
  170.             return ps.executeUpdate();
  171.         });
  172.     }
  173.     public static <T> T exec(SqlExec<T> sqlExec) throws Exception {
  174.         Connection conn = getConn();
  175.         try {
  176.             return sqlExec.exec(conn);
  177.         } finally {
  178.             closeConn(conn);
  179.         }
  180.     }
  181.     @FunctionalInterface
  182.     public interface SqlExec<T> {
  183.         T exec(Connection conn) throws Exception;
  184.     }
  185.     @Getter
  186.     @Setter
  187.     @Builder
  188.     public static class LockModel {
  189.         private String lock_key;
  190.         private String request_id;
  191.         private Integer lock_count;
  192.         private Long timeout;
  193.         private Integer version;
  194.     }
  195.     private static final String url = "jdbc:mysql://localhost:3306/javacode2018?useSSL=false";        //数据库地址
  196.     private static final String username = "root";        //数据库用户名
  197.     private static final String password = "root123";        //数据库密码
  198.     private static final String driver = "com.mysql.jdbc.Driver";        //mysql驱动
  199.     /**
  200.      * 连接数据库
  201.      *
  202.      * @return
  203.      */
  204.     public static Connection getConn() {
  205.         Connection conn = null;
  206.         try {
  207.             Class.forName(driver);  //加载数据库驱动
  208.             try {
  209.                 conn = DriverManager.getConnection(url, username, password);  //连接数据库
  210.             } catch (SQLException e) {
  211.                 e.printStackTrace();
  212.             }
  213.         } catch (ClassNotFoundException e) {
  214.             e.printStackTrace();
  215.         }
  216.         return conn;
  217.     }
  218.     /**
  219.      * 关闭数据库链接
  220.      *
  221.      * @return
  222.      */
  223.     public static void closeConn(Connection conn) {
  224.         if (conn != null) {
  225.             try {
  226.                 conn.close();  //关闭数据库链接
  227.             } catch (SQLException e) {
  228.                 e.printStackTrace();
  229.             }
  230.         }
  231.     }
  232. }

上面代码中实现了文章开头列的分布式锁的所有功能,大家可以认真研究下获取锁的方法:lock,释放锁的方法:unlock

测试用例

  1. package com.itsoku.sql;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.junit.Test;
  4. import static com.itsoku.sql.LockUtils.lock;
  5. import static com.itsoku.sql.LockUtils.unlock;
  6. /**
  7.  * 工作10年的前阿里P7分享Java、算法、数据库方面的技术干货!坚信用技术改变命运,让家人过上更体面的生活!
  8.  * 喜欢的请关注公众号:路人甲Java
  9.  */
  10. @Slf4j
  11. public class LockUtilsTest {
  12.     //测试重复获取和重复释放
  13.     @Test
  14.     public void test1() throws Exception {
  15.         String lock_key = "key1";
  16.         for (int i = 0; i < 10; i++) {
  17.             lock(lock_key, 10000L, 1000);
  18.         }
  19.         for (int i = 0; i < 9; i++) {
  20.             unlock(lock_key);
  21.         }
  22.     }
  23.     //获取之后不释放,超时之后被thread1获取
  24.     @Test
  25.     public void test2() throws Exception {
  26.         String lock_key = "key2";
  27.         lock(lock_key, 5000L, 1000);
  28.         Thread thread1 = new Thread(() -> {
  29.             try {
  30.                 try {
  31.                     lock(lock_key, 5000L, 7000);
  32.                 } finally {
  33.                     unlock(lock_key);
  34.                 }
  35.             } catch (Exception e) {
  36.                 e.printStackTrace();
  37.             }
  38.         });
  39.         thread1.setName("thread1");
  40.         thread1.start();
  41.         thread1.join();
  42.     }
  43. }

test1方法测试了重入锁的效果。

test2测试了主线程获取锁之后一直未释放,持有锁超时之后被thread1获取到了。

留给大家一个问题

上面分布式锁还需要考虑一个问题:比如A机会获取了key1的锁,并设置持有锁的超时时间为10秒,但是获取锁之后,执行了一段业务操作,业务操作耗时超过10秒了,此时机器B去获取锁时可以获取成功的,此时会导致A、B两个机器都获取锁成功了,都在执行业务操作,这种情况应该怎么处理?大家可以思考一下然后留言,我们一起讨论一下。

更多优质文章

mysql系列大概有20多篇,喜欢的请关注一下,欢迎大家加我微信itsoku或者留言交流mysql相关技术!

路人甲java

640?wx_fmt=png

▲长按图片识别二维码关注

来源:https://itsoku.blog.csdn.net/article/details/102714050

路人甲java:工作10年的前阿里P7分享Java、算法、数据库方面的技术干货!坚信用技术改变命运,让家人过上更体面的生活