zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

分布式锁----浅析redis实现

Redis分布式分布式 实现 ---- 浅析
2023-09-27 14:20:56 时间

引言
大概两个月前小伙伴问我有没有基于redis实现过分布式锁,之前看redis的时候知道有一个RedLock算法可以实现分布式锁,我接触的分布式项目要么是github上开源学习的,要么是小伙伴们公司项目我们一起讨论问题涉及的,我自己公司的项目中没有实践分布式锁的地方也就没有仔细研究,向小伙伴推荐使用的是redisson实现的就是RedLock算法;当然有能力的还可以自己根据redis作者的RedLock算法描述去实现

插曲
关于RedLock算法的安全性有位大牛 Martin Kleppmann 产生了分歧 How to do distributed locking ;当然Redis作者 antirez 也做出了回应 Is Redlock safe?;当然这是神仙"打架",我们从中学习大牛分析的问题,从而规避即可。

浅析
加锁
redisson通过lua脚本来实现加锁和释放锁,使用lua脚本可以保证原子性

KEYS[1] 就是我们自己定义的 锁名
ARGV[2] 就是生成的锁id UUID+线程id
ARGV[1] 就是生存时间
if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
锁对应的value+1 熟悉AQS锁就会知道 这是锁重入
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);
假设 现在线程a,b来请求锁,a先请求到,自定义锁名叫做MY_TEST_LOCK;
b来请求锁时,发现MY_TEST_LOCK 这个锁可以已经存在了,走第二个if;
如果不存在 将key 锁id 超时时间 设置到redis中,返回null表示获取到了锁
第二if判断这个锁名+锁id有没有存在, 如果存在 说明是重入了 就把value加1
返回null 表示获取到了锁
如果不存在返回MY_TEST_LOCK 这个锁的剩余时间,代码中b线程会while循环,
不停的尝试加锁
释放锁
释放锁
KEYS[1] 锁名 例如:MY_TEST_LOCK
KEYS[2] 通道名 当释放锁时发现锁不在redis中时使用
ARGV[1] 锁id
ARGV[2] 锁剩余时间
ARGV[3] 锁重入的值
如果锁不存在 说明已经释放过了 发布redis消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
如果锁对应得value 和redis中value不对应,说明该线程没有持有锁,不能释放
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
锁对应的value -1 也就是释放锁
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
如果锁value还是大于0 说明有重入情况 不删除
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
否则删除 发布redis消息
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;";
例子
这里只是很浅的说一下怎么用,然后解释一下源码里是怎么while循环获取锁的

哨兵模式
Config config = new Config();
config.useSentinelServers().addSentinelAddress(
"redis://172.29.3.245:26378","redis://172.29.3.245:26379", "redis://172.29.3.245:26380")
.setMasterName("mymaster")
.setPassword("a123456").setDatabase(0);

集群模式
Config config = new Config();
config.useClusterServers().addNodeAddress(
"redis://172.29.3.245:6375","redis://172.29.3.245:6376", "redis://172.29.3.245:6377",
"redis://172.29.3.245:6378","redis://172.29.3.245:6379", "redis://172.29.3.245:6380")
.setPassword("a123456").setScanInterval(5000);

单redis模式
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress("redis://127.0.0.1:6380")
.setTimeout(4000 * 10)
.setIdleConnectionTimeout(1000 * 60 * 10);

获取锁
public static final String MY_TEST_LOCK_NAME = "MY_TEST_LOCK";
RedissonClient redissonClient = Redisson.create(config);
RLock lock = redissonClient.getLock(USER_LOCK_NAME);
boolean getLock = false;
try {
getLock = lock.tryLock(10, 5, TimeUnit.SECONDS);
if (getLock){
获取到锁后执行代码
System.out.println(Thread.currentThread().getName()+"线程 锁住");
}
} catch (InterruptedException e) {
//todo 处理异常
e.printStackTrace();
} finally {
lock.unlock();
}

单redis版 获取代码
/**
* waitTime 获取可以等待的时间
* leaseTime 过了这个时间之后 redis这个锁自动消失
*/
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
先获取一下锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
如果获取到了锁 返回值就是null
if (ttl == null) {
return true;
}

time -= (System.currentTimeMillis() - current);
if (time <= 0) {
time <= 0表示超时了
acquireFailed(threadId);
return false;
}

current = System.currentTimeMillis();
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (subscribeFuture.isSuccess()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
acquireFailed(threadId);
return false;
}

try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
循环获取锁
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}

time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}

// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}

多节点版获取锁 RedissonMultiLock
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// try {
// return tryLockAsync(waitTime, leaseTime, unit).get();
// } catch (ExecutionException e) {
// throw new IllegalStateException(e);
// }
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}

long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);

需要多少个redis 加锁成功 限制(N/2 + 1)
int failedLocksLimit = failedLocksLimit();
加锁成功集合
List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}

加锁成功 加入到成功集合
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
失败判断成功节点是否达到了要求
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}

if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}

if (remainTime != -1) {
remainTime -= (System.currentTimeMillis() - time);
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}

if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}

for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly(http://www.my516.com);
}
}

return true;
}
zookeeper实现分布式锁
在于小伙伴讨论Redis实现分布式锁的同时,我们在万能的github上发现了另一种zookeeper实现分布式锁的方式
zookeeper只是听过,没有用过,这里简单说下区别:
redis 分布式锁,需要自己不断去尝试获取锁,比较消耗性能,但是效率高
zk 分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小,但是健壮性强
另外一点就是,如果是 redis 获取锁的那个客户端 出现 bug 挂了,那么只能等待超时时间之后才能释放锁;而 zk 的话,因为创建的是临时 znode,只要客户端挂了,znode 就没了,此时就自动释放锁