java.util.concurrent解析——AbstractQueuedSynchronizer队列管理
上一篇博客中,我们提到AQS的队列管理是基于CLH锁队列实现的,所以首先我们来看下CLH锁队列。
1 CLH锁队列CLH锁队列本质上是一个基于链表的FIFO自旋锁队列,队列中的每一个节点实质上是一个自旋锁:在阻塞时不断循环读取状态变量,当前驱节点释放同步对象使用权后,跳出循环,执行同步代码。其基本结构如下:
队列中每一个节点有两个成员:
节点状态变量 前驱指针:predhead,tail并不是实际节点,只是为了表示队列的首尾,被称为dumb node。
在如此结构之下,其enqueue操作逻辑如下:
do { pred = tail; } while(!tail.compareAndSet(pred, node));
其lock操作如下:
public void lock() { final Node node = new Node(); node.locked = true; // 一个CAS操作即可将当前线程对应的节点加入到队列中, // 并且同时获得了前继节点的引用,然后就是等待前继释放锁 Node pred = this.tail.getAndSet(node); this.prev.set(pred); while (pred.locked) {// 进入自旋 }
可以看到其自旋逻辑。
而其dequeue操做更加简单:
head = node;
从面的操作,可以看到CLH锁队列有如下优势:
队列的入列、出列操作原子性完成,无需加锁,高效 判断当前队列等待是否为空同样简单,只需检查head是否为tail即可 每个节点独立维护其状态变量,避免了集中状态管理的内存竞争 2 AQS进程队列AQS进程队列相比于CLH锁队列主要做了两处修改:
每个节点新增一个next指针。由于AQS队列中的进程不仅有自旋等待,还包括阻塞等待的情况。阻塞等待的队列需要其他队列主动唤醒。这就要求队列中某个节点出列时需要显式告知其后继节点,因而需要加入next指针 节点状态变量status由一个bit替换成一个int。这主要是由于AQS下的状态更加复杂首先来看下AQS队列节点的基本结构:
static final class Node { // 表明节点是否以共享模式等待的标记 static final Node SHARED = new Node(); // 表明节点是否以独占模式等待的标记 static final Node EXCLUSIVE = null; // 表明线程已被取消 static final int CANCELLED = 1; // 表明后续节点的线程需要unparking static final int SIGNAL = -1; // 表明线程正在等待一个条件 static final int CONDITION = -2; // 表明下一次acquireShared应该无条件传播 static final int PROPAGATE = -3; * 状态字段,只能取下面的值: * SIGNAL(-1): 这个结点的后继是(或很快是)阻塞的(通过park),所以当前结点 * 必须unpark它的后继,当它释放或取消时。为了避免竞争,acquire方法必须 * 首先表明它们需要一个信号,然后再次尝试原子性acquire,如果失败了就阻塞。 * CANCELLED(1): 这个结点由于超时或中断已被取消。结点从不离开这种状态。尤其是, * 这种状态的线程从不再次阻塞。 * CONDITION(-2): 这个结点当前在一个条件队列上。它将不会用于sync队列的结点, * 直到被转移,在那时,结点的状态将被设为0. * 这个值在这里的使用与其他字段的使用没有关系,仅仅是简化结构。 * PROPAGATE(-3): releaseShared应该传递给其他结点。这是在doReleaseShared里设置 * (仅仅是头结点)以确保传递继续,即使其他操作有干涉。 * 0: 非以上任何值。 * 值是组织为数字的用以简化使用。非负值表示结点不需要信号。这样,大部分代码不需要 * 检查特定的值,只需要(检查)符号。 * 对于普通同步结点,字段初始化为0;对于条件结点初始化为CONDITION(-2)。 * 通过CAS操作修改(或者,当允许时,用无条件volatile写。) volatile int waitStatus; * 连接到当前结点/线程依赖的用来检查等待状态的前驱结点。 * 在进入队列时赋值,只在出队列时置为空(为了GC考虑)。 * 根据前驱结点的取消,我们使查找一个非取消结点的while循环短路,这个总是会退出, * 因为头结点从不会是取消了的:一个结点成为头只能是一次成功的acquire操作结果。 * 一个取消了的线程从不会在获取操作成功,线程只能取消自己,不能是其他结点。 volatile Node prev; * 连接到当前结点/线程释放时解除阻塞的后续结点。 * 在入队列时赋值,在绕过已取消前驱节点时调整,出队列时置为空(for GC)。 * 入队操作不会给前驱结点的next字段赋值,直到附件后(把新节点赋值给队列的tail属性?), * 所以看到next字段为空不一定表示它就是队列的尾结点。然而,如果next字段看起来是空, * 我们可以从tail向前遍历进行双重检查。 * 被取消了的结点的next字段被设置为指向它自己而不是空,这让isOnSyncQueue变得容易。 volatile Node next; * 列队在这个结点的线程,在构造时初始化,用完后置空。 volatile Thread thread; * 连接到下一个在条件上等待的结点或是特殊的值SHARED。 * 因为条件队列只在独占模式下持有时访问,我们只需要一个简单的链表队列来持有在条件上等待的结点。 * 他们然后被转移到队列去re-acquire。 * 因为条件只能是独占的,我们通过用一个特殊的值来表明共享模式 来节省一个字段。 Node nextWaiter; Node() { // Used to establish initial head or SHARED marker Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; }
接下来我们就来看下其主要操作的主要逻辑。
3 enqueue由于AQS队列节点包括pred和next两个指针,无法通过一次原子操作更新两个指针。所以添加结点到队列的操作最重要的是要保证:即使添加的CAS操作失败了,也不能影响队列结点现有的连接关系。
对于新加结点:
在CAS之前指向它的预期前驱 CAS成功之后再更新预期前驱的后继指针。在步骤1成功之后、步骤2完成之前,其他线程通过结点的 “next” 连接可能看到“尾结点”(即代码里的 pred)的 “next” 为空,但其实队列里已经加入新的结点,这也是为什么通过 “next” 连接遍历队列时碰到后继为空的,必须从原子地更新的 “tail” 结点向后遍历。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 尝试enq的快速路径;失败后回退到完整的enq。 Node pred = tail; if (pred != null) { // 把新结点的前驱指向pred,必须在下面的CAS完成之前设置, // 这样确保一旦CAS成功后,从tail向后遍历是ok的。 node.prev = pred;// 步骤 1 if (compareAndSetTail(pred, node)) { //CAS // 新节点成功进入队列 // 前驱结点的next字段指向新结点,建立完整的连接。 pred.next = node; // 步骤 2 return node; enq(node); return node; private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 队列是空,必须初始化。 if (compareAndSetHead(new Node())) // 原子地设置头结点 tail = head; // 尾结点也指向头结点 } else { node.prev = t; if (compareAndSetTail(t, node)) { // 步骤 1 t.next = node; // 步骤 2 // 在把新结点设置为tail后才能更新前驱的next字段,这样,即使CAS失败了也不会影响原来的连接关系。 return t; }4 acquire
acquire方法不提供绝对公平的保证,因为现在在加入队列之前先进行tryAcquire操作,如果这个线程先于头结点锁定,那么头结点就只能继续等待了。这种情形称为闯入。
这个acquire之所以先尝试获取是为了在无竞争的情况下提高性能,并可以实现非公平的获取。如果要保证绝对的公平性,则可以在子类实现的tryAcquire方法里判断当前线程是否是头结点,是则尝试获取,不是则直接返回false。
// 以独占模式获取 public final void acquire(int arg) { if (!tryAcquire(arg) // 首先尝试获取 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 失败后加入等待队列,再从队列里再次尝试获取;成功获取后才返回, // 返回的boolean表示线程是否曾经被中断。 // 在acquireQueued方法里,线程可能被反复park、unpark,直到获取锁。 selfInterrupt(); // 重新设置中断状态位,是否执行取决于acquireQueued的返回值 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 线程是否曾被中断是由这个变量记录的。 for (;;) { // 死循环,用于acquire失败后重试 final Node p = node.predecessor(); if (p == head tryAcquire(arg)) {// 前驱是头结点,继续尝试获取 setHead(node); p.next = null; // help GC failed = false; return interrupted; // 检测是否需要等待,如果需要,则park当前线程 // 只有前驱在等待时才进入等待,否则继续重试 if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt()) // 线程进入等待,需要其他线程来唤醒这个线程以继续执行 interrupted = true; // 只要线程在等待过程中被中断过一次就会记录下来 } finally { if (failed) // acquire失败,取消acquire cancelAcquire(node); * 这个方法是信号控制的核心。检查和更新没有成功获取的结点的状态。 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 前驱结点也在等待,说明这是一个稳定的等待状态。 return true ; if (ws 0) { // 前驱结点已取消,向前遍历直到找到一个非取消结点。 do { node.prev = pred = pred.prev; } while (pred.waitStatus // 把找到的结点的后继指向node,那么当前pred与node之间的已取消结点就不再被引用了,可以被垃圾回收。 pred.next = node; } else { // 前驱的状态必是 0 或 PROPAGATE之一。表明需要一个信号,但不park先。 // 调用者需要重试来确保它在park之前没法获取。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); return false; private final boolean parkAndCheckInterrupt() { // park当前执行线程, 其他线程unpark这个线程后继续执行 LockSupport.park( this); return Thread.interrupted(); }5 release
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null h.waitStatus != 0) unparkSuccessor(h); return true; return false; private void unparkSuccessor(Node node) { * 如果status是负的(比如,可能需要信号)尝试清除预期的信号。 * 如果这失败了或status被其他等待线程修改也是没关系的。 int ws = node.waitStatus; if (ws 0) compareAndSetWaitStatus(node, ws, 0); * 准备unpark的线程在后继里持有,一般就是下一个结点。 * 但如果被取消或是空,从tail向后遍历来找到实际的非取消后继。 Node s = node.next; if (s == null || s.waitStatus 0) { // 没有直接后继或直接后继不需要通知 s = null; // 从tail向后遍历,查找需要通知的结点 for (Node t = tail; t != null t != node; t = t.prev) // 找到一个后不跳出循环是为了找到最老的需要通知的结点。 if (t.waitStatus = 0) s = t; if (s != null) // 结点不为null,唤醒后继的等待线程 LockSupport.unpark(s.thread); }
JUC:java.util.concurrent理解与使用示例 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
java.util.concurrent解析——AbstractQueuedSynchronizer综述 尽管JVM在并发上已经做了很多优化工作,如偏向锁、轻量级锁、自旋锁等等。但是基于`Synchronized` `wait` `notify`实现的同步机制还是无法满足日常开发中。原生同步机制在时间和空间上的开销也一直备受诟病。
java.util.concurrent解析——ThreadPoolExecutor源码解析 任何一种语言、框架,线程都是非常重要的一部分。要想实现异步就需要通过异步线程,但是频繁地创建销毁线程会带来较大的性能开销,而线程池就是为解决这一问题而出现的
相关文章
- java虚拟机学习-JVM内存管理:深入Java内存区域与OOM(3)
- 跟我学Java多线程——线程池与堵塞队列
- Java 自定义序列化、反序列化
- 2014面试总结--java、数据 方向
- CSDN日报191016:Java纯干货分享:史上最全的JAVA工程师面试题汇总
- Java面试--TCP和UDP
- JAVA文件类工具
- Java/JSP/JS Debug笔记
- Tomcat 奔溃:java.lang.OutOfMemoryError: Java heap space
- 基于Java实现生产者与消费者算法模拟【100010232】
- java 线程池线程忙碌且阻塞队列也满了时给一个拒接的详细报告
- Java Design Demo -简单的队列-异步多任务队列(java android)
- java高级用法之:在JNA中将本地方法映射到JAVA代码中
- java安全编码指南之:输入注入injection
- 你不知道的java对象序列化的秘密
- 华为OD机试 - 连续字母长度(Java) | 机试题+算法思路+考点+代码解析 【2023】
- 【JAVA】泛型——对数据类型转换和数据存取方法的逐步优化过程
- Eclipse迅速执行:Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
- Java开发 - 消息队列前瞻
- Java开发 - 消息队列之Kafka初体验
- 从零学Java(19)之 if else分支结构详解,小AD要搞对面心态!
- Java并发问题--乐观锁与悲观锁以及乐观锁的一种实现方式-CAS
- Java hutool/java 常用方法