Java 内Fork/Join框架的尝试
整个周末 都是在咳嗽中度过。夏日的感冒 感觉要死的节奏。没法思考 没法学习 是最尴尬的事情 喝了药就想睡觉。
那么 只能把存货 上一下。
Fork / Join框架是使用并发分治法解决问题的框架。引入它们是为了补充现有的并发API。在介绍它们之前 现有的Executor Service实现是运行异步任务的流行选择 但是当任务同质且独立时 它们会发挥最佳作用。运行依赖的任务并使用这些实现来组合其结果并不容易。随着Fork / Join框架的引入 试图解决这一缺陷。
解决非阻塞任务
简单粗暴地直接跳入代码。在代码中创建一个任务 该任务将返回列表的所有元素的总和。以下步骤以伪代码表示我们的算法
01.查找列表的中间索引
02.在中间划分列表
03.递归创建一个新任务 该任务将计算左侧部分的总和
04.递归创建一个新任务 该任务将计算正确部分的总和
05.将左总和 中间元素和右总和的结果相加
Slf4j public class ListSummer extends RecursiveTask Integer { private final List Integer listToSum; ListSummer(List Integer listToSum) { this.listToSum listToSum; Override protected Integer compute() { if (listToSum.isEmpty()) { log.info( 空集合 return 0; int middleIndex listToSum.size() / 2; log.info( 集合 {}, 中部索引: {} , listToSum, middleIndex); List Integer leftSublist listToSum.subList(0, middleIndex); List Integer rightSublist listToSum.subList(middleIndex 1, listToSum.size()); ListSummer leftSummer new ListSummer(leftSublist); ListSummer rightSummer new ListSummer(rightSublist); leftSummer.fork(); rightSummer.fork(); Integer leftSum leftSummer.join(); Integer rightSum rightSummer.join(); int total leftSum listToSum.get(middleIndex) rightSum; log.info( 左部分和 {}, 右部分和 {}, 总和 {} , leftSum, rightSum, total); return total; }
首先 我们通过继承ForkJoinTask的子类RecursiveTask。这是能够在执行并发任务并返回结果时的子类。当任务不返回结果而仅执行效果时 继承递归操作RecursiveAction的子类。对于我们解决的大多数实际任务 这两个子类就足够了。
其次 RecursiveTask和RecursiveAction都定义了一种抽象计算方法compute。这是进行计算执行代码的地方。
第三 在计算方法内部 检查通过构造函数传递的列表的大小。如果为空 则已经知道总和的结果为零 然后立即返回。否则 将列表分为两个子列表 并创建List Summer类型的两个实例。然后 在这两个实例上调用fork 在ForkJoinTask中定义 方法
leftSummer.fork(); rightSummer.fork();
导致将这些任务安排为异步执行的原因 稍后将在本文中解释用于此目的的确切机制。
之后 调用join 方法 也在ForkJoinTask中定义 等待这两个部分的结果
Integer leftSum leftSummer.join(); Integer rightSum rightSummer.join();
然后将其与列表的中间元素相加以获得最终结果。
代码内添加了许多日志消息 以使更易于理解。
注 当我们处理包含数千个条目的列表时 进行详细的日志记录 尤其是记录整个列表 对于解决方案来说其实很不优雅。
差不多了 现在 为测试运行创建一个测试类
public class ListSummerTest { Test public void shouldSumEmptyList() { ListSummer summer new ListSummer(List.of()); ForkJoinPool forkJoinPool new ForkJoinPool(); forkJoinPool.submit(summer); int result summer.join(); assertThat(result).isZero(); Test public void shouldSumListWithOneElement() { ListSummer summer new ListSummer(List.of(5)); ForkJoinPool forkJoinPool new ForkJoinPool(); forkJoinPool.submit(summer); int result summer.join(); assertThat(result).isEqualTo(5); Test public void shouldSumListWithMultipleElements() { ListSummer summer new ListSummer(List.of( 1, 2, 3, 4, 5, 6, 7, 8, 9 ForkJoinPool forkJoinPool new ForkJoinPool(); forkJoinPool.submit(summer); int result summer.join(); assertThat(result).isEqualTo(45); }
测试类中 创建一个 ForkJoinPool的实例。
ForkJoinPool是用于运行Fork Join任务的独特Executor Service实现。它采用一种称为工作窃取算法的特殊算法。与其他执行器服务实现相反 在其他实现器服务实现中 只有一个队列包含要执行的所有任务 在工作窃取实现中 每个工作线程都获得其工作队列。每个线程都从其队列开始执行任务。具体的原理 可以查看源代码进行查看。
当我们检测到Fork Join Task可以分解为多个较小的子任务时 便将它们分解为较小的任务 然后在这些任务上调用fork 方法。这种调用导致子任务被推入执行线程的队列中。在执行期间 当一个线程用尽队列/没有要执行的任务时 它可以从另一线程的队列中“窃取”任务 因此称为“工作窃取” 。与使用任何其他Executor Service实现相比 这种窃取行为可以带来更高的吞吐量。
之前 当我们在代码中执行leftSummer和rightSummer对象的fork 时 它们被推入执行线程的工作队列中 之后它们被池中的其他活动线程“窃取” 依此类推)。
解决阻塞任务
刚才解决的问题本质上是非阻塞的。如果想解决一个阻塞操作的问题 那么为了获得更好的吞吐量 则需要改变策略。
用另一个代码实例来描述一下。假设要创建一个非常简单的网页爬虫。该爬虫将接收HTTP链接列表 执行GET请求以获取响应体 然后计算响应长度。
这是代码–
Slf4j public class ResponseLengthCalculator extends RecursiveTask Map String, Integer { private final List String links; ResponseLengthCalculator(List String links) { this.links links; Override protected Map String, Integer compute() { if (links.isEmpty()) { log.info( 没获取更多链接 return Collections.emptyMap(); int middle links.size() / 2; log.info( 索引: {} , links, middle); ResponseLengthCalculator leftPartition new ResponseLengthCalculator(links.subList(0, middle)); ResponseLengthCalculator rightPartition new ResponseLengthCalculator(links.subList(middle 1, links.size())); log.info( 执行左区域加入 leftPartition.fork(); log.info( 左区域被加入 现在执行右区域加入 rightPartition.fork(); log.info( 右区域被加入 String middleLink links.get(middle); HttpRequester httpRequester new HttpRequester(middleLink); String response; try { log.info( 执行回调{} , middleLink); ForkJoinPool.managedBlock(httpRequester); response httpRequester.response; } catch (InterruptedException ex) { log.error( 发生异常 , ex); response Map String, Integer responseMap new HashMap (links.size()); Map String, Integer leftLinks leftPartition.join(); responseMap.putAll(leftLinks); responseMap.put(middleLink, response.length()); Map String, Integer rightLinks rightPartition.join(); responseMap.putAll(rightLinks); log.info( 左部链接集合{}, 响应长度 {}, 右部链接集合{} , leftLinks, response.length(), rightLinks); return responseMap; private static class HttpRequester implements ForkJoinPool.ManagedBlocker { private final String link; private String response; private HttpRequester(String link) { this.link link; Override public boolean block() { HttpGet headRequest new HttpGet(link); CloseableHttpClient client HttpClientBuilder .create() .disableRedirectHandling() .build(); try { log.info( 正在执行请求{} , link); CloseableHttpResponse response client.execute(headRequest); log.info( 链接url{}已经被请求 , link); this.response EntityUtils.toString(response.getEntity()); } catch (IOException e) { log.error( 当从链接url{}中获取响应报错:{} , link, e.getMessage()); this.response return true; Override public boolean isReleasable() { return false; }
上述代码创建了一个ForkJoinPool.Managed Blocker的实现类HttpRequester 在其中放置阻塞的HTTP调用。该接口定义了两个方法-block 和is Releasable 。block 方法是我们进行阻塞调用的地方。在完成阻塞操作之后 我们返回true 指示不再需要进一步的阻塞。我们从is Releasable 实现中返回false 以向fork-join工作线程指示block 方法实现实际上可能在阻塞。is Releasable 实现将在调用block 方法之前先由fork-join工作线程调用。最后 我们通过调用Fork Join Pool.managed Block 静态方法将Http Requester实例提交到池中。之后 我们的阻止任务将开始执行。当它阻止HTTP请求时 Fork Join Pool.managed Block 方法还将安排在必要时激活备用线程 以确保足够的并行性。
测试代码如下所示:
public class ResponseLengthCalculatorTest { Test public void shouldReturnEmptyMapForEmptyList() { ResponseLengthCalculator responseLengthCalculator new ResponseLengthCalculator(Collections.emptyList()); ForkJoinPool pool new ForkJoinPool(); pool.submit(responseLengthCalculator); Map String, Integer result responseLengthCalculator.join(); assertThat(result).isEmpty(); Test public void shouldHandle200Ok() { ResponseLengthCalculator responseLengthCalculator new ResponseLengthCalculator(List.of( http://huclele.ml/200 ForkJoinPool pool new ForkJoinPool(); pool.submit(responseLengthCalculator); Map String, Integer result responseLengthCalculator.join(); assertThat(result) .hasSize(1) .containsKeys( http://huclele.ml/200 ) .containsValue(0); Test public void shouldFetchResponseForDifferentResponseStatus() { ResponseLengthCalculator responseLengthCalculator new ResponseLengthCalculator(List.of( http://huclele.ml/200 , http://huclele.ml/302 , http://huclele.ml/404 , http://huclele.ml/502 ForkJoinPool pool new ForkJoinPool(); pool.submit(responseLengthCalculator); Map String, Integer result responseLengthCalculator.join(); assertThat(result) .hasSize(4); }
总结
今天 聊了Fork/Join的用法 以及两个实例 还有更多玩法 可以学习。有什么问题 留言给我o~
【Java框架型项目从入门到装逼】第七节 - 学生管理系统项目搭建 本次的教程是打算用Spring,SpringMVC以及传统的jdbc技术来制作一个简单的增删改查项目,对用户信息进行增删改查,就这么简单。
相关文章
- Java三方---->pdf框架之IText的使用
- java笔试题
- java 通用查询框架Querydsl 简介
- Java基础-ArrayList和LinkedList的区别
- Java 流程控制语句——循环结构
- 52类110个主流Java组件和框架
- Java入门到精通——框架篇之Spring源码分析Spring两大核心类
- 最适合Java开发者的大数据工具和框架
- java基础(四)-----抽象类与接口
- 【Java 集合】集合框架 JCF
- java后台框架 springmvc整合mybatis框架源码 java图片爬虫 bootstrap html5 mysql oracle
- 转:初学 Java Web 开发,请远离各种框架,从 Servlet 开发
- java spring 框架学习
- Java中"=="与equals()的区别
- Java和C有什么区别,应该学习Java还是C
- JAVA三大框架的各自作用
- Install Java JDK JRE on Ubuntu/Debian with Apt-Get
- 全面理解Java内存模型(JMM)及volatile关键字
- Java实现多文件压缩打包的方法
- 【基于MVC+Swing+Java的愤怒的小鸟游戏的设计与实现(效果+代码+论文 获取~~)】
- Java报错Exception in thread “main“ java.lang.ExceptionInInitializerError
- java 测试框架
- java-框架-Quartz
- java-框架-spring概述
- java集合框架复习----(3)Set
- 《架构探险——从零开始写Java Web框架》这书不错,能看懂的入门书
- 聊聊这些年我们用过的Java日志框架
- Java集合框架:WeakHashMap