JUC并发编程随记二——CompletableFuture
1、CompletableFuture
1.1、简介
JDK8设计出CompletableFuture,提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
(1)CompletionStage
- CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段。
- 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x->square(x)).thenAccept(x->System.out.print(x)).thenRnu(()->System.out.println())
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是有多个阶段一起触发。
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。
(2)CompletableFuture - 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式计算处理结果,也提供了转换和组合CompletableFuture的方法。
- 它可能代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。
- 它实现类Future和CompletionStage接口。
1.2、CompletableFuture四大静态方法
1.2.1、runAsync(Runnable()):无返回值两种
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} after
* it runs the given action.
*
* @param runnable the action to run before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor after it runs the given
* action.
*
* @param runnable the action to run before completing the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
(1)runAsync(Runnable runnable)
/**
* @Description: 1、runAsync(Runnable()):无返回值
* @Author: yangyb
* @Date:2022/9/25 16:48
* Version: 1.0
**/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 不建议直接new使用
// CompletableFuture completableFuture = new CompletableFuture();
// 推荐使用四大静态方法
// 1、runAsync(Runnable()):无返回值
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(completableFuture.get());
}
}
没指定Executor的方法,直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码。
如果指定,则使用我们自定义的或者特别指定的线程执行异步代码。
(2)unAsync(Runnable runnable,Executor executor)
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Description: 1、runAsync(Runnable()):无返回值,指定Executors参数
* @Author: yangyb
* @Date:2022/9/25 16:48
* Version: 1.0
**/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 不建议直接new使用
// CompletableFuture completableFuture = new CompletableFuture();
// 推荐使用四大静态方法
// 1、runAsync(Runnable()):无返回值
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},threadPool);
System.out.println(completableFuture.get());
threadPool.shutdown();
}
}
1.2.2、supplyAsync(Supplier supplier)有返回值两种
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} with
* the value obtained by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor with the value obtained
* by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
(1)supplyAsync(Supplier supplier)
/**
* @Description: 1、supplyAsync(Supplier<U> supplier):有返回值
* @Author: yangyb
* @Date:2022/9/25 16:48
* Version: 1.0
**/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello supplyAsync";
});
System.out.println(completableFuture.get());
}
}
(2)supplyAsync(Supplier supplier,Executor executor)
import java.util.concurrent.*;
/**
* @Description: 1、supplyAsync(Supplier<U> supplier,Executor executor):有返回值
* @Author: yangyb
* @Date:2022/9/25 16:48
* Version: 1.0
**/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello supplyAsync";
},threadPool);
System.out.println(completableFuture.get());
threadPool.shutdown();
}
}
1.3、通用异步编程
从JAVA8开始引入了CompletableFuture,它是future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回 调对象的回调方法。
/**
* @Description: 1、supplyAsync(Supplier<U> supplier,Executor executor):有返回值
* @Author: yangyb
* @Date:2022/9/25 16:48
* Version: 1.0
**/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
try{
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+"--come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("1秒后的结果是:"+result);
return result;
},threadPool).whenComplete((v,e)->{
if(e==null){
System.out.println("--计算完成,更新系统:"+v);
}
}).exceptionally(e->{
e.printStackTrace();
System.out.println("异常情况:"+e.getCause()+" "+e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
CompletableFuture的优点:
(1)异步任务结束时,会自动回调某个对象的方法。
(2)主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行。
(3)异步任务出错时,会自动回调某个对象的方法。
1.4、链式语法和join方法介绍
1.5、电商比价案列
package com.company.juc;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @Description: TODO
* @Author: yangyb
* @Date:2022/9/26 22:09
* Version: 1.0
**/
public class CompletableFutureMallDemo {
static List<NetMall> list= Arrays.asList(
new NetMall("jd"),
new NetMall("dangdang"),
new NetMall("taobao"),
new NetMall("tianmao")
);
public static List<String> getPrice(List<NetMall> list,String productName){
return list.stream().
map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName))).
collect(Collectors.toList());
}
public static List<String> getPriceByCompletableFuture(List<NetMall> list,String productName){
return list.stream()
.map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + "in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName))))
.collect(Collectors.toList()).stream().map(CompletableFuture::join)
.collect(Collectors.toList());
}
public static void main(String[] args){
long startTime = System.currentTimeMillis();
List<String> list1 = getPrice(list, "mysql");
list1.forEach(System.out::println);
long endTime =System.currentTimeMillis();
System.out.println("-------costTime: "+(endTime-startTime)+" 毫秒");
long startTime2 = System.currentTimeMillis();
List<String> list2 = getPriceByCompletableFuture(list, "mysql");
list2.forEach(System.out::println);
long endTime2 =System.currentTimeMillis();
System.out.println("-------costTime: "+(endTime2-startTime2)+" 毫秒");
}
}
class NetMall{
public String netMallName;
public double calcPrice(String productName){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ThreadLocalRandom.current().nextDouble()*2+productName.charAt(0);
}
public NetMall(String netMallName) {
this.netMallName = netMallName;
}
public String getNetMallName() {
return netMallName;
}
}
2、CompletableFuture常用方法
2.1、获得结果和触发计算
。。。。。。。。。。
2.2、对计算结果进行处理
。。。。。。。。。。
2.3、对计算结果进行消费
/**
* @Description:
* @Author: yangyb
* @Date:2022/9/27 21:25
* Version: 1.0
**/
public class CompletableFutureAPI3Demo {
public static void main(String[] args){
// thenRun(Runnable runnable),任务A执行完B,并且B不需要A的结果
System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenRun(()->{}).join());
// thenAccept(Consumer action),任务A执行完B,B需要A的结果,但是任务B无返回值
System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenAccept(System.out::println).join());
// thenAccept,任务A执行完B,B需要A的结果,同时任务B有返回值
System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenApply((r)-> r+"resultB").join());
}
}
import java.util.concurrent.CompletableFuture;
/**
* @Description: thenAccept,接收任务的处理结果,并消费处理,无返回结果
* @Author: yangyb
* @Date:2022/9/27 21:25
* Version: 1.0
**/
public class CompletableFutureAPI3Demo {
public static void main(String[] args){
CompletableFuture.supplyAsync(()->{
return 1;
}).thenApply(f->{
return f+2;
}).thenApply(f->{
return f+3;
}).thenAccept(System.out::println);
}
}
2.4、对计算速度进行选择
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* @Description: TODO
* @Author: yangyb
* @Date:2022/9/27 22:16
* Version: 1.0
**/
public class CompletableFutureFastDemo {
public static void main(String[] args){
CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
System.out.println("A come in");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "playA";
});
CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
System.out.println("B come in");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "playB";
});
CompletableFuture<String> result = playA.applyToEither(playB, f -> {
return f + "is winner";
});
System.out.println(Thread.currentThread().getName()+" "+"------: "+result.join());
}
}
2.5、对计算结果进行合并
import java.util.concurrent.CompletableFuture;
/**
* @Description: thenCombine
* @Author: yangyb
* @Date:2022/9/27 22:32
* Version: 1.0
**/
public class CompletableFutureCombineDemo {
public static void main(String[] args){
CompletableFuture<Integer> completableFutureA = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+" ---启动");
return 10;
});
CompletableFuture<Integer> completableFutureB = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+" ---启动");
return 13;
});
CompletableFuture<Integer> result = completableFutureA.thenCombine(completableFutureB, (x, y) -> {
System.out.println("--------结果合并");
return x + y;
});
System.out.println(result.join());
}
}
3、CompletableFuture之线程池运行选择
-
1、没有传入自定义线程池,都用默认线程池ForkJoinPool;
-
2、传入了一个自定义线程池,如果你执行第一个任务的时候,传入了一个自定义线程池:
调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池 -
3、备注
有可能处理太快,系统优化切换原则,直接使用main线程处理。
其它如: thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理。
相关文章
- Jease 2.6发布 Java开源内容框架
- JVM调优总结:反思
- JVM调优总结:调优方法
- JVM调优总结:新一代的垃圾回收算法
- JVM调优总结:典型配置举例
- JVM调优总结:分代垃圾回收详述
- JVM调优总结:垃圾回收面临的问题
- JVM调优总结:基本垃圾回收算法
- JVM调优总结:一些概念
- 用Java GUI编写的画板程序
- Java的动态绑定机制
- jOOQ 2.0.2发布 Java的ORM框架
- Java中带复选框的树的实现和应用
- Java网络编程菜鸟进阶:TCP和套接字入门
- 甲骨文与谷歌专利权之争定于今年三月开审
- Java调用C/C++编写的第三方dll动态链接库
- 集成开发环境 NetBeans IDE 7.1正式版发布
- kangle 2.7.5紧急发布 防hash碰撞攻击
- 东方通技术引领模式为国产软件“争权”
- UML中关联,组合与聚合等关系的辨析