zl程序教程

您现在的位置是:首页 >  后端

当前栏目

java.util.concurrent解析——ThreadPoolExecutor源码解析

JAVA源码 解析 util concurrent ThreadPoolExecutor
2023-09-11 14:19:42 时间

任何一种语言、框架,线程都是非常重要的一部分。要想实现异步就需要通过异步线程,但是频繁地创建销毁线程会带来较大的性能开销,而线程池就是为解决这一问题而出现的。简单来说线程池有以下几大优势:

降低资源开销:通过复用已经创建的线程,降低线程频繁创建、销毁带来的资源开销和性能损耗 快速启动任务:通过复用已有线程,快速启动任务 易于管理:线程池可以统一管理、分配、调优和监控

Java中的线程池是基于ThreadPoolExecutor实现的,我们使用的ExecutorService的各种线程池策略都是基于ThreadPoolExecutor实现的,所以ThreadPoolExecutor十分重要。要弄明白各种线程池策略,必须先弄明白ThreadPoolExecutor。

1 创建线程池

首先来看下线程池的创建:

public ThreadPoolExecutor(int corePoolSize,

 int maximumPoolSize,

 long keepAliveTime,

 TimeUnit unit,

 BlockingQueue Runnable workQueue,

 ThreadFactory threadFactory,

 RejectedExecutionHandler handler) {

 if (corePoolSize 0 ||

 maximumPoolSize = 0 ||

 maximumPoolSize corePoolSize ||

 keepAliveTime 0)

 throw new IllegalArgumentException();

 if (workQueue == null || threadFactory == null || handler == null)

 throw new NullPointerException();

 this.corePoolSize = corePoolSize;

 this.maximumPoolSize = maximumPoolSize;

 this.workQueue = workQueue;

 this.keepAliveTime = unit.toNanos(keepAliveTime);

 this.threadFactory = threadFactory;

 this.handler = handler;

 }
corePoolSize 核心线程池大小 maximumPoolSize 线程池最大容量大小 keepAliveTime 线程池空闲时,线程存活的时间 TimeUnit 时间单位 ThreadFactory 线程工厂 BlockingQueue任务队列 RejectedExecutionHandler 线程拒绝策略

这里写图片描述

ThreadPoolExecutor的基本流程如下:

当用户通过submit或者execute提交任务时,如果当前线程池中线程数小于corePoolSize,直接创建一个线程执行任务 如果当前线程数大于corePoolSize,则将任务加入到BlockingQueue中 如果BlockingQueue也满了,在小于MaxPoolSize的情况下创建线程执行任务 如果线程数大于等于MaxPoolSize,那么执行拒绝策略RejectedExecutionHandler 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程 2 线程池状态

ThreadPoolExecutor内部有多个状态,理解线程池内部状态对于理解线程池原理至关重要,所以接下来看下线程池的状态:

/*

 * runState是整个线程池的运行生命周期状态,有如下取值:

 * 1. RUNNING:可以新加线程,同时可以处理queue中的线程。

 * 2. SHUTDOWN:不增加新线程,但是处理queue中的线程。

 * 3. STOP 不增加新线程,同时不处理queue中的线程。

 * 4. TIDYING 所有的线程都终止了(queue中),同时workerCount为0,那么此时进入TIDYING

 * 5. terminated()方法结束,变为TERMINATED

 * The runState provides the main lifecyle control, taking on values:

 * RUNNING: Accept new tasks and process queued tasks

 * SHUTDOWN: Dont accept new tasks, but process queued tasks

 * STOP: Dont accept new tasks, dont process queued tasks,

 * and interrupt in-progress tasks

 * TIDYING: All tasks have terminated, workerCount is zero,

 * the thread transitioning to state TIDYING

 * will run the terminated() hook method

 * TERMINATED: terminated() has completed

 * The numerical order among these values matters, to allow

 * ordered comparisons. The runState monotonically increases over

 * time, but need not hit each state. The transitions are:

 * 状态的转化主要是:

 * RUNNING - SHUTDOWN(调用shutdown())

 * On invocation of shutdown(), perhaps implicitly in finalize()

 * (RUNNING or SHUTDOWN) - STOP(调用shutdownNow())

 * On invocation of shutdownNow()

 * SHUTDOWN - TIDYING(queue和pool均empty)

 * When both queue and pool are empty

 * STOP - TIDYING(pool empty,此时queue已经为empty)

 * When pool is empty

 * TIDYING - TERMINATED(调用terminated())

 * When the terminated() hook method has completed

 * Threads waiting in awaitTermination() will return when the

 * state reaches TERMINATED.

 * Detecting the transition from SHUTDOWN to TIDYING is less

 * straightforward than youd like because the queue may become

 * empty after non-empty and vice versa during SHUTDOWN state, but

 * we can only terminate if, after seeing that it is empty, we see

 * that workerCount is 0 (which sometimes entails a recheck -- see

 * below).

 */

