zl程序教程

您现在的位置是:首页 >  后端

当前栏目

一文详解java线程池 详解Java线程池的七个参数 详解池化技术 java如何选择核心线程数 详解Java线程池的拒绝策略

JAVA技术线程 如何 详解 参数 选择 策略
2023-09-14 09:07:25 时间

目录

引言

  • 我当时调用连薪商户-服务商接口,该接口的目的是:

    • 在客户发薪成功的回调接口中,去调用连薪服务商回单下载接口,但是连薪的付款成功和下载之间有3-5秒的延迟,这里睡眠了5秒之后,开启线程池去下载回单。

除了我自己在开发中使用到的场景,因而,线程池其他使用场景使用场景

线程池使用场景

加快请求响应(响应时间优先)

假如,用户在饿了么上查看某商家外卖,需要聚合商品库存、店家、价格、红包优惠等等信息返回给用户,接口逻辑涉及到聚合、级联等查询,从这个角度来看接口返回越快越好,那么就可以使用多线程方式,把聚合/级联查询等任务采用并行方式执行,从而缩短接口响应时间。

这种场景下使用线程池的目的就是为了缩短响应时间,往往不去设置队列去缓冲并发的请求,而是会适当调高corePoolSize和maxPoolSize去尽可能的创造线程来执行任务。

加快处理大任务(吞吐量优先)

假如,业务中台每10分钟就调用接口统计每个系统/项目的PV/UV等指标然后写入多个sheet页中返回,这种情况下往往也会使用多线程方式来并行统计。

和"时间优先"场景不同,这种场景的关注点不在于尽可能快的返回,而是关注利用有限的资源尽可能的在单位时间内处理更多的任务,即吞吐量优先。

这种场景下我们往往会设置队列来缓冲并发任务,并且设置合理的corePoolSize和maxPoolSize参数,这个时候如果设置了太大的corePoolSize和maxPoolSize,可能还会因为线程上下文频繁切换降低任务处理速度,从而导致吞吐量降低。

特殊说明

以上两种使用场景和JVM里的ParallelScavenge和CMS垃圾收集器有较大的类比性,ParallelScavenge垃圾收集器关注点在于达到可观的吞吐量,而CMS垃圾收集器重点关注尽可能缩短GC停顿时间。

线程池的池化技术

说到线程池,很自然想到数据库连接池,他们都用到池化技术。以最少的serverSocket来完成尽量多的socket请求,从而更大限度的发挥CPU的性能,提高吞吐量。

我们都知道不论是hibernate也好,Mybatis也好,其内部使用的jdbc。Jdbc内部是基于TCP协议的socket通信,socket通信有两处阻塞,连接阻塞和读写阻塞。因而在在项目启动时,便创建了核心长连接数[StandardSocketFactory],这样避免每次请求都建立连接的性能消耗,提高数据库访问的吞吐量。

线程池的创建

手动创建

线程池可以自动创建也可以手动创建,自动创建体现在Executors工具类中,常见的可以创建newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor、newScheduledThreadPool;,如下代码所示:

创建newFixedThreadPool线程池

使用的构造方式为new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()),设置了corePoolSize=maxPoolSize,keepAliveTime=0(此时该参数没作用),无界队列,任务可以无限放入。

当请求过多时(任务处理速度跟不上任务提交速度造成请求堆积)可能导致占用过多内存或直接导致OOM异常。

我们可以使用Executors.newFixedThreadPool(5);调用如下方法,请查看如下源码:

/**
* @param 线程数量
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/ 
public static ExecutorService newFixedThreadPool(int nThreads) {
     return new ThreadPoolExecutor(nThreads, nThreads,
                                   0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>());
 }

/**
 * @param nThreads 线程数量
 * @param threadFactory 创建线程的工厂类
 * @throws NullPointerException if threadFactory is null
 * @throws IllegalArgumentException if {@code nThreads <= 0}
 */
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
      return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory);
  }

创建newSingleThreadExecutor线程池

使用的构造方式为new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0),基本同newFixedThreadPool,但是将线程数设置为了1,单线程,弊端和newFixedThreadPool一致。

