zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Java并发 - Java core I

2023-06-13 09:15:37 时间

并发

并发执行的进程数目并不是由CPU数目制约的。 操作系统将CPU的时间片分配给每一个进程,给人并行处理的感觉。

多线程程序在较低的层次上扩展了多任务的概念:一个程序同时执行多个任务。

每个任务称为一个线程(thread),它是线程控制的简称。

可以同时运行一个以上线程的程序称为多线程程序(multithreaded)

多线程与多进程的区别?

  • 本质的区别在于每个进程拥有自己的一整套变量,而线程则共享数据
  • 线程更“轻量级”,创建、撤销一个线程比启动新进程的开销小

什么是线程

使用线程给其他任务提供机会

单独线程执行任务的过程:

  1. 把任务移到实现了Runnable接口的类的run方法中
  2. 由Runnable创建一个Thread对象
  3. 启动线程

注释:也可以通过构建一个Thread类的子类定义一个线程: class MyThread extends Thread{ public void run(){ // do task code } } 然后构建一个子类的对象,调用start方法。 不推荐这种方式。应该将要并行的任务与运行机制解耦合。 如果有多个任务,要为每个任务创建一个独立的线程所付出的代价太大了。 可以使用线程池来解决问题。

警告:不要调用Thread类或Runnable对象的run方法。 直接调用run方法,只会执行同一个线程中的任务,而不会启动新线程。 应该调用Thread.start方法,这个方法将创建一个执行run方法的新线程。

中断线程

当线程的run方法执行方法体中最后一条语句后,并经由执行return语句返回时,或者出现了在方法中没有捕获的异常时,线程将终止。

Java的早期版本中,还有一个stop方法,其他线程可以调用它终止线程,但是已经被弃用了。

没有可以强制线程终止的方法。然而,interrupt方法可以用来请求终止线程。

当一个线程调用interrupt方法时,线程的中断状态被置位。这是每一个线程都具有的Boolean标志。每个线程都应该不时地检查这个标志,以判断线程是否被中断。

可以调用Thread.currentThread方法获取当前线程,然后调用isInterrupt方法,检查中断状态:

while(&Thread.currentThread().isInterrupted() && more word do){
    do more work
}

但是,如果线程被阻塞,就无法检测中断状态。 这是产生InterruptedExcepiton异常的地方。 当一个阻塞的线程(调用sleep或wait)上调用interrupt方法时阻塞调用将会被InterruptedException异常中断

没有任何语言要求一个被中断的线程应该终止。 中断一个线程不过是引起它的注意。 被中断的线程可以决定如何响应中断。某些线程是如此重要以至于应该处理完异常后,继续执行,而不理会异常。更普遍的是,线程将简单地将中断作为一个终止的请求。

Runnable r = () -> { 
    try{
        while(&Thread.currentThread().isInterrupted() && more word do){
            do more work
        }
     }catch(InterruptedException e){
         // thead was interrupted during sleep or wait
     } finally {
         cleaup, if required
     }
    // exiting the run method terminates the thread
}

如果在每次工作迭代之后都调用sleep方法(或其他的可中断方法),isInterrupted检测即没有必要也没有用处。 如果在中断状态被置位时调用sleep方法,它不会休眠。相反,它将清楚这一状态(!)并抛出InterruptedException。因此,循环调用sleep,不会检测中断状态。

Runnable r = () -> { 
    try{
        while(more word to do){
            do more work
            Thread.sleep(delay);
        }
     }catch(InterruptedException e){
         // thead was interrupted during sleep
     } finally {
         cleaup, if required
     }
    // exiting the run method terminates the thread
}

<!--有两个非常类似的方法,interrupted和isInterrupted。Interrupted方法是一个静态方法,它检测当前的线程是否被中断。而且,调用interrupted方法会清楚该线程的中断状态。另一方面,isInterrupted方法是一个实例方法,可以用来检测是否有线程被中断。调用这个方法不会改变中断状态-->

不要将InterruptedException抑制在很低的层次,如果在catch字句中没有什么好的办法的话,有两种合理的选择:

  • 在catch字句中调用Thread.currentThread().interrupt()来设置中断状态。于是,调用者可以对其进行检测。
  • 或者,更好的选择是,throws InterruptedException标记你的方法,不采用try语句块捕获异常。

线程状态

线程的6种状态:

  • New(新建)
  • Runnable(可运行)
  • Blocked(被阻塞)
  • Waiting(等待)
  • Timed Waiting(计时等待)
  • Terminated(终止)

新创建线程

当new操作符创建一个新线程时,该线程还没有开始运行。意味着它的状态是new。当一个线程处于新建状态时,程序还没有开始运行线程中的代码。在线程运行之前还有一些基础工作要做

可运行线程

一旦调用start方法,线程处于runnable状态。一个可运行的线程可能正在运行也可能没有运行,取决于操作系统给线程提供运行的时间(Java规范说明没有将它作为一个单独状态。一个正在运行的线程仍然处于可运行状态)

一旦一个线程开始运行,它不必始终保持运行。事实上,运行中的线程被中断,目的是为了让其他线程获得运行机会。线程调度的细节依赖于操作系统提供的服务。抢占式调度系统给每一个可运行线程一个时间片来执行任务。当时间片用完,操作系统剥夺该线程的运行权,并给另一个线程运行机会。当选择下一个线程时,操作系统考虑线程的优先级。

现在所有的桌面以及服务器操作系统都使用抢占式调度

像手机的小型设备可能使用协作调度。在这样的设备中,一个线程只有在调用yield方法、或者被阻塞或等待时,线程才失去控制权。

具有多个处理器的机器上,每一个处理器运行一个线程,可以有多个线程并行运行。当然如果线程的数目多于处理器的数目,调度器依然采用时间片机制。

任何给定的时刻,一个可运行的线程可能正在运行也可能没有运行。

被阻塞线程和等待线程

当线程处于被阻塞或等待状态时,他暂时不活动。 它不运行任何代码且消耗最少的资源。直到线程调度器重新激活它。细节取决于它是怎样达到非活动状态的。

  • 当一个线程试图获得一个内部的锁对象,而该锁被其他线程持有,该线程进入阻塞状态。当所有其他线程释放该锁,并且线程调度器运行本线程持有它的时候,该线程将变成非阻塞状态。
  • 当线程等待另一个线程通知调度器一个条件时,他自己进入等待状态。在调用Object.wait方法或Thread.join方法,或者是等待java.util.concurrent库种的Lock或Condition时,就会出现这种情况。实际上,被阻塞状态与等待状态有很大不同。
  • 有几个方法有一个超时参数。调用它们导致线程进入计时等待状态。这一状态一致保持到超时期满或者接收到适当的通知。带有超时参数的方法有Thread.sleep和Object.wait、Thread.join、Lock.tryLock以及Condition.await的计时板

当一个线程被阻塞或等待时(或终止时),另一个线程被调度为运行状态。当一个线程被重新激活(例如,因为超时期满或成功地获得了一个锁),调度器检查它是否具有当前运行线程更高的优先级。如果是这样,调度器从当前运行线程中挑选要给,剥夺其运行权,选择一个新的线程运行。

被终止的线程

有两个原因:

  • 因为run方法退出而自然死亡。
  • 因为一个没有捕获的异常终止了run方法而意外死亡

特别是,可以调用线程的stop方法杀死一个线程。该方法抛出ThreadDeath错误对象,由此杀死线程。但是,stop方法已经过时了。

线程属性

线程的各种属性:线程优先级、守护线程、线程组以及处理未捕获异常的处理器。

线程优先级

在Java程序设计语言种,每一个线程有一个优先级。 默认情况下,线程继承它的父线程的优先级。可以用setPriority方法提高或降低任何一个线程的优先级。可以将优先级设置为MIN_PRIORITY与MAX_PRIORITY之间的任意值。NORM_PRIORITY被定义为5;

每当线程调度器有机会选择新线程时,他首先选择具有较高优先级的线程。但是,线程优先级高度依赖于操作系统的。当虚拟机依赖于宿主机平台的线程实现机制时,Java线程的优先级被映射到宿主机平台的优先级上,优先级个数也许更多,也许更少。

不要将程序构建为功能的正确性依赖于优先级。

警告:如果确实要使用优先级,应该避免初学者常犯的一个错误。如果有几个高优先级的线程没有进入非活动状态,低优先级的线程可能永远也不能执行。每当调度器决定运行一个新线程时首先会在具有高优先级的线程中进行选择,尽管这样会使低优先级的线程完全饿死。

守护线程

可以通过调用 t.setDaemon(true); 将线程转换为守护线程(daemon thread)。

守护线程的唯一用途就是为其他线程提供服务。

例如:计时器线程,定时发送“计时器滴答”信号给其他线程或清空过时的高速缓存项的线程。

当只剩下守护线程时,虚拟机就退出了,由于只剩下守护线程,就没必要继续运行程序了。

守护线程应该永远不要访问固有资源。如文件、数据库,因为它们在任何时候甚至在一个操作的中间发生中断。

未捕获异常处理器

线程的run方法不能抛出任何受查异常,但是,非受查异常会导致线程终止。在这种情况下,线程就死亡了。

但是,不需要任何catch字句来处理可以被传播的异常。相反,就在线程死亡之前,异常被传递到一个用于未捕获异常的处理器。

该处理器必须属于一个实现Thread.UncaughtExceptionHandler接口的类。这个接口只有一个方法: void uncaughtException(Thread t, Throwable e)

可以用setUncaughtExceptionHandler方法为任何线程安装一个处理器。也可以用Thread类的静态方法setDefaultUncaughtExceptionHandler为所有线程安装一个默认的处理器。替换处理器可以使用日志API发送未捕获异常的报告到日志文件。

如果不安装默认的处理器,默认的处理器为空。但是,如果不为独立的线程安装处理器,此时的处理器就是该线程的ThreadGroup对象。

<!--线程组是一个可以统一管理的线程集合。默认情况下,创建的所有线程属于相同的线程组,但是,也可能会创建其他的组。现在引入了更好的特性用于线程集合的操作,所以建议不要在自己的程序中使用线程组-->

ThreadGroup类实现Thread.UncaughtExceptionHandler接口。它的uncaughtException方法做如下操作:

  • 如果线程组有父线程组,那么父线程组的uncaughtException方法被调用。
  • 否则,如果Thread.getDeafultExceptionHandler方法返回一个非空的处理器,则调用该处理器
  • 否则,如果Throwable是ThreadDeath的一个实例,什么都不做。
  • 否则,线程的名字以及Throwable的栈轨迹被输出到System.err上。

同步

大多数实际的多线程应用中,两个或两个以上的线程需要共享对同一数据的存取。如果两个线程存取相同的对象,并且每一个线程都调用了一个修改该对象状态的方法,将会发生什么?根据个线程访问数据的次序,可能会产生讹误的对象。这样一个情况通常称为竞争条件(race condition)

竞争条件的一个例子

测试程序,模拟一个有若干账户的银行。随机地生成在这些账户之间转移钱款的交易。每个账户有一个线程。每一笔交易中,会从线程所服务的账户中随机转移一定的数目的钱款到另一个随机账户。

当这个程序运行时,不清楚某一时刻某一银行账户中有多少钱。但是,直到所有账户的总金额应该保持不变。

竞争条件详解

假定两个线程同时执行指令:accounts[to] += amount;

问题是这不是原子操作,该指令可能被处理如下:

  1. 将accounts[to]加载到寄存器
  2. 增加amount
  3. 将结果协会accounts[to]

假设,第一个线程执行步骤1和2,然后,他被剥夺了运行权。假定第2个线程被唤醒并修改了accounts数组的同一项。然后,第一个线程被唤醒并完成其第3步。

这样就会擦除了第二个线程做的更新。于是总金额不再正确。

<!--可以具体看一下执行我们的类中的每一个语句的虚拟机的字节码。运行命令 javap -c -v Bank 对Bank.class文件进行反编译。 例如,代码行accounts[to] += amount可以被转换成为多行字节码aload_0 getfield #2 // Field accounts:[D iload_2 dup2 daload dload_3 dadd dastore 这些代码的含义是无关紧要的。重要是增值命令是由几条指令组成的,执行它们的线程可以在任何一条指令点上被中断-->

如果没有打印语句,讹误的风险会降低一些,因为每个线程在再次睡眠之前所作的工作很少,调度器在计算过程中剥夺线程的运行权可能性很小。但是讹误的风险并没有完全消失。如果在负载很重的机器上运行许多线程,那么即使删除了打印语句,程序依然会出错。这种错误可能会几分钟、几小时或几天一次。

锁对象

有两种机制可以防止代码块受并发访问的干扰。Java语言提供了一个synchronized关键字达到这一目的。JavaSE5.0引入了ReentrantLock类。

synchronized关键字自动提供一个锁以及相关的“条件”,对于大多数需要显示锁的情况,这是很便利的。

java.util.concurrent框架为这些基础机制提供了独立的类。

ReentrantLock保护代码的基础结构如下:

mylock.lock();
try{
    critical section
}finally{
    myLock.unlock();
}

这一结构确保任何时刻只有一个线程进入临界区。一旦一个线程封锁了锁对象,其他任何线程都无法通过lock语句。当其他线程调用lock时,它们被阻塞,直到第一个线程释放锁对象。

警告:把解锁操作括在finally字句之内是至关重要的。如果在临界区的代码抛出异常,所必须被是否,否则,其他线程将永远阻塞。

<!--如果使用锁,就不能使用带资源的try语句。首先,解锁方法名不是close。不过即使将它重命名,带资源的try语句也无法正常工作。它的首部希望声明一个新变量。但是如果使用一个锁,你可能想使用多个线程共享的那个变量(而不是新变量)-->

每一个Bank对象有自己的ReetrantLock对象。如果两个线程试图访问同一个Bank对象,那么锁以串行方式提供服务。但是,如果两个线程访问不同的Bank对象,每一个线程得到不同的锁对象,两个线程之间不会发生阻塞。

线程在每一次调用lock都要调用unlock来释放锁。 由于这一特性,被一个锁保护的代码可以调用另一个使用相同的锁的方法。

例如,transfer方法调用getTotalBalance方法,这也会封锁bankLock对象,此时bankLock对象的持有计数为2,当getTotalBalance方法退出的时候,持有计数变回1。当transfer方法退出的时候,持有计数变为0 。线程锁释放。

通常,可能想要保护需若干个操作来更新或检查共享对象的代码块。要确保这些操作完成后,另一个线程才能使用相同的对象。

警告:要留心临界区中的代码,不要因为异常的抛出而跳出临界区。如果在临界区代码结束之前抛出了异常,finally字句释放锁,会使对象可能处于一个受损状态。

警告:听起来公平锁更合理一些,但是使用公平锁比使用常规锁要慢很多。 只有当你确实了解自己要做什么并且对于你要解决的问题有一个特定的理由必须使用公平锁的时候,才可以使用公平锁。即使使用公平锁,也无法确保线程调度器是公平的。如果线程调度器选择忽略一个线程,而该线程为了这个锁已经等待了很长时间,那么就没有机会公平地处理这个锁了。

条件对象

通常,线程进入临界区,却发现在某一条件满足之后他才能执行。 要使用一个条件对象来管理那些已经获得一个锁但是却不能做有用工作的线程

条件对象通常被叫做条件变量(conditional variable)

例如:

public void transfer(int from, int to, int amount){
    bankLock.lock();
    try{
        while(account[from] < amount){
            // wait
            ...
        }
        // transfer funds
        ...
    }finally{
        bankLock.unlock();
    }
}

现在,当账户中没有足够的余额时,应该做什么呢?等待直到另一个线程向账户中注入了资金。但是这一个线程刚刚获得了对bankLock的排它性访问,因此别的线程没有进行存款操作的机会。这就是为什么我们需要条件对象的原因。

个锁对象可以有一个或多个相关的条件对象。可以用newCondition方法获得一个条件对象。习惯上给每一个对象命名为可以反映它所表达的条件的名字。

class Bank {
    private Condition sufficientFunds;
    ...
    public Bank(){
        ...
        sufficientFunds = bankLock.newCondition();
    }
}

如果transfer方法发现余额不足,它调用 sufficientFunds.await();

当先线程被阻塞了,并放弃了锁。我们希望这样可以使得另一个线程可以进行增加账户余额的操作。

等待获得锁的线程和调用了await方法的线程存在本质不同。一旦一个线程调用await方法,它进入该条件的等待集。当锁可用时,该线程不能马上解除阻塞。相反,它处于阻塞状态,直到另一个线程调用同一条件上的signalAll方法时为止。

当另一个线程转账时,他应该调用 sufficientFunds.signalAll();

这一调用重新激活因为这一条件而等待的所有线程。当这些线程从等待集中移除时,它们再次成为可运行的,调度器将再次激活它们。同时,它们将试图重新进入该对象。一旦锁成为可用的,它们中的某个将从await调用返回,获得该锁并从被阻塞的地方继续执行。

此时,线程应该再次测试该条件。由于无法确保该条件被满足-signalAll方法仅仅是通知正在等待的过程:此时有可能已经满足条件,值得再次去检测该条件。

<!--通常,对await的调用应该如下形式的循环体中 while(!(ok to proceed)) condition.await()-->

至关重要的是最终需要某个其他线程调用signalAll方法。

当一个线程调用await时,它没有办法重新激活自身。它寄托于其他线程。如果没有其他线程来重新激活等待的线程,他就永远不再运行了。这将导致死锁(deadlock)现象。

注意调用signalAll不会立即激活一个等待线程。 它仅仅解除等待线程的阻塞,以便这些线程可以在当前线程退出同步方法之后,通过竞争实现对对象的访问。

另一个方法signal,则是随机解除等待集中某个线程的阻塞状态。这比解除所有线程的阻塞更加有效,但也存在危险。如果随机选择的线程发现自己仍然不能运行,那么它再次被阻塞。如果没有其他线程再次调用signal,那么系统就死锁了。

警告:当一个线程拥有某个条件的锁时,它仅仅可以在该条件上调用await、signalAll或signal方法。

synchronized关键字

锁和条件的关键:

  • 锁用来保护代码片段,任何时刻只能有一个线程执行被保护的代码
  • 锁可以管理试图进入被保护代码段的线程
  • 锁可以拥有一个或多个相关的条件对象
  • 每个条件对象管理那些已经进入被保护的代码段但还不能运行的线程。 Lock和Condition接口为程序设计人员提供了高度的锁定控制。

Java中的每一个对象都有一个内部锁。 如果一个方法使用synchronized关键字声明,那么对象的锁将保护整个方法。调用该方法,线程必须获得内部的对象锁。

public synchronized void method(){
    method body
}
等价于
public void method(){
    this.intrinsicLock.lock();
    try{
        method body
    }finally {
        this.intrinsicLock.unlock();
    }
}

内部对象锁只有一个相关条件。wait方法添加一个线程到等待集中,nofityAll/notify方法解除等待线程的阻塞状态。

<!--wait、notifyAll、notify方法是Object类的final方法。Condition方法必须被命名为awati、signalAll和signal以便它们不会与那些方法发生冲突-->

理解synchronized方法,必须了解每一个对象有一个内部锁,并且该锁有一个内部条件。 由锁来管理那些试图进入synchronized方法的线程,由条件管理那些调用wait的线程。

提示:synchronized方法时相对简单的。

将静态方法声明为synchronized也是合法的。 如果调用该方法,该方法获取相关的类对象的内部锁。如果类由一个静态同步方法,该方法被调用时,this.class对象的锁被持有。因此,没有其他线程可以调用同一类的这个或任何其他的同步静态方法。

内部锁和条件存在一些局限:

  • 不能中断一个正在试图获得锁的线程。
  • 试图获得锁时不得设定超时
  • 每个锁仅有单一的条件,可能是不够的。

代码中应该使用Lock还是synchronized?

  • 最好既不是用Lock/Condition也不使用synchronized关键字。在许多情况下你可以使用java.util.concurrent包中的一种机制,他会为你处理所有的枷锁。
  • 如果synchronized关键字适合你的程序,那么尽管使用它,这样可以减少编写的代码,减少出错的概率。
  • 如果特别需要Lock/Condition结构提供的独有特性时,才使用Lock/Condition

同步阻塞

java对象有一个锁。线程可以通过调用同步方法获得锁。还有另一种机制可以获得锁,通过进入一个同步阻塞。

synchronized(obj){
    
}

于是它获得obj的锁。

有时会有特殊的锁

public class Bank{
    private double[] accounts;
    private Object lock = new Object();
    
   ...
       
   public void transfer(int from, int to, int amount){
       synchronized(obj){
           accounts[from] -= amount;
           accounts[to] += amount;
       }
       
   }
}

此时的lock对象仅仅是用来使用每个java对象持有的锁。

有时程序员使用一个对象的锁来实现额外的原子操作,实际上称为客户端锁定(clientside locking)

public void transfer(Vector<Double> accounts, int from, int to, int amount){
    accounts.set(from, accounts.get(from) - amount);
    accounts.set(to, accounts.get(to) + amount);
}

Vector类的get和set方法是同步的,但是,对get调用已经完成之后,一个线程完全可能在transfer方法中被剥夺运行权。 于是,另一个线程可能在相同的位置存入不同的值。

public void transfer(Vector<Double> accounts, int from, int to, int amount){
    synchronized(accounts){
        accounts.set(from, accounts.get(from) - amount);
        accounts.set(to, accounts.get(to) + amount);
    }
}

这个方法可以工作,但是它完全依赖于一个事实,Vector类对自己的所有可修改方法都使用内部锁。然而,Vector类的文档没有给出这样的承若。

客户端锁是非常脆弱的。

监视器概念

锁和条件是线程同步的强大工具,但是,严格地讲,它们不是面向对象的。

可以在不需要程序员考虑如何枷锁的情况下,可以保证多线程的安全性。

最成功的解决方案之一是监视器(monitor),这一概念最早是由PerBrinchHansen和Tony Hoare在20世纪70年代提出的。

监视器具有如下特征:

  • 监视器是只包含私有域的类
  • 每个监视器类的对象有一个相关的锁
  • 使用该锁对所有的方法进行枷锁。 换句话说,如果客户端调用obj.method(),那么obj对象的锁是在方法调用开始时自动获得,并且当方法返回时自动释放该锁。 因为所有的域是私有的,这样的安排可以确保一个线程在对对象操作时,没有其他线程能访问该域
  • 该锁可以有任意多个相关条件

Java设计者以一种不是很精确的方式采用了监视器概念,Java中的每一个对象有一个内部的锁和内部的条件。 如果一个方法调用synchronized关键字声明,那么它表现的就像一个监视器方法。通过notify/wait/notifyAll来访问条件变量

Java对象不同于监视器,从而使得线程的安全性下降:

  • 域不要求必须是private
  • 方法不要求必须是synchronized。
  • 内部锁对客户是可用的。

volatile域

仅仅写一个或两个实例域就使用同步,显得开销过大了。

现在的处理器与编译器,出错的可能性很大。

  • 多处理器的计算机能够暂时在寄存器或本地缓冲区中保存内存中的值。结果是,运行在不同处理器上的线程可能在同一个内存位置取到不同的值
  • 编译器可以改变指令执行的顺序以使吞吐量最大化。 这种顺序上的变化不会改变代码语义,但是编译器假定内存的值仅仅在代码中有显示的修改指令时才会改变。然而,内存的值可以被另一个线程改变!

如果使用锁来保护被多个线程访问的代码,那么可以不考虑这种问题。编译器被要求通过在必要的时候刷新本地缓存来保持锁的效应,并且不能不正当地重新排序指令。

可以参考Java内存模型和线程规范http://www.jcp.org/en/jsr/detail?id=133

http://www-106.ibm.com/developerworks/java/library/j-jtp02244.html有一个更容易懂的概要

注释:同步格言:如果向一个变量写入值,而这个变量接下来可能会被另一个线程读取,或者,从一个变量读值,而这个变量可能是之前被另一个线程写入的,此时必须使用同步。

volatile 关键字为实例域的同步访问提供了一种免锁机制。 如果声明一个域为volatile,那么编译器和虚拟机就知道该域可能被另一个线程并发更新的。

警告:volatile变量不能提供原子性。例如方法 public void flipDone() {done = !done;}不能确保翻转域中的值。不能保证读取、翻转和写入不被中断。

final变量

除非使用锁或volatile修饰符,否则无法从多个线程中安全地读取一个域。还有一种情况可以安全地访问一个共享域,即这个域声明为final时。

final Map<String,Double> accounts = new HashMap();

其他线程会在构造函数完成构造之后才看到这个accounts变量。

如果不使用final,就不能保证其他线程看到的是accounts更新后的值,它们可能都只是看到null,而不是新构造的HashMap

原子性

假设对共享变量除了赋值之外并不完成其他操作,那么可以将这些共享变量声明为volatile。

java.util.concurrent.atomic包中有很多类使用了很高效的机器级指令(而不是使用锁)来保证其他操作的原子性。 例如:AtomicInteger类提供了方法incrementAndGet和decrementAndGet,它们分别以原子方式将一个整数自增或自减。 例如:可以安全地生成一个数字序列:

public static AtomicLong nextNumber = new AtomicLong();

long id = nextNumber.incrementAndGet();

以原子方式完成自增,然后返回自增后的值。也就是说获得值、增1然后生成新值的操作不会中断。

有很多方法可以以原子方式设置和增减值,不过,如果希望完成更复杂的更新,就必须使用compareAndSet方法。

假设希望跟踪不同线程观察的最大值,使用下面的代码是不行的

public static AtomicLong largest = new AtomicLong();
largest.set(Math.max(largest.get(), observed)) // error

这个更新不是原子的,实际上,应当在一个循环中计算新值和使用compareAndSet

do{
    oldValue = largest.get();
    newValue = Math.max(oldValue, observed);
}while(!larget.compareAndSet(oldValue, newValue))

如果另一个线程也在更新largest,就可以阻止这个线程更新。这样一来,compareAndSet会返回false,而不会设置新值。在这种情况下,循环会更次尝试,读取更新后的值,并尝试修改。

compareAndSet方法会映射到一个处理器操作,比使用锁速度更快。

largest.updateAndSet(x -> Math.max(x, observed));
或
largest.accumulateAndGet(observed, Math::max);

类AtomicInteger、AtomicIntegerArray、AtomicIntegerFieldUpdater、AtomicLongArray、AtomicLongFieldUpdater、AtomicReference、AtomicReferenceArray和AtomicReferenceFieldUpdater也提供了这些方法

如果有大量的线程要访问相同的原子值,性能会大幅下降,因为乐观锁更新需要太多次重试。

JavaSE8提供了LongAdder和LongAccumulator来解决这个问题。LongAdder包括多个变量(加数),其总和为当前值。可以有多个线程更新不同的加数,线程个数增加时会自动提供新的加数。

final LongAdder adder = new LongAdder();
for(...){
    pool.submit(() -> {
        ...
       if(...){
         adder.increment();
       }
    });
}
...
long total = adder.sum();

注释:increment方法不会返回原值。这样做会消除将求和和分解到多个加数所带来的性能提升。

LongAccumulator将这种思想推广到任意的累加操作。在构造器中,可以提供这个操作以及它的零元素。要加入新的值,可以调用accumulate。调用get来获得当前值。

LongAccmulator adder = new LongAccmulator(Long::sum, 0);
adder.accumulate(value);

在内部,这个累加器包含变量a1,a2...,an。每个变量初始化为零元素。

调用accumulate并提供值v时,其中一个变量会以原子方式更新为ai = ai op v,这里op时中缀形式的累加操作。

get的结果是a1 op a2 op ... op an

如果选择一个不同的操作,可以计算最小值或最大值。 一般,这个操作必须满足结合律和交换律。这说明,最终结果必须独立于所结合的中间值的顺序。

死锁

锁和条件不能解决多线程中的所有问题。

账户1:200

账户2:300

线程1:从账户1转移300到账户2

线程2:从账户2转移400到账户1

有可能会因为每一个线程要等待更过的钱款存入而导致所有线程都被阻塞。这样的状态称为死锁(deadlock)

注释:当程序挂起时,键入CTRL+\,将得到一个所有线程的列表。每一个线程有一个栈踪迹,告诉你线程被阻塞的位置。 可以使用jconsole并参考线程面板。

导致死锁的另一个途径是让第i个线程负责向第i个账户存钱,而不是从第i个账户取钱。这样一来,有可能将所有的线程都集中到一个账户上,每一个线程都试图从这个账户中取出大于该账户余额的钱。

还有一种情况,将signalAll方法转换为signal方法。

Java编程语言中没有任何东西可以避免或打破这中死锁现象。必须仔细设计程序,以确保不会出现死锁。

线程局部变量

为了避免共享变量,使用ThreadLocal辅助类为各个线程提供各自的实例。

例如:SimpleDateFormat对象

可以使用以下方法为每一个线程构造一个实例,

public static final ThreadLocal<SimpleDateFormat> dateFormat = ThreadLocal.withInital(() -> new SimpleDateFormat("yyyy-MM-dd"));
// 访问时
String dateStamp = dateFormat.get().format(new Date());

在一个给定线程中首次调用get时,会调用initialValue方法。在次之后,get方法会返回属于当前线程的那个实例。

多个线程中生成随机数也存在类似的问题。

int random = ThreadLocalRandom.current().nextInt(upperBound);

锁测试与超时

线程在调用lock方法来获另一个线程所持有的锁的时候,很可能发生阻塞。应该更加谨慎地申请锁。tryLock方法试图申请一个锁,在成功获得锁后返回true,否则,立即返回false,而且线程可以立即离开去做其他事情。

if(myLock.tryLock()){
    // now the thread owns the lock
    try{
        ...
    }finally{
        myLock.unlock();
    }
}else{
    // do somthing else
}

也可以带有超时参数 tryLock(100, TimeUnit.MILLISECONDS);

lock方法不能被中断。如果一个线程在等待获得一个锁时被中断,中断线程在获得锁之前一直处于阻塞状态。如果出现死锁,那么lock方法就无法被终止。

然而,如果带有超时参数的tryLock,那么如果线程在等待期间被中断,将抛出InterruptedException异常。这个非常重要,因为允许程序打破死锁。

也可以调用lockInterruptibly方法。相当于一个超时设为无限的tryLock方法。

等待一个条件时,也可以提供一个超时: myCondition.await(100, TimUnit.MILLISECONDS);

如果一个线程被另一个线程通过调用signalAll方法或signal激活,或者超时时限以达到,或者线程被中断,那么await方法将返回。

如果等待的线程被中断,await方法将抛出一个InterruptedException异常。如果你希望在出现这种情况时继续等待,可以使用awaitUninterruptibly方法代替await。

读写锁

java.util.concurrents.locks包定义了两个锁类,ReentrantLock类和ReentrantReadWriteLock类。

如果很多线程从一个数据结构中读取数据而很少线程修改其中数据的话,后者是十分有用的。

允许对读者线程共享访问,写者线程必须是互斥访问的。

读写锁的步骤

  1. 构造对象 private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  2. 抽取读锁和写锁 private Lock readLock = rwl.readLock(); private Lock writeLock = rwl.writeLock();
  3. 对所有的获取方法加读锁 public double getTotalBalance(){ readLock.lock(); try{ ... }finally{ readLock.unlock(); } }
  4. 对所有的修改方法加写锁 public void transfer(...){ writeLock.lock(); try{ ... }finally{ writeLock.unlock(); } }

为什么弃用stop和suspend方法

初始的java版本定义了一个stop方法来终止一个线程,以及一个suspend方法用来阻塞一个线程直至另一个线程调用resume。 stop和suspend方法有一个共同特点:都试图控制一个给定线程的行为。

stop方法天生不安全,经验证明suspend方法经常导致死锁。

stop方法终止所有未结束的方法,包括run方法。当线程被终止,立即释放被它锁住的所有对象的锁。这回导致对象处于不一致的状态。

当线程要终止另一个线程时,无法知道什么时候调用stop方法是安全的,什么时候导致对象被破坏。因此,该方法被停用了。 在希望停止线程的时候应该中断线程,被中断的线程会在安全的时候停止。

如果suspend挂起一个持有一个锁的线程,那么,该锁在恢复之前是不可用的。如果调用suspend方法的线程试图获得一个锁,那么程序死锁:被挂起的线程等着被恢复,而将其挂起的线程等待获得锁。

阻塞队列

对于实际编程来说,应该尽可能远离底层结构。使用由并发处理的专业人士实现的较高层次的结构要更方便、要安全得多。

对于许多线程问题,可以通过一个或多个队列以优雅且安全的方式将其形式化。

当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列导致线程阻塞。

方法

正常动作

特殊情况下的动作

add

添加一个元素

如果队列满,则抛出IllegalStateException异常

element

返回队列的头元素

如果队列空,抛出NoSuchException异常

offer

添加一个元素并返回true

如果队列满,返回false

peek

返回队列的头元素

如果队列空,则返回null

poll

移出并返回队列的头元素

如果队列空,则返回null

put

添加一个元素

如果队列满,则阻塞

remove

移出并返回头元素

如果队列空,则抛出NoSuchElementException异常

take

移出并返回头元素

如果队列空,则阻塞

注释:poll和peek方法返回空来指示失败。因此,向这些队列中插入null值是非法的。

带超时的offer和poll方法: boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS);

尝试在100毫秒的时间内在队列的元素尾部插入一个元素。 如果成功返回true;否则,达到超时时,返回false。

Object head = q.poll(100, TimUnit.MILLSECONDS)

尝试用100毫秒的时间移除队列的头元素;如果成功返回头元素,否则,达到在超时时,返回null。

默认情况下LinkedBlockingQueue的容量是没有上边界的。但是,也可以指定最大容量。

LinkedBlockingDeque是一个双端的版本。

ArrayBlockingQueu在构造时需要指定容量,并且有一个可选的参数来指定是否需要公平。如果设置了公平参数,则那么等待了最长时间的线程会优先得到处理。通常,公平会降低性能,只有在确实非常需要时才使用它。

PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按照它们的优先级顺序被移出。该队列是没有容器上限,但是,如果队列是空的,取元素的操作会阻塞。

DelayQueue包含实现Delayed接口的对象:

interface Delayed extends ComparableDelayed{
    long getDelay(TimeUnit unit);
}

getDelay方法返回对象的残留延迟。负值表示延迟已经结束。 元素只有在延迟用完的情况下才能从DelayQueue移除。还必须实现comparaTo方法。DelayQueue使用该方法对元素进行排序。

se7之后增加了一个TransferQueue接口,允许生产者线程等待,直到消费者准备就绪可以接受一个元素。 调用 q.transfer(item),这个调用会阻塞,直到另一个线程将元素删除。

LinkedTransferQueue实现了这个接口。

线程安全的集合

高效的映射、集和队列

映射、有序集和队列的高效实现:ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet和ConcurrentLinkedQueue。

这些集合使用复杂的算法,通过允许并发地访问数据结构的不同部分来使竞争极小化。

与大多数集合不同,size方法不必在常量时间内操作。确定这样的集合当前的大小通常需要遍历。

注释:有些应用使用庞大的并发散列映射,这些映射太过庞大,以至于无法用size方法得到它的大小,因为这个方法只能返回int。对于一个包含超过20亿条目的映射该,JavaSE8引入了一个mappingCount方法。

集合返回弱一致性(weakly consistent)的迭代器。 意味着迭代器不一定能反映出它们被构造后的所有的修改,但是,它们不会将同一个值返回两次,也不会抛出ConcurrentModificationException异常。

注释:与之形成对照的是,集合如果在迭代器构造之后发生改变,java.util包中的迭代器将抛出一个ConcurrentModificationException异常。

并发的散列映射表,可高效地支持大量的读者和一定数量的写者。 默认情况下,假定可以有多达16个写者线程同时执行。可以有更多的写者线程,但是,如果同一时间多于16个,其他线程将暂时被阻塞。 可以指定更大数目的构造器。

注释:散列映射将有相同散列码的所有条目放到同一个“桶”中。有些应用使用的散列函数不当,以至于所有条目最后都放在很少的桶中,这回严重降低性能。即使是一般意义上还算合理的散列函数,如String类的散列函数,也可能存在问题。例如,攻击者可能会制造大量有相同散列值的字符串,让程序速度减慢。 JavaSE8中,并发散列映射将桶组织为树,而不是列表,键类型实现了Comparable,从而保证性能为O(long(n));

映射条目的原子更新

ConcurrentHashMap原来的版本只有为数不多的方法可以实现原子更新,这使得编程多少有些麻烦。例如使用ConcurrentHashMap<String, Long>统计单词?

Long oldValue = map.get(word);
Long newValue = oldValue == null ? 1 : oldValue + 1;
map.put(word, newValue);

可能会有另一个线程在同时更新同一个计数。

注释:有些会奇怪为什么原本线程安全的数据结构会允许非线程安全的操作? 一:多个线程修改一个普通的HashMap,它们会破坏内部结构(一个链表数组)。 有些链接可能会丢失,或者甚至构成循环,使得这个数据结构不再可用。 二:对于ConcurrentHashMap不会发生这种情况,get和put代码不会破坏数据结构。不过,由于操作序列不是原子的,所以结果不可预知。

传统的做法是使用replace操作,他会以原子方式用一个新值替换原值,前提是之前没有其他线程把原值替换为其他值。必须一直这么做。

do{
    oldValue = map.get(word);
    newValue = oldValue == null ? 1 : oldValue + 1;
} while(!map.replace(word, oldValue, newValue))

或者ConcurrentHashMap<String, LongAdder>

map.putIfAbsent(word, new LongAdder());
map.get(word).increment();

一个条语句确保有一个LongAdder可以完成原子自增。由于putIfAbsent返回映射的值(可能是原来的值,或者是新设置的值),所以可以组合这两个语句:

map.putIfAbsent(word, new LongAdder()).increment();

JavaSE8之后提供了一个更方便的原子更新的方法。 调用compare方法时可以提供一个键和一个计算新值的函数。 这个函数接收键和相关联的值(如果没有值,则为null),它会计算新值。

map.compare(word, (key, value) -> value == null ? 1 : value + 1);

注释:ConcurrentHashMap中不允许有null值。有很多方法都是用null值来指示映射中某个给定的键不存在。

还有computeIfPresent和computeIfAbsent方法,它们分别只在已经有原值的情况下计算新值,或者只有没有原值的情况下计算新值。

// 更新一个LongAdder计数器映射, LongAdder构造器只有在确实需要一个新的计数器时才会调用
map.computeIfAbsent(word, v -> new LongAdder()).increment();		

首次增加一个键时通常需要做些特殊的处理。 利用merge方法可以非常方便地做到这一点。

这个方法有一个参数表示键不存在时使用的初始值。 否则,就会调用你提供的函数来结合原值与初始值。(与compute不同,这个函数不处理键)

map.merge(word, 1L, (existingVlaue, newValue) -> existingVlaue + newValue);
//或者
map.merge(word, 1L, Long::sum);

注释:如果传入compute和merge的函数返回null,将从映射中删除现有的条目

警告:使用compute和merge时,要记住你提供的函数不能做太多工作。这个函数运行时,可能会阻塞对映射的其他更新。当然,这个函数也不能更新映射的其他部分。

对并发散列映射的批操作

JavaSE8为并发散列映射提供了批操作,即使有其他线程在处理映射,这些操作也能安全地执行。

批操作会遍历映射,处理遍历过程中找到的元素。无需冻结当前映射的快照。除非恰好你知道批操作运行时映射不会被修改,否则就要把结果看作是映射状态的一个近似。

  • 搜索(search)为每个键或值提供一个函数,直到函数生成一个非nul的结果。然后搜索终止,返回这个函数的结果
  • 归约(reduce)组合所有键或值,这里要使用所提供的一个累加函数。
  • forEach为所有键或值提供一个函数

每个操作都有4个版本:

  • operationKeys:处理键
  • operationValues:处理值
  • operation:处理键和值
  • operationEntries:处理Map.Entry对象

对于上述各个操作,需要指定一个参数化阈值(parallelism threshold)。

如果映射包含的元素多个这个阈值,就会并行完成批操作。

如果希望批操作在一个线程中运行,可以使用阈值Long.MAX_VALUE

如果希望用尽可能多的线程运行批操作,可以使用阈值1

search方法版本:

  • U searchKeys(long threshold, BiFunction<? super K, ? extends U> f)
  • U searchValues(long threshold, BiFunction<? super V, ? extends U> f)
  • U search(long threshold, BiFunction<? super K, ? super V, ? extends U> f)
  • U searchEntries(long threshold, BiFuntion<Map.entry<K, V>, ? extends U> f)

假如,希望找到一个出现次数超过1000次的单词。

String result = map.search(threshold, (k, v) -> v > 1000 ? k : null);

resutl会设置为第一个匹配的单词,如果搜索函数对所有输入都返回null,则返回null。

forEash方法

  • 第一个只为各个映射条目提供一个消费者函数
map.forEash(threshold, (k, v) -> System.out.println(k + " -> " + v));
  • 第二种形式还有一个转换器函数,这个函数要先提供,其结果会传递到消费者
map.forEach(threshold, (k, v) -> k + " -> " + v, System.out::println);

转换器可以用作为一个过滤器。 只要转换器返回null,这个值就会被悄无声息地跳过。

reduce操作用一个累加函数组合其输入。计算所有值的总和:

Long sum = map.reduceVlaues(threshold, Long::sum);

与forEach类似,也可以提供一个转换器函数。计算最长键的长度

Integer maxlength = map.reduceKeys(threshold, 
                                  String::length,
                                  Integer::max);

转换器也可以作为一个过滤器,通过返回null来排除不想要的输入

统计多少个条目的值 > 1000

Long count = map.reduceValues(threshold,
                             v -> v > 1000 ? 1L : null,
                             Long::sum);

注释:如果映射为空,或者所有条目都被过滤掉,reduce操作会返回null。 如果只有一个元素,则返回其转换结果,不会应用累加器。

对于int、long、double输出还有相应的特殊化操作,分别有后缀ToInt、ToLong和ToDouble。 需要把输入转换为一个基本类型的值,并指定一个默认值和一个累加器函数。 映射为空时返回默认值。

long sum = map.reduceValuesToLong(threshold,
                                 Long::longValue,
                                 0,
                                 Long::sum);

警告:这些特殊化操作与对象版本的操作有所不同,对于对象版本的操作,只需要考虑一个元素。这里不是返回转换得到的元素,而是将与默认值累加。因此,默认值必须时累加器的元素。

并发集视图

假设你想要得到一个大的线程安全的集而不是映射。并没有一个ConcurrentHashSet类。

静态newKeySet方法可以生成一个Set<K>,实际上是ConcurrentHashMap<K, Boolean>的一个包装器。 (所有映射值都为Boolean.TRUE,不过因为只是要把它作为一个集,所以并不关心具体的值)

Set<String> words = ConcurrentHashMap<String>.newKeySet();

当然,如果原来有一个映射,keySet方法可以生成这个映射的键集。这个集是可变的。如果删除这个集的元素,这个键(以及相应的值)会从映射中删除。 不过,不能向键集增加元素,因为没有相应的值可以增加。

JavaSE8为ConcurrentHashMap增减了第二个keySet方法,包含一个默认值,可以在为集增加元素时使用:

Set<String> words = map.keySet(1L);

words.add("Java");

如果"Java"在words中不存在,现在它会有一个值1。

写数组的拷贝

CopyOnWriteArrayList和CopyOnWriteArraySet是线程安全的集合,其中所有的修改线程对底层数组进行复制。如果在集合上进行迭代的线程超过修改线程数,这样的安排是很有用的。

当构建一个迭代器的时候,它包含一个对当前数组的引用。

如果数组后来被修改了,迭代器仍然引用旧数组,但是,集合的数组已经被替换了。

因而,旧的迭代器拥有一致(可能过时的)视图,访问它无须任何同步开销。

并行数组算法

JavaSE8中,Arrays类提供了大量并行化操作。

静态Arrays.parallelSort方法可以对一个基本类型值或对象的数组排序:

String contents = new String(Files.readAllBytes(Paths.get("alice.txt")), StandardCharasets.UTF_8);
String[] words = contents.split("[\\P{L}]+");
Arrays.parallelSort(words);
// 对象排序时,可以提供一个comparator
Arrays.parallelSort(words, Comparator.comparing(String::length));
// 对于所有的方法都可以提供一个范围的边界
values.parallelSort(values.length / 2, vlaues.length);

注释:乍一看,这些方法名中的parallel可能有些奇怪,因为用户不用关心排序具体怎样完成。不过,API设计者希望清楚地指出排序是并行化的。这样一来,用户就会注意避免使用有副作用的比较器

parallelSetAll方法会由一个函数计算得到的值填充一个数组。这个函数接受元素索引,然后计算相应位置上的值。

Arrays.parallelSetAll(values, i -> i % 10); // 0,1,2,3,4,5,6,7,8,9,0,1,2,3...

显然并行化对这个操作很有好处。这个操作对于所有基本类型数组和对象数组都有相应的版本。

还有一个parallelPrefix方法,它会用对应一个给定结合操作的前缀的累加结果替换数组元素。

例如:数组[1,2,3,4,5....]和x操作。执行Arrays.parallelPrefix(values, (x,y) -> x*y)之后,数组将包含:

[1, 1 × 2, 1 × 2 × 3,1 × 2 × 3 × 4,1 × 2 × 3 × 4 × 5...]

较早的线程安全集合

Java的初始版本开始,Vector和Hashtable类就提供了线程安全的动态数组和散列表的实现。

现在取而代之的是ArrayList和HashMap类。这些类不是线程安全的,而集合库中提供了不同的机制

任何集合类都可以通过同步包装器变成线程安全的。

List<E> synchArrayList = Collections.synchronizedList(new ArrayList<E>());
Map<K, V> synchHashMap = Collections.synchronizedMap(new HashMap<K, V>());

结果集合的方法使用锁加以保护,提供了线程安全访问。

应该确保没有任何线程通过原始的非同步方法访问数据结构。 最便利的方法是确保不保存任何执行原始对象的引用,简单地构造一个集合并立即传递给包装器。

如果另一个线程可能进行修改时要对集合进行迭代,仍然需要使用“客户端”锁定:

synchronized(synchHashMap){
    Iteratr<K> iter = synchHashMap.keySet().iterator();
    while(iter.hasNext()){
        ...
    }
}

如果使用“for each”循环必须使用同样的代码,因为循环使用了迭代器。注意:如果在迭代过程中,别的线程修改集合,迭代器会失效,抛出ConcurrentModificationException异常。同步仍然是需要的,因此并发的修改可以被可靠地检测出来。

最好使用java.util.concurrent包中定义的集合,不使用同步包装器中的。

特别是,假如它们访问的是不同的桶,由于ConcurrentHashMap已经精心地实现了,多线程可以访问它而且不会彼此阻塞。 有一个例外是经常被修改的数组列表。在那种情况下,同步的ArrayList可以胜过CopyOnWriteArrayList。

Callable与Future

Runnable封装一个异步运行的任务,可以把它想象成为一个没有参数和返回值的异步方法。

Callable与Runnable类似,但是是有返回值。Callable接口是一个参数化的类型,只有一个方法call

public interface Callable<V>{
    V call() throws Exception();
}

类型参数时返回值的类型。

Future保存异步计算的结果。可以启动一个计算,将Future对象交给某个线程,然后忘掉他。Future对象的所有者在结果计算好之后就可以获得它。

public interface Future<V>{
    V get() throws ...;
    V get(long timeout, TimeUnit unit) throws ...;
    void cacel(boolean mayInterrupt);
    boolean isCallcelled();
    boolean isDone();
}

第一个get方法的调用被阻塞,直到计算完成。

如果在计算完成之前,第二个方法的调用超时,抛出一个TimeoutException异常。

如果运行该计算的线程被中断,两个方法都会抛出InterruptedException。

如果计算已经完成,那么get方法立即返回。

如果计算还在进行,isDone方法返回false;如果完成了,则返回true;

可以用cancel方法取消该计算。如果计算还没有开始,它被取消且不再开始。如果计算处于运行中,那么如果myInterrupt参数为true,它就被中断。

FutureTask包装器是一种非常便利的机制,可将Callable转换成Future和Runnable,它同时实现二者的接口。

Callable<Integer> myComputation = ...;
FutureTask<Integer> task = new FutureTask<Integer>(myComputation);
Thread t = new Thread(task); // It's a Runnable
t.start();
...
Integer result = task.get();

执行器

构建一个新的线程是有一定代价的,因为涉及与操作系统的交互。如果程序中创建了大量的生命周期很短的线程,应该使用线程池(thread pool)。

一个线程池中包含许多准备运行的空闲线程。

将Runnable对象交给线程池,就会有一个线程调用run方法。当run方法退出时,线程不会死亡,而是在池中准备为下一个请求提供服务。

另一个使用线程池的理由是减少并发线程的数目。创建大量线程会大大降低性能甚至使虚拟机崩溃。如果有一个会创建许多线程的算法,应该使用一个线程数“固定的”线程池以限制并发线程的总数。

执行器(Executor)类有许多静态工厂方法用来创建线程池。

方法

描述

newCachedThreadPool

必要时创建新线程;空闲线程会被保留60秒

newFixedThreadPool

该池包含固定数量的线程;空闲线程会一直保留

newSingleThreadExecutor

只有一个线程的“池”,该线程顺序执行每一个提交的任务

newScheduleThreadPool

用于预定执行而构建的固定线程池,替代java.util.Timer

newSingleThreadScheduledExecutor

用于预定执行而构建的单线程“池”

线程池

newCachedThreadPool方法创建一个线程池,对于每个任务,如果有空闲线程可用,立即让它执行任务,如果没有可用的空闲线程,则创建一个新线程。

newFixedThreadPool方法创建一个具有固定大小的线程池。如果提交的任务数多于空闲的线程数,那么把得不到服务的任务放置到队列中。当其他任务完成以后再运行它们。

newSingleThreadExecutor是一个退化了的大小为1的线程池:由一个线程执行提交的任务,一个接着一个。

上面三个方法返回实现了ExecutorService接口的ThreadPoolExecutor类的对象。

可以用下面的方法之一将一个Runnable对象或Callable对象提交给ExecutorService

Future<?> submit(Runnable task);
Future<T> submit(Runnable task, T result);
Future<T> submit(Callable<T> task);

该池会在方便的时候尽早执行提交的任务。调用submit时,会得到一个Future对象,可以用来查询该任务的状态。

第一个submit方法返回一个奇怪样子的Future<?>。 可以使用这样一个对象来调用isDone、cancel或isCancelled。但是,get方法在完成的时候只是简单地返回null。

第二个submit也提交一个Runnable,并且Future的get方法在完成的时候返回指定的result对象。

第三个submit提交一个Callable,并且返回的Future对象将在计算结果准备好的时候得到它。

当用完一个线程池的时候,调用shutdown。 该方法启动该池的关闭序列。别关闭的执行器不再接受新的任务。当所有任务都完成以后,该线程池中的线程死亡。另一种方法是调用shutdownnow。该池取消尚未开始的所有任务并试图中断正在运行的线程。

使用链接池应该做的事情:

  1. 调用Executors类中的静态方法newCachedThreadPool或newFixedThreadPool
  2. 调用submit提交Runnable或Callable对象。
  3. 如果想要取消一个任务,或如果提交Callable对象,那就要保存好返回的Future对象
  4. 当不再提交任何任务时,调用shutdown。

预定执行

ScheduleExecutorService接口具有为预定执行(Scheduled Execution)或重复执行任务而设计的方法。

它是一种允许使用线程池机制的java.util.Timer的泛化。

Executors类的newScheduledThreadPool和newSingleThreadScheduledExecutor方法将返回实现了ScheduledExecutorService接口的对象。

可以预定Runnable或Callable在初始的延迟之后只运行一次。也可以预定一个Runnable对象周期性运行。

控制任务组

执行器服务作为线程池使用,以提高执行任务的效率。

使用执行器可以控制一组相关任务。例如,可以在执行器中

invokeAny方法提交所有对象到一个Callable对象的集合中,并返回某个已经完成了的任务的结果。无法知道返回的究竟是哪个任务的结果。 对于搜索问题,如果愿意接受任何一个解决方案的话,可以使用这个方法。

invokeAll方法提交所有对象到一个Callable对象的集合中,并返回一个Future对象的列表,代表所有任务的解决方案。但计算结果可获得时,可以进行如下处理:

List<Callable> tasks = ...;
List<Future<T>> results = executor.invokeAll(tasks);
for(Future<T> result : results){
    procesFurther(result.get());
}

这个方法的缺点是如果第一个任务恰巧花去了很多时间,则可能不得不进行等待。 将结果按可获得的顺序保存起来更有实际意义。 可以用ExecutorCompletionService来进行排序。

常规的方法获得一个执行器。然后,构建一个ExecutorCompletionService,提交任务给完成服务(completion service)。 该服务管理Future对象的阻塞队列,其中包含已经提交的任务的执行结果(这些结果成为可用时)。

ExecutorCompletionService<T> service = new ExecutorCompletionService<T>(executor);
for(Callable<T> task : tasks){
    service.submit(task);
}

for(int i = 0; i < tasks.size(); i++){
    processFurther(service.take().get());
}

Fork-Join框架

有些应用使用了大量线程,但其中大多数都是空闲的。

一个web服务器可能为每个连接分别使用一个线程。

另外一些应用可能对每个处理器内核分别使用一个线程,来完成计算密集型任务,如图像处理或视频处理

JavaSE7之后引入了Fork-Join框架,专门支持后一类应用。

假设有一个处理任务,它可以很自认地分解为子任务:

if(problemSize < threshold){
	// solve problem directly
}else{
    // break problem into subproblems
    // recursively solve each subproblem
    // combine the results
}

图像处理就是一个例子,增强一个图像,可以变换上半部分和下部部分。如果有足够多空闲的处理器,这些操作可以并行运行。

假设想统计一个数组有多少个元素满足某个特定的属性。可以将数组一分为二,分别对两部分进行统计,再将结果相加。

要采用框架可用的一种方式完成这种递归计算,需要提供一个扩展RecursiveTask<T>的类(如果计算会生成一个类型为T的结果)或者提供一个扩展RecursiveAction的类(如果不生成任何结果)。再覆盖compute方法来生成并调用子任务,然后合并其结果。

class Counter extends RecursiveTask<Integer> {
    ...
    protected Integer compute(){
		if(to - from < THRESHOLD){
			// solve problem directly
        }else{
            int mid = (from + to) / 2;
            Counter first = new Counter(values, from, mid, filter);
            Counter second = new Counter(values, mid, to, filter);
            invokeAll(first, second);
            return first.join() + second.join();
        }
    }
}

在这里,invokeAll方法接收到很多任务并阻塞,知道所有这些任务都已经完成。join方法将生成结果。我们对每个子任务应用了join,并返回其总和。

注释:还有一个get方法可以得到当前结果,不过一般不太使用,因为它可能抛出已检查异常,而在compute方法中不允许抛出这些异常。

在后台,fork-join框架使用了一种有效的智能方法来平衡可用线程的工作负载,这种方法称为工作密取(work stealing)。 每个工作线程都有一个双端队列(deque)来完成任务。一个工作线程将子任务压入其双端队列的队头。(只有一个线程可以访问队头,所以不需要加锁)一个工作线程空闲时,它会从另一个双端队列的队尾“密取”一个任务。由于大的子任务都在队尾,这种密取很少出现。

可完成Future

处理非阻塞调用的传统方法是使用时间处理器,程序员为任务完成之后要出现的动作注册一个处理器。

当然,如果下一个动作也是异步的,在它之后的下一个动作会在一个不同的事件处理器中。

尽管程序员会认为“先做步骤1,再做步骤2,再完成步骤3”,但实际上程序逻辑分散到不同的处理器中。

如果必须增加错误处理,情况会更糟糕。假设步骤2是“用户登录”。可能需要重复这个步骤,因为用户输入凭据时可能出错。要尝试在一组事件处理器中实现这样一个控制流,或者想要理解所实现的这样一组处理器,会很有困难。

JavaSE8的CompletableFuture类提供了一种候选方案。与事件处理器不同,“可完成future”可以“组合(composed)”

例如,假如我们希望从一个Web页面提取所有链接来建立一个网络爬虫。假设方法

public void CompletableFuture<String> readPage(URL url);

Web页面可用时这会生成这个页面的文本。

方法

public static List<URL> getLinks(String page);

生成一个HTML页面中的URL,可以调度当页面可用时再调用这个方法

CompletableFuture<String> contents = readPage(url);
CompletableFuture<List<URL>> links = contents.thenApply(Parser::getLinks);

thenApply方法不会阻塞。它会返回一个future。第一个future完成时,其结果会提供给getLinks,这个方法的返回值就是最终的结果。

利用可完成Future,可以指定你希望做什么,以及希望以什么顺序执行这些工作。当然,这不会立即发生,不过重要的是所有代码都在一处。

从概念上讲,CompletableFuture是一个简单API,不过有很多不同方法来组合可完成future。

下面处理单个future的方法。对于这里所示的每个方法,还有两个Async形式,不过这里没有给出,其后一种形式使用一个共享的ForkJoinPool,另一种形式有一个Executor参数。

表中用来简写记法表示复杂的函数式接口,这里会把Function<? super T,U> 写成 T -> U。

方法

参数

描述

thenApply

T -> U

对结果应用一个函数

thenCompose

T -> CompletableFuture<U>

对结果调用函数并执行返回的future

handle

(T,Throwable) -> U

处理结果或错误

thenAccept

T -> void

类似于thenApply,不过结果为void

whenComplete

(T, Throwable) -> void

类似于handle,不过结果为void

thenRun

Runnable

执行Runnable,结果为void

组合多个future的方法

方法

参数

描述

thenCombine

CompletableFuture<U>,(T,U)->V

执行两个动作并用给定函数组合结果

thenAcceptBoth

CompletableFuture<U>,(T,U) -> void

与thenCombine类似,不过结果为void

runAfterBoth

CompletableFuture<?>,Runnable

两个都完成后执行runnable

applyToEither

CompletableFuture<T>,T-V

得到其中一个的结果时,传入给定的函数

acceptEither

CompletableFuture<T>,T->void

与applyToEither类似,不过结果为void

runAfterEither

CompletableFuture<?>,Runnable

其中一个完成后执行runnable

static allOf

CompletableFuture<?> ...

所有给定的future都完成后完成,结果为void

static anyOf

CompletableFuture<?> ...

任意给定的future完成后则完成,结果为void

前3个方法并行运行一个CompletableFuture<T>和一个CompletableFuture<U>动作,并组合结果。

接下来的3个方法并行运行两个CompletableFuture<T>动。一旦其中一个动作完成,就传递它的结果,并忽略另一个结果。

最后的静态allOf和anyOf方法取一组可完成future(数目可变),并生成一个CompletableFuture<Void>,它会在所有这些future都完成时或其中任意一个future完成时结束。不会传递任何结果。

理论上讲,这一节介绍的方法接受CompletionStage类型的参数,而不是CompletableFuture。这个接口有几乎40个抽象方法,只由CompletableFuture实现。提供这个接口是为了让第三方框架可以实现这个接口。

同步器

Java.util.concurrent包包含了几个能帮助人们管理相互合作的线程集的类。

这些机制具有为线程之间的共用集结点模式(common rendezvous patterns)提供“预置功能”(canned functionality)。

如果有一个互相合作的线程集满足这些行为模式之一,那么应该直接重用合适的库类而不要试图提供手工的锁与条件的集合。

它能做什么

说明

CycliBarrier

允许线程等待直到其中预定数目的线程到达一个公共障栅(barrier),然后可以选择执行一个处理障栅的动作

当大量的线程需要在它们的结果可用之前完成时

Phaser

类似于循环障栅,不过有一个可变的计数

Java SE7引入

CountDownLatch

允许线程集等待直到计数器减为0

当一个或多个线程需要等待直到指定数目的事件发生。

Exchanger

允许两个线程在要交换的对象准备好时交换对象

当两个线程工作在同一个数据结构的两个实例上的时候,一个向实例添加数据而另一个从实例清楚数据。

Semaphore

允许线程集等待直到被允许继续运行为止。

限制访问资源的线程总数。如果许可数是1,常常阻塞线程直到另一个线程给出许可为止

SynchronousQueue

允许一个线程把对象交给另一个线程

在没有显示同步的情况下,当两个线程准备好将一个对象从一个线程传递到另一个时。

信号量

概念上讲,一个信号量管理许多的许可证(permit)。

为了通过信号量,线程通过调用acquire请求许可。其实没有实际的许可对象,信号量仅维护一个计数。

许可的数目是固定的由此限制了通过的线程数量。

其他线程可以通过调用release释放许可

而且,许可不是必须由获取它的线程释放。事实上,任何线程都可以释放任意数目的许可,这可能会增加许可数目以至于超出初始数目。

倒计时门栓

一个倒计时门栓(CountDownLatch)让一个线程集等待直到计数变成0。倒计时门栓是一次性的。一旦计数为0,就不能再重用了。

一个有用的特例是计数值为1的门栓。实现一个只能通过一次的门。线程在门外等候直到另一个线程将计数器值置位0 。

举例,假定一个线程集需要一些初始的数据来完成工作。工作器线程被启动并在门外等候。另一个线程准备数据。当数据准备好的时候,调用countDown,所有工作器线程就可以继续运行了。

然后,可以使用第二个门栓检查什么时候所有工作器线程完成工作。用线程数初始化门栓。每个工作器线程在结束前将门栓计数减1.另一个获取工作结果的线程在门外等待,一旦所有工作器线程终止该线程继续工作。

障栅

CycliBarrier类实现了一个集结点(rendeavous)称为障栅(barrier)。

考虑大量线程运行在一次计算的不同部分的情形。当所有部分都准备好时,需要把结果组合在一起。当一个线程完成了它的那部分任务后,我们让它们运行到障栅处。一旦所有的线程都到达了这个障栅,障栅都撤销,线程就可以继续运行。

细节实现。构建一个障栅,并给出参与的线程数

CyclicBarier barrier = new CycliBarrier(nthreads);

每一个线程做一些工作,完成后在障栅上调用await

public void run(){
    dowork();
    barrier.await();
    ...
}

await方法有一个可选的超时参数: barrier.await(100, TimeUnit.MILLISECONDS);

如果任何一个在障栅上等待的线程离开了障栅,那么障栅就被破坏了(线程可能离开是因为它调用await设置了超时,或者因为它被中断了)。 在这种情况下,所有其他线程的await方法抛出BrokenBarrierException异常。那些已经在等待的线程立即终止await的调用。

可以提供一个可选的障栅动作(barrier action),当所有线程到达障栅的时候会执行这一动作

Runnable barrierAction = ...;
CyclicBarrier barrier = new CyclicBarrier(nthreads, barrierAction);

该动作可以收集哪些单个线程的运行结果。

障栅被称为是循环的(cycli),因为可以在等待所有线程被释放后被重用。区别与ConutDownLatch

Phaser类增加了更大的灵活性,允许改变不同阶段中参与线程的个数。

交换器

当两个线程在同一个数据缓冲区的两个实例上工作的时候,就可以使用交换器(Exchanger)。典型的情况是,一个线程向缓冲区填入数据,另一个线程消耗这些数据。当它们都完成之后,相互交换缓冲区。

同步队列

同步队列是一种将生产者与消费者线程配对的机制。

当一个线程调用SynchronousQueue的put方法时,它会阻塞直到另一个线程调用take方法为止,反之亦然。

与Exchanger的情况不同,数据仅仅沿一个方向传递,从生产者到消费者。

即使SynchronousQueue类实现了BlockingQueue接口,概念上讲,它依然不是一个队列。它没有包含任何元素,它的size方法总是返回0.