zl程序教程

您现在的位置是:首页 >  Java

当前栏目

JUC并发编程随记二——CompletableFuture

2023-04-18 16:52:35 时间

1、CompletableFuture

1.1、简介

JDK8设计出CompletableFuture,提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
在这里插入图片描述
(1)CompletionStage

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段。
  • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x->square(x)).thenAccept(x->System.out.print(x)).thenRnu(()->System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是有多个阶段一起触发。
    代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。
    (2)CompletableFuture
  • 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式计算处理结果,也提供了转换和组合CompletableFuture的方法。
  • 它可能代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。
  • 它实现类Future和CompletionStage接口。

1.2、CompletableFuture四大静态方法

1.2.1、runAsync(Runnable()):无返回值两种


    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} after
     * it runs the given action.
     *
     * @param runnable the action to run before completing the
     * returned CompletableFuture
     * @return the new CompletableFuture
     */
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the given executor after it runs the given
     * action.
     *
     * @param runnable the action to run before completing the
     * returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @return the new CompletableFuture
     */
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }

(1)runAsync(Runnable runnable)

/**
 * @Description: 1、runAsync(Runnable()):无返回值
 * @Author: yangyb
 * @Date:2022/9/25 16:48
 * Version: 1.0
 **/
public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 不建议直接new使用
        // CompletableFuture completableFuture = new CompletableFuture();
        // 推荐使用四大静态方法
        // 1、runAsync(Runnable()):无返回值
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(completableFuture.get());
    }

}

在这里插入图片描述
没指定Executor的方法,直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码。
如果指定,则使用我们自定义的或者特别指定的线程执行异步代码。
(2)unAsync(Runnable runnable,Executor executor)

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Description: 1、runAsync(Runnable()):无返回值,指定Executors参数
 * @Author: yangyb
 * @Date:2022/9/25 16:48
 * Version: 1.0
 **/
public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 不建议直接new使用
        // CompletableFuture completableFuture = new CompletableFuture();
        // 推荐使用四大静态方法
        // 1、runAsync(Runnable()):无返回值
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },threadPool);
        System.out.println(completableFuture.get());
        threadPool.shutdown();
    }

}

在这里插入图片描述

1.2.2、supplyAsync(Supplier supplier)有返回值两种


    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} with
     * the value obtained by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param <U> the function's return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the given executor with the value obtained
     * by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @param <U> the function's return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }

(1)supplyAsync(Supplier supplier)

/**
 * @Description: 1、supplyAsync(Supplier<U> supplier):有返回值
 * @Author: yangyb
 * @Date:2022/9/25 16:48
 * Version: 1.0
 **/
public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello supplyAsync";
        });
        System.out.println(completableFuture.get());
    }

}

在这里插入图片描述
(2)supplyAsync(Supplier supplier,Executor executor)

import java.util.concurrent.*;

/**
 * @Description: 1、supplyAsync(Supplier<U> supplier,Executor executor):有返回值
 * @Author: yangyb
 * @Date:2022/9/25 16:48
 * Version: 1.0
 **/
public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello supplyAsync";
        },threadPool);
        System.out.println(completableFuture.get());
        threadPool.shutdown();
    }

}

在这里插入图片描述

1.3、通用异步编程

从JAVA8开始引入了CompletableFuture,它是future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回 调对象的回调方法。

/**
 * @Description: 1、supplyAsync(Supplier<U> supplier,Executor executor):有返回值
 * @Author: yangyb
 * @Date:2022/9/25 16:48
 * Version: 1.0
 **/
public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        try{
            CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName()+"--come in");
                int result = ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("1秒后的结果是:"+result);
                return result;
            },threadPool).whenComplete((v,e)->{
                if(e==null){
                    System.out.println("--计算完成,更新系统:"+v);
                }
            }).exceptionally(e->{
                e.printStackTrace();
                System.out.println("异常情况:"+e.getCause()+"	"+e.getMessage());
                return null;
            });
            System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadPool.shutdown();
        }
    }

}

在这里插入图片描述
CompletableFuture的优点:
(1)异步任务结束时,会自动回调某个对象的方法。
(2)主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行。
(3)异步任务出错时,会自动回调某个对象的方法。

1.4、链式语法和join方法介绍

在这里插入图片描述

1.5、电商比价案列

package com.company.juc;


import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @Description: TODO
 * @Author: yangyb
 * @Date:2022/9/26 22:09
 * Version: 1.0
 **/
public class CompletableFutureMallDemo {