runState的存储也值得一说,它并不是用一个单独的int或者enum进行存储,而是和线程数workerCount共同保存到一个原子量ctl中:

//利用ctl来保证当前线程池的状态和当前的线程的数量。ps:低29位为线程池容量,高3位为线程状态。

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

 //设定偏移量

 private static final int COUNT_BITS = Integer.SIZE - 3;

 //确定最大的容量2^29-1

 private static final int CAPACITY = (1 COUNT_BITS) - 1;

 //几个状态,用Integer的高三位表示

 // runState is stored in the high-order bits

 //111

 private static final int RUNNING = -1 COUNT_BITS;

 //000

 private static final int SHUTDOWN = 0 COUNT_BITS;

 //001

 private static final int STOP = 1 COUNT_BITS;

 //010

 private static final int TIDYING = 2 COUNT_BITS;

 //011

 private static final int TERMINATED = 3 COUNT_BITS;

 //获取线程池状态,取前三位

 // Packing and unpacking ctl

 private static int runStateOf(int c) { return c ~CAPACITY; }

 //获取当前正在工作的worker,主要是取后面29位

 private static int workerCountOf(int c) { return c CAPACITY; }

 //获取ctl

 private static int ctlOf(int rs, int wc) { return rs | wc; }
通过调用runStateOf()方法获取当前线程池状态 通过调用workerCountOf()获取当前线程数 3 添加任务

向线程池添加任务一般通过execute或者submit方法添加,接下来通过execute方法介绍下添加任务的原理:

