zl程序教程

您现在的位置是:首页 >  Java

当前栏目

Java多线程的Callable, Future, FutureCallback, CompletableFuture

2023-03-07 09:40:05 时间

Callable可以看成是一个增强版的Runnable, 带返回结果, 需要通过Future或者FutureTask来提交任务或运行线程, 然后通过Future/FutureTask的get方法得到返回结果.

Callable在子线程中运行, 在主线程中异步得到执行结果(get()方法是阻塞的), 或者检查是否已取消, 是否已完成(检查取消和完成的方法是非阻塞的)

通过Thread子线程启动

这种方式, 需要创建一个FutureTask对象, 再用这个FutureTask对象创建一个Thread来运行. 后续操作都通过FutureTask进行.

public class DemoCallableFuture {
    public static void main(String[] args) {
        FutureTask<String> task = new FutureTask<>(()->{
            System.out.println("task start");
            Thread.sleep(1000);
            System.out.println("task done");
            return "task get";
        });

        new Thread(task).start();

        FutureTask<String> task2 = new FutureTask<>(()->{
            System.out.println("task2 start");
            Thread.sleep(1000);
            System.out.println("task2 done");
            return "task2 get";
        });

        new Thread(task2).start();

        if (task.isCancelled()) {
            System.out.println("task cancelled yes");
        } else {
            System.out.println("task cancelled no");
        }
        if (task.isDone()) {
            System.out.println("task done yes");
        } else {
            System.out.println("task done no");
        }

        try {
            System.out.println(task.get());
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
        }

        if (task2.isCancelled()) {
            System.out.println("task2 cancelled yes");
        } else {
            System.out.println("task2 cancelled no");
        }
        if (task2.isDone()) {
            System.out.println("task2 done yes");
        } else {
            System.out.println("task2 done no");
        }
        try {
            System.out.println(task2.get());
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
        }
    }
}

运行结果

task start
task cancelled no
task done no
task2 start
task2 done
task done
task get
task2 cancelled no
task2 done yes
task2 get

 

通过ExecutorCompletionService

ExecutorCompletionService是CompletionService在JUC里唯一的实现类, 这个类实现了非排序异步获取任务结果的功能, 通过ExecutorCompletionService.submit() 可以添加Callable并返回Future, 而通过多次调用 ExecutorCompletionService.take().get(), 可以按任务完成的顺序依次取回结果. 避免了使用Future.get()时, 无法根据结果返回次序获取结果的问题.

 

通过CompletableFuture

CompletableFuture.supplyAsync()可以输入一个Supplier<U>作为参数, 这个参数和Callable<U>的作用是相似的. 而后可以用thenApply加上结果处理方法, 适合全异步的处理, 例如

public void asyncGet(String url, Charset charset, Function<Response, Response> function) {
    CompletableFuture.supplyAsync(() -> get(url, charset)).thenApply(function);
}

 

通过ExecutorService线程池启动

这种方式, 通过线程池submit一个Callable对象, 就会得到一个Future对象, 根据这个Future对象做后续操作

public class DemoCallableFuture2 {
    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(4);
        Future<String> future = service.submit(()->{
            System.out.println("task start");
            Thread.sleep(1000);
            System.out.println("task done");
            return "task get";
        });

        if (future.isCancelled()) {
            System.out.println("task cancelled yes");
        } else {
            System.out.println("task cancelled no");
        }
        if (future.isDone()) {
            System.out.println("task done yes");
        } else {
            System.out.println("task done no");
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        if (future.isCancelled()) {
            System.out.println("task cancelled yes");
        } else {
            System.out.println("task cancelled no");
        }
        if (future.isDone()) {
            System.out.println("task done yes");
        } else {
            System.out.println("task done no");
        }

        try {
            System.out.println(future.get());
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
        }
    }
}

运行结果

task cancelled no
task done no
task start
task cancelled no
task done no
task done
task get

.

FutureCallback

FutureCallback是Google Guava中的一个类, 解决的是Future中get阻塞的问题, 让全过程异步. 需要使用ListeningExecutorService的线程池提交.

代码例子

public class DemoFutureCallback {
    public static void main(String[] args) {
        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

        for (int i = 0; i < 5; i++) {
            int j = i;
            ListenableFuture<String> future = service.submit(()->{
                System.out.println("task start");
                Thread.sleep(1000);
                System.out.println("task done");
                return "task return " + j;
            });

            Futures.addCallback(future, new FutureCallback<String>() {
                @Override
                public void onSuccess(String s) {
                    System.out.println("callback success: " + s);
                }

                @Override
                public void onFailure(Throwable throwable) {
                    throwable.printStackTrace();
                }
            });
        }
        System.out.println("thread ongoing");
        service.shutdown();
    }
}

运行结果

task start
task start
task start
thread ongoing
task start
task start
task done
callback success: task return 0
task done
callback success: task return 1
task done
callback success: task return 3
task done
callback success: task return 4
task done
callback success: task return 2

Process finished with exit code 0