Java源码分析之CountDownLatch
一、CountDownLatch介绍
CountDownLatch是一种同步手段,允许一个或者更多的线程等待,直到在其他线程正在执行的一组操作完成。给定count数目后CountDownLatch被初始化。await()方法阻塞,直到由于调用countDown()方法,当前count值达到0,之后所有等待线程被释放,而任何后续await()方法的调用会立即返回。这个是只有一次的现场,即count值无法被重设。如果你需要一个能够重设count值的版本,不妨考虑使用CyclicBarrier。
二、CountDownLatch应用
CountDownLatch是一个通用的同步工具,可用于许多目的。一个用count值为1来初始化的CountDownLatch可用作一个开关或者门闩:所有的线程调用await()方法等待一个线程调用countDown()方法后把门打开。一个用count值为N来初始化的CountDownLatch可用作使得一个线程等待,直到N个线程完成各自的事情,或者一些action被完成N次等。CountDownLatch一个有用的特性是:它不需要线程调用countDown()方法等待计数达到零在继续之前,它只是阻止任何线程继续过去一个等待,直到所有线程可以通过。
1、示例应用一
这里有一对类,一组工作线程使用两个countdown latches的示例:
第一个是启动信号,阻止worker线程工作直到driver准备好;
第二个是完成信号,允许driver等到所有的workers工作完成。
代码如下:
package com.pengli.jdk; import java.util.concurrent.CountDownLatch; public class TestCountDownLatch { class Driver { void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(20); for (int i = 0; i ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); // dont let run yet startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); // wait for all to finish void doSomethingElse() { // ... class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) { } // return; void doWork() { // ...
2、示例应用二
另一个典型用法是将一个问题分成n份,在一个线程中定义并执行一份,并在latch中count down,然后将所有的线程放入一个队列。当所有的部分完成,协调线程将会通过await()方法,继续处理。当线程必须以这种方式反复count down时,使用CyclicBarrier。
代码如下:
package com.pengli.jdk; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class TestCountDownLatch2 { class Driver2 { // ... void main() throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch(20); Executor e = Executors.newFixedThreadPool(20); for (int i = 0; i ++i) // create and start threads e.execute(new WorkerRunnable(doneSignal, i)); doneSignal.await(); // wait for all to finish class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; public void run() { doWork(i); doneSignal.countDown(); void doWork(int i) { // ...三、CountDownLatch实现分析
1、Sync
在CountDownLatch内部,有一个Sync的同步器,它继承自java.util.concurrent包中各种同步工具共用的AbstractQueuedSynchronizer,其实现如下:
/** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); int getCount() { return getState(); protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; }关于AbstractQueuedSynchronizer,有其它的文章进行专门的介绍。这里只分析下Sync的实现。其有一个需要入参int count的构造函数,设置AbstractQueuedSynchronizer的state。并覆写了tryAcquireShared()和tryReleaseShared()方法,其中tryReleaseShared()方法用于CountDownLatch的countDown()方法,这个tryReleaseShared()方法的逻辑如下:
在一个for循环内,首先通过getState()获取state值,如果为0,直接返回false,否则取state-1,并尝试CAS操作,修改state状态,并且state等于0,返回true,否则返回false。
tryAcquireShared()方法更简单,判断state(即count值),如果等于0,返回1,否则返回-1.
2、countDown()
countDown()方法的实现很简单,如下:
/** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * p If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * p If the current count equals zero then nothing happens. public void countDown() { sync.releaseShared(1); }其核心处理就是减少latch中count值,如果cout值为0,释放所有的等待线程。它调用的是sync的releaseShared()方法,而这个方法是在AbstractQueuedSynchronizer中实现的,如下:
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; return false; }先调用tryReleaseShared()方法,即上述Sync的同名方法,并且如果返回true的话,继续调用doReleaseShared()方法,返回true,否则返回false。即如果修改后state(即count)值为正,不做其他处理,否则调用doReleaseShared()方法。
3、await()
await()方法的核心作用是,让当前线程阻塞,直到latch的count值更改为0,或者当前线程被interrupted。如果count值为0,则await()方法直接返回。代码如下:
/** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}. * p If the current count is zero then this method returns immediately. * p If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of two things happen: * ul * li The count reaches zero due to invocations of the * {@link #countDown} method; or * li Some other thread {@linkplain Thread#interrupt interrupts} * the current thread. * /ul * p If the current thread: * ul * li has its interrupted status set on entry to this method; or * li is {@linkplain Thread#interrupt interrupted} while waiting, * /ul * then {@link InterruptedException} is thrown and the current threads * interrupted status is cleared. * @throws InterruptedException if the current thread is interrupted * while waiting public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }还是借助的sync,调用的其acquireSharedInterruptibly()方法,这个方法是在Sync的父类中实现的,代码如下:
/** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) 0) doAcquireSharedInterruptibly(arg); }先调用Sync的tryAcquireShared()方法,如果返回值为负值,则调用doAcquireSharedInterruptibly()方法。上面讲到了,如果state(即count)值为0,则返回1,方法直接返回,否则进入doAcquireSharedInterruptibly()方法,实现阻塞。
doReleaseShared()和doAcquireSharedInterruptibly()方法的介绍参见AbstractQueuedSynchronizer的分析文章。
一文带你理解java中的同步工具类CountDownLatch 这篇文章主要讲解java中一个比较常用的同步工具类CountDownLatch,不管是在工作还是面试中都比较常见。我们将通过案例来进行讲解分析。
相关文章
- 开涛spring3(12.4) - 零配置 之 12.4 基于Java类定义Bean配置元数据
- Java容器 | 基于源码分析List集合体系
- java.util.HashMap源码要点浅析
- 备战金九银十Java面试八股文来了,包含中级-高级-源码面试题解析,内涵25个专题,200+面试题解析
- 多家大厂JAVA面试题整理分布式+微服务+高并发+性能优调+框架源码
- 基于JAVA SpringBoot的综合博客系统的设计与实现源码
- javascript实现java的map对象,js实现new map()
- Java ArrayList源码剖析
- java.io.ByteArrayOutputStream 源码分析
- java.io.ByteArrayInputStream 源码分析
- java.io.BufferedInputStream 源码分析
- Java集合框架之一:ArrayList源码分析
- 十大基础排序算法[java源码+动静双图解析+性能分析]
- java.util.concurrent解析——FutureTask源码解析
- Java基础知识【下】( 转载)
- java做小游戏扫雷(附源码)
- 浅析Unix domain socket是什么、Java如何使用UnixSocket调用Docker API对容器进行操作(jnr-unixsocket的使用)
- Java虚拟机:深入详细分析Java ClassLoader原理与源码
- java项目之疫情网课管理系统(springboot+vue源码)
- 源码分析 There is no getter for property named '*' in 'class java.lang.String
- 蓝桥杯VIP试题 之 基础练习 芯片测试 - JAVA
- Java NIO——Selector机制源码分析---转
- 曹工说Redis源码(1)-- redis debug环境搭建,使用clion,达到和调试java一样的效果
- Java多线程基础(一)---Thread API(join深度详解、源码分析和案例分析之代码实现,优雅关闭线程三种方式)