zl程序教程

您现在的位置是:首页 >  其他

当前栏目

AQS同步组件-ReentrantLock、ReentrantReadWriteLock解析和用例

同步组件 解析 用例 ReentrantLock AQS ReentrantReadWriteLock
2023-06-13 09:13:24 时间

ReentrantLock原理

重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁,而不会造成自己阻塞自己。重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞。 ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进入,但是在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。 除此之外,该锁的还支持获取锁时的公平和非公平性选择。实际上,公平的锁机制往往没有非公平的效率高,但是,并不是任何场景都是以TPS作为唯一的指标,公平锁能够减少“饥饿”发生的概率,等待越久的请求越是能够得到优先满足。 公平性锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。 在Java里一共有两类锁, 一类是synchornized同步锁,还有一种是JUC里提供的锁Lock,Lock是个接口,其核心实现类就是ReentrantLock。 ReentrantLock主要利用CAS+AQS队列来实现 ,主要是采用自旋锁,循环调用CAS操作来实现加锁,避免了使线程进入内核态的阻塞状态。 CAS:Compare and Swap,比较并交换。CAS有3个操作数:内存值V、预期值A、要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。该操作是一个原子操作,被广泛的应用在Java的底层实现中。在Java中,CAS主要是由sun.misc.Unsafe这个类通过JNI调用CPU底层指令实现

ReentrantLock和synchronized区别

可重入性:两者的锁都是可重入的,差别不大,有线程进入锁,计数器自增1,等下降为0时才可以释放锁 锁的实现:synchronized是基于JVM实现,操作系统级别(用户很难见到,无法了解其实现),ReentrantLock是JDK实现的,可以查到到对应实现的源码。 公平锁:synchronized只能是非公平锁。而ReentrantLock可以实现公平锁和非公平锁两种。 超时设置:synchronized不能设置超时,而Lock可以设置超时。 中断等待:synchronized不能中断一个等待锁的线程,而Lock可以中断一个试图获取锁的线程。 性能区别:在最初的时候,二者的性能差别差很多,当synchronized引入了偏向锁、轻量级锁(自选锁)后,二者的性能差别不大,官方推荐synchronized(写法更容易、在优化时其实是借用了ReentrantLock的CAS技术,试图在用户态就把问题解决,避免进入内核态造成线程阻塞) 功能区别

  • 便利性:synchronized更便利,它是由编译器保证加锁与释放,不会产生死锁。ReentrantLock是需要手动释放锁,所以为了避免忘记手工释放锁造成死锁,所以最好在finally中声明释放锁。
  • 锁的细粒度和灵活度,ReentrantLock优于synchronized ReentrantLock独有的功能
  • 可指定是公平锁还是非公平锁,所谓公平锁就是先等待的线程先获得锁
  • 提供了一个Condition类,可以分组唤醒需要唤醒的线程
  • 提供能够中断等待锁的线程的机制,lock.lockInterruptibly()

ReentrantLock 和synchronized应该怎么选?

从上边的介绍,看上去ReentrantLock不仅拥有synchronized的所有功能,而且有一些功能synchronized无法实现的特性。性能方面,ReentrantLock也不比synchronized差,那么到底我们要不要直接使用ReentrantLock放弃使用synchronized呢?答案是不要这样做。 J.U.C包中的锁定类是用于高级情况和高级用户的工具,除非说你对Lock的高级特性有特别清楚的了解以及有明确的需要,或这有明确的证据表明同步已经成为可伸缩性的瓶颈的时候,否则我们还是继续使用synchronized。相比较这些高级的锁定类,synchronized还是有一些优势的,比如synchronized不可能忘记释放锁。还有当JVM使用synchronized管理锁定请求和释放时,JVM在生成线程转储时能够包括锁定信息,这些信息对调试非常有价值,它们可以标识死锁以及其他异常行为的来源。在确实需要一些 synchronized 所没有的特性的时候,比如时间锁等候、可中断锁等候、无块结构锁、多个条件变量或者轮询锁。 ReentrantLock 还具有可伸缩性的好处,应当在高度争用的情况下使用它,但是请记住,大多数 synchronized 块几乎从来没有出现过争用,所以可以把高度争用放在一边。我建议用 synchronized 开发,直到确实证明 synchronized 不合适,而不要仅仅是假设如果使用 ReentrantLock “性能会更好”。总之,如果需要实现ReenTrantLock的三个独有功能时,就选择使用ReenTrantLock, 通常情况下synchronized就能够满足了,而且使用起来简单,由JVM管理,不会产生死锁。