我们可以使用Executors.newSingleThreadExecutor();调用如下方法。

//源码
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

创建newCachedThreadPool线程池

使用的构造方式为new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue()),corePoolSize=0,maxPoolSize为很大的数,同步移交队列,也就是说不维护常驻线程(核心线程),每次来请求直接创建新线程来处理任务,也不使用队列缓冲,会自动回收多余线程,由于将maxPoolSize设置成Integer.MAX_VALUE,当请求很多时就可能创建过多的线程,导致资源耗尽OOM

我们可以使用Executors.newCachedThreadPool();调用如下方法。

//源码
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

创建newScheduledThreadPool线程池

使用的构造方式为new ThreadPoolExecutor(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue()),支持定时周期性执行,注意一下使用的是延迟队列,弊端同newCachedThreadPool一致

我们可以使用Executors.newScheduledThreadPool();调用如下方法。

/**
 * @param corePoolSize 核心线程数量
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

/**
 * @param corePoolSize 核心线程数量
 * @param threadFactory 创建线程的工厂类
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 * @throws NullPointerException if threadFactory is null
 */
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

手动创建(推荐)

原生ThreadPoolExecutor创建线程池

因为使用Executors工具类创建的线程池有隐患,那如何使用才能避免这个隐患呢?对症下药,建立自己的线程工厂类,灵活设置关键参数。

手动创建体现在可以灵活设置线程池的各个参数,体现在代码中即ThreadPoolExecutor类构造器上各个实参的不同:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

使用guava包中的ThreadFactoryBuilder工厂类来构造线程池

private static ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
 
private static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), threadFactory, new ThreadPoolExecutor.AbortPolicy());

使用guava的ThreadFactory工厂类还可以指定线程组名称,这对于后期定位错误时也是很有帮助的:

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-d%").build();

ThreadPoolExecutor中重要的几个参数详解

corePoolSize

核心线程数,也是线程池中常驻的线程数,线程池初始化时默认是没有线程的,当任务来临时才开始创建线程去执行任务。

corePoolSize计算方式

获取核心连接数:一般是电脑或服务器的cpu数量。

如果是Java代码,可以通过Runtime.getRuntime().availableProcessors()获取。

maximumPoolSize

最大线程数,在核心线程数的基础上可能会额外增加一些非核心线程,需要注意的是只有当workQueue队列填满时才会创建多于corePoolSize的线程(线程池总线程数不超过maxPoolSize)。

maximumPoolSize获取方式

这里的n指的corePoolSize的值

  • CPU密集型:n+1
  • I/O密集型:2n+1

keepAliveTime

非核心线程的空闲时间超过keepAliveTime就会被自动终止回收掉,注意当corePoolSize=maxPoolSize时,keepAliveTime参数也就不起作用了(因为不存在非核心线程);
unit:keepAliveTime的时间单位。

workQueue

用于保存任务的队列,可以为无界、有界、同步移交三种队列类型之一,当池子里的工作线程数大于corePoolSize时,这时新进来的任务会被放到队列中。

SynchronousQueue(同步移交队列)

队列不作为任务的缓冲方式,可以简单理解为队列长度为零

LinkedBlockingQueue(无界队列)

队列长度不受限制,当请求越来越多时(任务处理速度跟不上任务提交速度造成请求堆积)可能导致内存占用过多或OOM

ArrayBlockintQueue(有界队列)

队列长度受限,当队列满了就需要创建多余的线程来执行任务

threadFactory

创建线程的工厂类,默认使用Executors.defaultThreadFactory(),也可以使用guava库的ThreadFactoryBuilder来创建。

handler

线程池无法继续接收任务(队列已满且线程数达到maximunPoolSize)时的拒绝策略,取值有AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy。

拒绝策略

AbortPolicy策略

这种策略是,直接抛出一个异常;比如上面演示的抛出RejectedExecutionException异常;

DiscardPolicy策略

这种策略,会默默的把新来的这个任务给丢弃;我们不会得到通知;

DiscardOldestPolicy策略

这种策略,会把队列中存在时间最久的那个任务给丢弃掉,以便给新来的任务腾位置;

