zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Java AbstractQueuedSynchronizer(AQS)

JAVA AQS
2023-09-27 14:23:43 时间

AbstractQueuedSynchronizer 为 java.util.concurrent.locks 包下的一个抽象类,简称 AQS抽象队列同步器)。

并发包(JUC)中的 ReentrantLock、Semaphore、ReentrantReadWriteLock、SynchronousQueue、FutureTask 等,底层都是基于 AQS 来实现的。

 

一、使用

1.AQS 采用了模板模式

自定义同步器时需要重写下面几个方法。

// 该线程是否正在独占资源(是否在独占模式下被线程占用)。只有用到 condition 才需要去实现它。
boolean isHeldExclusively();

// 独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
boolean tryAcquire(int arg);

// 独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
// 成功后,等待中的其他线程此时将有机会获取到资源。
boolean tryRelease(int arg);

// 共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
int tryAcquireShared(int arg);

// 共享方式。尝试释放资源,成功则返回 true,失败则返回 false。
boolean tryReleaseShared(int);

默认每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。

AQS 类中的其它方法都是 final 修饰,无法被其它类使用。

2.AQS 对资源的共享方式

一般自定义同步器,要么是独占(tryAcquire-tryRelease)方式,要么是共享(tryAcquireShared-tryReleaseShared)方式。(AQS 也支持同时实现,如 ReentrantReadWriteLock)

  • Exclusive(独占):只有一个线程能执行,如 ReentrantLock。独占又可分为公平锁和非公平锁:

    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁

    • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的

  • Share(共享):多个线程可同时执行,如 Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock。

3.自定义同步器(独占式)

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class MyMutex {
    private final Sync sync = new Sync();

    // 当前状态为 0 时获取锁,然后进行 CAS 设置同步状态
    // 未获取到当前线程则会进入同步队列等待
    public void lock() {
        sync.acquire(1);
    }

    // 释放锁,将状态设置为 0
    public void unlock() {
        sync.release(1);
    }

    // 是否处于被当前线程占有状态
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    private static final class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            // 首先判断状态是否=0,如果状态=0,就将 status 设置为 1
            if (compareAndSetState(0, 1)) {
                // 将当前线程赋值给独占模式的 onwer
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            // 判断当前是获得资源的线程
            if (Thread.currentThread() != getExclusiveOwnerThread() || getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            // 没有线程拥有这个锁
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread() && getState() == 1;
        }
    }
}
View Code

