zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

JUC-Java多线程Future,CompletableFuture

2023-03-07 09:44:23 时间

多线程相关概念

1把锁:synchronized

2个并:并发(concurrent)在同一实体上的多个事件,在一台处理器上“同时处理多个任务”,同一时刻,其实是只有一个时间在发生

​        并行(parallel)在不同实体上的多个时间,在多台处理器上同时处理多个任务,同一时刻,大家都在做事情,你做你的,我做到我的,但是我们都在做

3个程:进程:在系统中运行的一个应用程序就是一个进程,每一个进程都有自己的内存空间和系统资源

​         线程:也被成为轻量级进程,在一个进程中会有1个或多个进程,是大多数操作系统进行时序调度的基本单元

​         管程:Monitor(监视器),也就是平时所说的锁

Monitor其实就是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。 JVM中同步是基于进入和退出监视器对象(Monitor,管程对象)来实现的,每个对象实例都会有一个Monitor对象

 Object o = new Object();

    Thread t1 = new Thread(() -> {
            synchronized (o) {
                System.out.println(1);
            }
        }, "t1");
        t1.start();

用户线程和守护线程

一般情况下不做特别说明配置,默认都是用户线程

用户线程(User Thread): 是系统的工作线程,它会完成这个程序需要完成的业务条件。

守护线程(Daemon Thread):是一种特殊的线程为其它线程服务的,在后台默默地完成一些系统性的服务 守护线程作为一个服务线程,没有服务对象就没有必要继续运行了 ,如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可以退出了,所以假如当系统只剩下守护线程的时候,java虚拟机会自动退出。

// 是否是守护线程
t1.isDaemon();
// 设置为守护线程
t1.setDaemon(true);

setDaemon(true)方法必须在start()之前设置

Future接口

Future接口(Future实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其他事情或者执行完,过了一会才去获取子任务的执行结果或变更的任务状态。

总结: Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

FutureTask异步任务

public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask futureTask = new FutureTask(new MyThread());

        Thread t1 = new Thread(futureTask);
        t1.start();

        System.out.println(futureTask.get());
    }

}
class MyThread implements Callable<String>{

    @Override
    public String call() throws Exception {
        System.out.println("come in call----");
        return "hello callable";
    }
}

Future+线程池异步多线程任务配合,能显著提高程序的执行效率。

futureTask.get();
futureTask.isDone();

Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。

CompletableFuture

从jdk1.8开始引入,它是Future的功能增强版,减少阻塞和轮询。可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

    ExecutorService threadPool = Executors.newFixedThreadPool(3);
		
//      CompletableFuture.runAsync() 无返回值
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("come in ---");
            return "hello";
        },threadPool);

        String s = completableFuture.get();
        System.out.println(s);
        threadPool.shutdown();
//这里用自定义线程池,采用默认线程会当作一个守护线程,main方法执行完后future线程还未处理完时会直接关闭
ExecutorService threadPool = Executors.newFixedThreadPool(3);

        try {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("come in ----");
                int i = ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("---1秒后出结果:" + i);
                return i;
            }, threadPool).whenComplete((v, e) -> {
                if (e == null) {
                    System.out.println("计算完成:" + v);
                }
            }).exceptionally(e -> {
                System.out.println(e.getCause());
                return null;
            });
            System.out.println(java.lang.Thread.currentThread().getName() + "去忙其他任务了");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadPool.shutdown();
        }

CompletableFuture join和get区别

在编译时是否报出检查型异常

CompletableFuture的优点

异步任务结束时,会自动回调某个对象的方法 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行 异步任务出错时,会自动回调某个对象的方法

ps:和javascript回调不能说相似,只能说一模一样,一通百通

Lambda表达式+Stream流式调用+CHain链式调用+Java8函数式编程

Runnable

无参数,无返回值

Function

Function<T, R> 接收一个参数,并且有返回值

Consumer

Consumer接收一个参数,并且没有返回值

BiConsumer

BiConsumer<T, U>接收两个参数(Bi,英文单词词根,代表两个的意思),没有返回值

Supplier

Supplier供给型函数接口,没有参数,有一个返回值

Predicate

一般用于做判断,返回boolean类型

总结

函数时接口名称

方法名称

参数

返回值

Runnable

run

无参数

无返回值

Function

apply

1个参数

有返回值

Consumer

accept

1个参数

无返回值

Supplier

get

没有参数

有返回值

BiConsumer

accept

2个参数

无返回值

Predicate

test

1个参数

有返回值(boolean)

日常工作中如何进行开发?

功能→性能,先完成功能实现,再考虑性能优化

CompletableFuture用例

假设要从多个电商平台查询一件商品的价格,每次查询耗时设定为1秒

普通查询:查询电商平台1→查询电商平台2→查询电商平台3 …