CallerRunsPolicy策略

这种策略下,因为线程池已经无法接纳新的任务了,那么谁提交的这个任务,谁就去跑这个业务;

比如,主线程向线程池提交了一个任务,线程池已经不能接纳这个任务了,那么此时就会让这个提交任务的主线程去执行这个任务;

这种策略有两点好处:

  1. 这种策略,避免了业务损失;

  2. 可以让任务提交的速度降低下来,比如主线程提交的任务被打回来后,主线程就必须执行完这个被打回来的任务后,才能够向线程池提交下一个任务,而这就相当于给了线程池一个缓冲的时间;

创建线程池的流程图

在这里插入图片描述
假如现有一个线程池,corePoolSize=10,maxPoolSize=20,队列长度为100,那么当任务过来会先创建10个核心线程数,接下来进来的任务会进入到队列中直到队列满了,会创建额外的线程来执行任务(最多20个线程),这个时候如果再来任务就会执行拒绝策略。

beforeExecute和afterExecute

在ThreadPoolExecutor类中有两个比较重要的方法引起了我们的注意:beforeExecute和afterExecute。

 protected void beforeExecute(Thread var1, Runnable var2) {
 }
 
 protected void afterExecute(Runnable var1, Throwable var2) {
 }

这两个方法是protected修饰的,很显然是留给开发人员去重写方法体实现自己的业务逻辑,非常适合做钩子函数,在任务run方法的前后增加业务逻辑,比如添加日志、统计等。

这个和我们springmvc中拦截器的preHandle和afterCompletion方法很类似,都是对方法进行环绕,类似于spring的AOP,参考下图:

在这里插入图片描述

阿里巴巴开发规范之线程池

FixedThreadPool和SigleThreadExecutor中之所以用LinkedBlockingQueue无界队列,是因为设置了corePoolSize=maxPoolSize,线程数无法动态扩展,于是就设置了无界阻塞队列来应对不可知的任务量。

而CachedThreadPool则使用的是SynchronousQueue同步移交队列,为什么使用这个队列呢?因为CachedThreadPool设置了corePoolSize=0,maxPoolSize=Integer.MAX_VALUE,来一个任务就创建一个线程来执行任务,用不到队列来存储任务。

SchduledThreadPool用的是延迟队列DelayedWorkQueue。在实际项目开发中也是推荐使用手动创建线程池的方式,而不用默认方式,关于这点在《阿里巴巴开发规范》中是这样描述的:

在这里插入图片描述

关闭线程池

shutdownNow()

立即关闭线程池(暴力),正在执行中的及队列中的任务会被中断,同时该方法会返回被中断的队列中的任务列表。

shutdown()

平滑关闭线程池,正在执行中的及队列中的任务能执行完成,后续进来的任务会被执行拒绝策略

isTerminated()

当正在执行的任务及对列中的任务全部都执行(清空)完就会返回true

线程池实现线程复用的原理

  1. 线程池里执行的是任务,核心逻辑在ThreadPoolExecutor类的execute方法中,同时ThreadPoolExecutor中维护了HashSet workers;

  2. addWorker()方法来创建线程执行任务,如果是核心线程的任务,会赋值给Worker的firstTask属性;

  3. Worker实现了Runnable,本质上也是任务,核心在run()方法里;

  4. run()方法的执行核心runWorker(),自旋拿任务while (task != null || (task = getTask()) != null)),task是核心线程Worker的firstTask或者getTask();

  5. getTask()的核心逻辑:

    • 若当前工作线程数量大于核心线程数->说明此线程是非核心工作线程,通过poll()拿任务,未拿到任务即getTask()返回null,然后会在processWorkerExit(w, completedAbruptly)方法释放掉这个非核心工作线程的引用。

    • 若当前工作线程数量小于核心线程数->说明此时线程是核心工作线程,通过take()拿任务。

    • take()方式取任务,如果队列中没有任务了会调用await()阻塞当前线程,直到新任务到来,所以核心工作线程不会被回收; 当执行execute方法里的workQueue.offer(command)时会调用Condition.singal()方法唤醒一个之前阻塞的线程,这样核心线程即可复用