源码分析

构造函数

/**
     * 初始化的时候默认给了一个不公平锁
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * 也可以加参数来初始化指定使用公平锁还是不公平锁
     *
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

常用方法

void  lock()   //加锁 
void  unlock()  //释放锁
tryLock() //仅在调用时锁定未被另一个线程保持的情况下才获取锁定。
tryLock(long timeout, TimeUnit unit) //如果锁定在给定的时间内没有被另一个线程保持且当前线程没有被中断,则获取这个锁定。
boolean isHeldByCurrentThread();   // 当前线程是否保持锁定
boolean isLocked()  // 是否存在任意线程持有锁资源
void lockInterruptbly()  // 如果当前线程未被中断,则获取锁定;如果已中断,则抛出异常(InterruptedException)
int getHoldCount()   // 查询当前线程保持此锁定的个数,即调用lock()方法的次数
int getQueueLength()   // 返回正等待获取此锁定的预估线程数
int getWaitQueueLength(Condition condition)  // 返回与此锁定相关的约定condition的线程预估数
boolean hasQueuedThread(Thread thread)  // 当前线程是否在等待获取锁资源
boolean hasQueuedThreads()  // 是否有线程在等待获取锁资源
boolean hasWaiters(Condition condition)  // 是否存在指定Condition的线程正在等待锁资源
boolean isFair()   // 是否使用的是公平锁

公平锁与非公平锁

static final class FairSync extends Sync {
    final void lock() { // 1 注意对比公平锁和非公平锁的这个方法
        acquire(1);
    }
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 2 和非公平锁相比,这里多了一个判断:是否有线程在等待
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

static final class NonfairSync extends Sync {
    final void lock() {
      //  1 和公平锁相比,这里会直接先进行一次CAS,如果当前正好没有线程持有锁,
      // 如果成功获取锁就直接返回了,就不用像公平锁那样一定要进行后续判断
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
 
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 2 这里没有对阻塞队列进行判断
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

可以看到公平锁和非公平锁的两个区别: (1) 线程在获取锁调用lock()时,非公平锁首先会进行一次CAS尝试抢锁,如果此时没有线程持有锁或者正好此刻有线程执行完释放了锁(state == 0),那么如果CAS成功则直接占用锁返回。 (2) 如果非公平锁在上一步获取锁失败了,那么就会进入nonfairTryAcquire(int acquires),在该方法里,如果state的值为0,表示当前没有线程占用锁或者刚好有线程释放了锁,那么就会CAS抢锁,如果抢成功了,就直接返回了,不管是不是有其他线程早就到了在阻塞队列中等待锁了。而公平锁在这里抢到锁了,会判断阻塞队列是不是空的,毕竟要公平就要讲先来后到,如果发现阻塞队列不为空,表示队列中早有其他线程在等待了,那么公平锁情况下线程会乖乖排到阻塞队列的末尾。 如果非公平锁 (1)(2) 都失败了,那么剩下的过程就和非公平锁一样了。 (3) 从(1)(2) 可以看出,非公平锁可能导致线程饥饿,但是非公平锁的效率要高。

使用案例

lock()、unlock()

@Slf4j
@ThreadSafe
public class ReentrantLockExample {

    /**
     * 请求总数
     */
    public static int clientTotal = 5000;

    public static int count = 0;
    /**
     * 定义一个可重入锁
     */
    final static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) throws Exception {
        //创建线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //计数器  (把请求计数)
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);

        for (int i = 0; i < clientTotal; i++) {
            executorService.execute(() -> {
                try {
                    add();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                //计数器减1
                countDownLatch.countDown();
            });
        }
        //当所有请求结束
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        //操作前加锁
        lock.lock();
        try {
            count++;
        } catch (Exception e) {

        } finally {
            //操作后在finally中关闭锁,确保锁成功释放,避免死锁
            lock.unlock();
        }
    }
}

多次输出结果都是5000,证明线程安全。

19:43:01.841 [main] INFO com.zjq.concurrency.lock.ReentrantLockExample - count:5000

