zl程序教程

您现在的位置是:首页 >  移动开发

当前栏目

【Android 异步操作】线程池 ( 线程池使用示例 | 自定义线程池使用流程 | 自定义任务拒绝处理策略 | 完整代码示例 )

Android流程线程异步代码 使用 处理 操作
2023-06-13 09:17:48 时间

文章目录

在博客 【Android 异步操作】线程池 ( 线程池简介 | 线程池初始化方法 | 线程池种类 | AsyncTask 使用线程池示例 ) 中 , 简单介绍了 线程池 , 以及 Java 提供的四个基本线程池 , 线程池的 基本工作机制 , 如核心线程 , 非核心线程 等 ;

在博客 【Android 异步操作】线程池 ( 线程池 execute 方法源码解析 ) 中 , 讲解 线程池 ThreadPoolExecutor 的 execute 方法时 , 有两个重要的核心方法 ;

两个核心的操作 :

  • 添加任务 : addWorker(command, true) , 第二个参数为 true 是添加核心线程任务 , 第二个参数为 false 是添加非核心线程任务 ;
  • 拒绝任务 : reject(command)

在博客 【Android 异步操作】线程池 ( 线程池 reject 拒绝任务 | 线程池 addWorker 添加任务 ) 介绍了 addWorker 添加任务 , reject 拒绝任务 的源码细节 ;

在博客 【Android 异步操作】线程池 ( Worker 简介 | 线程池中的工作流程 runWorker | 从线程池任务队列中获取任务 getTask ) 中介绍了 工作者 Worker 的工作流程 ;

本博客中简单介绍线程池的使用示例

一、自定义线程池使用流程


1 . 定义线程工厂 : 该线程工厂用于 创建线程池中的线程 ;

    /**
     * 线程工厂
     * 用于创建线程
     */
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

2 . 线程池任务队列 : 指定 BlockingQueue<Runnable> 类型的线程池队列 , 同时指定队列大小 ;

    /**
     * 线程池任务队列
     * 最多可以容纳 128 个可执行的任务
     */
    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);

3 . 初始化线程池 : 调用 ThreadPoolExecutor 的 构造函数 初始化线程池 , 并对线程池进行配置 , 配置内容包括如下内容 :

  • 核心线程数
  • 最大线程数
  • 非核心线程最大限制时间
  • 闲置时间的时间单位
  • 线程池任务队列
  • 线程创建工厂
        /*
            在静态代码块中初始化线程池
            在构造函数中对线程池进行配置 , 配置内容包括 :
            核心线程数
            最大线程数
            非核心线程最大限制时间
            闲置时间的时间单位
            线程池任务队列
            线程创建工厂
         */
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;

4 . 自定义任务拒绝处理策略 : 处理任务队列已满 , 拒绝任务的情况 ;

        THREAD_POOL_EXECUTOR.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                // 自定义任务被拒绝后的处理策略
                System.out.println("任务被拒绝");
            }
        });

5 . 执行任务 : 调用线程池的 execute 方法执行任务 ;

            THREAD_POOL_EXECUTOR.execute(
                    new Runnable() {
                        @Override
                        public void run() {
                            try {
                                Thread.sleep(1_000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
            );

二、自定义任务拒绝处理策略


如果执行的任务时 , 当前的线程池任务队列已满 , 此时就会拒绝任务 , 并抛出 RejectedExecutionException 异常 ;

报错信息如下 :

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task kim.hsl.threadpool.ThreadPool$2@1f32e575 rejected from java.util.concurrent.ThreadPoolExecutor@279f2327[Running, pool size = 17, active threads = 17, queued tasks = 128, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at kim.hsl.threadpool.ThreadPool.main(ThreadPool.java:90)

解决方案 : 为线程池设置 RejectedExecutionHandler , 该处理器需要开发者自定义 , 实现 RejectedExecutionHandler 接口 , 并实现其 rejectedExecution 方法 ;

        THREAD_POOL_EXECUTOR.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                // 自定义任务被拒绝后的处理策略
                System.out.println("任务被拒绝");
            }
        });

三、完整代码示例


package kim.hsl.threadpool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPool {

    /*
        自定义线程池使用示例
        自己配置线程池的各种参数

        模仿 AsyncTask 使用线程池
        部分代码从 AsyncTask 类中拷贝过来
     */

    /**
     * 获取当前的 CPU 核数
     */
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

    /**
     * 线程池核心线程数
     * 线程池中最少 2 个线程 , 最多 4 个线程 ,
     * 最好是选择 CPU 核数 - 1 个 , 避免后台任务使 CPU 性能饱和
     */
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));

    /**
     * 最大线程数
     */
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;

    /**
     * 非核心线程最大闲置时间
     */
    private static final int KEEP_ALIVE_SECONDS = 30;

    /**
     * 线程工厂
     * 用于创建线程
     */
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

    /**
     * 线程池任务队列
     * 最多可以容纳 128 个可执行的任务
     */
    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);

    /**
     * 并行执行任务的线程池执行者
     */
    public static final ThreadPoolExecutor THREAD_POOL_EXECUTOR;

    static {
        /*
            在静态代码块中初始化线程池
            在构造函数中对线程池进行配置 , 配置内容包括 :
            核心线程数
            最大线程数
            非核心线程最大限制时间
            闲置时间的时间单位
            线程池任务队列
            线程创建工厂
         */
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }

    public static void main(String[] args) {

        THREAD_POOL_EXECUTOR.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                // 自定义任务被拒绝后的处理策略
                System.out.println("任务被拒绝");
            }
        });

        /*
            线程池中只有 128 个任务队列 , 一次性写入 150 个
            任务队列满了以后, 会拒绝任务
            报错信息如下 :
            Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task kim.hsl.threadpool.ThreadPool$2@1f32e575 rejected from java.util.concurrent.ThreadPoolExecutor@279f2327[Running, pool size = 17, active threads = 17, queued tasks = 128, completed tasks = 0]
            at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
            at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
            at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
            at kim.hsl.threadpool.ThreadPool.main(ThreadPool.java:90)

         */
        for(int i = 0; i < 150; i ++ ){
            THREAD_POOL_EXECUTOR.execute(
                    new Runnable() {
                        @Override
                        public void run() {
                            try {
                                Thread.sleep(1_000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
            );
        }

    }
    
}

运行结果 :

---- IntelliJ IDEA coverage runner ---- 
sampling ...
include patterns:
kim\.hsl\.threadpool\..*
exclude patterns:
任务被拒绝
任务被拒绝
任务被拒绝
任务被拒绝
任务被拒绝
Class transformation time: 0.018846207s for 141 classes or 1.336610425531915E-4s per class