【Java并发编程】- 02 线程池总结
概述
常规new Thread
创建线程问题:
Thread线程
属于一个重量级的对象,通过new Thread
创建一个线程,首先它是一个Java对象
,需要分配堆空间资源;同时Thread
需要调用操作系统内核API
,在系统层面创建一个线程与此对应,操作系统还要为线程分配一系列资源。所以,创建线程的成本是很高的,应该避免频繁创建和销毁线程。- 另外,线程缺乏统一管理,可能无限制新建线程,相互之间竞争加剧,以及可能占用过多系统资源导致服务器宕机或
OOM
等。
如何去解决这个问题,就是采用经常使用到的资源池方案,比如数据库连接池等,将资源提前初始化后放入到池中进行管理,待需要使用时从池中获取一个空闲资源,使用完后再将资源放回到池中达到释放目的,这样其它任务就可以继续重复使用该资源,避免资源被不停创建、销毁。
由于Thread API
在接口设计上的问题,线程池和一般的资源池在使用上是有些差异的,比如连接池:从连接池获取可用连接 --> 使用连接执行任务 --> 将连接放入到连接池。如果我们从线程池中获取到一个Thread
对象,根本没法处理我们的任务,因为Thread
线程在启动之前要么重写run()
、要么传入Runnable
方式将任务和Thread
绑定在一起。所以,Java线程池
是没有提供申请线程
和释放线程
的方法,而是采用一种生产者/消费者模式去构建线程池执行机制。
Java
线程Thread
是被一对一映射到本地操作系统线程,即Java
启动时会创建一个本地操作系统线程,当Java
线程终止时,对应操作系统线程会被回收。
Executor体系
Java 5
之前,仅仅只能使用Thread
、Runnable
、ThreadLocal
、synchronized
等进行多线程开发,线程的使用及其简陋;Java 5
极大的改善并发编程,构建出了多线程开发API
的基础体系,这些类主要位于java.util.concurrent
包下,简称J.U.C
。
Executor
就是J.U.C
中比较重要的一块,用于构建多线程开发中使用最普遍的线程池:
Executor
【接口】:最顶层接口,该接口只定义了一个方法:void execute(Runnable command)
ExecutorService
【接口】:真正的线程池接口,继承Executor
接口,拓展了Callable
、Future
、关闭方法ScheduledExecutorService
【接口】:继承了ExecutorService
,增加了定时任务相关的方法ThreadPoolExecutor
【实现类】:ExecutorService
的默认实现ScheduledThreadPoolExecutor
【实现类】:继承了ThreadPoolExecutor
,并实现了ScheduledExecutorService
接口中相关定时任务的方法,可以认为ScheduledThreadPoolExecutor
是最丰富的实现类
ExecutorService
方法描述:
shutdown()
:优雅关闭线程池,之前提交的任务将被执行,包括当前正在执行的和等待队列中的任务,但是线程池不会再接收新任务,提交新任务会抛出异常shutdownNow()
:调用shutdownNow
方法后, 线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务,正在执行的任务会被中断, 该方法会立刻返回,返回值为这时候队列里面被丢弃的任务列表。isShutdown()
:如果此线程池关闭,则返回trueisTerminated()
:如果关闭后所有任务都已完成,则返回trueawaitTermination()
:当线程调用awaitTermination
方法后,当前线程会被阻塞,直到线程池状态变为TERMINATED
才返回, 或者等待时间超时才返回、或当前线程被中断submit()
系列方法:public Future<?> submit(Runnable task)
:提交Runnable
任务到线程池,注意:Runnable
任务是没有返回值的,但是这里submit
方法会返回一个Future
对象,调用Future.get()
方法会阻塞,直到Runnable
任务执行完成,Future.get()
方法 才会返回null
public <T> Future<T> submit(Runnable task, T result)
:和submit(Runnable task)
一样,只是可以指定一个默认返回值,待Runnable
任务结束后,Future.get()
就可以获取到该返回值public <T> Future<T> submit(Callable<T> task)
:提交一个Callable
任务,该任务是带有返回结果的
invokeAll()
系列方法:invokeAll(Collection<? extends Callable<T>> tasks)
:执行给定任务集合,执行完毕后返回结果invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
:执行给定任务集合,执行完毕或超时后,返回结果,其它任务终止,对终止任务执行Future.get()
方法获取结果是会抛出CancellationException
异常
invokeAny()
系列方法:参照invokeAll()
方法T invokeAny(Collection<? extends Callable<T>> tasks)
:执行给定的任务集合,任意一个执行成功则返回结果,其他任务终止T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
:执行给定的任务集合,任意一个执行成功或超时,则返回结果,其他任务终止
注意:invoke
方法是阻塞方法,即提交的任务执行完或超时才会返回结果,而submit
系列方法是会立即返回Future
结果。区分一个方法是不是阻塞方法可以通过方法签名是否抛出InterruptedException
:
public Future<?> submit(Runnable task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
ScheduledExecutorService
创建并执行一个一次性任务,可以指定延迟时间:
schedule(Runnable command, long delay, TimeUnit unit)
schedule(Callable<V> callable, long delay, TimeUnit unit)
创建并执行一个周期性任务:
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
:周期是以两个任务开始时间间隔为周期,初始延迟时间后会触发第一次执行,执行过程中发生异常,那么任务就停止,一次任务执行时长超过周期时间,那下一次任务会等到该任务执行结束后,立即执行。
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
:周期是以上一个任务结束到下一个任务开始时间间隔为周期,指的是以固定的延时执行,initialDelay
初次延迟时间,delay
指的是上一次执行终止和下一次执行开始之间的延迟。
线程池状态
RUNNING
:接收新任务并处理排队任务;SHUTDOWN
:不接收新任务,但处理排队任务,调用shutdown()
会处于该状态;STOP
:不接收新任务,也不处理排队任务,并中断正在运行的任务,调用shutdownNow()
会处于该状态;TIDYING
:中文是整洁,理解了中文就容易理解这个状态:所有任务都已终止,workerCount
为零时,线程会转换到TIDYING
状态,并将运行terminate()
钩子方法;TERMINATED
:terminate()
运行完成;
参见ThreadPoolExecutor
:
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池状态具体可参加线程池终止一节中源码分析。
线程池模型
线程池中主要有:
AtomicInteger ctl
:二进制高3位表示线程池状态,后29位记录工作线程数量,所以线程池所能够支持的最大线程数:2^29 - 1 = 536870911
;
线程池状态高3位表示:
RUNNING = -1 << COUNT_BITS;//111
SHUTDOWN = 0 << COUNT_BITS;//000
STOP = 1 << COUNT_BITS;//010
TIDYING = 2 << COUNT_BITS;//100
TERMINATED = 3 << COUNT_BITS;//110
- 工作线程
Worker
:线程池用于执行任务的工作线程是Worker
,每个Worker
都对应一个Thread
用于真正执行时的线程,Worker
会在提交任务时创建,并将提交的任务赋值给firstTask
,该任务会作为Worker
运行的第一个任务;后续执行的任务通过getTask
方法从等待队列中获取;
ThreadPoolExecutor
定义:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
核心参数:
corePoolSize
:核心线程数,线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务maximumPoolSize
:最大线程数,线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上限,这就是最大量maxPoolSize
keepAliveTime
:线程空闲超时时间,超时的线程会被销毁回收,默认只有当工作线程数 > 核心线程数
&线程空闲超时
才会销毁线程;可以通过allowCoreThreadTimeOut(true)
设置:不管当前工作线程数
是否大于核心线程数
,只要线程空闲超时都会被回收;
等待队列workQueue
:
- 有界队列
ArrayBlockingQueue
:等待队列有固定大小; - 无界队列
LinkedBlockingQueue
:当有新任务到来,系统的线程数小于corePoolSize
时,则新建线程执行任务,当达到corePoolSize
后,就不会继续增加,若后续仍有新的任务加入,而有没有空闲的线程资源,则任务直接进入队列等待,并且等待队列是无界的,直到耗尽系统内存。因此,无界队列maximumPoolSize
参数是不起作用,线程池中最大的线程数只能是corePoolSize
数量; - 无缓冲区
SynchronousQueue
:参照Executors
类newCachedThreadPool()
方法创建的线程池
拒绝策略handler
:如若使用拒绝策略,等待队列一定要设置成有界队列才行;若等待队列已满,则在总线程数不大于maximumPoolSize
的前提下,创建新的线程;若线程数大于maximumPoolSize
,则执行拒绝策略
AbortPolicy
:拒绝任务,并且抛出异常CallerRunsPolicy
:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务DiscardOldestPolicy
:丢弃最老的一个请求,然后把当前任务重新加入到队列中DiscardPolicy
:丢弃无法处理的任务,没有异常信息- 如果需要自定义拒绝策略可以实现
RejectedExecutionHandler
接口
增减线程时机:
- 如果
工作线程数 < corePoolSize
,即使其它工作线程处于空闲状态,也会创建一个新线程来运行新任务; - 如果
工作线程数 >= corePoolSize
,但工作线程数 < maximumPoolSize
,则提交的任务会被放入到等待队列中; - 如果
等待队列已满
,并且工作线程数 < maxPoolSize
,则创建一个新线程来运行新任务,这个机制可能很多人都会搞错,这里就容易出现之前提交的任务还在等待队列中阻塞,但是新提交任务却被执行情况,千万注意; - 如果队列已满,并且
工作线程数 >= maxPoolSize
,这时提交的新任务采用拒绝策略处理; - 只有
等待队列
填满时才创建多于corePoolSize
的线程,所以如果你使用的是无界队列LinkedBlockingQueue
,由于等待队列
一直无法装满,那么工作线程数
也就不会超过corePoolSize
,即maximumPoolSize
无效; - 线程池有个线程超时机制,超时的线程会被销毁回收,详见
keepAliveTime
参数; shutdown
和shutdownNow
方法终止线程池时,最终会把所有的工作线程都销毁,具体参见后续源码分析;
Executors
类似于Collection
和Collections
关系,Executors
是Executor
的工具类,提供了一些创建线程池方法,主要如下:
ExecutorService newFixedThreadPool(int nThreads)
:创建一个固定大小、任务队列容量无界的线程池;ExecutorService newCachedThreadPool()
:创建一个核心线程数=0,最大线程数=Integer.MAX_VALUE
,线程空闲时间=60秒,等待队列是SynchronousQueue
类型,任务加入到线程池中,如果有空闲线程则使用空闲线程,如无则创建新线程执行。ExecutorService newSingleThreadExecutor()
:只有一个线程来执行无界任务队列的单一线程池,该线程池确保任务按照加入的顺序一个一个依次执行,当唯一的线程因任务异常终止时,将创建一个新的线程来继续执行后续的任务;ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
:能定时执行任务的线程池,线程池中核心线程数由参数指定,最大线程数=Integer.MAX_VALUE
;
FixedThreadPool
和SingleThreadExecutor
的Queue
是LinkedBlockingQueue
,因为固定线程,线程满的话需要等待队列去缓存任务;CachedThreadPool
使用的Queue
是SynchronousQueue
,不需要缓存任务,因为最大线程数没有限制;ScheduledThreadPool
来说,它使用的是延迟队列,DelayedWorkQueue
;JDK1.8
新加入了一种线程池:workStealingPool
Executors
工具类方式创建线程池不建议在生产开发中直接使用,因为创建出来的线程池比较粗糙,或多或少都存在一些问题,一般只在测试用例中使用,阿里开发手册中也是规定不能在代码中直接通过Executors
方式创建线程池。
CompletionService
ExecutorService
+ Future
在处理批量提交异步任务并获取结果时可能会存在一些问题,如下:
public void executorServiceTest() throws InterruptedException {
List<Future<Integer>> futures = new ArrayList<>();
for(int i=0;i<10;i++){
Future<Integer> future = executor.submit(new Task());
futures.add(future);
}
//Future.get获取处理结果时,按照顺序获取
for (int i = 0, size = futures.size(); i < size; i++) {
Future<Integer> f = futures.get(i);
try {
Integer ret = f.get();
log.info("ret:{}", ret);
} catch (InterruptedException | ExecutionException e) {
}
}
}
class Task implements Callable<Integer>{
private Random random = new Random();
@Override
public Integer call() throws Exception {
int v = random.nextInt(500);
TimeUnit.MICROSECONDS.sleep(v);
return v;
}
}
如上述案例:通过executor.submit()
批量提交10个任务
,然后获取这些任务的结果,只能顺序通过Future.get()
获取,因为无法知道哪些任务先完成,这就造成即使有些任务先完成,由于前面任务没有完成依然被阻塞。
CompletionService
对ExecutorService
进行包装,将结果集按照完成时间的顺序放入到阻塞队列中,获取结果时只需要从阻塞队列中获取即可,即实现:先完成的任务先获取结果。
CompletionService
逻辑如上图,其实现代码也很简单:
1、将任务封装成FutureTask
,然后提交到内部线程池中执行任务;
2、FutureTask#done()
方法会在任务被完成时进行回调的方法,CompletionService
就是重写该方法,在FutureTask
任务完成时添加到阻塞队列中去,这样就可以从阻塞队列中获取已完成的任务,见下:
protected void done() { completionQueue.add(task); }
Tips:
ExecutorCompletionService
类中使用LinkedBlockingQueue
无界队列存储结果集,所以,一定要及时去取结果,不然完成的任务结果不停的堆积到阻塞队列中,可能会撑爆内存空间。
总结
在生产开发中建议更多的使用线程池技术,除去性能方面考虑外,从软件设计上线程池比之前介绍的创建线程方式更好:
首先,线程池更好的体现了把任务单元与执行机制分离开的思想,开发者更多关注的是任务单元,只需要把需要执行的任务封装到Runnable
、Callable
中,通过submit()
、invoke()
等方式提交给线程池;而多线程如何创建、运行等执行机制是由Executor
框架提供,一般情况下对于开发者是不需要关心的。
另一点Thread
(JDK1.0
就存在)可能是由于出现的比较早,本身在接口设计上有那么点不是很完善的地方,比如我们的任务封装到Runnable
中需要和Thread
绑定完成后才能使用start
启动线程执行任务。执行完成这个线程就废弃了,而不能通过重新绑定Runnable
实现执行新任务,这就导致Thread
类接口设计上就将任务和执行很紧密的耦合在一起了,即使Runnable
、Callable
接口的出现在代码层面将任务逻辑代码部分剥离出来,但是在执行时依然需要先绑定才能启动线程,而不能做到像线程池在运行时有新任务随时提交。
相关文章
- JAVA三元运算符_java中三元运算符详解
- JAVA多线程面试题_java多线程的实现方式
- Java并发——线程同步Volatile与Synchronized详解
- 手机java程序_2020年最流行的Java开发技术
- java找不着符号_找不到符号:Java
- MySQL字段类型如何转为java_Java JDBC中,MySQL字段类型到JAVA类型的转换
- 【说站】java中并发和并行的概念
- java arraydeque poll,Java ArrayDeque「建议收藏」
- jdk1.7 hashmap扩容_Java并发实现原理:JDK源码剖析
- c 线程安全的单例模式-c多线程并发处理方式_Java多线程面试题:线程锁+线程池+线程同步等
- 【Java 并发编程】线程操作原子性问题 ( 问题业务场景分析 | 使用 synchronized 解决线程原子性问题 )
- 【Java 并发编程】线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )
- java并发编程(2):Java多线程-java.util.concurrent高级工具
- 开启Java之门:访问MySQL数据库(java访问mysql数据库)
- 学习Java编程,攻克Oracle难题(java学oracle)
- 使用Java实现Redis锁定的实现(redis锁定 java)
- java线程并发countdownlatch类使用示例
- Java并发编程示例(一):线程的创建和执行
- Java并发编程示例(九):本地线程变量的使用