new ReentrantLock(true)公平锁

	/**
     * 定义一个公平锁 ReentrantLock(true)参数为true,表明实现公平锁机制
     */
    final static ReentrantLock lock = new ReentrantLock(true);

    public static void main(String[] args) throws Exception {
        new Thread(() -> testLock(), "线程壹号").start();
        new Thread(() -> testLock(), "线程贰号").start();
        new Thread(() -> testLock(), "线程叁号").start();
    }

    private static void testLock() {
        for (int i = 0; i < 2; i++) {
            //操作前加锁
            lock.lock();
            try {
                log.info("{}获取了锁", Thread.currentThread().getName());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //操作后在finally中关闭锁,确保锁成功释放,避免死锁
                lock.unlock();
            }
        }
    }

输出结果,两次执行顺序完全一样。

new ReentrantLock()非公平锁

    /**
     * 定义一个非公平锁,new一个ReentrantLock的时候参数默认为false,可以不用指定为false
     */
    final static ReentrantLock lock = new ReentrantLock();

非公平锁输出没有规律,随机的获取,谁运气好,cpu时间片轮到哪个线程,哪个线程就能获取锁

lockInterruptbly()响应中断

 /**
     * 定义一个非公平锁,new一个ReentrantLock的时候参数默认为false,可以不用指定为false
     */
    final static ReentrantLock lock1 = new ReentrantLock();
    final static ReentrantLock lock2 = new ReentrantLock();

    public static void main(String[] args) throws Exception {
        Thread thread1 = new Thread(new ThreadTest(lock1, lock2), "线程壹号");
        Thread thread2 = new Thread(new ThreadTest(lock2, lock1), "线程贰号");
        thread1.start();
        thread2.start();
        thread1.interrupt();
    }


    static class ThreadTest implements Runnable {

        Lock lock111;
        Lock lock222;

        public ThreadTest(Lock lock111, Lock lock222) {
            this.lock111 = lock111;
            this.lock222 = lock222;
        }

        @Override
        public void run() {
            try {
                //如果当前线程未被中断,则获取锁定;如果已中断,则抛出异常
                lock111.lockInterruptibly();
                TimeUnit.MILLISECONDS.sleep(50);
                lock222.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock111.unlock();
                lock222.unlock();
                log.info("{}获取到了锁", Thread.currentThread().getName());
            }

        }
    }

我们定义了两个锁lock1和lock2。然后使用两个线程thread1和thread2构造死锁场景。正常情况下,这两个线程相互等待获取资源而处于死循环状态。但是我们此时thread1中断,另外一个线程就可以获取资源,正常地执行了。 输出结果:

tryLock()、tryLock(long timeout, TimeUnit unit)