测试,用 30 个线程,每个线程对 i 自加 10000 次,同步正常的话,最终结果应为 300000

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Demo {
    private static volatile int count;
    private static MyMutex mutex = new MyMutex();
    private static CyclicBarrier barrier = new CyclicBarrier(31);
    private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(30, 30, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(30));

    public static void main(String[] args) throws Exception {
        // 未加锁
        for (int i = 0; i < 30; i++) {
            // 向线程池提交任务
            threadPool.execute(() -> {
                try {
                    for (int j = 0; j < 10000; j++) {
                        count++;
                    }
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        barrier.await();
        System.out.println("未加锁,count=" + count);

        // 重置
        barrier.reset();
        count = 0;

        // 加锁
        for (int i = 0; i < 30; i++) {
            // 向线程池提交任务
            threadPool.execute(() -> {
                try {
                    for (int j = 0; j < 10000; j++) {
                        mutex.lock();
                        count++;
                        mutex.unlock();
                    }
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        barrier.await();
        System.out.println("加锁后,count=" + count);

        threadPool.shutdown();
    }
}
View Code

 

二、源码

AQS 内部实现主要是状态变量 state 和一个 FIFO(先进先出)队列来完成(这个内置的同步队列称为 CLH 队列,其实就是个双端双向链表)。

1.状态变量 state

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    // The current owner of exclusive mode synchronization
    private transient Thread exclusiveOwnerThread;

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    // 等待队列头部节点
    private transient volatile Node head;
    // 等待队列尾部节点
    private transient volatile Node tail;
    // 状态,大于等于 1 阻塞线程。类似当前窗口已有人在办理业务,排队的其它人需要等待
    private volatile int state;

    // CAS 方式更新
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

2.CLH 队列,由 AQS 中的一个静态内部类 Node 实现

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    static final class Node {
        static final Node SHARED = new Node(); // 标记:表示节点正在 共享 模式中等待
        static final Node EXCLUSIVE = null; // 标记:表示节点正在 独占 模式下等待

        static final int CANCELLED = 1; // 线程被取消
        static final int SIGNAL = -1; // 后继线程需要唤醒
        static final int CONDITION = -2; // 等待 Condition 唤醒
        static final int PROPAGATE = -3; // 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点
        volatile int waitStatus; // 当前节点在队列中的状态,初始为 0,状态是上面的几种

        volatile Node prev; // 当前结点的前驱节点,当节点加入同步队列时被设置(尾部添加)
        volatile Node next; // 当前结点的后继节点
        volatile Thread thread; // 与当前结点关联的线程

        /**
         * 既可以作为同步队列节点使用,也可以作为 Condition 的等待队列节点使用。
         * 在作为同步队列节点使用时,nextWaiter 可能有两个值:EXCLUSIVE、SHARED 标识当前节点是独占模式还是共享模式。
         * 在作为 Condition 的等待队列节点使用时,nextWaiter 保存后继节点。
         */
        Node nextWaiter;

        final boolean isShared() { // 是否在共享模式下等待
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException { // 返回前驱节点
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() { // 用于建立初始队列头或 共享 标记
        }

图示

两者关系

3.使用 ReentrantLock 类来看 AQS 的运行流程

以为公平锁为例

ReentrantLock lock = new ReentrantLock();

new Thread(() -> {
    try {
        Thread.sleep(1234);
        lock.lock();
        System.out.println("AAA");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}, "A").start();

new Thread(() -> {
    try {
        Thread.sleep(1234);
        lock.lock();
        System.out.println("BBB");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}, "B").start();

new Thread(() -> {
    try {
        Thread.sleep(1234);
        lock.lock();
        System.out.println("CCC");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}, "C").start();
View Code

第一个线程获取锁(可以获取到锁)

public class ReentrantLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.lock();
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {
        abstract void lock();
    }

    static final class NonfairSync extends Sync {
        final void lock() {
            if (compareAndSetState(0, 1))
                // 会进入这里,初始状态变量默认为 0
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    }

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

这时重点看第二个线程获取锁(第一个线程没有释放锁,第二个线程获取不到锁),这里比较繁琐,分步来说

public class ReentrantLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.lock();
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {
        abstract void lock();
    }

    static final class NonfairSync extends Sync {
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 会进入这里,状态变量已被第一个线程修改
                acquire(1);
        }
    }

继续看 acquire(1),其中会再次尝试获取锁

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    public final void acquire(int arg) {
        // 先看 tryAcquire,尝试获取锁
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

public class ReentrantLock implements Lock, java.io.Serializable { static final class NonfairSync extends Sync { protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 第一个线程已经释放锁 if (compareAndSetState(0, acquires)) { // 第二个线程获取到锁 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 当前线程是否等于已获取到锁的线程 // 已获取到锁的线程又获取到锁,可重入锁 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); // 再次修改状态变量 return true; } return false; // 再次尝试获取锁失败 } }

继续看尝试获取锁失败的情况,这时就会形成队列

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    public final void acquire(int arg) {
        // 尝试获取锁失败,tryAcquire 返回 false,取反为 true,会继续执行 addWaiter
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    private Node addWaiter(Node mode) {
        // 将当前线程包装为 Node 节点,以 EXCLUSIVE(独占) 模式
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) { // 看队列尾节点是否不为 null,这时队列都还没又,自然为 null
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node); // 建立队列
        return node; // 返回当前线程包装而成的节点
    }

    private Node enq(final Node node) {
        for (; ; ) { // 自旋
            Node t = tail;
            if (t == null) { // 尾节点是否为 null,当然是
                if (compareAndSetHead(new Node())) // 注意这里 new 了一个空 Node 为队列的头节点(哨兵节点)
                    tail = head; // 让尾节点等于头节点,这时已形成了队列(只有一个节点,还是空 Node)
            } else { // 自旋第二次进入这里,把当前节点插入队列
                node.prev = t; // 当前节点上一个指向尾节点
                if (compareAndSetTail(t, node)) { // 尾节点没有被改变,就把当前节点设置为尾节点
                    t.next = node; // 尾节点的写一个指向当前节点,这时就已经成功将当前节点插入到队列末尾
                    return t; // 返回之前的尾节点,结束自旋
                }
            }
        }
    }

这时的队列情况

继续看加入队列之后的操作,可以猜想到肯定会阻塞线程

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    public final void acquire(int arg) {
        // addWaiter(Node.EXCLUSIVE) 返回当前线程包装而成的节点,继续执行 acquireQueued
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true; // 标记是否成功拿到锁
        try {
            boolean interrupted = false; // 标记等待过程中是否被中断过
            for (; ; ) {
                // 获取当前 Node 的前驱 Node,为 null 会抛异常(这时队列里有两个 Node,这里获取到的显然是头节点)
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 不等于头节点或获取锁失败,这里是头节点,所以是获取锁失败
                // 如果自己可以休息了,就通过 park() 进入 waiting 状态,直到被 unpark()。
                // 如果在不可中断的情况下被中断了,那么会从 park() 中醒过来,发现拿不到资源,从而继续进入 park() 等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true; // 如果等待过程中被中断过,哪怕只有一次,就将 interrupted 标记为 true
            }
        } finally {
            if (failed)
                // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待
                cancelAcquire(node);
        }
    }

//
static final int CANCELLED = 1; // 线程被取消 // static final int SIGNAL = -1; // 后继线程需要唤醒 // static final int CONDITION = -2; // 等待 Condition 唤醒 // static final int PROPAGATE = -3; // 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驱节点状态,头节点创建时默认为 0 if (ws == Node.SIGNAL) return true; // 第二此会走这里,表示后继节点可以被阻塞了 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 第一次会进入这里,将头节点的值设为 -1 } return false; } private final boolean parkAndCheckInterrupt() { // shouldParkAfterFailedAcquire 把前驱节点状态设置好后,开始阻塞当前节点 LockSupport.park(this); return Thread.interrupted(); // 返回在被阻塞的这段时间内线程是否被中断过 }

https://www.cnblogs.com/jhxxb/p/10864125.html

https://www.cnblogs.com/jhxxb/p/11527735.html

这时的队列状态,头节点已标记后继结点需要唤醒,后继节点已被阻塞

继续再回到第一个线程执行完毕,这时需要唤醒后继节点,并进行出队操作

public class ReentrantLock implements Lock, java.io.Serializable {
    public void unlock() {
        sync.release(1);
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases; // 当前状态变量减一
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { // 若同一线程多次获取到锁,这里就不等于 0
            free = true;
            setExclusiveOwnerThread(null); // 置空
        }
        setState(c); // 设置状态变量
        return free;
    }

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    public final boolean release(int arg) {
        if (tryRelease(arg)) { // 当前状态变量已为 0
            Node h = head;
            if (h != null && h.waitStatus != 0) // 在后继节点阻塞时已经设置头节点状态了
                unparkSuccessor(h); // 唤醒 h 的后继节点
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0); // 设置头节点状态为 0,因为要唤醒后续节点了

        Node s = node.next;
        if (s == null || s.waitStatus > 0) { // s 若有后继节点,s.waitStatus 会为 -1
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); // 唤醒后继节点
    }

已执行完的头节点进行出队操作

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    public final void acquire(int arg) {
        // addWaiter(Node.EXCLUSIVE) 返回当前线程包装而成的节点,继续执行 acquireQueued
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true; // 标记是否成功拿到锁
        try {
            boolean interrupted = false; // 标记等待过程中是否被中断过
            for (; ; ) {
                // 获取当前 Node 的前驱 Node,为 null 会抛异常(这时队列里有两个 Node,这里获取到的显然是头节点)
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { // 等于头节点,再次尝试获取锁
                    // 获取锁成功
                    setHead(node); // 头节点显然已执行完毕,需要移除,然后将当前节点设置为头节点
                    p.next = null; // 让已执行完的头节点不可达,help GC
                    failed = false; // 成功获取资源
                    return interrupted; // 返回等待过程中是否被中断过
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待
                cancelAcquire(node);
        }
    }

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

队列状态

到这里,正常情况下的获取锁与释放锁的流程就进行完了

 


https://mp.weixin.qq.com/s/PAn5oTlvVmjMepmCRdBnkQ

https://mp.weixin.qq.com/s/uOIuvC6ZjpdQyEtrHK6Ocw

https://www.cnblogs.com/waterystone/p/4920797.html

https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html

https://blog.csdn.net/pange1991/article/details/80930394

https://coderbee.net/index.php/concurrent/20131115/577/comment-page-1

https://segmentfault.com/a/1190000015562787

https://www.javadoop.com/post/AbstractQueuedSynchronizer