Java ThreadPoolExecutor线程池原理详解编程语言
《Java 线程池理解》简要梳理java线程池ThreadPoolExecutor的使用和主要流程,这篇在此基础上继续学习ThreadPoolExecutor的原理,顺带巩固AQS和CAS的知识。
ThreadPoolExecutor构造函数public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue Runnable workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);ThreadPoolExecutor主要参数 corePoolSize 核心线程数量 maximumPoolSize 最大线程数量 unit 超时时间单位 ( TimeUnit.SECONDS 等) keepAliveTime 超时时间,当线程空闲超过keepAlive会被回收,保证线程数量不大于核心线程数量,表示核心线程已经够应对当前的任务了 workQueue 任务阻塞队列,用来缓存待执行的任务 ThreadFactory 负责创建Thread线程 RejectedExecutionHandler 拒绝策略,当任务缓存队列满了,线程也是最大数量了,此时执行拒绝策略 ThreadPoolExecutor执行流程
在分析ThreadPoolExecutor之前,先学习AbstractQueueSynchronizer,简称AQS,他是实现ReentrantLock、CountDownLatch的重要部分,ThreadPoolExecutor 也使用了AQS实现非重入锁机制。可以参考《ReentrantLock原理从开始到放弃》
简要说一下AQS的实现原理以及优势
AQS 基本数据结构是一个FIFO的双向队列,每个结点Node存储线程和其他信息;
队列的头部结点表示:该结点对应的线程已经处于执行状态,占用了资源;剩下的队列里的线程则被挂起等待唤醒。
AQS实现了一个FIFO的队列,让线程根据规则排列阻塞和争夺资源。
AQS提供了独占方式和共享方式,对应独占锁和共享锁。
AQS提供模板方法需要具体子类实现功能 isHeldExclusively():该线程是否正在独占资源 tryAcquire(int):独占方式下尝试去获取资源 tryRelease(int):独占方式下尝试释放资源 tryAcquireShared(int):共享方式下,尝试获取资源。返回值表示剩余资源,负数表示失败 tryReleaseShared(int):共享方式以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法。
支持独占锁的同步器应该实现tryAcquire、 tryRelease、isHeldExclusively
支持共享获取的同步器应该实现tryAcquireShared、tryReleaseShared、isHeldExclusively。
AQS获取、修改state状态 getState() 获取状态(资源) setState() 设置状态(资源) compareAndSetState() CAS设置状态(资源)state可以在不同的场景下表达不同的意义,例如在ReentrantLock表示重入次数,CountDownLatch表示当前计数。
以ReentrantLock为例子:
这里只抽取了ReentrantLock 的公平锁部分核心代码看下:
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { // 线程调用lock acquire(1); // 调用AQS的acquire()内部会顺序获取 // 具体的尝试获取锁方法,state protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 获取当前状态 // 当且state为0时候才可以表示获取锁 if (c == 0) { // hasQueuedPredecessors 表示是否有其他线程排队, // 如果没人排队就不必入队,直接CAS尝试去获取锁 if (!hasQueuedPredecessors() compareAndSetState(0, acquires)) { // 获取锁成功,设置当前线程独占 setExclusiveOwnerThread(current); return true; // 当前线程可重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; return false;线程池状态
线程池每一时刻都会处于以下一种状态。
RUNNING: 可接收新的任务,当前有任务正在排队、执行
SHUTDOWN: 不接收新的任务,当前有任务正在排队、执行
STOP: 不接收新的任务,不允许任务执行、排队,中断正在执行的任务
TIDYING: 所有任务都被终止,无执行任务线程,会执行terminated()
TERMINATED: terminated() 方法执行完成
// 线程池状态 + 数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 COUNT_BITS; private static final int SHUTDOWN = 0 COUNT_BITS; private static final int STOP = 1 COUNT_BITS; private static final int TIDYING = 2 COUNT_BITS; private static final int TERMINATED = 3 COUNT_BITS;
RUNNING SHUTDOWN STOP TIDYING TERMINATED
ctl有32位bit,其中前3位表示线程状态,后面39位表示线程数量,所以线程数最大为2的39次方-1。
提供几个方法:
// 获取线程池状态 private static int runStateOf(int c) { return c ~CAPACITY; } // 获取线程数 private static int workerCountOf(int c) { return c CAPACITY; } // 利用位运算集合线程池状态和数量,计算ctl private static int ctlOf(int rs, int wc) { return rs | wc; }worker
利用worker实现非可重入锁
private final class Worker extends AbstractQueuedSynchronizer implements Runnable /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; // 执行的第一个任务,后续任务从blockQueue获取 Worker(Runnable firstTask) { // 设置为-1,不允许被线程获取锁,后续到runWorker()内部会解除这个状态 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 由定义的ThreadFactory创建线程 this.thread = getThreadFactory().newThread(this); /** Delegates main run loop to outer runWorker. */ public void run() { // 后续详细分析 runWorker(this); // 0表示无锁,非0表示锁占用状态 protected boolean isHeldExclusively() { return getState() != 0; // 尝试获取资源 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; return false; // 尝试释放资源 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; // 直接acquire,实现非重入锁,对比下ReentrantLock public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }
对比下ReentrantLock和Worker的tryAcquire(),Worker中并没有允许同个线程多次获取锁,也没有做过多优化。
runWorker()再看下runWorker()是我们创建的线程死循环执行取任务、执行任务的过程,简单的看下具体流程。
final void runWorker(Worker w) { // 运行在创建的子线程中 Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 解锁状态,对应上面 setState(-1); w.unlock(); // allow interrupts // 表示任务执行是否被中断了 boolean completedAbruptly = true; try { // getTask()会从blockQueue取任务,控制线程数量等操作 while (task != null || (task = getTask()) != null) { // 锁住worker w.lock(); // 这部分是跟线程池状态有关的判断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() runStateAtLeast(ctl.get(), STOP))) !wt.isInterrupted()) wt.interrupt(); try { // 执行任务前,留给子类的空实现 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行任务后,留给子类的空实现 afterExecute(task, thrown); } finally { task = null; // 完成任务数+1 w.completedTasks++; w.unlock(); // 未被中断 completedAbruptly = false; } finally { // 线程结束操作 processWorkerExit(w, completedAbruptly);getTask() 获取任务(返回null代表结束当前线程)
getTask()负责取出要执行的任务,如果返回null就会走到线程结束的操作
private Runnable getTask() { boolean timedOut = false; //超时 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 检测线程池状态 // Check if queue empty only if necessary. if (rs = SHUTDOWN (rs = STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; // 当前线程数 int wc = workerCountOf(c); // 若阻塞超时的话,是否允许回收掉当前线程 boolean timed = allowCoreThreadTimeOut || wc corePoolSize; // 当前线程数超过最大线程数,或者,超时了且允许被回收 if ((wc maximumPoolSize || (timed timedOut)) (wc 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; // 从阻塞队列取任务,或者队列为空被阻塞 try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //获取到任务 timedOut = true;// 超时 } catch (InterruptedException retry) { timedOut = false;
getTask()内部是一个for语句循环,
阻塞队列不为空,能取到任务直接返回task; 阻塞队列为空,阻塞一段时间超时,如果允许核心线程超时被回收,或者当前线程数 核心线程数 或者收,则返回null,让当前线程被结束掉; 走到这里的留下的核心线程,只能在死循环 + 阻塞队列阻塞等待任务的到来。 processWorkerExit() 结束线程processWorkerExit()执行线程结束操作,处理了控制线程数、回收线程资源的操作
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 线程被中断了 if (completedAbruptly) // If abrupt, then workerCount wasnt adjusted // 将work数减一 decrementWorkerCount(); // 因为要操作全局变量,利用ReentrantLock锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 总的任务完成数 completedTaskCount += w.completedTasks; // 移除当前worker workers.remove(w); } finally { mainLock.unlock(); tryTerminate(); int c = ctl.get(); // 如果在没有停止状态 if (runStateLessThan(c, STOP)) { // 不是因为异常中断 if (!completedAbruptly) { // allowCoreThreadTimeOut表示是否允许核心线程超时回收,如果允许就可以不添加worker int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 ! workQueue.isEmpty()) min = 1; // worker数量超过core thread数量 if (workerCountOf(c) = min) return; // replacement not needed // 若因为异常中断,或者当前线程数 coreThread数量,添加一个worker线程 addWorker(null, false);
addWorker() 内部主要做两个操作:
校验当前线程池状态 添加一个worker线程private boolean addWorker(Runnable firstTask, boolean core) { w = new Worker(firstTask); final Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); workers.add(w); mainLock.unlock(); t.start();
到达这里,我们知道Worker是利用AQS实现一个非可重入锁,但是为什么需要一个非可重入锁呢?
每一个Worker在各自的子线程中运行,取任务前lock,在任务结束之后unlock,一旦获取了独占锁,表示当前线程正在执行任务中。
可是也并没有其他线程争夺Worker,争夺的应该是task,而task利用阻塞队列实现多线程竞争了。
不是很理解为什么需要非可重入锁?
后面发现,使用独占锁来表示线程是否正在执行任务,Worker的线程获取了独占锁就说明它在执行任务,不能被中断。
execute流程execute流程分成三步:
如果当前worker线程少于核心线程数,则addWorker()添加一个线程执行。 如果worker线程数等于核心线程数,则将任务加入到阻塞队列workQueue 阻塞队列满了且未等到最大线程数之前,增加一个worker线程 否则,执行拒绝策略public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); if (isRunning(c) workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); else if (!addWorker(command, false)) reject(command);总结
线程池会分核心线程和非核心线程吗?
不会,ThreadPoolThread是控制线程数来保证核心线程数量
线程池在哪里控制数量
利用阻塞队列阻塞,若超时了并且允许回收,则当前线程会被结束,若不允许结束,则当前线程会死循环并且一段时间阻塞的等待任务。
worker为什么需要不可重入锁
Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断
参考https://www.cnblogs.com/liuzhihu/p/8177371.html
相关文章
- eclipse怎么导入java文件_Eclipse如何导入JAVA工程?如何将项目导入Eclipse中?
- java单例模式——详解JAVA单例模式及8种实现方式
- Java:详解Java中的异常(Error与Exception)[通俗易懂]
- java se与java 的区别_java se与java的区别是什么
- java后端开发需要学什么_从事Java后端开发,要学习哪些知识和技能?[通俗易懂]
- MySQL字段类型如何转为java_Java JDBC中,MySQL字段类型到JAVA类型的转换
- java 随机数算法_Java随机数算法原理与实现方法实例详解
- Java线程池Executor详解
- Java的Mavan项目实践
- java 异或加密_Java异或技操作给任意的文件加密原理及使用详解
- java线程和进程(一)
- Java学习笔记(线程的几种状态)
- Java线程优先级示例详解编程语言
- Java之创建对象>3.Enforce the singleton property with a private constructor or an enum type详解编程语言
- 深入源码分析Java线程池的实现原理详解编程语言
- Java停止(终止)线程详解版
- 管理Java使用Redis实现数据过期管理(redisjava过期)
- Java操作Redis管理实现可靠过期策略(redisjava过期)
- Linux下Java应用打包实践(linux java打包)
- Java程序员的MySQL数据库之旅(java操作mysql数据库)
- Java编程从Oracle中读取数据(java读取oracle)
- Java之oracle知多少(java的oracle)