zl程序教程

您现在的位置是:首页 >  Java

当前栏目

Java的所有线程知识精华全在CompletableFuture了

2023-04-18 14:27:49 时间

目录

1、Thread,Runnable,Callable

1.1 线程的概念

1.2 线程的创建

1.2.1 直接创建线程

1.2.2 实现Runnable定义任务

1.2.3 通过callable 创建可以有返回值的任务

2、function,consumer

3、Executor,Executors,ExecutorService

3.1 为什么要使用线程池

3.2Executor

3.3 ExecutorService

3.4 Executors

4、CompletableFuture

4.1 CompletionStage

4.2 CompletableFuture

4.3 看下源码到底咋回事

4.3.1 线程池怎么回事

4.3.2 怎么串行的?

总结


今天在读项目代码的过程中发现了项目中有CompletableFuture的使用,虽然很早就知道这个类,也会使用但是从来没有探究代码的实现逻辑,今天凑着一个机会从里到外扒一扒这个类,希望能讲明白。

1、Thread,Runnable,Callable

1.1 线程的概念

先讲一讲线程,我想刚入门的同学都知道线程是什么,线程是为了提升cpu利用效率,防止阻塞的执行单位,举个例子,比如你正在做饭,发现家里没有酱油了,这时候你有两个选择,一个是停下手里的活,自己去打酱油,另外一个方式就是叫你儿子去打酱油,你儿子就是相当于一个新线程,帮你解决问题。打酱油这件事情就是一个任务。

1.2 线程的创建

所有的事情都可以从生活中发现蛛丝马迹,从上面的例子中我们可以看到,线程就是一个执行单位,可以具象理解为就是一个人,而要做的事情可以理解为一件任务,所以迁移到程序中我们可以理解线程。

Thread -> 人

Runnable->没有回复的任务

callable -> 有回复的任务

Runnable 和 callable 的不同就是有没有回复结果,比如在开发中发出命令不需要回复,当存储数据库的时候不关注结果,只是发出动作可以使用runnable,比如第一个例子的打酱油,是需要计算结果的,这个时候使用callable是合适的。

1.2.1 直接创建线程

通过继承Thread 创建线程,覆盖实现run方法,将任务代码写在run内,通过start 启动线程。

class MyThread extends Thread{  // 继承Thread类,作为线程的实现类
    private String name ;       // 表示线程的名称
    public MyThread(String name){
        this.name = name ;      // 通过构造方法配置name属性
    }
    public void run(){  // 覆写run()方法,作为线程任务
        for(int i=0;i<10;i++){
            System.out.println(name + "运行,i = " + i) ;
        }
    }
};
public class ThreadDemo{
    public static void main(String args[]){
        MyThread mt1 = new MyThread("香菜A ") ;    // 实例化对象
        MyThread mt2 = new MyThread("香菜B ") ;    // 实例化对象
        mt1.start() ;   // 开始执行任务吧
        mt2.start() ;   // 开始执行任务吧
    }
}

1.2.2 实现Runnable定义任务

Runnable 是一个接口,也就是一个规范,定义了任务的基本方法run,这个在有些也叫契约(垃圾概念)

这种方式是任务和线程进行分离,在启动线程的时候告诉他执行什么任务。只定义任务,随便来个线程都可以执行

class MyThread implements Runnable{ // 实现Runnable接口
    public void run(){  // 覆写run()方法
        while(true){
            System.out.println(Thread.currentThread().getName() + "在运行。") ;
        }
    }
};
public class ThreadDemo{
    public static void main(String args[]){
        MyThread mt = new MyThread() ;  // 实例化Runnable子类对象
        Thread t = new Thread(mt,"线程");     // 实例化Thread对象
        t.setDaemon(true) ; // 此线程在后台运行
        t.start() ; // 启动线程
    }
};

1.2.3 通过callable 创建可以有返回值的任务

实现 Callable 接口, 相较于实现 Runnable 接口的方式,方法可以有返回值,并且可以抛出异常。

使用Callable 需要有 FutureTask的支持,再次使用上面打酱油的例子,你在让你儿子打酱油的时候说,等会打酱油回来之后可以直接放在桌子的左上角,你在做饭的过程中只要发现左上角有酱油就代表任务完成了。

FutureTask 可以理解为约定的一个地方,线程执行完之后就会把结果放在FutureTask的result容器中。具体的可以参照:《多线程系列二》不理解future怎么能有future?

 
public class TestCallable {
    public static void main(String[] args) {
        ThreadDemo td = new ThreadDemo();
        //1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。
        FutureTask<Integer> result = new FutureTask<>(td);
        new Thread(result).start();
        //2.接收线程运算后的结果
        try {
            Integer sum = result.get();  //FutureTask 可用于 闭锁 类似于CountDownLatch的作用,在所有的线程没有执行完成之后这里是不会执行的
            System.out.println(sum);
            System.out.println("------------------------------------");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
 
}
 
class ThreadDemo implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
 
        for (int i = 0; i <= 100000; i++) {
            sum += i;
        }
        return sum;
    }
}

小结:在理解多线程的时候主要映射到现实生活的需求中,Thread 就是一个人,需要做的事情就是任务,任务分为有回复的任务和无回复的任务,也就分出Runnable和Callable 了。

2、function,consumer

第二个话题其实是Java8 的新特性,在之前的工作经历中,发现还有很多人停留在Java6的语法上,不愿意学习也不愿意接受新的语法特性,究其原因是没理解,不能很好的掌握,这个我也写过一篇文章,里面有详细的说明。

一篇文章掌握lambda,function下41个类 我相信读完一定能一下掌握所有的function。