    static List<NetMall> list= Arrays.asList(
            new NetMall("jd"),
            new NetMall("dangdang"),
            new NetMall("taobao"),
            new NetMall("tianmao")
    );

    public static List<String> getPrice(List<NetMall> list,String productName){
        return list.stream().
                map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName))).
                collect(Collectors.toList());
    }

    public static List<String> getPriceByCompletableFuture(List<NetMall> list,String productName){
        return list.stream()
                .map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + "in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName))))
                .collect(Collectors.toList()).stream().map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

    public static void main(String[] args){

        long startTime = System.currentTimeMillis();
        List<String> list1 = getPrice(list, "mysql");
        list1.forEach(System.out::println);
        long endTime =System.currentTimeMillis();
        System.out.println("-------costTime: "+(endTime-startTime)+" 毫秒");


        long startTime2 = System.currentTimeMillis();
        List<String> list2 = getPriceByCompletableFuture(list, "mysql");
        list2.forEach(System.out::println);
        long endTime2 =System.currentTimeMillis();
        System.out.println("-------costTime: "+(endTime2-startTime2)+" 毫秒");
    }
}

class NetMall{

    public String netMallName;

    public double calcPrice(String productName){
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return ThreadLocalRandom.current().nextDouble()*2+productName.charAt(0);
    }

    public NetMall(String netMallName) {
        this.netMallName = netMallName;
    }

    public String getNetMallName() {
        return netMallName;
    }
    
}

在这里插入图片描述

2、CompletableFuture常用方法

2.1、获得结果和触发计算

。。。。。。。。。。

2.2、对计算结果进行处理

。。。。。。。。。。

2.3、对计算结果进行消费

在这里插入图片描述

/**
 * @Description: 
 * @Author: yangyb
 * @Date:2022/9/27 21:25
 * Version: 1.0
 **/
public class CompletableFutureAPI3Demo {

    public static void main(String[] args){

        // thenRun(Runnable runnable),任务A执行完B,并且B不需要A的结果
        System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenRun(()->{}).join());
        // thenAccept(Consumer action),任务A执行完B,B需要A的结果,但是任务B无返回值
        System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenAccept(System.out::println).join());
        // thenAccept,任务A执行完B,B需要A的结果,同时任务B有返回值
        System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenApply((r)-> r+"resultB").join());
    }
}

在这里插入图片描述

import java.util.concurrent.CompletableFuture;

/**
 * @Description: thenAccept,接收任务的处理结果,并消费处理,无返回结果
 * @Author: yangyb
 * @Date:2022/9/27 21:25
 * Version: 1.0
 **/
public class CompletableFutureAPI3Demo {

    public static void main(String[] args){
        CompletableFuture.supplyAsync(()->{
            return 1;
        }).thenApply(f->{
            return f+2;
        }).thenApply(f->{
            return f+3;
        }).thenAccept(System.out::println);
    }
}

在这里插入图片描述

2.4、对计算速度进行选择

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @Description: TODO
 * @Author: yangyb
 * @Date:2022/9/27 22:16
 * Version: 1.0
 **/
public class CompletableFutureFastDemo {

    public static void main(String[] args){
        CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
            System.out.println("A come in");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "playA";
        });

        CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
            System.out.println("B come in");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "playB";
        });

        CompletableFuture<String> result = playA.applyToEither(playB, f -> {
            return f + "is winner";
        });

        System.out.println(Thread.currentThread().getName()+"	"+"------: "+result.join());
    }
}

在这里插入图片描述

2.5、对计算结果进行合并

import java.util.concurrent.CompletableFuture;

/**
 * @Description: thenCombine
 * @Author: yangyb
 * @Date:2022/9/27 22:32
 * Version: 1.0
 **/
public class CompletableFutureCombineDemo {

    public static void main(String[] args){
        CompletableFuture<Integer> completableFutureA = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName()+"	---启动");
            return 10;
        });

        CompletableFuture<Integer> completableFutureB = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName()+"	---启动");
            return 13;
        });

        CompletableFuture<Integer> result = completableFutureA.thenCombine(completableFutureB, (x, y) -> {
            System.out.println("--------结果合并");
            return x + y;
        });

        System.out.println(result.join());
    }
}

在这里插入图片描述

3、CompletableFuture之线程池运行选择

  • 1、没有传入自定义线程池,都用默认线程池ForkJoinPool;

  • 2、传入了一个自定义线程池,如果你执行第一个任务的时候,传入了一个自定义线程池:
    调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
    调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

  • 3、备注
    有可能处理太快,系统优化切换原则,直接使用main线程处理。
    其它如: thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理。