zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Java ThreadPoolExecutor线程池原理详解编程语言

JAVA线程原理编程语言 详解 ThreadPoolExecutor
2023-06-13 09:11:51 时间
前言

《Java 线程池理解》简要梳理java线程池ThreadPoolExecutor的使用和主要流程,这篇在此基础上继续学习ThreadPoolExecutor的原理,顺带巩固AQS和CAS的知识。

ThreadPoolExecutor构造函数
public ThreadPoolExecutor(int corePoolSize, 

 int maximumPoolSize, 

 long keepAliveTime, 

 TimeUnit unit, 

 BlockingQueue Runnable workQueue, 

 ThreadFactory threadFactory, 

 RejectedExecutionHandler handler); 

ThreadPoolExecutor主要参数 corePoolSize 核心线程数量 maximumPoolSize 最大线程数量 unit 超时时间单位 ( TimeUnit.SECONDS 等) keepAliveTime 超时时间,当线程空闲超过keepAlive会被回收,保证线程数量不大于核心线程数量,表示核心线程已经够应对当前的任务了 workQueue 任务阻塞队列,用来缓存待执行的任务 ThreadFactory 负责创建Thread线程 RejectedExecutionHandler 拒绝策略,当任务缓存队列满了,线程也是最大数量了,此时执行拒绝策略 ThreadPoolExecutor执行流程

执行流程

AQS的使用

在分析ThreadPoolExecutor之前,先学习AbstractQueueSynchronizer,简称AQS,他是实现ReentrantLock、CountDownLatch的重要部分,ThreadPoolExecutor 也使用了AQS实现非重入锁机制。可以参考《ReentrantLock原理从开始到放弃》

简要说一下AQS的实现原理以及优势

AQS 基本数据结构是一个FIFO的双向队列,每个结点Node存储线程和其他信息;

队列的头部结点表示:该结点对应的线程已经处于执行状态,占用了资源;剩下的队列里的线程则被挂起等待唤醒。
AQS
AQS实现了一个FIFO的队列,让线程根据规则排列阻塞和争夺资源。

AQS提供了独占方式和共享方式,对应独占锁和共享锁。

AQS提供模板方法需要具体子类实现功能 isHeldExclusively():该线程是否正在独占资源 tryAcquire(int):独占方式下尝试去获取资源 tryRelease(int):独占方式下尝试释放资源 tryAcquireShared(int):共享方式下,尝试获取资源。返回值表示剩余资源,负数表示失败 tryReleaseShared(int):共享方式

以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法。

支持独占锁的同步器应该实现tryAcquire、 tryRelease、isHeldExclusively

支持共享获取的同步器应该实现tryAcquireShared、tryReleaseShared、isHeldExclusively。

AQS获取、修改state状态 getState() 获取状态(资源) setState() 设置状态(资源) compareAndSetState() CAS设置状态(资源)

state可以在不同的场景下表达不同的意义,例如在ReentrantLock表示重入次数,CountDownLatch表示当前计数。

以ReentrantLock为例子:

