zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Java 内Fork/Join框架的尝试

JAVA框架 Join 尝试 fork
2023-09-27 14:25:56 时间
写在前面

整个周末 都是在咳嗽中度过。夏日的感冒 感觉要死的节奏。没法思考 没法学习 是最尴尬的事情 喝了药就想睡觉。

那么 只能把存货 上一下。

Fork / Join框架是使用并发分治法解决问题的框架。引入它们是为了补充现有的并发API。在介绍它们之前 现有的Executor Service实现是运行异步任务的流行选择 但是当任务同质且独立时 它们会发挥最佳作用。运行依赖的任务并使用这些实现来组合其结果并不容易。随着Fork / Join框架的引入 试图解决这一缺陷。


解决非阻塞任务

简单粗暴地直接跳入代码。在代码中创建一个任务 该任务将返回列表的所有元素的总和。以下步骤以伪代码表示我们的算法

01.查找列表的中间索引

02.在中间划分列表

03.递归创建一个新任务 该任务将计算左侧部分的总和

04.递归创建一个新任务 该任务将计算正确部分的总和

05.将左总和 中间元素和右总和的结果相加

 Slf4j

public class ListSummer extends RecursiveTask Integer {

 private final List Integer listToSum;

 ListSummer(List Integer listToSum) {

 this.listToSum listToSum;

 Override

 protected Integer compute() {

 if (listToSum.isEmpty()) {

 log.info( 空集合 

 return 0;

 int middleIndex listToSum.size() / 2;

 log.info( 集合 {}, 中部索引: {} , listToSum, middleIndex);

 List Integer leftSublist listToSum.subList(0, middleIndex);

 List Integer rightSublist listToSum.subList(middleIndex 1, listToSum.size());

 ListSummer leftSummer new ListSummer(leftSublist);

 ListSummer rightSummer new ListSummer(rightSublist);

 leftSummer.fork();

 rightSummer.fork();

 Integer leftSum leftSummer.join();

 Integer rightSum rightSummer.join();

 int total leftSum listToSum.get(middleIndex) rightSum;

 log.info( 左部分和 {}, 右部分和 {}, 总和 {} , leftSum, rightSum, total);

 return total;

}

首先 我们通过继承ForkJoinTask的子类RecursiveTask。这是能够在执行并发任务并返回结果时的子类。当任务不返回结果而仅执行效果时 继承递归操作RecursiveAction的子类。对于我们解决的大多数实际任务 这两个子类就足够了。

其次 RecursiveTask和RecursiveAction都定义了一种抽象计算方法compute。这是进行计算执行代码的地方。

第三 在计算方法内部 检查通过构造函数传递的列表的大小。如果为空 则已经知道总和的结果为零 然后立即返回。否则 将列表分为两个子列表 并创建List Summer类型的两个实例。然后 在这两个实例上调用fork 在ForkJoinTask中定义 方法

leftSummer.fork();

rightSummer.fork();

导致将这些任务安排为异步执行的原因 稍后将在本文中解释用于此目的的确切机制。

之后 调用join 方法 也在ForkJoinTask中定义 等待这两个部分的结果

Integer leftSum leftSummer.join();

Integer rightSum rightSummer.join();

然后将其与列表的中间元素相加以获得最终结果。

代码内添加了许多日志消息 以使更易于理解。

注 当我们处理包含数千个条目的列表时 进行详细的日志记录 尤其是记录整个列表 对于解决方案来说其实很不优雅。

差不多了 现在 为测试运行创建一个测试类

public class ListSummerTest {

 Test

 public void shouldSumEmptyList() {

 ListSummer summer new ListSummer(List.of());

 ForkJoinPool forkJoinPool new ForkJoinPool();

 forkJoinPool.submit(summer);

 int result summer.join();

 assertThat(result).isZero();

 Test

 public void shouldSumListWithOneElement() {

 ListSummer summer new ListSummer(List.of(5));

 ForkJoinPool forkJoinPool new ForkJoinPool();

 forkJoinPool.submit(summer);

 int result summer.join();

 assertThat(result).isEqualTo(5);

 Test

 public void shouldSumListWithMultipleElements() {

 ListSummer summer new ListSummer(List.of(

 1, 2, 3, 4, 5, 6, 7, 8, 9

 ForkJoinPool forkJoinPool new ForkJoinPool();

 forkJoinPool.submit(summer);

 int result summer.join();

 assertThat(result).isEqualTo(45);

}

测试类中 创建一个 ForkJoinPool的实例。

ForkJoinPool是用于运行Fork Join任务的独特Executor Service实现。它采用一种称为工作窃取算法的特殊算法。与其他执行器服务实现相反 在其他实现器服务实现中 只有一个队列包含要执行的所有任务 在工作窃取实现中 每个工作线程都获得其工作队列。每个线程都从其队列开始执行任务。具体的原理 可以查看源代码进行查看。

当我们检测到Fork Join Task可以分解为多个较小的子任务时 便将它们分解为较小的任务 然后在这些任务上调用fork 方法。这种调用导致子任务被推入执行线程的队列中。在执行期间 当一个线程用尽队列/没有要执行的任务时 它可以从另一线程的队列中“窃取”任务 因此称为“工作窃取” 。与使用任何其他Executor Service实现相比 这种窃取行为可以带来更高的吞吐量。

之前 当我们在代码中执行leftSummer和rightSummer对象的fork 时 它们被推入执行线程的工作队列中 之后它们被池中的其他活动线程“窃取” 依此类推)。


解决阻塞任务

刚才解决的问题本质上是非阻塞的。如果想解决一个阻塞操作的问题 那么为了获得更好的吞吐量 则需要改变策略。

用另一个代码实例来描述一下。假设要创建一个非常简单的网页爬虫。该爬虫将接收HTTP链接列表 执行GET请求以获取响应体 然后计算响应长度。

这是代码–

 Slf4j

public class ResponseLengthCalculator extends RecursiveTask Map String, Integer {

 private final List String links;

 ResponseLengthCalculator(List String links) {

 this.links links;

 Override

 protected Map String, Integer compute() {

 if (links.isEmpty()) {

 log.info( 没获取更多链接 

 return Collections.emptyMap();

 int middle links.size() / 2;

 log.info( 索引: {} , links, middle);

 ResponseLengthCalculator leftPartition new ResponseLengthCalculator(links.subList(0, middle));

 ResponseLengthCalculator rightPartition new ResponseLengthCalculator(links.subList(middle 1, links.size()));

 log.info( 执行左区域加入 

 leftPartition.fork();

 log.info( 左区域被加入 现在执行右区域加入 

 rightPartition.fork();

 log.info( 右区域被加入 

 String middleLink links.get(middle);

 HttpRequester httpRequester new HttpRequester(middleLink);

 String response;

 try {

 log.info( 执行回调{} , middleLink);

 ForkJoinPool.managedBlock(httpRequester);

 response httpRequester.response;

 } catch (InterruptedException ex) {

 log.error( 发生异常 , ex);

 response 

 Map String, Integer responseMap new HashMap (links.size());

 Map String, Integer leftLinks leftPartition.join();

 responseMap.putAll(leftLinks);

 responseMap.put(middleLink, response.length());

 Map String, Integer rightLinks rightPartition.join();

 responseMap.putAll(rightLinks);

 log.info( 左部链接集合{}, 响应长度 {}, 右部链接集合{} , leftLinks, response.length(), rightLinks);

 return responseMap;

 private static class HttpRequester implements ForkJoinPool.ManagedBlocker {

 private final String link;

 private String response;

 private HttpRequester(String link) {

 this.link link;

 Override

 public boolean block() {

 HttpGet headRequest new HttpGet(link);

 CloseableHttpClient client HttpClientBuilder

 .create()

 .disableRedirectHandling()

 .build();

 try {

 log.info( 正在执行请求{} , link);

 CloseableHttpResponse response client.execute(headRequest);

 log.info( 链接url{}已经被请求 , link);

 this.response EntityUtils.toString(response.getEntity());

 } catch (IOException e) {

 log.error( 当从链接url{}中获取响应报错:{} , link, e.getMessage());

 this.response 

 return true;

 Override

 public boolean isReleasable() {

 return false;

}

上述代码创建了一个ForkJoinPool.Managed Blocker的实现类HttpRequester 在其中放置阻塞的HTTP调用。该接口定义了两个方法-block 和is Releasable 。block 方法是我们进行阻塞调用的地方。在完成阻塞操作之后 我们返回true 指示不再需要进一步的阻塞。我们从is Releasable 实现中返回false 以向fork-join工作线程指示block 方法实现实际上可能在阻塞。is Releasable 实现将在调用block 方法之前先由fork-join工作线程调用。最后 我们通过调用Fork Join Pool.managed Block 静态方法将Http Requester实例提交到池中。之后 我们的阻止任务将开始执行。当它阻止HTTP请求时 Fork Join Pool.managed Block 方法还将安排在必要时激活备用线程 以确保足够的并行性。

测试代码如下所示:

public class ResponseLengthCalculatorTest {

 Test

 public void shouldReturnEmptyMapForEmptyList() {

 ResponseLengthCalculator responseLengthCalculator new ResponseLengthCalculator(Collections.emptyList());

 ForkJoinPool pool new ForkJoinPool();

 pool.submit(responseLengthCalculator);

 Map String, Integer result responseLengthCalculator.join();

 assertThat(result).isEmpty();

 Test

 public void shouldHandle200Ok() {

 ResponseLengthCalculator responseLengthCalculator new ResponseLengthCalculator(List.of(

 http://huclele.ml/200 

 ForkJoinPool pool new ForkJoinPool();

 pool.submit(responseLengthCalculator);

 Map String, Integer result responseLengthCalculator.join();

 assertThat(result)

 .hasSize(1)

 .containsKeys( http://huclele.ml/200 )

 .containsValue(0);

 Test

 public void shouldFetchResponseForDifferentResponseStatus() {

 ResponseLengthCalculator responseLengthCalculator new ResponseLengthCalculator(List.of(

 http://huclele.ml/200 ,

 http://huclele.ml/302 ,

 http://huclele.ml/404 ,

 http://huclele.ml/502 

 ForkJoinPool pool new ForkJoinPool();

 pool.submit(responseLengthCalculator);

 Map String, Integer result responseLengthCalculator.join();

 assertThat(result)

 .hasSize(4);

}

总结

今天 聊了Fork/Join的用法 以及两个实例 还有更多玩法 可以学习。有什么问题 留言给我o~


【Java框架型项目从入门到装逼】第七节 - 学生管理系统项目搭建 本次的教程是打算用Spring,SpringMVC以及传统的jdbc技术来制作一个简单的增删改查项目,对用户信息进行增删改查,就这么简单。