zl程序教程

您现在的位置是:首页 >  工具

当前栏目

【Redisson】三.可重入锁-watchdog维持加锁源码

源码 加锁 重入 维持 Redisson
2023-09-27 14:25:05 时间

前言

  本篇主要介绍基于Redisson实现的分布式锁,获取锁之后,通过watchdog机制异步的,定时的,递归的判断是否存活,从而进行锁续期

Watchdog机制

  在使用Redisson分布式的锁的过程中,如果客户端的请求线程获取锁之后,由于当前任务执行时间较长,线程任务没执行完毕,但又超过了线程占有这把锁的时间(初始时间是30s)

那么就需要watchdog机制对当前线程锁持有得到这把锁进行续期,从而保证任务执行完毕

源码分析

  通过以下代码,可以看到,当前线程执行lock方法时,会执行tryAcquireAsync方法,在该方法中,执行lua脚本判断加锁是否存在,并且给到锁对应的有效期,异步返回Future对象

  同时,当Future对象获取到值,且线程已经获取到锁之后(怎么判断当前线程已经获取到锁,见备注),scheduleExpirationRenewal()方法 会执行看门狗锁续期的逻辑

  注意:

    1.这里源码版本是3.6,获取Future对象的值方式可能有区别,原有的版本使用的监听器的方式,这里使用的是JDK8的并发的API

    2.如果当前线程一直持有锁没有释放,则后台的watchdog线程,默认每隔10s会将持有锁的线程的锁的有效期续期至默认的30s;

  

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //返回当前加锁的Key的剩余时间的Future对象
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime != -1) {
            //自定义的锁的占有时间
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            //使用默认的锁占有时间
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        //这里当Future对象获取到异步线程的执行结果时,调用下面方法执行,注意,这里watchdog续期的使用的也是异步线程
        CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
            // lock acquired
            if (ttlRemaining == null) {
                //这里为什么ttlRemaining为null表示当前线程获取到锁,是因为上述tryLockInnerAsync中执行的lua脚本判断锁对象对应的Key已经不存在或已过期
                if (leaseTime != -1) {
                    //自定义的锁的占有时间
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    //默认情况下会,会执行锁续期的逻辑
                    scheduleExpirationRenewal(threadId);
                }
            }
            return ttlRemaining;
        });
        return new CompletableFutureWrapper<>(f);
    }

 

   scheduleExpirationRenewal 续期的逻辑如下

   进一步调用 renewExpiration()方法

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        //递归调用锁续期的方法,直到线程任务执行结束(也就是在Redisson维护的Map中找不到对应的Key)
                        renewExpiration();
                    } else {
                        //取消锁续期的逻辑
                        cancelExpirationRenewal(null);
                    }
                });
            }
            //这里是通过续期时间/3,例如,30s续期时间,则每隔10s执行一次续期的逻辑,将锁的存活时间延长至 internalLockLeaseTime默认的30s,
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }

   锁续期的lua脚本

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                //判断加锁key的值Map中,当前线程对应值是否为1,如果为1,表示当前线程还是持有锁的,持有锁,就要进行续期
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        //将加锁key对应的Map中,对应加锁线程的值重新设置为 internalLockLeaseTime 30秒
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }