zl程序教程

您现在的位置是:首页 >  移动开发

当前栏目

【Android 异步操作】线程池 ( 线程池作用 | 线程池种类 | 线程池工作机制 | 线程池任务调度源码解析 )

Android源码线程异步 操作 解析 作用 机制
2023-06-13 09:17:44 时间

文章目录

一、线程池作用


线程池作用 :

① 避免创建线程 : 避免每次使用线程时 , 都需要 创建线程对象 ;

② 统一管理 : 统一管理线程 , 重用存在的线程 , 减少线程对象创建 , 销毁的开销 ;

③ 控制并发 : 可 控制线程的最大并发数 , 提高资源使用效率 , 避免资源竞争导致堵塞 ;

二、线程池种类


线程池种类 :

① newCachedThreadPool : 可缓存线程池 , 如果 线程池线程个数已满 , 回收空闲线程 , 如果没有空闲线程 , 此时会创建新线程 ;

② newFixedThreadPool : 创建固定大小线程池 , 可设置并发数 , 如果并发数已满 , 后续任务会 在 等待队列 中等待可用线程 ;

③ newScheduledThreadPool : 创建固定大小线程池 , 其支持 周期性任务 ;

④ newSingleThreadExecutor : 创建 单线程 线程池 , 该线程池中 只有一个线程 , 所有的任务按照指定的优先级顺序执行 , 如 FIFO 先入先出 ( 先到的先执行 , 后到的后执行 ) , LIFO 后入先出 ( 后到的先执行 ) ;

三、线程池工作机制


线程池线程相关概念:

  • 线程数 : 线程池的 有 最大线程数 MaxSzie , 核心线程数 CoreSize , 非核心线程数就是 MaxSize - CoreSize ;
  • 示例 : 最大线程数 ( MaxSize ) 是 8 个 , 有 3 个核心线程 ( CoreSize ) , 5 个非核心线程 ;
  • 非核心线程 : 闲置超过一定时间 , 就会被回收 ;

线程池任务调度 : 线程池中维护了一个任务队列 , 线程池启动后 , 会不停的从任务队列中取出任务 , 如果有新任务 , 执行如下操作 ;

如果 线程数 小于核心线程数 ( CoreSize ) , 那么创建核心线程 , 执行上述任务 ;

如果 线程数 大于核心线程数 ( CoreSize ) , 小于最大线程数 ( MaxSize ) , 那么创建非核心线程 , 执行上述任务 ;

如果 线程数 超过 最大线程数 ( MaxSize )

  • 如果 任务队列没满 , 则将任务放入任务队列 ;
  • 如果 任务队列满了 , 则抛出异常 ; 这里一般情况下需要手动处理这种情况 , 任务拒绝后 , 处理善后 ;

四、线程池任务调度源码解析


在 AsyncTask.java 中 , 在静态代码块中 , 自己 自定义创建了线程池 , 没有使用上述四种线程池 ;

创建线程池时传入的参数 :

  • CORE_POOL_SIZE : 核心线程数
  • MAXIMUM_POOL_SIZE : 最大线程数
  • KEEP_ALIVE_SECONDS : 闲置时间 , 非核心线程一旦闲置超过一定时间 , 就会被回收
  • TimeUnit.SECONDS : 闲置时间单位 , 秒
  • sPoolWorkQueue : 线程队列 , 任务队列
  • sThreadFactory : 线程工厂 , 用于生产线程
public abstract class AsyncTask<Params, Progress, Result> {
    static {
        /**
         * 自定义的线程池 : 
         * CORE_POOL_SIZE : 核心线程数 
         * MAXIMUM_POOL_SIZE : 最大线程数 
         * KEEP_ALIVE_SECONDS : 闲置时间 , 非核心线程一旦闲置超过一定时间 , 就会被回收
         * TimeUnit.SECONDS : 闲置时间单位 , 秒
         * sPoolWorkQueue : 线程队列 , 任务队列 
         * sThreadFactory : 线程工厂 , 用于生产线程 
         */  
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }
    
    private static class SerialExecutor implements Executor {
        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
            	// 线程池执行任务 
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }
}

在 AsyncTask 中 , 调用 ThreadPoolExecutor THREAD_POOL_EXECUTOR 线程池的 void execute(Runnable command) 方法 , 执行线程池任务 ;

在 execute 方法中, 需要执行以下三个步骤 :

  1. 如果当前 运行线程数小于核心线程数 , 尝试 启动新线程执行该任务, 该任务是线程的第一个任务.调用 addWorker 方法会检查运行状态, 和线程运行个数, 避免在不应该添加线程时执行错误操作.
  2. 如果 任务成功加入队列, 需要 双重检查 ( 进入该方法后, 线程池可能关闭 ), 在进入该方法后, 是否添加了一个线程, 或者线程池是否关闭. 因此, 我们应该再次检查运行状态, 如果需要, 将任务放回队列中, 或者启动一个新线程.
  3. 如果 不能将任务入队, 尽量添加一个新线程. 如果添加失败, 此时线程池可能关闭, 或者运行线程数等于最大线程数, 需要拒绝该任务.
public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 在将来的某个时间执行给定的任务. 
     * 该任务可能在一个新线程中执行, 也可能在当前线程池中已存在的线程中执行.
     * 
     * 如果任务不能被提交执行, 或该线程池失效, 或该线程池线程个数由于超过最大线程数,
     * 任务被 RejectedExecutionHandler 处理. 
     *
     * @param command 向线程池中提交的任务 
     * @throws RejectedExecutionException 如果任务不能被接受, 抛出该异常
     * @throws NullPointerException 如果任务为空, 抛出该异常 
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 三个步骤:
         *
         * 1. 如果当前运行线程数小于核心线程数 , 尝试启动新线程执行该任务, 该任务是线程的第一个任务.
         * 调用 addWorker 方法会检查运行状态, 和线程运行个数, 避免在不应该添加线程时执行错误操作.
         *
         * 2. 如果任务成功加入队列, 需要双重检查 ( 进入该方法后, 线程池可能关闭 ), 
         * 在进入该方法后, 是否添加了一个线程, 或者线程池是否关闭.
         * 因此, 我们应该再次检查运行状态, 如果需要, 将任务放回队列中, 或者启动一个新线程.
         *
         * 3. 如果不能将任务入队, 尽量添加一个新线程. 
         * 如果添加失败, 此时线程池可能关闭, 或者运行线程数等于最大线程数, 需要拒绝该任务.
         */
        int c = ctl.get();
        // 当前运行的线程数 小于 核心线程数 
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        
  		// 确保处于运行状态, 然后将任务添加到队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 如果不处于运行状态, 从队列中移除
            if (! isRunning(recheck) && remove(command))
                reject(command);	// 拒绝任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 尝试添加任务 
        else if (!addWorker(command, false))
            reject(command);
    }
}