zl程序教程

您现在的位置是:首页 >  后端

当前栏目

ForkJoin 线程池[通俗易懂]

线程 通俗易懂 ForkJoin
2023-06-13 09:14:40 时间

大家好,又见面了,我是你们的朋友全栈君。

一、分而治之

严格来讲,分而治之不算一种模式,而是一种思想。它可以将一个大任务拆解为若干个小任务并行执行,提高系统吞吐量。主要讲两个场景,Master-Worker 模式,ForkJoin 线程池。

ForkJoin 线程池是Jdk7之后引入的一个并行执行任务的框架。其核心思想是将任务分割为子任务,有可能子任务还是很大,还需要进一步拆解,最终得到足够小的任务。将分割出来的子任务放入双端队列中,然后几个启动线程从双端队列中获取任务执行。子任务执行的结果放到一个队列里,另起线程从队列中获取数据,合并结果。

二、ForkJoin 与传统线程池的区别

采用 “工作窃取”模式(work-stealing):当执行新的任务时,它可以将其拆分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。 相较于一般的线程池,ForkJoin 的优势体现在对其中包含的任务的处理方式上。在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而 ForkJoin,如果某个子问题由于等待另外一个子问题的完成而无法继续运行,那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行。这种方式减少了线程的等待时间,提高了性能。

三、案例

计算从 0 到 100L 的累加求和。CountTask 继承自 RecursiveTask,可以携带返回值。每次分解大任务,简单的将任务划分为 10 个等规模的小任务,并使用 fork() 提交子任务。在子任务中通过 THRESHOLD(门槛) 设置子任务分解的阈值,如果当前需要求和的总数大于 THRESHOLD,则子任务需要再次分解,如果子任务可以直接执行,则进行求和操作,返回结果。最终等待所有的子任务执行完毕,对所有结果求和。

public class CountTask extends RecursiveTask<Long> { 

//任务分解的阈值
private static final int THRESHOLD = 11;
private long start;
private long end;
public CountTask(long start, long end) { 

this.start = start;
this.end = end;
}
public Long compute() { 

long sum = 0;
boolean canCompute = (end - start) < THRESHOLD;
if (canCompute) { 

for (long i = start; i <= end; i++) { 

sum += i;
}
} else { 

//分成100个小任务
long step = (start + end) / 10;
ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
long pos = start;
for (int i = 0; i < 10; i++) { 

long lastOne = pos + step;
if (lastOne > end) { 

lastOne = end;
}
CountTask subTask = new CountTask(pos, lastOne);
pos += step + 1;
//将子任务推向线程池
subTasks.add(subTask);
subTask.fork();
}
for (CountTask task : subTasks) { 

//对结果进行join
sum += task.join();
}
}
return sum;
}
public static void main(String[] args) throws ExecutionException, InterruptedException { 

ForkJoinPool pool = new ForkJoinPool();
// 累加求和 0 -> 100L
CountTask task = new CountTask(0, 100L);
ForkJoinTask<Long> result = pool.submit(task);
System.out.println("sum result : " + result.get());
}
}

ForkJoin 线程池使用一个无锁的栈来管理空闲线程,如果一个工作线程暂时取不到可用的任务,则可能被挂起。挂起的线程将被压入由线程池维护的栈中,待将来有任务可用时,再从栈中唤醒这些线程。Java8 的并行流就是基于 ForkJoin,并进行了优化

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/191575.html原文链接:https://javaforall.cn