Java多线程:条件变量
2023-06-13 09:18:40 时间
一、概览
条件变量将因不同条件而无法推进的线程分别阻塞在不同的条件队列上,可以精细控制线程同步,降低惊群效应。
1 类图
2 数据结构
//ExclusiveNode、SharedNode、ConditionNode都继承此类
abstract static class Node {
volatile Node prev; // initially attached via casTail
volatile Node next; // visibly nonnull when signallable
Thread waiter; // visibly nonnull when enqueued
volatile int status; // written by owner, atomic bit ops by others
}
static final class ConditionNode extends Node
implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // link to next waiting node
}
//条件队列
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient ConditionNode firstWaiter;
/** Last node of condition queue. */
private transient ConditionNode lastWaiter;
}
线程执行await后就会进入条件队列,等被唤醒时重新进入同步队列。
二、signal流程
signal会唤醒条件队列上的首个线程,而signalAll会唤醒全部线程,唤醒流程如下:
- 拿到firstWaiter,取消COND标志,并将node从条件队列上移除;
- 将node转入到同步队列,并调用LockSupport唤醒线程;
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, false);
}
public final void signalAll() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, true);
}
//AQS
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) {//取消COND状态
enqueue(first);//转入同步队列
if (!all)
break;
}
first = next;
}
}
final void enqueue(Node node) {
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null) // initialize
tryInitializeHead();
else if (casTail(t, node)) {
t.next = node;
if (t.status < 0) // wake up to clean link
LockSupport.unpark(node.waiter);
break;
}
}
}
}
三、await流程
await流程如下:
- 创建ConditionNode,并保存AQS的status值,然后释放掉锁,再将ConditionNode加入条件队列;
- 进入while循环,ForkJoinPool.managedBlock(node)最终会调用LockSupport.park阻塞线程;
- 当本线程被signal唤醒时,node已加入到同步队列,canReacquire返回true,跳出循环;
- 再次调用AQS.acquire获取锁,以原来的savedState设置AQS的status。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);//加入条件队列
LockSupport.setCurrentBlocker(this); // for back-compatibility,将AQS对象设置到thread中
boolean interrupted = false, cancelled = false, rejected = false;
while (!canReacquire(node)) {//如果被唤醒进入同步队列后就可以跳出循环
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
if (rejected)
node.block();
else
ForkJoinPool.managedBlock(node);//阻塞线程,最终会调用LockSupport.park()
} catch (RejectedExecutionException ex) {
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
}
//被唤醒
LockSupport.setCurrentBlocker(null);
node.clearStatus();//
//lock.lock()方法:acquire(null, arg, false, false, false, 0L);
//重新获取锁时已原来的savedState
acquire(node, savedState, false, false, false, 0L);//重新获取锁,此时该节点已经进入了同步队列,有可能直接tryAcquire成功跳出循环,也可能需要两次循环修改node.status为WAITING、park。
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
enableWait方法需要保存线程此时现场状态用于将来恢复,加入条件队列并释放锁。
/**
* Adds node to condition list and releases lock.
*
* @param node the node
* @return savedState to reacquire after wait
*/
private int enableWait(ConditionNode node) {
if (isHeldExclusively()) {//Sync 判断是否是持有锁的线程
node.waiter = Thread.currentThread();
node.setStatusRelaxed(COND | WAITING);//设置标志
//加入条件队列
ConditionNode last = lastWaiter;
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node;
//缓存状态用于恢复
int savedState = getState();
if (release(savedState))//AQS.release释放锁
return savedState;
}
node.status = CANCELLED; // lock not held or inconsistent
throw new IllegalMonitorStateException();
}
相关文章
- Java多线程详解_java支持多线程
- java static 变量存在哪_Java中的静态方法和静态变量存储在哪里?
- java 彻底删除文件_如何删除java文件「建议收藏」
- java switch用法_Java switch语句
- java启动器_JAVA基础:Java 启动器如何查找类
- java中static关键字的作用_Java:Java中static关键字作用
- 物业管理系统源码java_Java小区物业管理系统 源码报告下载
- JAVA数据库连接池_java与数据库的连接怎么实现
- 二叉树前序遍历Java「建议收藏」
- java softreference_Java引用总结–StrongReference、SoftReference、WeakReference、PhantomReference…[通俗易懂]
- java——继承遇到构造方法
- 深入java多线程与高并发:JMH与Disruptor,确定能学会?
- java工程师需要掌握的技能_java软件工程师需要学什么
- idea打开工程无法运行java程序_如何运行一个java程序
- 【Groovy】Groovy 脚本调用 ( Java 类中调用 Groovy 脚本 )
- java并发编程(2):Java多线程-java.util.concurrent高级工具
- Java 多线程并发编程之 Synchronized 关键字详解编程语言
- java学习笔记06–正则表达式详解编程语言
- Java性能监控工具:VisualVM详解编程语言
- Oracle 视图 USER_JAVA_DERIVATIONS 官方解释,作用,如何使用详细说明
- java基于redis事务的秒杀实现详解编程语言
- Java多线程(七):线程休眠详解编程语言
- java多线程编程之连续打印abc的几种解法详解编程语言
- 系统命令Java实现Linux系统命令调用的探究(java调用linux)
- 策略Java中使用Redis制定过期策略(redisjava过期)
- 删除使用Java操作Redis实现自动过期删除(redisjava过期)
- 策略优化Redis Java应用的过期策略(redisjava过期)
- 处理Java实现Redis数据过期策略.(redisjava过期)
- 教程:在Linux系统下安装Java(linux下java安装)
- 使用Java连接MySQL数据库的具体操作方法(java连接mysql代码)
- 从Java应用程序中实现Oracle配置连接(java配置oracle)
- Java迭代Oracle实现数据库更高性能(java迭代oracle)
- 异常Java程序捕获Oracle异常从失败中学习(java捕获oracle)
- 本使用Oracle Java 进行升级新版本带来新体验(oracle java版)
- JAVA多线程和并发基础面试问答(翻译)