定制并发类(八)自定义在 Fork/Join 框架中运行的任务
声明:本文是《 Java 7 Concurrency Cookbook 》的第七章, 作者: Javier Fernández González 译者:郑玉婷
自定义在 Fork/Join 框架中运行的任务
执行者框架分开了任务的创建和运行。这样,你只要实现 Runnable 对象来使用 Executor 对象。你可以发送 Runnable 任务给执行者,然后它会创建,管理,并终结必要的线程来执行这些任务。
Java 7 在 Fork/Join 框架中提供了特殊的执行者。这个框架是设计用来解决那些可以使用 divide 和 conquer 技术分成更小点的任务的问题。在一个任务内,你要检查你要解决的问题的大小,如果它比设定的大小还大,你就把问题分成2个或多个任务,再使用框架来执行这些任务。
如果问题的大小比设定的大小要小,你可以在任务中直接解决问题,可选择返回结果。Fork/Join 框架实现 work-stealing 算法来提高这类问题的整体表现。
Fork/Join 框架的主要类是 ForkJoinPool 类。它内部有以下2个元素:
默认情况,被 ForkJoinPool类执行的任务是 ForkJoinTask 类的对象。你也可以发送 Runnable 和 Callable 对象给 ForkJoinPool 类,但是他们就不能获得所以 Fork/Join 框架的好处。通常情况,你将发送ForkJoinTask 类的这2个子类中的一个给 ForkJoinPool 对象:
RecursiveAction: 如果你的任务没有返回结果 RecursiveTask: 如果你的任务返回结果在这个指南,你将学习如何为 Fork/Join 框架实现你自己的任务,实现一个任务扩展ForkJoinTask类。你将要实现的任务是计量运行时间并写入操控台,这样你可以控制它的进展(evolution)。你也可以实现你自己的 Fork/Join 任务来写日志信息,为了获得在这个任务中使用的资源,或者来 post-process 任务的结果。
怎么做呢…
按照这些步骤来实现下面的例子::
//1. 创建一个类,名为 MyWorkerTask,并特别扩展 ForkJoinTask 类参数化 Void type。 public abstract class MyWorkerTask extends ForkJoinTask Void { //2. 声明一个私有 String 属性,名为 name,用来储存任务的名字。 private String name; //3. 实现类的构造函数,初始化它的属性。 public MyWorkerTask(String name) { this.name=name; //4. 实现 getRawResult() 方法。这是 ForkJoinTask 类的抽象方法之一。由于任务不会返回任何结果,此方法返回的一定是null值。 @Override public Void getRawResult() { return null; //5. 实现 setRawResult() 方法。这是 ForkJoinTask 类的另一个抽象方法。由于任务不会返回任何结果,方法留白即可。 @Override protected void setRawResult(Void value) { //6. 实现 exec() 方法。这是任务的主要方法。在这个例子,把任务的算法委托给 compute() 方法。计算方法的运行时间并写入操控台。 @Override protected boolean exec() { Date startDate=new Date(); compute(); Date finishDate=new Date(); long diff=finishDate.getTime()-startDate.getTime(); System.out.printf("MyWorkerTask: %s : %d Milliseconds to complete.\n",name,diff); return true; //7. 实现 getName() 方法来返回任务的名字。 public String getName(){ return name; //8. 声明抽象方法 compute()。像我们之前提到的,此方法实现任务的算法,必须是由 MyWorkerTask 类的子类实现。 protected abstract void compute(); //9. 创建一个类,名为 Task,延伸 MyWorkerTask 类。 public class Task extends MyWorkerTask { //10. 声明一个私有 int 值 array,名为 array。 private int array[]; //11. 实现类的构造函数,初始化它的属性值。 public Task(String name, int array[], int start, int end){ super(name); this.array=array; this.start=start; this.end=end; //12. 实现 compute() 方法。此方法通过 start 和 end 属性来决定增加array的元素块。如果元素块的元素超过100个,把它分成2部分,并创建2个Task对象来处理各个部分。再使用 invokeAll() 方法把这些任务发送给池。 protected void compute() { if (end-start 100){ int mid=(end+start)/2; Task task1=new Task(this.getName()+"1",array,start,mid); Task task2=new Task(this.getName()+"2",array,mid,end); invokeAll(task1,task2); //13.如果元素块的元素少于100,使用for循环增加全部的元素。 } else { for (int i=start; i i++) { array[i]++; //14. 最后,让正在执行任务的线程进入休眠50毫秒。 try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); //15. 创建例子的主类通过创建一个类,名为 Main 并添加 main()方法。 public class Main { public static void main(String[] args) throws Exception { //16. 创建一个10,000元素的 int array。 int array[]=new int[10000]; //17. 创建一个 ForkJoinPool 对象,名为 pool。 ForkJoinPool pool=new ForkJoinPool(); //18. Create a 创建一个 Task 对象来增加array的全部元素。构造函数的参数是:任务的名字 Task,array对象,和0 和10000来向这个任务表示要处整个array. Task task=new Task("Task",array,0,array.length); //19. 使用 execute() 方法发送任务给池。 pool.invoke(task); //20. 使用 shutdown() 方法关闭池。 pool.shutdown(); //21. 在操控台写个信息表明程序结束。 System.out.printf("Main: End of the program.\n");
它是怎么工作的…
在这个指南,你实现了扩展 ForkJoinTask 类的 MyWorkerTask 类。这是你的基本类,它可以在 ForkJoinPool 执行者中执行,并且可以获得这个执行者的所以好处,如 work-stealing 算法。这个类等同于 RecursiveAction 和 RecursiveTask 类。
当你扩展 ForkJoinTask 类,你必须实现以下3个方法:
setRawResult(): 此方法是用来设立任务的结果。由于你的任务不返回任何结果,所以留白即可。 getRawResult(): 此方法是用来返回任务的结果。由于你的任务不返回任何结果,此方法会返回null值。 exec(): 此方法实现了任务的算法。在这个例子,你把算法委托给抽象方法 compute() (如 RecursiveAction 类和 RecursiveTask 类),且在exec()方法你计算了方法的运行时间,并写入到操控台。最后,在例子的主类,你创建了一个有10,000个元素的array,一个 ForkJoinPool 执行者,和一个处理整个array的 Task 对象。运行程序,你可以发现不同的任务运行后都在操控台写入他们的运行时间。
参见
第五章,Fork/Join 框架:创建 Fork/Join 池
第七章,定制并发类:实现ThreadFactory接口生成自定义的线程给Fork/Join框架
文章转自 并发编程网-ifeve.com
Java7任务并行执行神器:Fork&Join框架 Fork/Join是什么? Fork/Join框架是Java7提供的并行执行任务框架,思想是将大任务分解成小任务,然后小任务又可以继续分解,然后每个小任务分别计算出结果再合并起来,最后将汇总的结果作为大任务结果。其思想和MapReduce的思想非常类似。对于任务的分割,要求各个子任务之间相互独立,能够并行独立地执行任务,互相之间不影响。
【高并发】如何使用Java7中提供的Fork/Join框架实现高并发程序? 在JDK中,提供了这样一种功能:它能够将复杂的逻辑拆分成一个个简单的逻辑来并行执行,待每个并行执行的逻辑执行完成后,再将各个结果进行汇总,得出最终的结果数据。有点像Hadoop中的MapReduce。
ForkJoin是由JDK1.7之后提供的多线程并发处理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值分解成多个计算,然后将各个计算结果进行汇总。相应的,ForkJoin将复杂的计算当做一个任务,而分解的多个计算则是当做一个个子任务来并行执行。
相关文章
- 聊聊并发(八)——Fork/Join框架介绍
- 测试并发应用(三)监控Executor框架
- 定制并发类(八)自定义在 Fork/Join 框架中运行的任务
- 违反并发性: UpdateCommand影响了预期 1 条记录中的 0 条 解决办法
- 高并发高可用系统以及面试分析
- [转] asp.net解决高并发的方案
- Akka框架——第一节:并发编程简介
- Scala Actor并发编程入门示例
- 【项目实战】并发编程之Java集合框架中的一个线程安全的队列实现 ——BlockingQueue入门介绍
- 【项目实战】并发编程之Java集合框架中的一个线程安全的队列实现 —— LinkedBlockingQueue入门介绍
- TCP 网络编程 TCP C/S 架构 socket connect send recv bind listen accept 三次握手 四次挥手 多进程实现并发
- java并发编程之CountDownLatch
- [手游项目3]-9-Go语言sync.Map(在并发环境中使用的map)
- 构建自己的Java并发模型框架
- 第三十八章 linux-并发解决方法三(信号量)
- Java_并发线程_Semaphore、CountDownLatch、CyclicBarrier、Exchanger
- 谈谈高并发系统的限流
- 并发编程— wait 与 notify 为什么是 Object 的成员方法?