zl程序教程

您现在的位置是:首页 >  后端

当前栏目

【Java 并发编程】线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )

2023-06-13 09:17:56 时间

文章目录

一、线程池阻塞队列


线程池阻塞队列是线程池创建的第

5

个参数 : BlockingQueue<Runnable> workQueue ;

    public ThreadPoolExecutor(int corePoolSize,					// 核心线程数 , 这些线程基本不会被销毁
                              int maximumPoolSize, 				// 最大线程数 , 线程池能创建的最大线程数量
                              long keepAliveTime, 				// 空闲情况下 , 非核心线程存活时间
                              TimeUnit unit, 					// 空闲时间单位
                              BlockingQueue<Runnable> workQueue,// 任务的阻塞队列 ★
                              ThreadFactory threadFactory, 		// 创建线程的工厂类
                              RejectedExecutionHandler handler) // 拒绝策略

线程池阻塞队列 : 线程池中的阻塞队列 , 同一时刻 , 只能有

1

个线程访问队列 , 执行任务 入队 / 出队 操作 ; 队列都是 FIFO 先进先出 ;

  • 阻塞队列相关概念 :
    • 大小边界 :
      • 有界 : 阻塞队列 大小有限制 , 不是无限大的 ;
      • 无界 : 阻塞队列 理论上无限大 , 比如设置成 Integer.MAX_VALUE ;
    • 队列已满 : 只能出队 , 不能入队 ; 入队操作需阻塞等待 ;
    • 队列为空 : 只能入队 , 不能出队 ; 出队操作需要等待 ;
  • ArrayBlockingQueue : 有界阻塞队列 , 需要 指定阻塞队列大小 ;
  • LinkedBlockingQueue : 无界阻塞队列 , 基于链表的阻塞队列 ;
    • Executors.newCachedThreadPool()Executors.newFixedThreadPool(10) 方法创建的线程池 , 使用的是该阻塞队列 ;
  • SynchronousQueue : 队列 不存储元素 , 后一个 Runnable 任务入队 , 必须等到前一个任务执行完毕才可以 , 否则会一直阻塞等待 ;
    • Executors.newCachedThreadPool() 方法创建的线程池 , 使用的是该阻塞队列 ;
  • PriorityBlockingQueue : 有优先级的阻塞队列 ;

阻塞队列吞吐量 : SynchronousQueue > LinkedBlockingQueue > ArrayBlockingQueue ;

二、拒绝策略


线程池拒绝策略是线程池创建的第

7

个参数 : RejectedExecutionHandler handler ;

    public ThreadPoolExecutor(int corePoolSize,					// 核心线程数 , 这些线程基本不会被销毁
                              int maximumPoolSize, 				// 最大线程数 , 线程池能创建的最大线程数量
                              long keepAliveTime, 				// 空闲情况下 , 非核心线程存活时间
                              TimeUnit unit, 					// 空闲时间单位
                              BlockingQueue<Runnable> workQueue,// 任务的阻塞队列 
                              ThreadFactory threadFactory, 		// 创建线程的工厂类
                              RejectedExecutionHandler handler) // 拒绝策略 ★

线程池拒绝策略 : 如果核心线程 , 非核心线程都在执行任务 , 阻塞队列是有界的 , 也满了 , 此时线程池如果再添加任务 , 就会触发如下拒绝策略 ;

  • DiscardPolicy : 丢弃任务 ;
  • DiscardOldestPolicy : 丢弃队头的最旧的任务 ;
  • AbortPolicy : 抛出异常 , 这也是默认方式 ;
  • CallerRunsPolicy : 调用者自行处理 ;

线程池默认的拒绝策略是 抛出异常 方式 ;

    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

三、使用 ThreadPoolExecutor 自定义线程池参数


创建

1

个线程池 , 核心线程数是

2

, 最大线程数是

3

, 则非核心线程 0 ~ 1 个 , 非核心线程最大空闲存活时间 60 秒 , 阻塞队列最大存放 10 个元素 , 拒绝策略设置为抛出异常方式 , 如果阻塞队列装满 , 再次尝试执行新任务时 , 会抛出异常 ;

代码示例 :

import java.util.concurrent.*;

public class Main {

    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(
                2,                          // 核心线程数 2
                3,                      // 最大线程数 3, 非核心线程 0 ~ 1 个
                60,                        // 非核心线程最大空闲存活时间 60 秒
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10),   // 阻塞队列, 最大存放 10 个元素
                Executors.defaultThreadFactory(),       // 线程工厂
                new ThreadPoolExecutor.AbortPolicy()    // 决绝策略, 如果执行任务失败, 抛出异常
        );
        for (int i = 0; i < 20; i ++) {
            executorService.execute(new Task(i));
        }
    }

    static class Task implements Runnable {
        /**
         * 记录线程的索引 0 ~ 99
         */
        private int i = 0;

        public Task(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            System.out.println("线程 ID : " + Thread.currentThread().getName() + " , 线程索引 : " + i);

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果 : 这里线程最大执行到了

12

, 也就是从

0

开始计数 , 执行了

13

个任务 , 其中

3

个线程池各自执行一个任务 , 阻塞队列存放

10

个任务 , 再次尝试将第

14

个任务放入阻塞队列时 , 报出 java.util.concurrent.RejectedExecutionException 异常 , 但是队列中的

10

个任务也正常执行完毕 ;

线程 ID : pool-1-thread-2 , 线程索引 : 1
线程 ID : pool-1-thread-3 , 线程索引 : 12
线程 ID : pool-1-thread-1 , 线程索引 : 0
Exception in thread "main" java.util.concurrent.RejectedExecutionException: 
Task Main$Task@5cad8086 rejected from java.util.concurrent.ThreadPoolExecutor@6e0be858
[Running, pool size = 3, active threads = 3, queued tasks = 10, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at Main.main(Main.java:16)
线程 ID : pool-1-thread-3 , 线程索引 : 2
线程 ID : pool-1-thread-1 , 线程索引 : 4
线程 ID : pool-1-thread-2 , 线程索引 : 3
线程 ID : pool-1-thread-1 , 线程索引 : 5
线程 ID : pool-1-thread-2 , 线程索引 : 7
线程 ID : pool-1-thread-3 , 线程索引 : 6
线程 ID : pool-1-thread-1 , 线程索引 : 9
线程 ID : pool-1-thread-2 , 线程索引 : 8
线程 ID : pool-1-thread-3 , 线程索引 : 10
线程 ID : pool-1-thread-2 , 线程索引 : 11