zl程序教程

您现在的位置是:首页 >  后端

当前栏目

【详解】线程池及其自定义线程池的实现

线程 实现 详解 自定义 及其
2023-09-11 14:17:55 时间

1. 为什么使用线程池?

线程池是运用场景最多的并发框架,几乎所有需要一步或者并发执行任务的程序都可以使用线程池。使用线程池一般有以下三个好处:

降低资源的消耗,通过重复利用已经创建的线程降低线程创建和销毁造成的消耗。

提高相应速度,当任务到达的时候,任务可以不需要等到线程创建就能立刻执行。

提高线程的可管理性,线程是稀缺资源,使用线程池可以统一的分配、调优和监控。

2. Java中的线程池ThreadPool

  • jdk中的线程池构造方法
public ThreadPoolExecutor(
				int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
             	TimeUnit unit,
                BlockingQueue<Runnable> workQueue,
                ThreadFactory threadFactory,
                RejectedExecutionHandler handler) {...}
  • 线程池参数介绍

1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。

2)runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几 个阻塞队列。

  1. ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原 则对元素进行排序。

  2. LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。

  3. SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。吞吐量通常要高于Linked-BlockingQueue。

  4. PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

3)maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。

4)keepAliveTime(线程活动保持时间): 线程池的工作线程空闲后,保持存活的时间。所以, 如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。

5)ThreadFactory(线程工厂):用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

6)RejectedExecutionHandler(拒绝策略): 当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法 处理新任务时抛出异常。在JDK1.5中Java线程池框架提供了以下4种策略。

AbortPolicy:直接抛出异常。

CallerRunsPolicy:只用调用者所在线程来运行任务。

DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

DiscardPolicy:不处理,丢弃掉。

3. 简单实现一个自定义线程池

  • 创建流程
	 * 创建流程:
     *            1.优先创建正式员工去处理;
     *            2.加入阻塞队列
     *            3.创建零时工去处理
     *            4.拒绝策略
  • 处理模型

在这里插入图片描述

  • 主类
import java.util.concurrent.*;

/**
 * 自定义线程池
 */
public class MyThreadPoolExecutor implements Executor {

    //创建线程的工厂对象
    private final ThreadFactory threadFactory;
    //临时工的空闲时间上限
    private final long keepAliveTime;
    private final TimeUnit unit;
    //当前正式员工的数量
    private int currentCoreSize;
    //最大正式员工的数量
    private final int corePoolSize;

    //当前临时工的数量
    private int currentTemporarySize;
    //临时员工的上限
    private int temporaryPoolSize;

    //传递任务的阻塞队列
    private final BlockingQueue<Runnable> workQueue;

    //构造方法

    /**
     * @param corePoolSize      正式员工的数量
     * @param maximumPoolSize   员工上限
     * @param keepAliveTime     临时工执行时间上限
     * @param unit      	 时间单位
     * @param workQueue      工作队列
     * @param threadFactory 线程工厂
     * @param handler   拒绝策略
     */
    public MyThreadPoolExecutor(
            int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory,
            RejectedExecutionHandler handler
    ){
        this.corePoolSize=corePoolSize;
        this.temporaryPoolSize=maximumPoolSize-corePoolSize;
        this.workQueue=workQueue;
        this.threadFactory=threadFactory;
        this.keepAliveTime =keepAliveTime;
        this.unit=unit;
    }
    //向线程池中提交任务
    @Override
    public void execute(Runnable command) {
                //1.当正式员工的数量小于员工上限时,则优先创建正式员工处理
        if (currentCoreSize <corePoolSize){
            //优先创建正式员工
            //创建一个线程,这个线程中的任务就是不断的取任务-做任务,不考虑退出的问题
            CoreJob job =new CoreJob(workQueue,command);
            //线程工厂创建线程
            Thread thread =threadFactory.newThread(job);

            String name=String.format("正式员工-%d",currentCoreSize);
            thread.setName(name);
            thread.start();

            currentCoreSize++;
            return;
        }
        //走到这里,表示正式员工的数量==正式员工的上限
        //2.将任务放入优先级队列中,如果放入成功,execute执行结束,否则还需要继续
        //带阻塞的放入,需要立即看到结果
         boolean success = workQueue.offer(command);
        if (success==true){
            //说明放入成功
            return;
        }
        //队列已满
        //3.继续判断临时工的数量是否到达上限
        if (currentTemporarySize<temporaryPoolSize){
            //创建临时工进行处理
            //创建一个线程,这个线程中的任务就是不断的取任务-做任务,不考虑退出的问题
            TemporaryJob job =new TemporaryJob(keepAliveTime,unit,workQueue,command);
            //线程工厂创建线程
            Thread thread =threadFactory.newThread(job);

            String name=String.format("临时员工-%d",currentTemporarySize);
            thread.setName(name);
            thread.start();
            currentTemporarySize++;
            return;
        }
        //4.临时员工到达上限,执行拒绝策略
        //为实现方便,暂不考虑其他策略
       throw  new RejectedExecutionException();
    }
}
  • 核心线程任务