在这里插入图片描述

在这里插入图片描述

Springboot中使用线程池

配置线程池

配置guava

因为如下使用的是guava.jar,因而,需要引入guava.jar的maven配置,如下代码所示:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.1-jre</version>
</dependency>

创建线程池

springboot可以说是非常流行了,下面说说如何在springboot中优雅的使用线程池,如下代码所示:

package com.example.demo.thredpool;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.*;

/**
 * @author zs
 * @datetime 2022/8/3 11:23
 * @desc 使用spring boot配置线程池
 */
@Configuration
public class ThreadPoolConfig {

  @Bean(value = "threadPoolInstance")
  public ExecutorService createThreadPoolInstance() {
    // 通过guava类库的ThreadFactoryBuilder来实现线程工厂类并设置线程名称
    ThreadFactory threadFactory =
        new ThreadFactoryBuilder().setNameFormat("thread-pool-%d").build();
    ExecutorService threadPool =
        new ThreadPoolExecutor(
            10,
            16,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(100),
            threadFactory,
            new ThreadPoolExecutor.AbortPolicy());
    return threadPool;
  }
}

使用线程池

  //通过name=threadPoolInstance引用线程池实例
  @Resource(name = "threadPoolInstance")
  private ExecutorService executorService;
 
  @Override
  public void spikeConsumer() {
    //TODO
    executorService.execute(new Runnable() {
    @Override
    public void run() {
      //TODO
     }});
  }

Callable和Runnable

Runnable和Callable都可以理解为任务,里面封装这任务的具体逻辑,用于提交给线程池执行。

区别在于Runnable任务执行没有返回值,且Runnable任务逻辑中不能通过throws抛出cheched异常(但是可以try catch),而Callable可以获取到任务的执行结果返回值且抛出checked异常。

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
 
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Future和FutureTask

Future接口用来表示执行异步任务的结果存储器,当一个任务的执行时间过长就可以采用这种方式。

把任务提交给子线程去处理,主线程不用同步等待,当向线程池提交了一个Callable或Runnable任务时就会返回Future,用Future可以获取任务执行的返回结果。Future的主要方法包括:

  • get()方法:返回任务的执行结果,若任务还未执行完,则会一直阻塞直到完成为止,如果执行过程中发生异常,则抛出异常,但是主线程是感知不到并且不受影响的,除非调用get()方法进行获取结果则会抛出ExecutionException异常;

  • get(long timeout, TimeUnit unit):在指定时间内返回任务的执行结果,超时未返回会抛出TimeoutException,这个时候需要显式的取消任务;
    cancel(boolean mayInterruptIfRunning):取消任务,boolean类型入参表示如果任务正在运行中是否强制中断;

  • isDone():判断任务是否执行完毕,执行完毕不代表任务一定成功执行,比如任务执行失但也执行完毕、任务被中断了也执行完毕都会返回true,它仅仅表示一种状态说后面任务不会再执行了;
    isCancelled():判断任务是否被取消;

下面来实际演示Future和FutureTask的用法:知识库页面发布接口逻辑:

@Override
public ResultResponse publish(PageIndex page) {
    ResultResponse<Boolean> result = new ResultResponse<>();
    ResultResponse<PageIndex> recordResult = getPageRecord(page.getPageId());
    if (!recordResult.isSuccess()) {
        return recordResult;
    }
    PageIndex record = recordResult.getData();
    String currentUser = RequestContext.getUser().getUserCode();
    // 生成html文件,后续导出pdf要用
    FutureTask htmlFileTask = new FutureTask<>(new CreateHtmlFileTask(page.setName(record.getName())));
    // 同步更新页面所属知识库的最后更新人及更新时间
    FutureTask wikiUpdateTask = new FutureTask<>(new WikiUpdateTask(record.getWikiId(), currentUser));
    // 生成页面历史版本记录
    FutureTask pageVersionTask = new FutureTask<>(new GeneratePageVersionTask(new PageVersion(record.getPageId(), record.getWikiId(), page.getHtmlContent(), page.getContent(), currentUser, new Date(), null, null)));
    // 更新页面最新信息
    FutureTask pageUpdateTask = new FutureTask<>(new PageUpdateTask(page));
    List<FutureTask<String>> futureTaskList = ImmutableList.of(htmlFileTask, wikiUpdateTask, pageVersionTask, pageUpdateTask);
    // 任务提交
    futureTaskList.forEach(task -> executorService.submit(task));
    try {
        for (FutureTask<String> task : futureTaskList) {
            // 阻塞式等待任务执行结果
            String taskResult = task.get();
            logger.info("publish taskResult={}", taskResult);
        }
    } catch (Exception e) {
        logger.error("发布接口异常:{}", e, e.getMessage());
        e.printStackTrace();
    }
    // 释放锁
    pageLockService.unlock(page.getPageId(), currentUser);
    return result.data(true);
}