CompletableFuture: 同时异步执行要查询的电商平台

public class CompletableFutureDemo {

  static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("dangdang"),
            new NetMall("taobao"),
            new NetMall("pdd"),
            new NetMall("tmall")
    );

    /**
     * step by step 一家家搜查
     * List<NetMall> ----->map------> List<String>
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPrice(List<NetMall> list,String productName)
    {
        //《mysql》 in taobao price is 90.43
        return list
                .stream()
                .map(netMall ->
                        String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }

    /**
     * List<NetMall> ----->List<CompletableFuture<String>>------> List<String>
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPriceByCompletableFuture(List<NetMall> list,String productName)
    {
        return list.stream().map(netMall ->
                        CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName))))
                .collect(Collectors.toList())
                .stream()
                .map(s -> s.join())
                .collect(Collectors.toList());
    }


    public static void main(String[] args)
    {
        long startTime = System.currentTimeMillis();
        List<String> list1 = getPrice(list, "mysql");
        for (String element : list1) {
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");

        System.out.println("--------------------");

        long startTime2 = System.currentTimeMillis();
        List<String> list2 = getPriceByCompletableFuture(list, "mysql");
        for (String element : list2) {
            System.out.println(element);
        }
        long endTime2 = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");
    }
}

class NetMall
{
    @Getter
    private String netMallName;

    public NetMall(String netMallName)
    {
        this.netMallName = netMallName;
    }

    public double calcPrice(String productName)
    {
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }

        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}

执行效率显著提高

CompletableFuture常用API

获取结果和触发计算

T get(); 容易造成阻塞,非得拿到结果,否则不往下执行

T get(long timeout, TimeUnit unit); 指定时间后还没拿到结果,直接TimtoutException

T join(); 和get一样,区别在于编译时是否报出检查型异常

T getNow(T valueIfAbsent); 没有计算完成的情况下,返回一个替代结果。立即获取结果不阻塞:计算完,返回计算完成后的结果,没计算完:返回设定的valueIfAbsend值

boolean complete(T value); 是否打断get方法立即返回括号值,计算完:不打断,返回计算后的结果,没计算完:打断返回设定的value值

allOf:多个CompletableFuture任务并发执行,所有CompletableFuture任务完成时,返回一个新的CompletableFuture对象,其返回值为Void,也就是无返回值。该方法的应用之一是在继续程序之前等待一组独立的 CompletableFuture 完成,如:CompletableFuture.allOf(c1, c2, c3).join();

anyOf:多个CompletableFuture任务并发执行,只要有一个CompletableFuture任务完成时,就会返回一个新的CompletableFuture对象,并返回该CompletableFuture执行完成任务的返回值。

对计算结果进行处理

thenApply 计算结果存在依赖关系,将两个线程串行化,由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。

handle 计算结果存在依赖关系,这两个线程串行化。有异常也可以往下一步走,根据带的异常参数可以进一步处理

execptionally类似 try/catch

whenCpmplete和handle类似 try/finally,有异常也会往下执行

对计算结果进行消费

thenAccept 顾名思义,消费型接口。接收任务的处理结果,并消费处理,无返回结果。

对比

API

说明

thenRun

thenRun(Runnable runnable)

任务A执行完执行B,并且B不需要A的结果

thenAccept

thenAccpet(Consumer action)

任务A执行完执行B,B需要A的结果,但是任务B无返回值

thenApply

thenApply(Function fn)

任务A执行完执行B,B需要A的结果,同时任务B有返回值

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());

thenRun和thenRunAsync区别?

  1. 没有传入自定义线程池,都用默认线程池ForkJoinPool
  2. 如果执行第一个任务的时候,传入一个自定义线程池
    1. 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务时使用同一个线程池
    2. 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoinpool线程池
  3. 备注:有可能处理的太快,系统优化切换原则,直接使用main线程处理 其他如:thenAccept和thenAccpetAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理

对计算速度选用

applyToEither对比任务执行速度

   CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
            System.out.println("A come in");
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            return "playA";
        });

        CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
            System.out.println("B come in");
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return "playB";
        });

        CompletableFuture<String> result = playA.applyToEither(playB, f -> {
            return f + " is winer";
        });

        System.out.println(Thread.currentThread().getName()+"\t"+"-----: "+result.join());

对计算结果进行合并

两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理。先完成的先等着,等待其他分支任务。

   public static void main(String[] args) {
        long start = System.currentTimeMillis();
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10;
        });

        CompletableFuture<Integer> future2= CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10;
        });
        CompletableFuture<Integer> future = future1.thenCombine(future2, (x, y) -> {
            System.out.println("-----开始两个结果合并");
            return x + y;
        });
        System.out.println(future.join());
        long end = System.currentTimeMillis();
        System.out.println("耗时:" + (end-start));
    }