import java.util.concurrent.BlockingQueue;

//一个正式员工要要完成的任务
public class CoreJob  implements Runnable{

    //需要阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    private Runnable firstCommand;

    public CoreJob(BlockingQueue<Runnable> workQueue,Runnable firstCommand){
        this.workQueue=workQueue;
        this.firstCommand =firstCommand;
    }
    @Override
    public void run() {
        try {
            //优先把刚提交的任务执行掉
            firstCommand.run();
            firstCommand =null;             //设置为空的目的是为不影响firstCommand对象被GC回收
            while (true){
                //不断的从队列中去取任务,执行
                Runnable command =workQueue.take();
                command.run();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • 临时线程任务
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

//一个临时员工要要完成的任务
public class TemporaryJob implements Runnable{

    //需要阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    //临时工的空闲时间上限
    private final long keepAliveTime;
    private final TimeUnit unit;
    private Runnable firstCommand;

    public TemporaryJob(long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Runnable firstCommand){
        this.keepAliveTime=keepAliveTime;
        this.unit=unit;
        this.workQueue=workQueue;
        this.firstCommand =firstCommand;
    }
    @Override
    public void run() {
        try {
            //优先把刚提交的任务执行掉
            firstCommand.run();
            firstCommand =null;             //设置为空的目的是为不影响firstCommand对象被GC回收

        //与正式工不同的是,临时工在一定时间内没有任务时会退出
            //keepAliveTime+unit 记录起来
            //如果一定时间内无法从任务队列中取出任务,则认为到达时间上限
            while (true){
                //不断的从队列中去取任务,执行

                final Runnable command = workQueue.poll(keepAliveTime, unit);
                if (command==null){
                    //表明没有取到任务,超时时间已到,退出
                    break;
                }
                command.run();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • 测试
import java.util.concurrent.*;

public class Test {
    static  class Task implements Runnable{
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class MyThreadFactory implements ThreadFactory{
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r);
        }
    }
    public static void main(String[] args) {

         BlockingQueue<Runnable> workQueue =new ArrayBlockingQueue<>(5);
        //3正式,2临时,队列中5,最多任务上限10,第11个任务执行拒绝策略
        MyThreadPoolExecutor executor =new MyThreadPoolExecutor(
                3,5,20, TimeUnit.SECONDS,
                workQueue,new MyThreadFactory(),new ThreadPoolExecutor.AbortPolicy()
        );

        //测试,创建线程任务
        for (int i = 0; i <50 ; i++) {
            System.out.println("提交任务:"+i);
            Task task =new Task();
            executor.execute(task);
        }
    }
}
  • 测试结果
    执行第11个任务时,会执行拒绝策略。在这里插入图片描述