Redisson如何解决Redis分布式锁提前释放问题
在分布式场景下,相信你或多或少需要使用分布式锁来访问临界资源,或者控制耗时操作的并发性。
当然,实现分布式锁的方案也比较多,比如数据库、redis、zk 等等。本文主要结合一个线上案例,讲解 redis 分布式锁的相关实现。
一、问题描述:某天线上出现了数据重复处理问题,经排查后发现,竟然是单次处理时间较长,redis 分布式锁提前释放导致相同请求并发处理。
其实,这是一个锁续约的问题,对于一把分布式锁,我们需要考虑,设置锁多长时间过期、出现异常如何释放锁?
以上问题便是本文要讨论的主题。
二、原因分析:项目采用较简单的自定义 redis 分布式锁,为避免死锁定义默认过期时间 10s,如下:
override fun lock() { while (true) {
//尝试获取锁
if (tryLock()) {
return
}
try {
Thread.sleep(10)
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
}
override fun tryLock(): Boolean {
val value = getUniqueSign() // 随机串
val flag = redisTemplate!!.opsForValue().setIfAbsent(name, value, 10000, TimeUnit.MILLISECONDS)
if (flag != null flag) {
VALUE_lOCAL.set(value)
INTO_NUM_LOCAL.set(if (INTO_NUM_LOCAL.get() != null) INTO_NUM_LOCAL.get() + 1 else 1)
return true
}
return false
}
缺乏对锁自动续期等实现。
三、解决方案: 1、思考:针对这种场景,可以考虑的是如何给锁自动续期-当业务没有执行结束的情况下,当然也可以自定义实现 比如开一个后台线程定时的给这些拿到锁的线程续期。
Redisson 也正是基于这种思路实现自动续期的分布式锁,各种异常情况也考虑的更加完善,综合考虑采用 Redisson 的分布式锁解决方案优化。
2、Redisson简单配置: @Configuration@EnableConfigurationProperties(RedissonProperties::class)
class RedissonConfig {
@Bean
fun redissonClient(redissonProperties: RedissonProperties): RedissonClient {
val config = Config()
val singleServerConfig = redissonProperties.singleServerConfig!!
config.useSingleServer().setAddress(singleServerConfig.address)
.setDatabase(singleServerConfig.database)
.setUsername(singleServerConfig.username)
.setPassword(singleServerConfig.password)
.setConnectionPoolSize(singleServerConfig.connectionPoolSize)
.setConnectionMinimumIdleSize(singleServerConfig.connectionMinimumIdleSize)
.setConnectTimeout(singleServerConfig.connectTimeout)
.setIdleConnectionTimeout(singleServerConfig.idleConnectionTimeout)
.setRetryInterval(singleServerConfig.retryInterval)
.setRetryAttempts(singleServerConfig.retryAttempts)
.setTimeout(singleServerConfig.timeout)
return Redisson.create(config)
}
}
@ConfigurationProperties(prefix = xxx.redisson )
class RedissonProperties {
var singleServerConfig: SingleServerConfig? = null
}
Redis 服务使用的腾讯云的哨兵模式架构,此架构对外开放一个代理地址访问,因此这里配置单机模式配置即可。
如果你是自己搭建的 redis 哨兵模式架构,需要按照文档配置相关必要参数
3、使用样例: @Autowired
lateinit var redissonClient: RedissonClient
fun xxx() {
val lock = redissonClient.getLock( mylock )
lock.lock()
try {
} finally {
lock.unlock()
}
}
使用方式和JDK提供的锁是不是很像?是不是很简单?
正是Redisson这类优秀的开源产品的出现,才让我们将更多的时间投入到业务开发中
四、源码分析下面来看看 Redisson 对常规分布式锁的实现,主要分析 RedissonLock
1、lock加锁操作 @Overridepublic void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
// 租约期限, 也就是expire时间, -1代表未设置 将使用系统默认的30s
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 尝试拿锁, 如果能拿到就直接返回
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture RedissonLockEntry future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
// 如果拿不到锁就尝试一直轮循, 直到成功获取锁或者异常终止
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
}
} finally {
unsubscribe(future, threadId);
}
}
1.1、tryAcquire
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private T RFuture Long tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture Long ttlRemainingFuture;
// 调用真正获取锁的操作
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) - {
if (e != null) {
return;
}
// lock acquired
// 这里是成功获取了锁, 尝试给锁续约
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
// 通过lua脚本真正执行加锁的操作
T RFuture T tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand T command) {
// 如果key不存在, 那正好, 直接set并设置过期时间
// 如果key存在, 就有两种情况需要考虑
// 同一线程获取重入锁,直接将field(也就是getLockName(threadId))对应的value值+1
// 不同线程竞争锁, 此次加锁失败, 并直接返回此key对应的过期时间
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
if (redis.call( exists , KEYS[1]) == 0) then +
redis.call( hincrby , KEYS[1], ARGV[2], 1); +
redis.call( pexpire , KEYS[1], ARGV[1]); +
return nil; +
end; +
if (redis.call( hexists , KEYS[1], ARGV[2]) == 1) then +
redis.call( hincrby , KEYS[1], ARGV[2], 1); +
redis.call( pexpire , KEYS[1], ARGV[1]); +
return nil; +
end; +
return redis.call( pttl , KEYS[1]); ,
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
1.2、续约
通过 scheduleExpirationRenewal 给锁续约
protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 续约操作
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 设置延迟任务task, 在时长internalLockLeaseTime/3之后执行, 定期给锁续期
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 真正执行续期命令操作
RFuture Boolean future = renewExpirationAsync(threadId);
future.onComplete((res, e) - {
if (e != null) {
log.error( Can t update lock + getRawName() + expiration , e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 这次续期之后, 继续schedule自己, 达到持续续期的效果
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
// 所谓续期, 就是将expire过期时间再延长
protected RFuture Boolean renewExpirationAsync(long threadId) {
// 如果key以及当前线程存在, 则延长expire时间, 并返回1代表成功;否则返回0代表失败
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
if (redis.call( hexists , KEYS[1], ARGV[2]) == 1) then +
redis.call( pexpire , KEYS[1], ARGV[1]); +
return 1; +
end; +
return 0; ,
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
}
}
public RFuture Void unlockAsync(long threadId) {
RPromise Void result = new RedissonPromise ();
// 执行解锁操作
RFuture Boolean future = unlockInnerAsync(threadId);
// 操作成功之后做的事
future.onComplete((opStatus, e) - {
// 取消续约task
cancelExpirationRenewal(threadId);
});
return result;
}
protected RFuture Boolean unlockInnerAsync(long threadId) {
// 如果key以及当前线程对应的记录已经不存在, 直接返回空
// 否在将field(也就是getLockName(threadId))对应的value减1
// 如果减去1之后值还大于0, 那么重新延长过期时间
// 如果减去之后值小于等于0, 那么直接删除key, 并发布订阅消息
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
if (redis.call( hexists , KEYS[1], ARGV[3]) == 0) then +
return nil; +
end; +
local counter = redis.call( hincrby , KEYS[1], ARGV[3], -1); +
if (counter 0) then +
redis.call( pexpire , KEYS[1], ARGV[2]); +
return 0; +
else +
redis.call( del , KEYS[1]); +
redis.call( publish , KEYS[2], ARGV[1]); +
return 1; +
end; +
return nil; ,
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
以上便是 redisson 客户端工具对 redis 分布式锁的加/解锁具体实现,主要解决了以下几个问题
1、死锁问题:设置过期时间
2、可重入问题:重入+1, 释放锁-1,当值=0时代表完全释放锁
3、续约问题:可解决锁提前释放问题
4、锁释放:谁加锁就由谁来释放
本文由一个线上问题做引子,通过 redis 分布式锁的常用实现方案,最终选定 redisson 的解决方案; 并分析 redisson 的具体实现细节
相关参考: Redisson官方文档 分布式锁和同步器 Redisson官方文档 配置方法 CSDN 如何使用Redis实现分布式锁?到此这篇关于Redisson如何解决Redis分布式锁提前释放问题的文章就介绍到这了,更多相关Redis分布式锁提前释放内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
我想要获取技术服务或软件
服务范围:MySQL、ORACLE、SQLSERVER、MongoDB、PostgreSQL 、程序问题
服务方式:远程服务、电话支持、现场服务,沟通指定方式服务
技术标签:数据恢复、安装配置、数据迁移、集群容灾、异常处理、其它问题
本站部分文章参考或来源于网络,如有侵权请联系站长。
数据库远程运维 Redisson如何解决Redis分布式锁提前释放问题
相关文章
- 发挥Redis分布式优势,实现极致性能(分布式redis)
- 开启Redis:让分布式数据应用更简单(redis应用场景)
- 使用Redis实现分布式锁(分布式锁redis)
- 实现使用Redis客户端实现高性能分布式缓存(redis的客户端)
- Redis实现分布式加锁的方法(怎么用redis加锁)
- 深入了解Redis企业集群的运行状况(查看redis集群情况)
- Redis内存容量查看一个实用的技术方法(查看redis内存容量)
- 泛微突破Redis储存瓶颈,实现数据畅通(泛微 redis)
- 构建高效可靠的Redis分布式集群(搭建redis分布式集群)
- 基于Web的Redis管理系统(web redis管理)
- Redis实现分布式锁,秒杀你的心愿(分布式锁 redis抢购)
- 使用分布式读写锁Redis实现数据安全(分布式读写锁redis)
- 使用分布式Redis实现极致命中(分布式redis怎么命中)
- 实现无缝分布式Redis同步(分布式redis同步)
- 如何快速安全关闭Redis密码(关闭redis密码)
- Redis缓存技术助力企业信息化提升(关于redis的作用)
- 揭秘Redis缓存查询技巧大全(怎么查redis缓存)
- 锁利用Redis打造分布式锁(基于redis的分布式)
- 启动Redis集群激活你的分布式缓存(启动redis集群命令)
- 据库Redis集群建立高可用安全的分布式数据库(redis集群数)
- 分布式主键管理之Redis篇(分布式主键redis)
- Redis集群安全删除函数的调用(redis集群删除函数)
- Redis集群 实现分布式更新的精准方案(redis集群分布式更新)
- Redis实现随机选取三张头像(redis随机取三个头像)
- 研究Redis实现的高性能分布式锁方法(redis锁方法)
- Redis让进程外缓存拥有神奇的威力(redis进程外缓存)
- 重要Redis如何设置访问IP(redis访问ip设置)
- 使用Redis实现快速获取所有用户信息(redis获取所有用户)
- Redis实现分布式锁,解决多线程安全问题(redis能做分布式锁)
- 利用Redis实现多线程自动过期(redis过期 多线程)
- 研究Redis缓存系统的组件功能(redis缓存相关组件)