/**
     * 定义一个非公平锁,new一个ReentrantLock的时候参数默认为false,可以不用指定为false
     */
    final static ReentrantLock lock1 = new ReentrantLock();
    final static ReentrantLock lock2 = new ReentrantLock();

    public static void main(String[] args) throws Exception {
        Thread thread1 = new Thread(new ThreadTest(lock1,lock2),"线程壹号");
        Thread thread2 = new Thread(new ThreadTest(lock2,lock1),"线程贰号");
        thread1.start();
        thread2.start();
    }


    static class ThreadTest implements Runnable{

        Lock lock111;
        Lock lock222;

        public ThreadTest(Lock lock111, Lock lock222) {
            this.lock111 = lock111;
            this.lock222 = lock222;
        }

        @Override
        public void run() {
            try {
                while (!lock1.tryLock()) {
                    TimeUnit.MILLISECONDS.sleep(50);
                    log.info("{}尝试获取锁",Thread.currentThread().getName());
                }
                while (!lock2.tryLock()) {
                    TimeUnit.MILLISECONDS.sleep(50);
                    log.info("{}尝试获取锁",Thread.currentThread().getName());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock111.unlock();
                lock222.unlock();
                log.info("{}获取到了锁",Thread.currentThread().getName());
            }

        }
    }

输出结果:

22:31:13.316 [线程壹号] INFO com.zjq.lock.ReentrantLockTryLockExample - 线程壹号获取到了锁
22:31:13.325 [线程贰号] INFO com.zjq.lock.ReentrantLockTryLockExample - 线程贰号尝试获取锁
22:31:13.325 [线程贰号] INFO com.zjq.lock.ReentrantLockTryLockExample - 线程贰号获取到了锁

Condition的使用

Condition可以非常灵活的操作线程的唤醒,下面是一个线程等待与唤醒的例子,其中用1234序号标出了日志输出顺序

 /**
     * 请求总数
     */
    public static int clientTotal = 5000;

    public static int count = 0;
    /**
     * 定义一个可重入锁
     */
    final static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) throws Exception {
            //创建一个可重入锁
            ReentrantLock reentrantLock = new ReentrantLock();
            //创建condition
            Condition condition = reentrantLock.newCondition();
            //线程1
            new Thread(() -> {
                try {
                    reentrantLock.lock();
                    log.info("wait signal"); // 1
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("get signal"); // 4
                reentrantLock.unlock();
            }).start();
            //线程2
            new Thread(() -> {
                reentrantLock.lock();
                log.info("get lock"); // 2
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //发送信号
                condition.signalAll();
                log.info("send signal"); // 3
                reentrantLock.unlock();
            }).start();
    }

输出结果:

输出讲解:

  1. 线程1调用了reentrantLock.lock(),线程进入AQS等待队列,输出1号log
  2. 接着调用了awiat方法,线程从AQS队列中移除,锁释放,直接加入condition的等待队列中
  3. 线程2因为线程1释放了锁,拿到了锁,输出2号log
  4. 线程2执行condition.signalAll()发送信号,输出3号log
  5. condition队列中线程1的节点接收到信号,从condition队列中拿出来放入到了AQS的等待队列,这时线程1并没有被唤醒。
  6. 线程2调用unlock释放锁,因为AQS队列中只有线程1,因此AQS释放锁按照从头到尾的顺序,唤醒线程1
  7. 线程1继续执行,输出4号log,并进行unlock操作。

ReentrantReadWriteLock读写锁

在没有任何读写锁的时候才可以取得写入锁(悲观读取,容易写线程饥饿),也就是说如果一直存在读操作,那么写锁一直在等待没有读的情况出现,这样我的写锁就永远也获取不到,就会造成等待获取写锁的线程饥饿。平时使用的场景并不多。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写,使得并发性相比一般的排他锁有了很大提升。

 private final Map<String, Data> map = new TreeMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        //读操作 加入写锁
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys() {
        //读操作 加入写锁
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public Data put(String key, Data value) {
        //写操作 加入写锁
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            readLock.unlock();
        }
    }

    class Data {

    }

票据锁:StempedLock

它控制锁有三种模式(写、读、乐观读)。一个StempedLock的状态是由版本和模式两个部分组成。锁获取方法返回一个数字作为票据(stamp),他用相应的锁状态表示并控制相关的访问。数字0表示没有写锁被锁写访问,在读锁上分为悲观锁和乐观锁。 乐观读: 如果读的操作很多写的很少,我们可以乐观的认为读的操作与写的操作同时发生的情况很少,因此不悲观的使用完全的读取锁定。程序可以查看读取资料之后是否遭到写入资料的变更,再采取之后的措施。

//定义一个StampedLock
private final static StampedLock lock = new StampedLock();

//操作前加锁
long stamp = lock.writeLock();
try {
    count++;
} catch (Exception e) {

} finally {
    //操作后在finally中关闭锁,确保锁成功释放,避免死锁
    lock.unlock(stamp);
}

源码分析

class Point {
        private double x, y;
        private final StampedLock sl = new StampedLock();

        void move(double deltaX, double deltaY) { // an exclusively locked method
            long stamp = sl.writeLock();
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                sl.unlockWrite(stamp);
            }
        }

        //下面看看乐观读锁案例
        double distanceFromOrigin() { // A read-only method
            long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
            double currentX = x, currentY = y;  //将两个字段读入本地局部变量
            if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
                stamp = sl.readLock();  //如果没有,我们再次获得一个读悲观锁
                try {
                    currentX = x; // 将两个字段读入本地局部变量
                    currentY = y; // 将两个字段读入本地局部变量
                } finally {
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }

        //下面是悲观读锁案例
        void moveIfAtOrigin(double newX, double newY) { // upgrade
            // Could instead start with optimistic, not read mode
            long stamp = sl.readLock();
            try {
                while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
                    long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
                    if (ws != 0L) { //这是确认转为写锁是否成功
                        stamp = ws; //如果成功 替换票据
                        x = newX; //进行状态改变
                        y = newY;  //进行状态改变
                        break;
                    } else { //如果不能成功转换为写锁
                        sl.unlockRead(stamp);  //我们显式释放读锁
                        stamp = sl.writeLock();  //显式直接进行写锁 然后再通过循环再试
                    }
                }
            } finally {
                sl.unlock(stamp); //释放读锁或写锁
            }
        }
    }