zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Java源码分析之CountDownLatch

JAVA源码 分析 CountDownLatch
2023-09-27 14:29:33 时间

        一、CountDownLatch介绍

       CountDownLatch是一种同步手段,允许一个或者更多的线程等待,直到在其他线程正在执行的一组操作完成。给定count数目后CountDownLatch被初始化。await()方法阻塞,直到由于调用countDown()方法,当前count值达到0,之后所有等待线程被释放,而任何后续await()方法的调用会立即返回。这个是只有一次的现场,即count值无法被重设。如果你需要一个能够重设count值的版本,不妨考虑使用CyclicBarrier。

        二、CountDownLatch应用

        CountDownLatch是一个通用的同步工具,可用于许多目的。一个用count值为1来初始化的CountDownLatch可用作一个开关或者门闩:所有的线程调用await()方法等待一个线程调用countDown()方法后把门打开。一个用count值为N来初始化的CountDownLatch可用作使得一个线程等待,直到N个线程完成各自的事情,或者一些action被完成N次等。CountDownLatch一个有用的特性是:它不需要线程调用countDown()方法等待计数达到零在继续之前,它只是阻止任何线程继续过去一个等待,直到所有线程可以通过。

        1、示例应用一

        这里有一对类,一组工作线程使用两个countdown latches的示例:

        第一个是启动信号,阻止worker线程工作直到driver准备好;

        第二个是完成信号,允许driver等到所有的workers工作完成。

       代码如下:

package com.pengli.jdk;

import java.util.concurrent.CountDownLatch;

public class TestCountDownLatch {

 class Driver {