简单一句话概括,所有的function功能上和定义的函数一样,只不过定义了通用的结构,相当于通用的规则,只要填写代码就可以了,而这一组函数按不同的需求可以分为

Function 有返回值的函数,使用accept函数

Consumer 必须有参数的函数,使用apply函数

Supplier 有返回值的函数,使用get函数

format,png

3、Executor,Executors,ExecutorService

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3BlcmZlY3QyMDEx,size_16,color_FFFFFF,t_70

3.1 为什么要使用线程池

线程池这个大家都知道,是为了提高效率,可以类比生活,如果开个店,需要几个员工,正常的操作都是雇佣员工,而不是每天使用临时工,这样用完就解雇掉,对于店主来说招人的成本太高,还需要培训,我想正常的都不会这么做,线程池也是同样的道理,避免了创建和销毁线程的开销。

3.2Executor

java线程池中的一个顶级接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类,一般来说,在Executor中,可以使用Executor而不用显示地创建线程:

executor.execute(new RunnableTask()); // 异步执行

3.3 ExecutorService

ExecutorService:是一个比Executor使用更广泛的子类接口,提供了生命周期管理的方法,返回 Future 对象,以及可跟踪一个或多个异步任务执行状况返回Future的方法;可以调用ExecutorService的shutdown()方法来关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。

3.4 Executors

Executors是个静态工厂类。它通过静态工厂方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3BlcmZlY3QyMDEx,size_16,color_FFFFFF,t_70

newScheduledThreadPool 定时执行的线程池

newCachedThreadPool 缓存使用过的线程

newFixedThreadPool 固定数量的线程池

newWorkStealingPool 将大任务分解为小任务的线程池

可以看到提供了一些具体的线程池模型,可以根据自己的需求使用。

4、CompletableFuture

终于到我今天想说的了,上面的基本上都可以在我的博客中找到相关的专题,今天主要聊一下CompletableFuture。

4.1 CompletionStage

CompletionStage 的接口一般都返回新的CompletionStage,表示执行完一些逻辑后,生成新的CompletionStage,构成链式的阶段型的操作。ef1c998d327447fbaa3db2cf6a6861ee.png

CompletionStage的接口方法可以从多种角度进行分类,可

以从函数的命名和函数进行分类

每个函数名可以拆分为三段,第一个单词表示触发时机,第二个表述stage之间的关系,最后一个表示执行的方式

从函数的参数可以看到主要是有返回值的function,消费型的Consumer以及runnable

4.2 CompletableFuture

先来个例子吧,可以运行下面的例子看下打酱油的全过程,并不影响你做菜。

    public static void main(String[] args) throws InterruptedException {
        CompletableFuture
                //让儿子去打酱油
                .supplyAsync(()-> {
                    try {
                        System.out.println("儿子跑步去打酱油");
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println("酱油打好了");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "酱油";
                })
                //告诉我一声
                .thenAccept(oil->{
                    System.out.println("做菜用酱油:" + oil);
                });
        System.out.println("继续做菜");
        Thread.currentThread().join();
    }

一个completetableFuture就代表了一个任务,看名字就知道和Future的关系。使用future需要等待isDone为true才能知道任务跑完了。或者就是用get方法调用的时候会出现阻塞。而使用completableFuture的使用就可以用then,when等等操作来防止以上的阻塞和轮询isDone的现象出现。

4.3 看下源码到底咋回事

4.3.1 线程池怎么回事

从上面的源码我们看到并没有我们之前知道的线程池相关的东西,也没使用线程池,到底是怎么做的呐?

打开源码瞧一瞧。

   public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

Supplier 是一个有返回值的函数,在上面的例子中我们反悔了一个酱油的字符串。

在上面的return语句中我们看到使用了一个asyncPool,这是个什么东西?我们找下定义

    /**
     * Default executor -- ForkJoinPool.commonPool() unless it cannot
     * support parallelism.
     */
    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

从上面的代码注释中我们看到,这是一个缺省的线程池,使用ForkJoinPool.commonPool(),这下我们明白了原来是使用了缺省的线程池。这个线程池默认创建的线程数是 CPU 的核数

4.3.2 怎么串行的?

CompletionStage的作用就是为了链式编程而存在的,所以可以猜测下CompletionStage 扮演了重要的作用,看下源码吧。

   //1
   public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    //2
    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                     Supplier<U> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));
        return d;
    }

asyncSupplyStage()方法中,调用指定的线程池,并执行execute(new AsyncSupply(d,f)),这里d就是我们的“源任务”,接下来thenApply()要依赖着这个源任务进行后续逻辑操作,f是Supplier的函数式编程。

 
  static final class AsyncSupply<T> extends ForkJoinTask<Void>
            implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<T> dep; Supplier<T> fn;
        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
            this.dep = dep; this.fn = fn;
        }
​
        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) {}
        public final boolean exec() { run(); return true; }
​
        public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }
    }

在run()方法里。在run()方法里,先判断d.result == null,判断该任务是否已经完成,防止并发情况下其他线程完成此任务了。f.get()就是调用的Supplier的函数式编程。

主线程会在asyncSupplyStage()方法中返回d,就是我们的“依赖任务”,而这个任务此时还处在阻塞中。接下来main线程会继续执行CompletableFuture的thenAccept(Comsumer<? super T> action)方法,然后调用CompletableFuture的uniAcceptStage()方法。

CompletableFuture中有“源任务”和“依赖任务”,“源任务”的完成能够触发“依赖任务”的执行,这里的完成可以是返回正常结果或者是异常。

总结

CompletableFuture 是一个自带缺省线程池的,并且支持链式编程的,免去线程之间关系的类,在多线程编程中可以减少代码量,减少线程的调用,推荐

码字不易,求三连,求支持,为爱发电