public void execute(Runnable command) {

 if (command == null)

 throw new NullPointerException();

 int c = ctl.get();

 //当前的Worker的数量小于核心线程池大小时,新建一个Worker线程执行该任务。

 if (workerCountOf(c) corePoolSize) { 

 if (addWorker(command, true))

 return;

 c = ctl.get();

 //如果worker数量已经大于核心线程数,尝试将任务添加到任务队列中

 if (isRunning(c) workQueue.offer(command)) {

 int recheck = ctl.get();

 if (! isRunning(recheck) remove(command))//recheck防止线程池状态的突变,如果突变,那么将reject线程,防止workQueue中增加新线程

 reject(command);

 else if (workerCountOf(recheck) == 0)//上下两个操作都有addWorker的操作,但是如果在workQueue.offer的时候Worker变为0,

 //那么将没有Worker执行新的task,所以增加一个Worker.

 addWorker(null, false);

 //如果workQueue满了,那么这时候可能还没到线程池的maximum,所以尝试增加一个Worker

 else if (!addWorker(command, false))

 reject(command);//如果Worker数量到达上限,那么就拒绝此线程

 }

可以看到execute方法内部的核心逻辑在于添加工作线程addWorker方法,所以接下来看下addWorker:

 private boolean addWorker(Runnable firstTask, boolean core) {

 retry:

 for (;;) {

 int c = ctl.get();

 int rs = runStateOf(c);

 // Check if queue empty only if necessary.

 * rs!=Shutdown || fistTask!=null || workCount.isEmpty

 * 如果当前的线程池的状态 SHUTDOWN 那么拒绝Worker的add 

 * 如果=SHUTDOWN,那么此时不能新加入不为null的Task,如果在WorkCount为empty的时候不能加入任何 * 类型的Worker,

 * 如果不为empty可以加入task为null的Worker,增加消费的Worker

 if (rs = SHUTDOWN 

 ! (rs == SHUTDOWN 

 firstTask == null 

 ! workQueue.isEmpty()))

 return false;

 for (;;) {

 int wc = workerCountOf(c);

 //如果当前线程数已经超标,直接返回false

 if (wc = CAPACITY ||

 wc = (core ? corePoolSize : maximumPoolSize))

 return false;

 //如果线程数没有超标,则尝试通过CAS将workercount加一,如果成功直接跳出循环

 if (compareAndIncrementWorkerCount(c))

 break retry;

 //如果失败,对状态进行double check,如果状态已改变则重试

 c = ctl.get(); // Re-read ctl

 if (runStateOf(c) != rs)

 continue retry;

 // else CAS failed due to workerCount change; retry inner loop

 //接下来开始真正创建新的线程

 //创建一个新的worker线程

 Worker w = new Worker(firstTask);

 Thread t = w.thread;

 //获取锁

 final ReentrantLock mainLock = this.mainLock;

 mainLock.lock();

 try {

 // Recheck while holding lock.

 // Back out on ThreadFactory failure or if

 // shut down before lock acquired.

 int c = ctl.get();

 int rs = runStateOf(c);

 * rs!=SHUTDOWN ||firstTask!=null

 * 同样检测当rs SHUTDOWN时直接拒绝减小Wc,同时Terminate,如果为SHUTDOWN同时firstTask不为null的时候也要Terminate

 if (t == null ||

 (rs = SHUTDOWN 

 ! (rs == SHUTDOWN 

 firstTask == null))) {

 decrementWorkerCount();

 tryTerminate();

 return false;

 //将新建的worker线程加入到workers数组中

 workers.add(w);

 int s = workers.size();

 if (s largestPoolSize)

 largestPoolSize = s;

 } finally {

 mainLock.unlock();

 //新建线程开始执行

 t.start();

 // It is possible (but unlikely) for a thread to have been

 // added to workers, but not yet started, during transition to

 // STOP, which could result in a rare missed interrupt,

 // because Thread.interrupt is not guaranteed to have any effect

 // on a non-yet-started Thread (see Thread#interrupt).

 //若此时线程池状态变为STOP,但当前线程并未interrupt,执行interrupt

 if (runStateOf(ctl.get()) == STOP ! t.isInterrupted())

 t.interrupt();

 return true;

 }

整个addWorker方法大致分为两大阶段:

workerCount++:此时并不创建真正的线程,而仅仅是通过CAS操作把workerCount加一 创建线程:创建worker线程,将其加入到workers队列中,并根据状态对线程进行不同操作 3.1 workerCount++

workerCount++操作主要涉及上述代码中标号retry覆盖的代码,主要逻辑有以下三大部分:


如果当前的线程池的状态 SHUTDOWN 那么拒绝Worker的add 如果=SHUTDOWN,那么此时不能新加入不为null的Task,如果在WorkCount为empty的时候不能加入任何类型的Worker 如果不为empty可以加入task为null的Worker,增加消费的Worker
如果core为true,且当前worker数超过corePoolSize则不允许添加线程 如果core为fasle,且worker数超过maximumPoolSize则不允许添加线程
通过compareAndIncrementWorkerCount执行workerCount++操作,如果成功跳出循环;如果失败对当前状态进行doubleCheck,如果状态改变重新回到步骤1,如果状态不变重新回到步骤2 3.2 创建线程

创建线程的操作主要分为以下几个步骤:

创建一个worker线程实例 获取当前线程池锁进行互斥操作 对线程池状态再次进行判断。同样检测当rs SHUTDOWN时直接拒绝减小Wc,同时Terminate,如果为SHUTDOWN同时firstTask不为null的时候也要Terminate 将线程加入线程队列中,释放锁 若此时线程池状态变为STOP,但当前线程并未interrupt,执行interrupt 4 Worker

在第3节中看到添加的线程是通过Worker实现的,所以接下来看下Worker这个类:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

 private static final long serialVersionUID = 6138294804551838833L;

 final Thread thread;

 Runnable firstTask;

 volatile long completedTasks;

 Worker(Runnable var2) {

 this.setState(-1);

 this.firstTask = var2;

 this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);

 ......

 public void run() {

 ThreadPoolExecutor.this.runWorker(this);

}

可以看到Worker实现了Runnable接口,并在内部维护了一个线程变量,看到这里其实Worker的大致逻辑明显了,无非是维护一个线程实例,执行添加的runnable实例。

4.1runWorker

在addWorker方法中,Worker实例创建好后会就会执行其thread变量的start方法,进而也就会执行Worker的run方法:

public void run() {

 ThreadPoolExecutor.this.runWorker(this);

}

所以接下来看下ThreadPoolExecutor的runWorker方法:

final void runWorker(Worker w) {

 Runnable task = w.firstTask;

 w.firstTask = null;

 //标识线程是不是异常终止的

 boolean completedAbruptly = true;

 try {

 //task不为null情况是初始化worker时,如果task为null,则去队列中取线程--- getTask()

 while (task != null || (task = getTask()) != null) {

 w.lock();

 //获取woker的锁,防止线程被其他线程中断

 clearInterruptsForTaskRun();//清楚所有中断标记

 try {

 beforeExecute(w.thread, task);//线程开始执行之前执行此方法,可以实现Worker未执行退出,本类中未实现

 Throwable thrown = null;

 try {

 task.run();

 } catch (RuntimeException x) {

 thrown = x; throw x;

 } catch (Error x) {

 thrown = x; throw x;

 } catch (Throwable x) {

 thrown = x; throw new Error(x);

 } finally {

 afterExecute(task, thrown);//线程执行后执行,可以实现标识Worker异常中断的功能,本类中未实现

 } finally {

 task = null;//运行过的task标null,方便GC

 w.completedTasks++;

 w.unlock();

 completedAbruptly = false;

 } finally {

 //处理worker退出的逻辑

 processWorkerExit(w, completedAbruptly);

 }

整个方法的逻辑比较简单:

task不为null情况是初始化worker时,如果task为null,则去队列中取线程--- getTask() 获取woker的锁,防止线程被其他线程中断 线程开始执行之前执行beforeExecute方法,可以实现Worker未执行退出,本类中未实现 线程执行后执行,可以实现标识Worker异常中断的功能,本类中未实现 处理worker退出的逻辑 4.2 getTask

接下来再来看看runWorker中的getTask方法:

private Runnable getTask() {

 boolean timedOut = false; // Did the last poll() time out?

 retry:

 for (;;) {

 int c = ctl.get();

 int rs = runStateOf(c);

 // Check if queue empty only if necessary.

 //当前状态为 stop时,不处理workQueue中的任务,同时减小worker的数量所以返回null,如果为shutdown 同时workQueue已经empty了,同样减小worker数量并返回null

 if (rs = SHUTDOWN (rs = STOP || workQueue.isEmpty())) {

 decrementWorkerCount();

 return null;

 boolean timed; // Are workers subject to culling?

 for (;;) {

 int wc = workerCountOf(c);

 timed = allowCoreThreadTimeOut || wc corePoolSize;

 if (wc = maximumPoolSize ! (timedOut timed))

 break;

 if (compareAndDecrementWorkerCount(c))

 return null;

 c = ctl.get(); // Re-read ctl

 if (runStateOf(c) != rs)

 continue retry;

 // else CAS failed due to workerCount change; retry inner loop

 try {

 Runnable r = timed ?

 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

 workQueue.take();

 if (r != null)

 return r;

 timedOut = true;

 } catch (InterruptedException retry) {

 timedOut = false;

 }

这段代码十分关键,首先看几个局部变量:

boolean timedOut = false;//主要是判断后面的poll是否要超时

boolean timed;//主要是标识着当前Worker超时是否要退出

wc corePoolSize时需要减小空闲的Worker数,那么timed为true,但是wc = corePoolSize时,不能减小核心线程数timed为false。
timedOut初始为false,如果timed为true那么使用poll取线程。如果正常返回,那么返回取到的task。如果超时,证明worker空闲,同时worker超过了corePoolSize,需要删除。返回r=null。则 timedOut = true。此时循环到wc = maximumPoolSize ! (timedOut timed)时,减小worker数,并返回null,导致worker退出。如果线程数 = corePoolSize,那么此时调用 workQueue.take(),没有线程获取到时将一直阻塞,直到获取到线程或者中断。


JUC:java.util.concurrent理解与使用示例 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
java.util.concurrent解析——AbstractQueuedSynchronizer综述 尽管JVM在并发上已经做了很多优化工作,如偏向锁、轻量级锁、自旋锁等等。但是基于`Synchronized` `wait` `notify`实现的同步机制还是无法满足日常开发中。原生同步机制在时间和空间上的开销也一直备受诟病。