class CreateHtmlFileTask implements Callable<String> {
    private PageIndex pageIndex;

    public CreateHtmlFileTask(PageIndex pageIndex) {
        this.pageIndex = pageIndex;
    }

    @Override
    public String call() throws Exception {
        return generateHtmlFile(pageIndex);
    }
}

线程池的优化

实现优先使用运行线程及调整线程数大小的线程池

当前在JDK中默认使用的线程池 ThreadPoolExecutor,在具体使用场景中,有以下几个缺点:

  1. core线程一般不会timeOut

  2. 新任务提交时,如果工作线程数小于 coreSize,会自动先创建线程,即使当前工作线程已经空闲,这样会造成空闲线程浪费

  3. 设置的maxSize参数只有在队列满之后,才会生效,而默认情况下容器队列会很大(比如1000)

如一个coreSize为10,maxSize为100,队列长度为1000的线程池,在运行一段时间之后的效果会是以下2个效果:

  1. 系统空闲时,线程池中始终保持10个线程不变,有一部分线程在执行任务,另一部分线程一直wait中(即使设置allowCoreThreadTimeOut)

  2. 系统繁忙时,线程池中线程仍然为10个,但队列中有还没有执行的任务(不超过1000),存在任务堆积现象

将描述一下简单版本的线程池,参考于 Tomcat ThreadPoolExecutor, 实现以下3个目标:

  1. 新任务提交时,如果有空闲线程,直接让空闲线程执行任务,而非创建新线程

  2. 如果coreSize满了,并且线程数没有超过maxSize,则优先创建线程,而不是放入队列

  3. 其它规则与ThreadPoolExecutor一致,如 timeOut机制

首先看一下ThreadPoolExecutor的执行逻辑, 其基本逻辑如下

  1. 如果线程数小于coreSize,直接创建新线程并执行(coreSize逻辑)
  2. 尝试放入队列
  3. 放入队列失败,则尝试创建新线程(maxSize逻辑)

而执行线程的任务执行逻辑,就是不断地从队列里面获取任务并执行,换言之,即如果有执行线程,直接往队列里面放任务,执行线程就会被通知到并直接执行任务。

空闲线程优先

空闲线程优先在基本逻辑中,即如果线程数小于coreSize,但如果有空闲线程,就取消创建线程的逻辑. 在有空闲线程的情况下,直接将任务放入队列中,即达到任务执行的目的。

这里的逻辑即是直接调整默认的ThreadPoolExecutor逻辑,通过重载 execute(Runnable) 方法达到效果. 具体代码如下所示:

public void execute(Runnable command) {
    //此处优先处理有活跃线程的情况,避免在<coreSize时,直接创建线程
    if(getActiveCount() < getPoolSize()) {
        if(pool1.offer(command)) {
            return;
        }
    }
    super.execute(command);
}

coreSize满了优先创建线程

从之前的逻辑来看,如果放入队列失败,则尝试创建新线程。

在这个时候,相应的coreSize肯定已经满了。

那么,只需要处理一下逻辑,将其offer调整为false,即可以实现相应的目的。

这里的逻辑,即是重新定义一个BlockingDeque,重载相应的offer方法,相应的参考如下:

public boolean offer(Runnable o) {
    //这里的parent为ThreadPoolExecutor的引用
    int poolSize = parent.getPoolSize();
    int maxPoolSize = parent.getMaximumPoolSize();
    //还没到最大值,先创建线程
    if(poolSize < maxPoolSize) {
        return false;
    }
    //默认逻辑
    return super.offer(o);
}

即判定当前线程池中线程数如果小于最大线程数,即直接返回false,达到放入队列失败的效果。

总结

按照以上的调整,只需要通过继承自默认的ThreadPoolExecutor和默认的BlockingQueue(如LinkedBlockingDeque),重载2个主要的方法 ThreadPoolExecutor#execute 和 LinkedBlockingDeque#offer 即达到调整的目的。

缺点在于,实现后的类,在定义时,需要互相引用,因为相应的逻辑中需要互相调用相应的方法,以处理逻辑。此外,ThreadPoolExecutor的相应方法 getXXX 方法,在调用时都为通过加锁式实现,以精确返回数据,这里在多线程环境中可能会存在一些性能上的考虑。

在Tomcat默认的worker线程池中,即是采用以上的逻辑来达到工作线程的调整逻辑。因此在spring boot tomcat embedded中,通过参数 server.tomcat.max-thread 和 min-thread 即是通过优先调整线程来达到控制工作线程的目的。 相应的处理类为 org.apache.tomcat.util.threads 下的 ThreadPoolExecutor 和 TaskQueue

基于spring体系的业务中正确地关闭线程池

在业务代码中,特别是基于spring体系的代码中,均会使用线程池进行一些操作,比如异步处理消息,定时任务,以及一些需要与当前业务分离开的操作等。

常规情况下,使用spring体系的TaskExecutor或者是自己定义ExecutorService,均可以正常地完成相应的操作。不论是定义一个spring bean,或者是使用 static Thread工具类均是能满足条件。

但是,如果需要正常地关闭spring容器时,这些线程池就不一定能够按照预期地关闭了。结果就是,当使用代码 context.close() 时,期望进程会正常地退出,但实际上进程并不会退出掉.原因就在于这些线程池中还在运行的线程。

本文描述了在基于spring boot的项目中,如何正确地配置线程池,以保证线程池能够正确的在整个spring容器周期内运行,并且在容器正常关闭时能够一并退出掉。

  • 定义bean时添加destroyMethod方法或相应生命周期方法

  • 设置线程池中线程为daemon

  • 为每个线程池正确地命名及使用ThreadFactory

  • 丢弃不再需要的周期性任务

  • 监听ContextClosedEvent,触发额外操作

定义Bean时的Lifecycle定义

将线程池定义为spring bean,为保证在容器关闭时,线程池一并关闭,可以为其定义相应的关闭方法。以下为特定的bean的关闭方法:

ThreadPoolTaskExecutor#destroy

ThreadPoolTaskExecutor#destroy

ScheduledThreadPoolExecutor#shutdown

针对 ConcurrentTaskExecutor 这种并没有实现关闭方法的bean,则需要保证其所使用 executor 能够被正常地关闭

在spring 体系,让一个bean可以接收Lifecycle管理,有多种方法,如下所示:

@Bean(destroyMethod) //定义时

@PreDestroy //bean方法

DisposableBean  //接口

AutoCloseable   //接口

Lifecycle   //接口
//有方法名为 shutdown

设置线程池中线程为Daemon

一般情况下,关闭线程池后,线程池会自行将其中的线程结束掉.但针对一些自己伪装或直接new Thread()的这种线程,则仍会阻塞进程关闭。

按照,java进程关闭判定方法,当只存在Daemon线程时,进程才会正常关闭。因此,这里建议这些非主要线程均设置为 daemon,即不会阻塞进程关闭.

Thread.setDaemon(true)

不过更方便的是使用ThreadFactory,其在 newThread 时,可以直接操作定义出来的Thread对象(如后面所示)

正确命名Thread

在使用线程池时,一般会接受 ThreadFactory 对象,来控制如何创建thread。

在java自带的ExecutorService时,如果没有设置此参数,则会使用默认的 DefaultThreadFactory。

效果就是,你会在 线程栈列表中,看到一堆的 pool-x-thread-y,在实际使用 jstack时,根本看不清这些线程每个所属的组,以及具体作用。

