zl程序教程

您现在的位置是:首页 >  后端

当前栏目

【Java 并发编程】线程池机制 ( 线程池执行任务细节分析 | 线程池执行 execute 源码分析 | 先创建核心线程 | 再放入阻塞队列 | 最后创建非核心线程 )

2023-06-13 09:17:56 时间

文章目录

一、线程池执行任务细节分析


线程池执行细节分析 :

核心线程数

10

, 最大小成熟

20

, 非核心线程数

10

, 非核心线程空闲存活时间

60

秒 , 阻塞队列大小

10

个 ;

当有 Runnable 任务进入线程池后 ;

先查看 " 核心线程 " , 如果没有核心线程 , 先 创建核心线程 ;

如果有核心线程 , 则 查看核心线程是否有空闲的 ;

如果有空闲的核心线程 , 直接将该任务分配给该空闲核心线程 ;

如果没有空闲核心线程 , 则 查看核心线程数有没有满 ;

如果核心线程没有满 , 则 创建一个核心线程 , 然后执行该任务 ;

如果核心线程满了 , 将该任务放入 " 阻塞队列 " 中 , 查看阻塞队列是否已满 ;

如果阻塞队列没有满 , 直接 将任务放入阻塞队列中 ;

如果阻塞队列满了 , 则 查看是否能创建 " 非核心线程 " ;

如果能创建非核心线程 , 则 创建非核心线程 , 并执行该任务 ;

如果不能创建非核心线程 , 则 执行 " 拒绝策略 " ;

二、线程池执行 execute 源码分析


查看传入的 Runnable 任务是否为空 , 如果为空 , 就报异常 ;

        if (command == null)
            throw new NullPointerException();

获取当前线程池的状态 , 根据不同的状态 , 执行不同的操作 ;

        /*
         * 进行以下三个步骤处理:
         *
         * 1. 如果当前运行的线程 , 小于核心线程数 , 那么创建一个新的核心线程 , 
         * 将传入的任务作为该线程的第一个任务 . 
         * 调用 addWorker 方法 , 会原子性检查运行状态和任务数量 ; 
         * 如果在不应该添加线程的情况下执行添加线程操作 , 就会发出错误警报 ; 
         * 如果该方法返回 false , 说明当前不能添加线程 , 此时就不要执行添加线程的操作了 ; 
         *
         * 2. 如果任务被成功放入 线程池任务 队列 , 不管我们此时是否应该添加线程 , 都需要进行双重验证 ;
         * 双重验证 : 添加到任务队列时验证一次 , 添加到线程执行时验证一次 ; 
         * 可能存在这种情况 , 在上次验证线程运行状态之后 , 有可能该线程就立刻被销毁了 ;
         * 也可能存在进入该方法后 , 线程池被销毁的情况 ; 
         * 因此我们反复验证线程状态 , 如果需要在线程停止时回滚队列 , 如果没有线程就创建新线程 ;
         *
         * 3. 如果不能将任务放入队列中 , 尝试创建一个新线程 ; 
         * 如果创建线程失败 , 说明当前线程池关闭 , 或者线程池中线程饱和 , 此时拒绝执行该任务 ; 
         */
        int c = ctl.get();

上述 AtomicInteger ctl 线程池状态是很关键的原子变量 , 该原子变量中同时包含了线程池的线程数量 , 该值是一个组合的数值 ; 该 int 值

4

字节

32

位 , 前

3

位是线程池的状态位 , 剩下的

29

位是线程数 ;

	/**
	 * 主池控制状态ctl是一个原子整数
	 * 两个概念领域
	 * workerCount,指示有效线程数
	 * 运行状态,指示是否运行、关闭等
	 * 
	 * 为了将它们打包成一个整数,我们将workerCount限制为
	 * (2^29)-1(约5亿)个线程,而不是(2^31)-1(2
	 * 10亿)否则可代表。如果这曾经是一个问题
	 * 将来,变量可以更改为原子长度,
	 * 下面的移位/遮罩常数已调整。但在需要之前
	 * 因此,此代码使用int更快更简单。
	 * 
	 * workerCount是已注册的工人数
	 * 允许启动,不允许停止。该值可能是
	 * 与活动线程的实际数量暂时不同,
	 * 例如,ThreadFactory在以下情况下无法创建线程:
	 * 当退出线程仍在执行时
	 * 终止前的簿记。用户可见池大小为
	 * 报告为工作集的当前大小。
	 * 
	 * 运行状态提供主要的生命周期控制,具有以下值:
	 * 
	 * 正在运行:接受新任务和处理排队的任务
	 * 关机:不接受新任务,但处理排队的任务
	 * 停止:不接受新任务,不处理排队的任务,
	 * 并中断正在进行的任务
	 * 整理:所有任务都已终止,workerCount为零,
	 * 正在转换为状态整理的线程
	 * 将运行终止的()钩子方法
	 * 终止:终止()已完成
	 * 
	 * 这些值之间的数字顺序很重要,以允许
	 * 有序比较。运行状态随时间单调增加
	 * 时间,但不需要击中每个状态。这些转变是:
	 * 
	 * 运行->关机
	 * 在调用shutdown()时,可能隐式地在finalize()中
	 * (运行或关闭)->停止
	 * 在调用shutdownNow()时
	 * 关机->整理
	 * 当队列和池都为空时
	 * 停止->整理
	 * 当池为空时
	 * 清理->终止
	 * 当终止的()钩子方法完成时
	 * 
	 * 等待终止()的线程将在
	 * 国家终止。
	 * 
	 * 检测从关闭到清理的过渡较少
	 * 比您希望的简单,因为队列可能会
	 * 非空后为空,关机状态下为空,但
	 * 只有在看到它是空的之后,我们才能终止
	 * workerCount为0(有时需要重新检查——请参阅
	 * 下)。
	 */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

简单的机翻了下 , 如果查看详细的英文注释 , 查看 libcore/ojluni/src/main/java/java/util/concurrent/ThreadPoolExecutor.java 源码 ;

线程池的状态如下 , 有

5

种状态 ;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

判断当前的工作线程数 workerCountOf(c) 是否小于核心线程数 corePoolSize ;

如果小于 , 则添加核心线程 addWorker(command, true) ;

这里注意 , 来了新任务后 , 不是先将任务放入阻塞队列 , 而是检查核心线程 , 先尝试将核心线程部署满 ;

        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

判断当前的线程池状态 isRunning(c) 是否正在执行处于 RUNNING 状态 , 如果当前线程池处于 RUNNING 状态 , 说明所有的核心线程都满了 , 则将任务队列放入阻塞队列中 workQueue.offer(command) ;

如果可以入队 , 重新检查状态 , 如果必要 回滚排队 ! isRunning(recheck) && remove(command) , 重新检查状态通过后 , addWorker(null, false) 将任务添加如阻塞队列中 ;

入队失败 , 尝试添加非核心线程 !addWorker(command, false) , 如果非核心线程也失败 , 则执行拒绝策略 reject(command) ;

        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);