Java多线程:条件变量
2023-03-07 09:44:27 时间
一、概览
条件变量将因不同条件而无法推进的线程分别阻塞在不同的条件队列上,可以精细控制线程同步,降低惊群效应。
1 类图
2 数据结构
//ExclusiveNode、SharedNode、ConditionNode都继承此类
abstract static class Node {
volatile Node prev; // initially attached via casTail
volatile Node next; // visibly nonnull when signallable
Thread waiter; // visibly nonnull when enqueued
volatile int status; // written by owner, atomic bit ops by others
}
static final class ConditionNode extends Node
implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // link to next waiting node
}
//条件队列
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient ConditionNode firstWaiter;
/** Last node of condition queue. */
private transient ConditionNode lastWaiter;
}
线程执行await后就会进入条件队列,等被唤醒时重新进入同步队列。
二、signal流程
signal会唤醒条件队列上的首个线程,而signalAll会唤醒全部线程,唤醒流程如下:
- 拿到firstWaiter,取消COND标志,并将node从条件队列上移除;
- 将node转入到同步队列,并调用LockSupport唤醒线程;
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, false);
}
public final void signalAll() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, true);
}
//AQS
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) {//取消COND状态
enqueue(first);//转入同步队列
if (!all)
break;
}
first = next;
}
}
final void enqueue(Node node) {
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null) // initialize
tryInitializeHead();
else if (casTail(t, node)) {
t.next = node;
if (t.status < 0) // wake up to clean link
LockSupport.unpark(node.waiter);
break;
}
}
}
}
三、await流程
await流程如下:
- 创建ConditionNode,并保存AQS的status值,然后释放掉锁,再将ConditionNode加入条件队列;
- 进入while循环,ForkJoinPool.managedBlock(node)最终会调用LockSupport.park阻塞线程;
- 当本线程被signal唤醒时,node已加入到同步队列,canReacquire返回true,跳出循环;
- 再次调用AQS.acquire获取锁,以原来的savedState设置AQS的status。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);//加入条件队列
LockSupport.setCurrentBlocker(this); // for back-compatibility,将AQS对象设置到thread中
boolean interrupted = false, cancelled = false, rejected = false;
while (!canReacquire(node)) {//如果被唤醒进入同步队列后就可以跳出循环
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
if (rejected)
node.block();
else
ForkJoinPool.managedBlock(node);//阻塞线程,最终会调用LockSupport.park()
} catch (RejectedExecutionException ex) {
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
}
//被唤醒
LockSupport.setCurrentBlocker(null);
node.clearStatus();//
//lock.lock()方法:acquire(null, arg, false, false, false, 0L);
//重新获取锁时已原来的savedState
acquire(node, savedState, false, false, false, 0L);//重新获取锁,此时该节点已经进入了同步队列,有可能直接tryAcquire成功跳出循环,也可能需要两次循环修改node.status为WAITING、park。
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
enableWait方法需要保存线程此时现场状态用于将来恢复,加入条件队列并释放锁。
/**
* Adds node to condition list and releases lock.
*
* @param node the node
* @return savedState to reacquire after wait
*/
private int enableWait(ConditionNode node) {
if (isHeldExclusively()) {//Sync 判断是否是持有锁的线程
node.waiter = Thread.currentThread();
node.setStatusRelaxed(COND | WAITING);//设置标志
//加入条件队列
ConditionNode last = lastWaiter;
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node;
//缓存状态用于恢复
int savedState = getState();
if (release(savedState))//AQS.release释放锁
return savedState;
}
node.status = CANCELLED; // lock not held or inconsistent
throw new IllegalMonitorStateException();
}
相关文章
- JAVA Review-应用程序开发-JAVA反射机制(一)
- Java网络模型、JVM内存模型、Java的GC
- Java教程06—Java中的运算符与优先级
- 相册java源码_电子相册java源码
- Java基础教程 从C/C++到Java(四)
- java 注解 官方文档_Java 注解指导手册(下)
- Java学习笔记22:Java实现模拟用户登录
- java日期函数_java常用日期函数总结
- java设置超时_Java:如何实现代码超时功能?
- java基础-------初识java和配置环境变量
- java误区: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
- java数值计算_Java数值计算包 Commons Math
- Java|Java中int的取值范围是多少
- 从‘零’学JAVA(一)——简析Hello Java
- Java容器(四):HashMap(Java 7)的实现原理
- Java总结篇系列:Java多线程(一)
- Java基础15:深入剖析Java枚举类
- java中调用matlab_从java中调用matlab
- 用java编写邮箱_使用java实现发送邮箱
- Java编程思想重点笔记(Java开发必看)