ReentrantLock AQS
这里只抽取了ReentrantLock 的公平锁部分核心代码看下:

 static final class FairSync extends Sync {

 private static final long serialVersionUID = -3000897897090466540L; 

 final void lock() {

 // 线程调用lock 

 acquire(1); // 调用AQS的acquire()内部会顺序获取 

 // 具体的尝试获取锁方法,state 

 protected final boolean tryAcquire(int acquires) {

 final Thread current = Thread.currentThread(); 

 int c = getState(); // 获取当前状态 

 // 当且state为0时候才可以表示获取锁 

 if (c == 0) {

 // hasQueuedPredecessors 表示是否有其他线程排队, 

 // 如果没人排队就不必入队,直接CAS尝试去获取锁 

 if (!hasQueuedPredecessors() 

 compareAndSetState(0, acquires)) {

 // 获取锁成功,设置当前线程独占 

 setExclusiveOwnerThread(current); 

 return true; 

 // 当前线程可重入 

 else if (current == getExclusiveOwnerThread()) {

 int nextc = c + acquires; 

 if (nextc 0) 

 throw new Error("Maximum lock count exceeded"); 

 setState(nextc); 

 return true; 

 return false; 

线程池状态

线程池每一时刻都会处于以下一种状态。

RUNNING: 可接收新的任务,当前有任务正在排队、执行
SHUTDOWN: 不接收新的任务,当前有任务正在排队、执行
STOP: 不接收新的任务,不允许任务执行、排队,中断正在执行的任务
TIDYING: 所有任务都被终止,无执行任务线程,会执行terminated()
TERMINATED: terminated() 方法执行完成

// 线程池状态 + 数量 

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 

private static final int COUNT_BITS = Integer.SIZE - 3; 

private static final int CAPACITY = (1 COUNT_BITS) - 1; 

// runState is stored in the high-order bits 

private static final int RUNNING = -1 COUNT_BITS; 

private static final int SHUTDOWN = 0 COUNT_BITS; 

private static final int STOP = 1 COUNT_BITS; 

private static final int TIDYING = 2 COUNT_BITS; 

private static final int TERMINATED = 3 COUNT_BITS; 

RUNNING SHUTDOWN STOP TIDYING TERMINATED

ctl有32位bit,其中前3位表示线程状态,后面39位表示线程数量,所以线程数最大为2的39次方-1。

提供几个方法:

// 获取线程池状态 

private static int runStateOf(int c) {

 return c ~CAPACITY; } 

// 获取线程数 

private static int workerCountOf(int c) {

 return c CAPACITY; } 

// 利用位运算集合线程池状态和数量,计算ctl 

private static int ctlOf(int rs, int wc) {

 return rs | wc; } 

worker

利用worker实现非可重入锁

private final class Worker extends AbstractQueuedSynchronizer implements Runnable 

 /** Thread this worker is running in. Null if factory fails. */ 

 final Thread thread; 

 /** Initial task to run. Possibly null. */ 

 Runnable firstTask; 

 /** Per-thread task counter */ 

 volatile long completedTasks; 

 // 执行的第一个任务,后续任务从blockQueue获取 

 Worker(Runnable firstTask) {

 // 设置为-1,不允许被线程获取锁,后续到runWorker()内部会解除这个状态 

 setState(-1); // inhibit interrupts until runWorker 

 this.firstTask = firstTask; 

 // 由定义的ThreadFactory创建线程 

 this.thread = getThreadFactory().newThread(this); 

 /** Delegates main run loop to outer runWorker. */ 

 public void run() {

 // 后续详细分析 

 runWorker(this); 

 // 0表示无锁,非0表示锁占用状态 

 protected boolean isHeldExclusively() {

 return getState() != 0; 

 // 尝试获取资源 

 protected boolean tryAcquire(int unused) {

 if (compareAndSetState(0, 1)) {

 setExclusiveOwnerThread(Thread.currentThread()); 

 return true; 

 return false; 

 // 尝试释放资源 

 protected boolean tryRelease(int unused) {

 setExclusiveOwnerThread(null); 

 setState(0); 

 return true; 

 // 直接acquire,实现非重入锁,对比下ReentrantLock 

 public void lock() {

 acquire(1); } 

 public boolean tryLock() {

 return tryAcquire(1); } 

 public void unlock() {

 release(1); } 

 public boolean isLocked() {

 return isHeldExclusively(); } 

对比下ReentrantLock和Worker的tryAcquire(),Worker中并没有允许同个线程多次获取锁,也没有做过多优化。

runWorker()

再看下runWorker()是我们创建的线程死循环执行取任务、执行任务的过程,简单的看下具体流程。

final void runWorker(Worker w) {

 // 运行在创建的子线程中 

 Thread wt = Thread.currentThread(); 

 Runnable task = w.firstTask; 

 w.firstTask = null; 

 // 解锁状态,对应上面 setState(-1); 

 w.unlock(); // allow interrupts 

 // 表示任务执行是否被中断了 

 boolean completedAbruptly = true; 

 try {

 // getTask()会从blockQueue取任务,控制线程数量等操作 

 while (task != null || (task = getTask()) != null) {

 // 锁住worker 

 w.lock(); 

 // 这部分是跟线程池状态有关的判断 

 if ((runStateAtLeast(ctl.get(), STOP) || 

 (Thread.interrupted() 

 runStateAtLeast(ctl.get(), STOP))) 

 !wt.isInterrupted()) 

 wt.interrupt(); 

 try {

 // 执行任务前,留给子类的空实现 

 beforeExecute(wt, task); 

 Throwable thrown = null; 

 try {

 // 执行任务 

 task.run(); 

 } catch (RuntimeException x) {

 thrown = x; throw x; 

 } catch (Error x) {

 thrown = x; throw x; 

 } catch (Throwable x) {

 thrown = x; throw new Error(x); 

 } finally {

 // 执行任务后,留给子类的空实现 

 afterExecute(task, thrown); 

 } finally {

 task = null; 

 // 完成任务数+1 

 w.completedTasks++; 

 w.unlock(); 

 // 未被中断 

 completedAbruptly = false; 

 } finally {

 // 线程结束操作 

 processWorkerExit(w, completedAbruptly); 

getTask() 获取任务(返回null代表结束当前线程)

getTask()负责取出要执行的任务,如果返回null就会走到线程结束的操作

private Runnable getTask() {

 boolean timedOut = false; //超时 

 for (;;) {

 int c = ctl.get(); 

 int rs = runStateOf(c); 

 // 检测线程池状态 

 // Check if queue empty only if necessary. 

 if (rs = SHUTDOWN (rs = STOP || workQueue.isEmpty())) {

 decrementWorkerCount(); 

 return null; 

 // 当前线程数 

 int wc = workerCountOf(c); 

 // 若阻塞超时的话,是否允许回收掉当前线程 

 boolean timed = allowCoreThreadTimeOut || wc corePoolSize; 

 // 当前线程数超过最大线程数,或者,超时了且允许被回收 

 if ((wc maximumPoolSize || (timed timedOut)) 

 (wc 1 || workQueue.isEmpty())) {

 if (compareAndDecrementWorkerCount(c)) 

 return null; 

 continue; 

 // 从阻塞队列取任务,或者队列为空被阻塞 

 try {

 Runnable r = timed ? 

 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 

 workQueue.take(); 

 if (r != null) 

 return r; //获取到任务 

 timedOut = true;// 超时 

 } catch (InterruptedException retry) {

 timedOut = false; 

getTask()内部是一个for语句循环,

阻塞队列不为空,能取到任务直接返回task; 阻塞队列为空,阻塞一段时间超时,如果允许核心线程超时被回收,或者当前线程数 核心线程数 或者收,则返回null,让当前线程被结束掉; 走到这里的留下的核心线程,只能在死循环 + 阻塞队列阻塞等待任务的到来。 processWorkerExit() 结束线程

processWorkerExit()执行线程结束操作,处理了控制线程数、回收线程资源的操作

 private void processWorkerExit(Worker w, boolean completedAbruptly) {

 // 线程被中断了 

 if (completedAbruptly) // If abrupt, then workerCount wasnt adjusted 

 // 将work数减一 

 decrementWorkerCount(); 

 // 因为要操作全局变量,利用ReentrantLock锁 

 final ReentrantLock mainLock = this.mainLock; 

 mainLock.lock(); 

 try {

 // 总的任务完成数 

 completedTaskCount += w.completedTasks; 

 // 移除当前worker 

 workers.remove(w); 

 } finally {

 mainLock.unlock(); 

 tryTerminate(); 

 int c = ctl.get(); 

 // 如果在没有停止状态 

 if (runStateLessThan(c, STOP)) {

 // 不是因为异常中断 

 if (!completedAbruptly) {

 // allowCoreThreadTimeOut表示是否允许核心线程超时回收,如果允许就可以不添加worker 

 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 

 if (min == 0 ! workQueue.isEmpty()) 

 min = 1; 

 // worker数量超过core thread数量 

 if (workerCountOf(c) = min) 

 return; // replacement not needed 

 // 若因为异常中断,或者当前线程数 coreThread数量,添加一个worker线程 

 addWorker(null, false); 

addWorker() 内部主要做两个操作:

校验当前线程池状态 添加一个worker线程
private boolean addWorker(Runnable firstTask, boolean core) {

 w = new Worker(firstTask); 

 final Thread t = w.thread; 

 final ReentrantLock mainLock = this.mainLock; 

 mainLock.lock(); 

 workers.add(w); 

 mainLock.unlock(); 

 t.start(); 

到达这里,我们知道Worker是利用AQS实现一个非可重入锁,但是为什么需要一个非可重入锁呢?

每一个Worker在各自的子线程中运行,取任务前lock,在任务结束之后unlock,一旦获取了独占锁,表示当前线程正在执行任务中。

可是也并没有其他线程争夺Worker,争夺的应该是task,而task利用阻塞队列实现多线程竞争了。

不是很理解为什么需要非可重入锁?

后面发现,使用独占锁来表示线程是否正在执行任务,Worker的线程获取了独占锁就说明它在执行任务,不能被中断。

execute流程

execute流程分成三步:

如果当前worker线程少于核心线程数,则addWorker()添加一个线程执行。 如果worker线程数等于核心线程数,则将任务加入到阻塞队列workQueue 阻塞队列满了且未等到最大线程数之前,增加一个worker线程 否则,执行拒绝策略
public void execute(Runnable command) {

 if (command == null) 

 throw new NullPointerException(); 

 int c = ctl.get(); 

 if (workerCountOf(c) corePoolSize) {

 if (addWorker(command, true)) 

 return; 

 c = ctl.get(); 

 if (isRunning(c) workQueue.offer(command)) {

 int recheck = ctl.get(); 

 if (! isRunning(recheck) remove(command)) 

 reject(command); 

 else if (workerCountOf(recheck) == 0) 

 addWorker(null, false); 

 else if (!addWorker(command, false)) 

 reject(command); 

总结

线程池会分核心线程和非核心线程吗?

不会,ThreadPoolThread是控制线程数来保证核心线程数量


线程池在哪里控制数量

利用阻塞队列阻塞,若超时了并且允许回收,则当前线程会被结束,若不允许结束,则当前线程会死循环并且一段时间阻塞的等待任务。


worker为什么需要不可重入锁

Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断

参考https://www.cnblogs.com/liuzhihu/p/8177371.html