java.util.concurrent解析——ThreadPoolExecutor源码解析
任何一种语言、框架,线程都是非常重要的一部分。要想实现异步就需要通过异步线程,但是频繁地创建销毁线程会带来较大的性能开销,而线程池就是为解决这一问题而出现的。简单来说线程池有以下几大优势:
降低资源开销:通过复用已经创建的线程,降低线程频繁创建、销毁带来的资源开销和性能损耗 快速启动任务:通过复用已有线程,快速启动任务 易于管理:线程池可以统一管理、分配、调优和监控Java中的线程池是基于ThreadPoolExecutor实现的,我们使用的ExecutorService的各种线程池策略都是基于ThreadPoolExecutor实现的,所以ThreadPoolExecutor十分重要。要弄明白各种线程池策略,必须先弄明白ThreadPoolExecutor。
1 创建线程池首先来看下线程池的创建:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue Runnable workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize 0 || maximumPoolSize = 0 || maximumPoolSize corePoolSize || keepAliveTime 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }corePoolSize 核心线程池大小 maximumPoolSize 线程池最大容量大小 keepAliveTime 线程池空闲时,线程存活的时间 TimeUnit 时间单位 ThreadFactory 线程工厂 BlockingQueue任务队列 RejectedExecutionHandler 线程拒绝策略
ThreadPoolExecutor的基本流程如下:
当用户通过submit或者execute提交任务时,如果当前线程池中线程数小于corePoolSize,直接创建一个线程执行任务 如果当前线程数大于corePoolSize,则将任务加入到BlockingQueue中 如果BlockingQueue也满了,在小于MaxPoolSize的情况下创建线程执行任务 如果线程数大于等于MaxPoolSize,那么执行拒绝策略RejectedExecutionHandler 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程 2 线程池状态ThreadPoolExecutor内部有多个状态,理解线程池内部状态对于理解线程池原理至关重要,所以接下来看下线程池的状态:
/* * runState是整个线程池的运行生命周期状态,有如下取值: * 1. RUNNING:可以新加线程,同时可以处理queue中的线程。 * 2. SHUTDOWN:不增加新线程,但是处理queue中的线程。 * 3. STOP 不增加新线程,同时不处理queue中的线程。 * 4. TIDYING 所有的线程都终止了(queue中),同时workerCount为0,那么此时进入TIDYING * 5. terminated()方法结束,变为TERMINATED * The runState provides the main lifecyle control, taking on values: * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Dont accept new tasks, but process queued tasks * STOP: Dont accept new tasks, dont process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed * The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * 状态的转化主要是: * RUNNING - SHUTDOWN(调用shutdown()) * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) - STOP(调用shutdownNow()) * On invocation of shutdownNow() * SHUTDOWN - TIDYING(queue和pool均empty) * When both queue and pool are empty * STOP - TIDYING(pool empty,此时queue已经为empty) * When pool is empty * TIDYING - TERMINATED(调用terminated()) * When the terminated() hook method has completed * Threads waiting in awaitTermination() will return when the * state reaches TERMINATED. * Detecting the transition from SHUTDOWN to TIDYING is less * straightforward than youd like because the queue may become * empty after non-empty and vice versa during SHUTDOWN state, but * we can only terminate if, after seeing that it is empty, we see * that workerCount is 0 (which sometimes entails a recheck -- see * below). */
runState的存储也值得一说,它并不是用一个单独的int或者enum进行存储,而是和线程数workerCount共同保存到一个原子量ctl中:
//利用ctl来保证当前线程池的状态和当前的线程的数量。ps:低29位为线程池容量,高3位为线程状态。 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //设定偏移量 private static final int COUNT_BITS = Integer.SIZE - 3; //确定最大的容量2^29-1 private static final int CAPACITY = (1 COUNT_BITS) - 1; //几个状态,用Integer的高三位表示 // runState is stored in the high-order bits //111 private static final int RUNNING = -1 COUNT_BITS; //000 private static final int SHUTDOWN = 0 COUNT_BITS; //001 private static final int STOP = 1 COUNT_BITS; //010 private static final int TIDYING = 2 COUNT_BITS; //011 private static final int TERMINATED = 3 COUNT_BITS; //获取线程池状态,取前三位 // Packing and unpacking ctl private static int runStateOf(int c) { return c ~CAPACITY; } //获取当前正在工作的worker,主要是取后面29位 private static int workerCountOf(int c) { return c CAPACITY; } //获取ctl private static int ctlOf(int rs, int wc) { return rs | wc; }通过调用runStateOf()方法获取当前线程池状态 通过调用workerCountOf()获取当前线程数 3 添加任务
向线程池添加任务一般通过execute或者submit方法添加,接下来通过execute方法介绍下添加任务的原理:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //当前的Worker的数量小于核心线程池大小时,新建一个Worker线程执行该任务。 if (workerCountOf(c) corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); //如果worker数量已经大于核心线程数,尝试将任务添加到任务队列中 if (isRunning(c) workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) remove(command))//recheck防止线程池状态的突变,如果突变,那么将reject线程,防止workQueue中增加新线程 reject(command); else if (workerCountOf(recheck) == 0)//上下两个操作都有addWorker的操作,但是如果在workQueue.offer的时候Worker变为0, //那么将没有Worker执行新的task,所以增加一个Worker. addWorker(null, false); //如果workQueue满了,那么这时候可能还没到线程池的maximum,所以尝试增加一个Worker else if (!addWorker(command, false)) reject(command);//如果Worker数量到达上限,那么就拒绝此线程 }
可以看到execute方法内部的核心逻辑在于添加工作线程addWorker方法,所以接下来看下addWorker:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. * rs!=Shutdown || fistTask!=null || workCount.isEmpty * 如果当前的线程池的状态 SHUTDOWN 那么拒绝Worker的add * 如果=SHUTDOWN,那么此时不能新加入不为null的Task,如果在WorkCount为empty的时候不能加入任何 * 类型的Worker, * 如果不为empty可以加入task为null的Worker,增加消费的Worker if (rs = SHUTDOWN ! (rs == SHUTDOWN firstTask == null ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //如果当前线程数已经超标,直接返回false if (wc = CAPACITY || wc = (core ? corePoolSize : maximumPoolSize)) return false; //如果线程数没有超标,则尝试通过CAS将workercount加一,如果成功直接跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; //如果失败,对状态进行double check,如果状态已改变则重试 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop //接下来开始真正创建新的线程 //创建一个新的worker线程 Worker w = new Worker(firstTask); Thread t = w.thread; //获取锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); * rs!=SHUTDOWN ||firstTask!=null * 同样检测当rs SHUTDOWN时直接拒绝减小Wc,同时Terminate,如果为SHUTDOWN同时firstTask不为null的时候也要Terminate if (t == null || (rs = SHUTDOWN ! (rs == SHUTDOWN firstTask == null))) { decrementWorkerCount(); tryTerminate(); return false; //将新建的worker线程加入到workers数组中 workers.add(w); int s = workers.size(); if (s largestPoolSize) largestPoolSize = s; } finally { mainLock.unlock(); //新建线程开始执行 t.start(); // It is possible (but unlikely) for a thread to have been // added to workers, but not yet started, during transition to // STOP, which could result in a rare missed interrupt, // because Thread.interrupt is not guaranteed to have any effect // on a non-yet-started Thread (see Thread#interrupt). //若此时线程池状态变为STOP,但当前线程并未interrupt,执行interrupt if (runStateOf(ctl.get()) == STOP ! t.isInterrupted()) t.interrupt(); return true; }
整个addWorker方法大致分为两大阶段:
workerCount++:此时并不创建真正的线程,而仅仅是通过CAS操作把workerCount加一 创建线程:创建worker线程,将其加入到workers队列中,并根据状态对线程进行不同操作 3.1 workerCount++workerCount++操作主要涉及上述代码中标号retry覆盖的代码,主要逻辑有以下三大部分:
如果当前的线程池的状态 SHUTDOWN 那么拒绝Worker的add 如果=SHUTDOWN,那么此时不能新加入不为null的Task,如果在WorkCount为empty的时候不能加入任何类型的Worker 如果不为empty可以加入task为null的Worker,增加消费的Worker
如果core为true,且当前worker数超过corePoolSize则不允许添加线程 如果core为fasle,且worker数超过maximumPoolSize则不允许添加线程
通过compareAndIncrementWorkerCount执行workerCount++操作,如果成功跳出循环;如果失败对当前状态进行doubleCheck,如果状态改变重新回到步骤1,如果状态不变重新回到步骤2 3.2 创建线程
创建线程的操作主要分为以下几个步骤:
创建一个worker线程实例 获取当前线程池锁进行互斥操作 对线程池状态再次进行判断。同样检测当rs SHUTDOWN时直接拒绝减小Wc,同时Terminate,如果为SHUTDOWN同时firstTask不为null的时候也要Terminate 将线程加入线程队列中,释放锁 若此时线程池状态变为STOP,但当前线程并未interrupt,执行interrupt 4 Worker在第3节中看到添加的线程是通过Worker实现的,所以接下来看下Worker这个类:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable var2) { this.setState(-1); this.firstTask = var2; this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this); ...... public void run() { ThreadPoolExecutor.this.runWorker(this); }
可以看到Worker实现了Runnable接口,并在内部维护了一个线程变量,看到这里其实Worker的大致逻辑明显了,无非是维护一个线程实例,执行添加的runnable实例。
4.1runWorker在addWorker方法中,Worker实例创建好后会就会执行其thread变量的start方法,进而也就会执行Worker的run方法:
public void run() { ThreadPoolExecutor.this.runWorker(this); }
所以接下来看下ThreadPoolExecutor的runWorker方法:
final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; //标识线程是不是异常终止的 boolean completedAbruptly = true; try { //task不为null情况是初始化worker时,如果task为null,则去队列中取线程--- getTask() while (task != null || (task = getTask()) != null) { w.lock(); //获取woker的锁,防止线程被其他线程中断 clearInterruptsForTaskRun();//清楚所有中断标记 try { beforeExecute(w.thread, task);//线程开始执行之前执行此方法,可以实现Worker未执行退出,本类中未实现 Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//线程执行后执行,可以实现标识Worker异常中断的功能,本类中未实现 } finally { task = null;//运行过的task标null,方便GC w.completedTasks++; w.unlock(); completedAbruptly = false; } finally { //处理worker退出的逻辑 processWorkerExit(w, completedAbruptly); }
整个方法的逻辑比较简单:
task不为null情况是初始化worker时,如果task为null,则去队列中取线程--- getTask() 获取woker的锁,防止线程被其他线程中断 线程开始执行之前执行beforeExecute方法,可以实现Worker未执行退出,本类中未实现 线程执行后执行,可以实现标识Worker异常中断的功能,本类中未实现 处理worker退出的逻辑 4.2 getTask接下来再来看看runWorker中的getTask方法:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. //当前状态为 stop时,不处理workQueue中的任务,同时减小worker的数量所以返回null,如果为shutdown 同时workQueue已经empty了,同样减小worker数量并返回null if (rs = SHUTDOWN (rs = STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc corePoolSize; if (wc = maximumPoolSize ! (timedOut timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }
这段代码十分关键,首先看几个局部变量:
boolean timedOut = false;//主要是判断后面的poll是否要超时 boolean timed;//主要是标识着当前Worker超时是否要退出
wc corePoolSize时需要减小空闲的Worker数,那么timed为true,但是wc = corePoolSize时,不能减小核心线程数timed为false。
timedOut初始为false,如果timed为true那么使用poll取线程。如果正常返回,那么返回取到的task。如果超时,证明worker空闲,同时worker超过了corePoolSize,需要删除。返回r=null。则 timedOut = true。此时循环到wc = maximumPoolSize ! (timedOut timed)时,减小worker数,并返回null,导致worker退出。如果线程数 = corePoolSize,那么此时调用 workQueue.take(),没有线程获取到时将一直阻塞,直到获取到线程或者中断。
JUC:java.util.concurrent理解与使用示例 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
java.util.concurrent解析——AbstractQueuedSynchronizer综述 尽管JVM在并发上已经做了很多优化工作,如偏向锁、轻量级锁、自旋锁等等。但是基于`Synchronized` `wait` `notify`实现的同步机制还是无法满足日常开发中。原生同步机制在时间和空间上的开销也一直备受诟病。
相关文章
- Java学习之java高级特性
- java之内部类
- Java IO流框架图
- Java核心技术卷I基础知识1.2.9 高性能
- CSDN日报191016:Java纯干货分享:史上最全的JAVA工程师面试题汇总
- Java 内存溢出(java.lang.OutOfMemoryError)的常见情况和处理方式总结
- Java开发环境的搭建以及使用eclipse从头一步步创建java项目
- Java Design Demo -简单的队列-异步多任务队列(java android)
- java.lang.OutOfMemoryError: Java heap space
- java高级用法之:JNA中的回调
- 常见算法合集[java源码+持续更新中...]
- 十大基础排序算法[java源码+动静双图解析+性能分析]
- 【Java】生成图形验证码
- Java_类似java.lang.VerifyError: Expecting a stackmap frame at branch target 22 in method的解决方法
- java源码--HashMap扩容机制学习
- 『Java练习生的自我修养』java-se进阶⁴ • IO流概览
- 『Java练习生的自我修养』java-se进阶¹ • 初识多线程
- 《青花瓷》JAVA版:周杰伦告诉你怎么学Java
- 深入理解java虚拟机(十四)正确利用 JVM 的方法内联
- java面向对象编程——static和final