CyclicBarrier的使用
2023-03-07 09:39:47 时间
CyclicBarrier
CyclicBarrier机制
和join()类似, 对于需要等待多个线程执行完成后再继续的场景, 都可以使用CyclicBarrier. 其使用方法是在主线程创建一个CyclicBarrier实例, 设置信号量, 设置结束时的回调方法, 然后在各个工作子线程的末尾调用这个CyclicBarrier实例的await()方法. 其时间顺序为:
- 子线程在调用await()时, 会阻塞在那里直到CyclicBarrier实例的信号量变0
- 当调用await()的子线程数量达到时, 执行若CyclicBarrier实例预先设置的回调方法
- 子线程await()得到返回结果, 这个结果是其调用await时CyclicBarrier实例的信号量, 按从大到小, 值为0的就是最后一个子线程.
使用示例
设置5个工作线程, 在线程都完成后打印输出
public class CyclicBarrierTest { private CyclicBarrier barrier; public void start() { this.barrier = new CyclicBarrier(5, ()->System.out.println("Last thread arrived.")); for (int i = 0; i < 5; i++) { int k = i; new Thread(()->{ try { Thread.sleep((long)(Math.random() * 1000)); System.out.println(k + ": awaiting"); System.out.println(k + ": await returned, barrier remains:" + barrier.await()); } catch (Exception e) { System.out.println("Exception occurs"); } }).start(); } } public static void main(String[] args) { new CyclicBarrierTest().start(); System.out.println("End of code reached"); } }
输出结果如下, 可以看到start()方法是不会阻塞的, 立即执行后面的代码, 但是程序本身会阻塞直到执行完成.
End of code reached 3: awaiting 2: awaiting 1: awaiting 0: awaiting 4: awaiting Last thread arrived. 4: await returned, barrier remains:0 0: await returned, barrier remains:1 1: await returned, barrier remains:2 2: await returned, barrier remains:3 3: await returned, barrier remains:4
在很多场景中, 是需要start()阻塞的, 这样方便做循环, 例如一批处理完成后, 再处理下批, 这需要一个小小的技巧, 即多设置一个信号量, 并在主线程中await(), 代码如下
public class CyclicBarrierTest { private CyclicBarrier barrier; public void start() { this.barrier = new CyclicBarrier(3, ()->System.out.println("Last thread arrived.")); for (int i = 0; i < 2; i++) { int k = i; new Thread(()->{ try { Thread.sleep((long)(Math.random() * 1000)); System.out.println(k + ": awaiting"); barrier.await(); } catch (Exception e) { System.out.println("Exception occurs"); } }).start(); } try { this.barrier.await(); } catch (BrokenBarrierException | InterruptedException e) { System.out.println("Exception occurs"); } } public static void main(String[] args) { for (int i = 0; i < 3; i++) { new CyclicBarrierTest().start(); System.out.println("End of cycle reached"); } } }
输出结果如下, 可以看到多增加的一个await()使得start()被阻塞, 并等到全部完成后才开始下一轮
1: awaiting 0: awaiting Last thread arrived. End of cycle reached 0: awaiting 1: awaiting Last thread arrived. End of cycle reached 0: awaiting 1: awaiting Last thread arrived. End of cycle reached
区分ExecutorCompletionService的使用场景
ExecutorCompletionService是CompletionService在JUC里唯一的实现类, 这个类实现了非排序异步获取任务结果的功能, 在功能上和CyclicBarrier相似. 在具体使用场景上, 如果子任务数大于工作线程数, 且需要等待多于线程的结果返回后一并处理的, 用ExecutorCompletionService会更加简便. 这个是JDK文档中提供的示例代码
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); int n = solvers.size(); List<Future<Result>> futures = new ArrayList<Future<Result>>(n); Result result = null; try { for (Callable<Result> s : solvers) futures.add(ecs.submit(s)); for (int i = 0; i < n; ++i) { try { Result r = ecs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) {} } } finally { for (Future<Result> f : futures) f.cancel(true); } if (result != null) use(result); }
相关文章
- 在 Go 里用 CGO?这 7 个问题你要关注!
- 9款优秀的去中心化通讯软件 Matrix 的客户端
- 求职数据分析,项目经验该怎么写
- 在OKR中,我看到了数据驱动业务的未来
- 火山引擎云原生大数据在金融行业的实践
- OpenHarmony富设备移植指南(二)—从postmarketOS获取移植资源
- 《数据成熟度指数》报告:64%的企业领袖认为大多数员工“不懂数据”
- OpenHarmony 小型系统兼容性测试指南
- 肯睿中国(Cloudera):2023年企业数字战略三大趋势预测
- 适用于 Linux 的十大命令行游戏
- GNOME 截图工具的新旧截图方式
- System76 即将推出的 COSMIC 桌面正在酝酿大变化
- 2GB 内存 8GB 存储即可流畅运行,Windows 11 极致精简版系统 Tiny11 发布
- 迎接 ecode:一个即将推出的具有全新图形用户界面框架的现代、轻量级代码编辑器
- loongarch架构介绍(三)—地址翻译
- Go 语言怎么解决编译器错误“err is shadowed during return”?
- 敏捷:可能被开发人员遗忘的部分
- Denodo预测2023年数据管理和分析的未来
- 利用数据推动可持续发展
- 在 Vue3 中实现 React 原生 Hooks(useState、useEffect),深入理解 React Hooks 的