 void main() throws InterruptedException {

 CountDownLatch startSignal = new CountDownLatch(1);

 CountDownLatch doneSignal = new CountDownLatch(20);

 for (int i = 0; i ++i) // create and start threads

 new Thread(new Worker(startSignal, doneSignal)).start();

 doSomethingElse(); // dont let run yet

 startSignal.countDown(); // let all threads proceed

 doSomethingElse();

 doneSignal.await(); // wait for all to finish

 void doSomethingElse() {

 // ...

 class Worker implements Runnable {

 private final CountDownLatch startSignal;

 private final CountDownLatch doneSignal;

 Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {

 this.startSignal = startSignal;

 this.doneSignal = doneSignal;

 public void run() {

 try {

 startSignal.await();

 doWork();

 doneSignal.countDown();

 } catch (InterruptedException ex) {

 } // return;

 void doWork() {

 // ...

        2、示例应用二

        另一个典型用法是将一个问题分成n份,在一个线程中定义并执行一份,并在latch中count down,然后将所有的线程放入一个队列。当所有的部分完成,协调线程将会通过await()方法,继续处理。当线程必须以这种方式反复count down时,使用CyclicBarrier。

        代码如下:

package com.pengli.jdk;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.Executor;

import java.util.concurrent.Executors;

public class TestCountDownLatch2 {

 class Driver2 { // ...

 void main() throws InterruptedException {

 CountDownLatch doneSignal = new CountDownLatch(20);

 Executor e = Executors.newFixedThreadPool(20);

 for (int i = 0; i ++i) // create and start threads

 e.execute(new WorkerRunnable(doneSignal, i));

 doneSignal.await(); // wait for all to finish

 class WorkerRunnable implements Runnable {

 private final CountDownLatch doneSignal;

 private final int i;

 WorkerRunnable(CountDownLatch doneSignal, int i) {

 this.doneSignal = doneSignal;

 this.i = i;

 public void run() {

 doWork(i);

 doneSignal.countDown();

 void doWork(int i) {

 // ...

        三、CountDownLatch实现分析

        1、Sync

        在CountDownLatch内部,有一个Sync的同步器,它继承自java.util.concurrent包中各种同步工具共用的AbstractQueuedSynchronizer,其实现如下:

 /**

 * Synchronization control For CountDownLatch.

 * Uses AQS state to represent count.

 private static final class Sync extends AbstractQueuedSynchronizer {

 private static final long serialVersionUID = 4982264981922014374L;

 Sync(int count) {

 setState(count);

 int getCount() {

 return getState();

 protected int tryAcquireShared(int acquires) {

 return (getState() == 0) ? 1 : -1;

 protected boolean tryReleaseShared(int releases) {

 // Decrement count; signal when transition to zero

 for (;;) {

 int c = getState();

 if (c == 0)

 return false;

 int nextc = c-1;

 if (compareAndSetState(c, nextc))

 return nextc == 0;

 }
        关于AbstractQueuedSynchronizer,有其它的文章进行专门的介绍。这里只分析下Sync的实现。其有一个需要入参int count的构造函数,设置AbstractQueuedSynchronizer的state。并覆写了tryAcquireShared()和tryReleaseShared()方法,其中tryReleaseShared()方法用于CountDownLatch的countDown()方法,这个tryReleaseShared()方法的逻辑如下:

        在一个for循环内,首先通过getState()获取state值,如果为0,直接返回false,否则取state-1,并尝试CAS操作,修改state状态,并且state等于0,返回true,否则返回false。

        tryAcquireShared()方法更简单,判断state(即count值),如果等于0,返回1,否则返回-1.

        2、countDown()

        countDown()方法的实现很简单,如下:

 /**

 * Decrements the count of the latch, releasing all waiting threads if

 * the count reaches zero.

 * p If the current count is greater than zero then it is decremented.

 * If the new count is zero then all waiting threads are re-enabled for

 * thread scheduling purposes.

 * p If the current count equals zero then nothing happens.

 public void countDown() {

 sync.releaseShared(1);

 }
        其核心处理就是减少latch中count值,如果cout值为0,释放所有的等待线程。它调用的是sync的releaseShared()方法,而这个方法是在AbstractQueuedSynchronizer中实现的,如下:

 /**

 * Releases in shared mode. Implemented by unblocking one or more

 * threads if {@link #tryReleaseShared} returns true.

 * @param arg the release argument. This value is conveyed to

 * {@link #tryReleaseShared} but is otherwise uninterpreted

 * and can represent anything you like.

 * @return the value returned from {@link #tryReleaseShared}

 public final boolean releaseShared(int arg) {

 if (tryReleaseShared(arg)) {

 doReleaseShared();

 return true;

 return false;

 }
        先调用tryReleaseShared()方法,即上述Sync的同名方法,并且如果返回true的话,继续调用doReleaseShared()方法,返回true,否则返回false。即如果修改后state(即count)值为正,不做其他处理,否则调用doReleaseShared()方法。

        3、await()

         await()方法的核心作用是,让当前线程阻塞,直到latch的count值更改为0,或者当前线程被interrupted。如果count值为0,则await()方法直接返回。代码如下:

 /**

 * Causes the current thread to wait until the latch has counted down to

 * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.

 * p If the current count is zero then this method returns immediately.

 * p If the current count is greater than zero then the current

 * thread becomes disabled for thread scheduling purposes and lies

 * dormant until one of two things happen:

 * ul 

 * li The count reaches zero due to invocations of the

 * {@link #countDown} method; or

 * li Some other thread {@linkplain Thread#interrupt interrupts}

 * the current thread.

 * /ul 

 * p If the current thread:

 * ul 

 * li has its interrupted status set on entry to this method; or

 * li is {@linkplain Thread#interrupt interrupted} while waiting,

 * /ul 

 * then {@link InterruptedException} is thrown and the current threads

 * interrupted status is cleared.

 * @throws InterruptedException if the current thread is interrupted

 * while waiting

 public void await() throws InterruptedException {

 sync.acquireSharedInterruptibly(1);

 }
        还是借助的sync,调用的其acquireSharedInterruptibly()方法,这个方法是在Sync的父类中实现的,代码如下:

 /**

 * Acquires in shared mode, aborting if interrupted. Implemented

 * by first checking interrupt status, then invoking at least once

 * {@link #tryAcquireShared}, returning on success. Otherwise the

 * thread is queued, possibly repeatedly blocking and unblocking,

 * invoking {@link #tryAcquireShared} until success or the thread

 * is interrupted.

 * @param arg the acquire argument

 * This value is conveyed to {@link #tryAcquireShared} but is

 * otherwise uninterpreted and can represent anything

 * you like.

 * @throws InterruptedException if the current thread is interrupted

 public final void acquireSharedInterruptibly(int arg)

 throws InterruptedException {

 if (Thread.interrupted())

 throw new InterruptedException();

 if (tryAcquireShared(arg) 0)

 doAcquireSharedInterruptibly(arg);

 }
        先调用Sync的tryAcquireShared()方法,如果返回值为负值,则调用doAcquireSharedInterruptibly()方法。上面讲到了,如果state(即count)值为0,则返回1,方法直接返回,否则进入doAcquireSharedInterruptibly()方法,实现阻塞。

        doReleaseShared()和doAcquireSharedInterruptibly()方法的介绍参见AbstractQueuedSynchronizer的分析文章。







一文带你理解java中的同步工具类CountDownLatch 这篇文章主要讲解java中一个比较常用的同步工具类CountDownLatch,不管是在工作还是面试中都比较常见。我们将通过案例来进行讲解分析。