这里建议使用 guava 工具类 ThreadFactoryBuilder 来构建,如下代码参考所示

ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("定义任务-%d").build()

此代码,一是定义所有创建出来的线程为 daemon,此外相应的线程name均为 指定前缀开始。在线程栈中可以很方便地查找和定位. 此外,在判断影响进程退出时,也可以很方便地判断出是否是相关的线程池存在问题(查看daemon属性及name)。

丢弃不再可用周期性任务

一般情况下,使用 java 自带的 ScheduledThreadPoolExecutor, 调用 scheduleAtFixedRate 及 scheduleWithFixedDelay 均会将任务设置为周期性的(period)。在线程池关闭时,这些任务均可以直接被丢弃掉(默认情况下). 但如果使用 schedule 添加远期的任务时,线程池则会因为其不是 周期性任务而不会关闭所对应的线程

如 spring 体系中 TriggerTask(包括CronTask), 来进行定时调度的任务,其最终均是通过 schedule 来实现调度,并在单个任务完成之后,再次 schedule 下一次任务的方式来执行。这种方式会被认为并不是 period. 因此,使用此调度方式时,尽管容器关闭时,执行了 shutdown 方法,但相应底层的 ScheduledExecutorService 仍然不会成功关闭掉(尽管所有的状态均已经设置完)。最终效果就是,会看到一个已经处于shutdown状态的线程池,但线程仍然在运行(状态为 wait 任务)的情况。

为解决此方法,java 提供一个额外的设置参数 executeExistingDelayedTasksAfterShutdown, 此值默认为true,即 shutdown 之后,仍然执行。可以通过在定义线程池时将其设置为 false,即线程池关闭之后,不再运行这些延时任务。

监听ContextClosedEvent事件

针对在业务中自己构建的bean时,可以通过上面的方式进行相应的控制,如果是三方的代码。则需要在容器关闭时通过触发额外的操作,来显示地进行三方的代码。如,在代码中获取到三方的对象,主动调用其的close方法。

惟一需要作的就是监听到容器关闭事件,然后在回调代码中进行相应的操作。因此,只需要定义beqn,实现以下接口即可

CallbackObj implements ApplicationListener<ContextClosedEvent> {
    public void onApplicationEvent(ContextClosedEvent event) {}
}

当此对象声明为bean时,当容器关闭时,会触发相应的事件,并回调起相应的操作.

总结

上面的几个方面均是从仔细控制线程池的创建和销毁来进行描述。

基本思路即是从对象管理,生命周期以及代码控制多个方面来处理.在实际项目中,业务使用方只需要使用相应的组件即可,但组件提供方需要正确地提供,才能保证基础架构的完整性。

java web项目中慎用Executors以及非守护线程

最近研究embeded tomcat,特别是关于tomcat启动和关闭的模块。

通过查看相应的源代码, 我们知道tomcat的关闭是通过往相应的关闭端口发送指定的关闭指令来达到关闭tomcat的目的。

但是有的时候,通过shutdown.bat或shutdown.sh却不能有效地关闭tomcat,网上也有很多人提出这个问题。通过相关资料,最后问题出现线程上。

首先看java虚拟机退出的条件,如下所示:

  • a,调用了 Runtime 类的 exit 方法,并且安全管理器允许退出操作发生。

  • b,非守护线程的所有线程都已停止运行,无论是通过从对 run 方法的调用中返回,还是通过抛出一个传播到 run 方法之外的异常。

如上所示,第一条是通过exit退出,第二条指出了一个正常程序退出的条件,就是所有的非守护线程都已经停止运行。我们看相应embed tomcat的启动代码,如下所示:

tomcat.start();
tomcat.getServer().await();

最后一条有效的运行命令即是await,通过调用shutdown命令时,这个await就会成功的返回。

按照常理来说,整个程序即会成功的完成。但是程序有时候并没有成功的结束,原因就在于程序中还存在着非守护进程。

