zl程序教程

您现在的位置是:首页 >  其他

当前栏目

CyclicBarrier的使用

2023-03-07 09:39:47 时间

CyclicBarrier

CyclicBarrier机制

和join()类似, 对于需要等待多个线程执行完成后再继续的场景, 都可以使用CyclicBarrier. 其使用方法是在主线程创建一个CyclicBarrier实例, 设置信号量, 设置结束时的回调方法, 然后在各个工作子线程的末尾调用这个CyclicBarrier实例的await()方法. 其时间顺序为:

  • 子线程在调用await()时, 会阻塞在那里直到CyclicBarrier实例的信号量变0
  • 当调用await()的子线程数量达到时, 执行若CyclicBarrier实例预先设置的回调方法
  • 子线程await()得到返回结果, 这个结果是其调用await时CyclicBarrier实例的信号量, 按从大到小, 值为0的就是最后一个子线程.

使用示例

设置5个工作线程, 在线程都完成后打印输出

public class CyclicBarrierTest {
    private CyclicBarrier barrier;

    public void start() {
        this.barrier = new CyclicBarrier(5, ()->System.out.println("Last thread arrived."));
        for (int i = 0; i < 5; i++) {
            int k = i;
            new Thread(()->{
                try {
                    Thread.sleep((long)(Math.random() * 1000));
                    System.out.println(k + ": awaiting");
                    System.out.println(k + ": await returned, barrier remains:" + barrier.await());
                } catch (Exception e) {
                    System.out.println("Exception occurs");
                }
            }).start();
        }
    }

    public static void main(String[] args) {
        new CyclicBarrierTest().start();
        System.out.println("End of code reached");
    }
}

输出结果如下, 可以看到start()方法是不会阻塞的, 立即执行后面的代码, 但是程序本身会阻塞直到执行完成.

End of code reached
3: awaiting
2: awaiting
1: awaiting
0: awaiting
4: awaiting
Last thread arrived.
4: await returned, barrier remains:0
0: await returned, barrier remains:1
1: await returned, barrier remains:2
2: await returned, barrier remains:3
3: await returned, barrier remains:4

在很多场景中, 是需要start()阻塞的, 这样方便做循环, 例如一批处理完成后, 再处理下批, 这需要一个小小的技巧, 即多设置一个信号量, 并在主线程中await(), 代码如下

public class CyclicBarrierTest {
    private CyclicBarrier barrier;

    public void start() {
        this.barrier = new CyclicBarrier(3, ()->System.out.println("Last thread arrived."));
        for (int i = 0; i < 2; i++) {
            int k = i;
            new Thread(()->{
                try {
                    Thread.sleep((long)(Math.random() * 1000));
                    System.out.println(k + ": awaiting");
                    barrier.await();
                } catch (Exception e) {
                    System.out.println("Exception occurs");
                }
            }).start();
        }
        try {
            this.barrier.await();
        } catch (BrokenBarrierException | InterruptedException e) {
            System.out.println("Exception occurs");
        }

    }

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            new CyclicBarrierTest().start();
            System.out.println("End of cycle reached");
        }
    }
}

输出结果如下, 可以看到多增加的一个await()使得start()被阻塞, 并等到全部完成后才开始下一轮

1: awaiting
0: awaiting
Last thread arrived.
End of cycle reached
0: awaiting
1: awaiting
Last thread arrived.
End of cycle reached
0: awaiting
1: awaiting
Last thread arrived.
End of cycle reached

区分ExecutorCompletionService的使用场景

ExecutorCompletionService是CompletionService在JUC里唯一的实现类, 这个类实现了非排序异步获取任务结果的功能, 在功能上和CyclicBarrier相似. 在具体使用场景上, 如果子任务数大于工作线程数, 且需要等待多于线程的结果返回后一并处理的, 用ExecutorCompletionService会更加简便. 这个是JDK文档中提供的示例代码

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
     CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
     int n = solvers.size();
     List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
     Result result = null;
     try {
         for (Callable<Result> s : solvers)
             futures.add(ecs.submit(s));
         for (int i = 0; i < n; ++i) {
             try {
                 Result r = ecs.take().get();
                 if (r != null) {
                     result = r;
                     break;
                 }
             } catch (ExecutionException ignore) {}
         }
     }
     finally {
         for (Future<Result> f : futures)
             f.cancel(true);
     }

     if (result != null)
         use(result);
 }