Redis分布式锁的使用和实现原理详解
2.新建一个Spring Boot项目,在pom里面引入相关的依赖。
dependency groupId org.springframework.boot /groupId artifactId spring-boot-starter-web /artifactId /dependency dependency groupId org.springframework.boot /groupId artifactId spring-boot-starter-data-redis /artifactId /dependency
3.接下来,在application.yml配置redis属性和指定应用的端口号:
server: port: 8090 spring: redis: host: 192.168.0.60 port: 6379
4.新建一个Controller类,扣减库存第一版代码:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.Objects; @RestController public class StockController { private static final Logger logger = LoggerFactory.getLogger(StockController.class); @Resource private StringRedisTemplate stringRedisTemplate; @RequestMapping("/reduceStock") public String reduceStock() { // 从redis中获取库存数量 int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount"))); if (stock 0) { // 减库存 int restStock = stock - 1; // 剩余库存再重新设置到redis中 stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock)); logger.info("扣减成功,剩余库存:{}", restStock); } else { logger.info("库存不足,扣减失败。"); return "success"; }
上面第一版的代码存在什么问题:超卖。假如多个线程同时调用获取库存数量的代码,那么每个线程拿到的都是100,判断库存都大于0,都可以执行减库存的操作。假如两个线程都做减库存更新缓存,那么缓存的库存变成99,但实际上,应该是减掉2个库存。
那么很多人的第一个想法是加synchronized同步代码块,因为获取数量和减库存不是原子性操作,有多个线程来执行代码的时候,只允许一个线程执行代码块里的代码。那么改完的第二版的代码如下:
@RequestMapping("/reduceStock") public String reduceStock() { synchronized (this) { // 从redis中获取库存数量 int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount"))); if (stock 0) { // 减库存 int restStock = stock - 1; // 剩余库存再重新设置到redis中 stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock)); logger.info("扣减成功,剩余库存:{}", restStock); } else { logger.info("库存不足,扣减失败。"); return "success"; }
但使用synchronize存在的问题,就是只能保证单机环境运行时没有问题的。但现在的软件公司里,基本上都是集群架构,是多实例,前面使用Nginx做负载均衡,大概架构如下:
Nginx分发请求,把请求发送到不同的Tomcat容器,而synchronize只能保证一个应用是没有问题的。
那么代码改进第三版,就是引入redis分布式锁,具体代码如下:
@RequestMapping("/reduceStock") public String reduceStock() { String lockKey = "stockKey"; try { boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1"); if (!result) { return "errorCode"; // 从redis中获取库存数量 int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount"))); if (stock 0) { // 减库存 int restStock = stock - 1; // 剩余库存再重新设置到redis中 stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock)); logger.info("扣减成功,剩余库存:{}", restStock); } else { logger.info("库存不足,扣减失败。"); } finally { stringRedisTemplate.delete(lockKey) return "success"; }
如果有一个线程拿到锁,那么其他的线程就会等待。一定要记得在finally里面把使用完的锁要删除掉。否则一旦抛出异常,只有一个线程会一直持有锁,其他线程没有机会获取。
但如果在执行if (stock 0) {代码块里的代码,因为宕机或重启没有执行完,也会一直持有锁,所以,这里需要把锁加一个超时时间:
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1"); stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
但如果上面两行代码在中间执行出问题了,设置超时时间的代码还没执行,也会出现锁不能释放的问题。好在有对应的方法:就是把上面两行代码设置成一个原子操作:
// 这里默认设置超时时间为10秒 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
到此为止,如果并发量不是很大的话,基本上是没有问题的。
但是,如果请求的并发量很大,就会出现新的问题:有种比较特殊的情况,第一个线程执行了15秒,但是执行到10秒钟的时候,锁已经失效释放了,那么在高并发场景下,第二个线程发现锁已经失效,那么它就可以拿到这把锁进行加锁,
假设第二个线程执行需要8秒,它执行到5秒钟后,此时第一个线程已经执行完了,执行完那一刻,进行了删除key的操作,但是此时的锁是第二个线程加的,这样第一个线程把第二个线程加的锁删掉了。
那意味着第三个线程又可以拿到锁,第三个线程执行了3秒钟,此时第二个线程执行完毕,那么第二个线程把第三个线程的锁又删除了。导致锁失效。
那么解决的思路就是,我自己加的锁,不要被别人删掉。那么可以为每个进来的请求生成一个唯一的id,作为分布式锁的值,然后在释放时,判断一下当前线程的id,是不是和缓存里的id是否相等。
@RequestMapping("/reduceStock") public String reduceStock() { String lockKey = "stockKey"; String id = UUID.randomUUID().toString(); try { // 这里默认设置超时时间为30秒 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, id, 30, TimeUnit.SECONDS); if (!result) { return "errorCode"; // 从redis中获取库存数量 int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount"))); if (stock 0) { // 减库存 int restStock = stock - 1; // 剩余库存再重新设置到redis中 stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock)); logger.info("扣减成功,剩余库存:{}", restStock); } else { logger.info("库存不足,扣减失败。"); } finally { if (id.contentEquals(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(lockKey)))) { stringRedisTemplate.delete(lockKey); return "success"; }
到此为止,一个比较完善的锁就实现了,可以应付大部分场景。
当然,上面的代码还有一个问题,就是一个线程执行时间超过了过期时间,后面的代码还没有执行完,锁就已经删除了,还是会有些bug存在。解决的方法是给锁续命的操作。
在当前主线程获取到锁以后,可以fork出一个线程,执行Timer定时器操作,假如默认超时时间为30秒,那么定时器每隔10秒去看下这把锁还是否存在,存在就说明这个锁里的逻辑还没有执行完,那么就可以把当前主线程的超时时间重新设置为30秒;如果不存在,就直接结束掉。
但是上面的逻辑,在高并发场景下,实现比较完善还是比较困难的。好在现在已经有比较成熟的框架,那就是Redisson。官方地址https://redisson.org。
下面用Redisson来实现分布式锁。
首先引入依赖包:
dependency groupId org.redisson /groupId artifactId redisson /artifactId version 3.6.5 /version /dependency
配置类:
@Configuration public class RedissonConfig { @Bean public Redisson redisson() { // 单机模式 Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.0.60:6379").setDatabase(0); return (Redisson) Redisson.create(config); }
接下来用redisson重写上面的减库存操作:
@Resource private Redisson redisson; @RequestMapping("/reduceStock") public String reduceStock() { String lockKey = "stockKey"; RLock redissonLock = redisson.getLock(lockKey); try { // 加锁,锁续命 redissonLock.lock(); // 从redis中获取库存数量 int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount"))); if (stock 0) { // 减库存 int restStock = stock - 1; // 剩余库存再重新设置到redis中 stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock)); logger.info("扣减成功,剩余库存:{}", restStock); } else { logger.info("库存不足,扣减失败。"); } finally { redissonLock.unlock(); return "success"; }
其实就是三个步骤:获取锁,加锁,释放锁。
先简单看下Redisson的实现原理:
这里先说一下Redis很多操作使用Lua脚本来实现原子性操作,关于Lua语法,可以去网上找下相关教程。
使用Lua脚本的好处有:
1.减少网络开销,多个命令可以使用一次请求完成;
2.实现了原子性操作,Redis会把Lua脚本作为一个整体去执行;
3.实现事务,Redis自带的事务功能有限,而Lua脚本实现了事务的常规操作,而且还支持回滚。
但是Lua实际上不会使用很多,如果Lua脚本执行时间过长,因为Redis是单线程,因此会导致堵塞。
最后,说下Redisson分布式锁的代码实现,
找到上面的redissonLock.lock();
lock方法点进去,一直点到RedissonLock类里面的lockInterruptibly方法:
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { // 获取线程id long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; RFuture RedissonLockEntry future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; // waiting for message if (ttl = 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } finally { unsubscribe(future, threadId); // get(lockAsync(leaseTime, unit)); }
重点看下tryAcquire方法,把线程id作为一个参数传递进来,在这个方法里面,找到tryLockInnerAsync方法点进去,
T RFuture T tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand T command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call("exists", KEYS[1]) == 0) then " + "redis.call("hset", KEYS[1], ARGV[2], 1); " + "redis.call("pexpire", KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call("hexists", KEYS[1], ARGV[2]) == 1) then " + "redis.call("hincrby", KEYS[1], ARGV[2], 1); " + "redis.call("pexpire", KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call("pttl", KEYS[1]);", Collections. Object singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
这里就是一堆Lua脚本,先看第一个if命令,先去判断 KEYS[1](就是对应的锁key的名字),如果不存在,在hashmap里,设置一个属性为线程id,值为1,再把map的过期时间设置为internalLockLeaseTime,这个值默认是30秒,
上面的操作对应的命令是:
hset keyname id:thread 1 pexpire keyname 30
然后返回nil,相当于null,那程序return了。
另外,Redisson还支持重入锁,那第二个if就是执行重入锁的操作,会判断锁是否存在,并且传入的线程id是否是当前线程的id,若果是,支持重复加锁进行自增操作;
如果是其他线程调用lock方法,上面两个if判断不会走,会返回锁剩余过期时间。
接着返回到tryAcquireAsync方法里面往下看:
实际上是加了一个监听器,在监听器里面有个很重要的方法scheduleExpirationRenewal,一看这个名字就能大概猜出是什么功能,
里面有个定时任务的轮询,
private void scheduleExpirationRenewal(final long threadId) { if (expirationRenewalMap.containsKey(getEntryName())) { return; Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { // 判断传递进来的线程id是否是我们之前主线程设置的id,如果是,则增加续命,增加30秒。 RFuture Boolean future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call("hexists", KEYS[1], ARGV[2]) == 1) then " + "redis.call("pexpire", KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections. Object singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); future.addListener(new FutureListener Boolean () { @Override public void operationComplete(Future Boolean future) throws Exception { expirationRenewalMap.remove(getEntryName()); if (!future.isSuccess()) { log.error("Can"t update lock " + getName() + " expiration", future.cause()); return; if (future.getNow()) { // reschedule itself scheduleExpirationRenewal(threadId); }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) { task.cancel(); }
接着推迟10秒钟(internalLockLeaseTime / 3),再执行续命操作逻辑。
到最后,再回到lockInterruptibly方法,如果ttl 为null,说明加锁成功了,就返回null,那如果其他线程的话,就会返回剩余过期时间,那么就会进入到while死循环里,一直尝试加锁,调用tryAcquire方法,在琐失效以后,再会尝试获取加锁。
到此为止,分析完毕。
总结
到此这篇关于Redis分布式锁的使用和实现原理的文章就介绍到这了,更多相关Redis分布式锁的使用和原理内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
我想要获取技术服务或软件
服务范围:MySQL、ORACLE、SQLSERVER、MongoDB、PostgreSQL 、程序问题
服务方式:远程服务、电话支持、现场服务,沟通指定方式服务
技术标签:数据恢复、安装配置、数据迁移、集群容灾、异常处理、其它问题
本站部分文章参考或来源于网络,如有侵权请联系站长。
数据库远程运维 Redis分布式锁的使用和实现原理详解
相关文章
- 极速体验:Redis查询加速(redis查询速度)
- 实现分布式锁的原理:Redis技术实践(redis分布式锁的原理)
- 深入浅出Redis:正则表达式篇(redis正则表达式)
- 如何跨距离连接Redis服务器(怎么远程链接redis)
- 缓存深入浅出怎样利用Redis实现分布式缓存(怎么使用redis分布式)
- 安全稳定,实现梦想Redis稳定版来袭(稳定版redis)
- 程序员深入学习Redis系统(程序员redis系统)
- 百度突破瓶颈Redis分布式系统招募(百度社招redis分布式)
- Redis单机部署也能实现SS没分布式(ss没分布式redis)
- 实现分布式session共享基于SSM框架的Redis分布式Session共享实现(ssm 结合redis)
- 优雅删除Redis集合连接赛跑(删除redis集合)
- 以Redis为基础实现分布式锁(创建redis 分布式锁)
- 使用Redis解决分布式事务问题(分布式事务用redis)
- 分布式Redis的过期事件处理(分布式redis过期事件)
- 分布式环境下Redis过期监控实践(分布式redis过期监听)
- 如何关闭Redis的保护模式(关闭redis的保护模式)
- 的分布式缓存实现Redis风格的分布式缓存技术(如何实现类似redis)
- 分布式架构下的Redis部分数据同步(多个redis 部分同步)
- Redis基于Raft的分布式实现(基于raft的redis)
- 灵活运用Redis,聪明获取项目值(在项目中用redis取值)
- Redis开启Redis之路安装好Redis如何连接(安装好redis怎么连接)
- 余胜军带你快速掌握Redis(余胜军redis课程)
- 强大的Redis集群助力分布式系统(redis集群与分布式)
- Redis为分布式系统解锁(redis 释放分布式锁)
- Redis探索分布式之路(redis通向分布式)
- 处理Redis连接池中读超时处理之道(redis连接池读超时)
- Redis实现自动处理订单过期(redis订单过期实现)
- 解析Redis获取键的数量的简单方法(redis获取键的数量)