对于tomcat来说,tomcat程序中开启的所有进程都是守护进程,所以tomcat自身可以保证程序的正常结束。当await结束时,tomcat所就正常的结束了,包括相应的监听端口等,都已经成功结束。然而,由于项目程序中仍然还有其它线程在运行,所以导致java虚拟机并没有成功的退出。

在我们的项目中,很多时候都运用到了线程。比如,异步调用等。不过,幸运的是,这些线程往往都是守护线程,原因就在于tomcat在运行我们的项目时,对于每一个请求,tomcat是使用了守护线程来进行相应的请求调用,这个保证在以下代码:

//AprEndpoint
// Start poller threads
pollers = new Poller[pollerThreadCount];
for (int i = 0; i < pollerThreadCount; i++) {
    pollers[i] = new Poller(false);
    pollers[i].init();
    Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);
    pollerThread.setPriority(threadPriority);
    pollerThread.setDaemon(true);
    pollerThread.start();

}

所以,一般情况下,在我们的项目代码中使用new Thread建立的线程都是守护线程,原因就是新建线程默认上使用建立线程时的当前线程所处的守护状态。

tomcat的请求处理线程为守护线程,所以我们一般情况下建立的线程也是守护线程。然而,Executors除外。

使用Executors建立后台线程并执行一些多线程操作时,Executors会使用相对应的threadFactory来对runnable建立新的thread,所以使用默认的threadFactory时就会出问题。默认的ThreadFactory强制性的将新创建的线程设置为非守护状态,如下所示:

public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(), 0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

所以,一般情况下,我们使用executors创建多线程时,就会使用默认的threadFactory(即调用只有一个参数的工厂方法),而创建出来的线程就是非守护的。

而相应的程序就永远不会退出,如采用Executors创建定时调度任务时,这个调试任务永远不会退出。解决的办法就是重写相对应的threadFactory,如下所示:

new ThreadFactory() {
   public Thread newThread(Runnable r) {
       Thread s = Executors.defaultThreadFactory().newThread(r);
       s.setDaemon(true);
       return s;
   }
}

同理,对于java web项目中的线程程序,一定要记住将相应的线程标记为守护线程(尽管它默认就是守护的)。而对于使用Executors,一定要记住传递相应的threadFactory实现,以重写相应的newThread方法,将线程标记为守护线程。

以上的结论对于普通的java项目同样有效,为了正常的结束相应的程序,一定要正确的使用相应的线程,以避免java程序不能退出的问题。

总结

线程池也是池化一种,也有连接池的特性——以最少的线程,完成更多的任务。

如果客户端每个任务过来,就去创建一个线程去处理,那么势必会创建很多线程。

然而,java分为用户线程和内核线程,new Thread()是创建用户线程,thread.start()方法是创建内核线程。java通过start0()方法通过jvm地第三方语言去创建内核线程,因而,java创建线程的代价很高。如果处理不当,还是造成CPU飙升,因而,考虑到这个问题,便有了线程池,复用已有的线程。

当一个任务请求过来,首先使用workContOf方法,判断当前线程池中的线程数是否小于核心线程数,如果小于核心线程数,便去创建新的线程。将当前任务置为新线程的第一个任务,于是去调用runTask方法,在该方法中执行「firstTask.run()」,任务执行完毕后,gc回收当前任务。

如果队列中的任务已满,仍有新的任务进来。则判断线程数量是否达到最大线程的数量,如果没有达到最大线程的数量,那么就去创建一个新的线程,将当前任务置为新线程的第一个任务,于是去调用runTask方法,在该方法中执行「firstTask.run()」,任务执行完毕后,gc回收当前任务。

如果线程数量达到线程池中所能创建的最大线程的数量,便使用线程池地饱和拒绝策略。

策略名称策略目的
abortPolicy默认策略,丢弃任务,并抛出拒绝异常。
discardOldestPolicy丢弃队列里最近的一个任务并执行当前任务。
discardPolicy不处理,丢弃掉。
runCallersPolicy只用调用者所在的线程来执行任务。

如果队列中的任务数小于队列数,而线程在指定时间内是空闲的,那么当前线程便被回收,但核心线程不会被回收,以便下次复用。

参看文档

https://blog.csdn.net/fanrenxiang/article/details/79855992