zl程序教程

您现在的位置是:首页 >  后端

当前栏目

六万字【Java高并发编程入门】第十九篇:并发编程入门总览包教包会【值得收藏】

JAVA并发编程入门 收藏 值得 总览
2023-09-27 14:19:46 时间

 

您好,我是码农飞哥,感谢您阅读本文,欢迎一键三连哦阅读本文你将收获:1. 如何进行并发编程以及如何处理线程安全问题;2.了解各种并发容器的数据结构。
专栏地址

👉  点击我跳转末尾👈   获取粉丝专属 《Java高并发编程入门》源码,脱单秘籍,以及获取博主的联系方式。

为什么写这篇文章

本专栏最早更新于2020年1月份,正是那个被疫情束缚住手脚的日子里。当时看着发小们一个个结婚带孩子,而我只能一个人默默的在家肝文写博客。

本专栏起名【Java高并发编程入门】高并发编程是一个非常大的专题,涉及软件和硬件两个方面,要想熟练的掌握高并发编程,需要很系统化的学习并且最重要的是要经历高并发项目的实战才能有很深的理解。

为了帮助大家更好的了解和掌握高并发编程的必备的基础知识。故在此我写下了本文。 ​

目录

为什么写这篇文章

总览

1. 线程安全问题产生的原因

1.1. 可见性

1.2.原子性

1.3.有序性

2. Java内存模型

2.1.程序次序规则

2.2.volatile

2.3.管程(synchronized)

2.4.传递性

2.5.线程启动规则

2.6.线程终止规则

2.7.线程中断规则

2.8.对象终结规则

2.9.final

3.synchronized

3.1.synchronized的运用

3.2.锁与受保护资源的关系

4. lock锁

4.1. lock的类图

4.2. lock 与synchronized 的区别

4.3.Lock的实现原理简介

4.4. 第一个lock的实例

5.死锁的发生以及避免

5.1.死锁发生条件

5.2. 如何避免死锁

5.3. 死锁发生后如何处理

6.线程间通信

6.1. wait()与sleep() 的相同点与不同点

7. ConcurrentHashMap分析

7.1. 源码分析

7.2. ConcurrentHashMap定义的三个原子操作

7.3. 初始化initTable方法

7.4.sizeCtl 控制符标识

7.5.put方法

7.6. put方法流程图

8.线程池的使用

8.1. 为啥要用线程池呢?

8.2.线程池的优点

8.3. 创建线程的几种方式

8.4. ThreadPoolExecutor类

8.5. 创建线程池的正确姿势是啥呢?

8.6.线程池的执行

8.7. 线程池的配置

8.8. 如何优雅的关闭线程池?

9.线程池的原理

9.1. ThreadPoolExecutor类的常量

9.2. execute方法

9.3. addWorker的方法

10. ThreadLocal

10.1. ThreadLocal的定义

10.2. ThreadLocal的应用场景

10.3. ThreadLocal的demo

10.4. TheadLocal的源码解析

10.5. ThreadLocal的内存泄露

11. 粉丝专属福利


总览

下图1的思维导图展示了本专栏的目录大纲,按照高并发的学习梯度来编写高并发编程所涉及到的相关知识点。 图1​ 这里简单介绍一下思维导图的脉络架构:

  1. 首先介绍导致线程安全的三个问题:分别是:原子性问题,可见性性问题以及有序性问题。
  2. 介绍Java内存模型以及Happen-Before规则,该规则是处理线程线程安全的指导性原则
  3. 介绍各种原子类使用
  4. 介绍如何通过synchronized和lock来处理原子性问题
  5. 介绍死锁发生的条件以及如何避免死锁
  6. 介绍线程间的通信,重点是wait,notify,notifyAll这几个关键字
  7. 介绍各种并发容器的源代码:重点需要掌握ConcurrentHashMap的原理
  8. 介绍线程池的使用以及原理
  9. 介绍ThreadLocal的使用。 下面就让我们来展开说说!!!!

1. 线程安全问题产生的原因

并发编程世界里,由于线程切换导致的原子性问题,CPU缓存导致的可见性问题,以及编译器重排序导致的有序性问题是并发编程线程安全问题的根源。

1.1. 可见性

可见性:一个线程对共享变量的修改,另外一个线程能够立刻看到的特性。 共享变量指的是存放在堆内存,由所有线程所共享的变量。比如:实例变量,静态变量。 如图2所示: 图2​ 共享变量V可以由线程A和线程B同时操作,线程A和B首先从各自的CPU缓存或者寄存器中读取数值,然后由CPU的寄存器写入内存中。

public class VisibleProblemTest {
    private static Boolean aBoolean = false;
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            int i = 0;
            System.out.println("VisibleProblemTest.aBoolean====" + VisibleProblemTest.aBoolean);
            while (VisibleProblemTest.aBoolean != true) {
                i++;
            }
            System.out.println("i====" + i);
        }).start();
        //主线程休息2秒再操作共享变量,便于人眼可以直接看到
        Thread.sleep(2000);
        //修改共享变量值
        VisibleProblemTest.aBoolean = true;
        System.out.println("变量值修改");
        System.out.println("程序结束");
    }
}

运行结果是: 在这里插入图片描述

预期的结果是主线程将共享变量aBoolean的值改成true之后,子线程则会跳出循环。然而,我们看到子线程并没有跳出循环,这就是因为主线程对aBoolean值的修改对子线程没有可见。

1.2.原子性

原子性:一个或者多个操作在CPU执行过程中不被中断的特性。

一般认为一条CPU指令是具有原子性的,但是由于一条高级语句在CPU中可能会分成若干条指令来执行,每条指令执行完之后都有可能会发生线程切换。故线程切换造成的原子性问题。 例如: count=count+1 共有三个指令:

  • 指令一: 将count值从内存加载到到CPU的寄存器中
  • 指令二: 在寄存器中+1操作
  • 指令三 :将新值写入到内存中,这里的内存可能是CPU的缓存(栈),也可能是主内存(堆)中。 操作系统做任务切换可以发生在任何一条CPU指令执行完,是CPU指令执行完。
public class AtomicTest {
    private  int atest = 0;


    public void countTest() {
        for (int i = 0; i < 10000; i++) {
            atest = atest + 1;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        final AtomicTest atomicTest = new AtomicTest();
        Thread threadA = new Thread(() -> atomicTest.countTest());
        Thread threadB = new Thread(() -> atomicTest.countTest());
        threadA.start();
        threadB.start();
        threadA.join();
        threadB.join();
        System.out.println("*******获得到的atest值为=" + atomicTest.atest);
    }
}

每次运行该程序得到的结果都不一定是期望的值20000,而是在10000-20000之间的随机数。这就出现了线程安全问题。 原因分析:假设线程A和线程B同时执行 atest = atest + 1;, 线程A 读取到的原值是0,执行+1操作之后,得到新值1,同样的,线程B也是读取到的原值0,然后执行+1操作,得到新值1。这样就永远得不到结果2。 类推的话,循环执行10000次也是同理,线程A执行+1操作时不能及时获得线程B已经写入的值,故导致值永远不可能达到20000。

1.3.有序性

编译器重排序导致的有序性问题: 例如:双重加锁中的:

public class SingletonDemo {
    private static  SingletonDemo instance = null;

    private SingletonDemo(){

    }

    static SingletonDemo getSingletonDemo() {
        if (instance == null) {
            synchronized (SingletonDemo.class) {
                if (instance == null) {
                    instance = new SingletonDemo();    //6
                }
            }
        }
        return instance;
    }
}

这里会有有序性问题: 问题主要出在了 new SingletonDemo() 这一步 因为instance = new SingletonDemo();主要有三个指令

  1. 分配内存空间M
  2. 在内存M上初始化对象
  3. 然后M的地址赋值给instance变量

    正常顺序是1-2-3 但是CPU重排序之后执行顺序可能变成了 1-3-2 步骤如下:

  4. A首先进入synchronized,由于instance为null,所以它执行instance = new SingletonDemo();
  5. 然后线程 A执行1->JVM先画出了一些分配给SingletonDemo实例的空白内存,并赋值给instance
  6. 在还没有进行第三步(将instance引用指向内存空间)的时候,恰好发生了线程切换 ,切换到了线程B上,
  7. 如果此时线程B也执行getSingletonDemo()方法,那么线程B 在执行第一个判断是会发现instance!=null,所以直接返回了instance,而此时的instance是没有初始化的。

    详情信息可以参考第一篇(1):原子性,可见性,有序性

2. Java内存模型

Java 内存模型是一个很复杂的规范,本质上可以理解为:Java内存模型规范了JVM如何提供按需禁用缓存和编译优化(本质上是指令重排序)的方法。具体来说这些方法包括:通过volatile,synchronized,final 还有Happen-Before规则来控制。它主要用来解决可见性问题和有序性问题。下面就详细介绍下。 Happen-Before 的本质上表示的意思是,先行发生于,本质上表示的意思是 前面的操作产生的结果对后续操作可见。例如:A先行发生于B,那么执行B操作时A操作产生的数据,B操作一定能看到。Happen-Before规则共有七条规则。 | 规则 | 规则描述 | |--|--| | 顺序性规则 | 流程前的执行结果后流程后的代码可见 | | volatile规则 | volatile 修饰的变量,读操作先行发生于写操作 | | 管程规则 | 管程的解锁操作先行发生于加锁操作 | | 传递性规则 | A先行发生于B,B先行发生于C,则A先行发生于C | | 线程的start规则| 线程的start方法先行发生于该线程内的所有操作 | | 线程的join规则 | 线程内所有操作都先行发生于join操作 | |线程的interrupted规则 | 线程的interrupted先行发生于检测中断发生的方法 |

2.1.程序次序规则

程序次序规则:程序前面对某个变量的修改对后续操作一定是可见的 例如:

 public void writer() {
        a = 43;
        b = a;
    }

执行b=a时,a=43 一定是可见的。

2.2.volatile

volatile 关键字修饰的变量,表示这个变量的值变更对其他线程是可见的。例如: volatile int y=1,它表达的意思是对这个变量的读写,不能使用CPU缓存,必须从内存中直接读写。 如下程序所示:

   private volatile  boolean flag = false;
    private   int a = 0;

    public void writer() {
        a = 43;
        flag = true;
    }
    public void read() {
        if (flag) {
            System.out.println("读取到的a值是="+a);
        }
    }

在Java 1.5之后的版本中,读取到的a 值永远都是43。 故:volatile变量的规则是:volatile变量的写操作对于这个volatile变得的读操作是可见的

2.3.管程(synchronized)

管程锁定规则:管程中对一个锁的解锁先行发生于对这个锁的加锁操作 我们来看一个用synchronized修饰的方法。

   synchronized int getValue() {
        return value;
    }

如上程序,假设有线程A和线程B, 线程A 首先获取到锁,进入getValue() 方法,那么只有当线程A释放锁之后线程B才有可能获取到锁。

2.4.传递性

A先行发生于B,B先行发生于C,那么A必然先行发生于C

    public void transfer() {
        c = a + b;  //1
        a = 100;  //2
        c = a - b;//3
    }

如上程序 第一步一定优先发生于第二步,第二步一定优先于第三步。那么第一步一定先于第三步发生。

2.5.线程启动规则

线程启动规则:线程的start()方法先行发生于此线程的每一个操作 这个很好理解:就是线程只有调用了start()方法变成了 可运行状态,才会执行线程需要处理的任务。 例如:

  new Thread(new Runnable() {
            public void run() {
                longTest.countTest();
            }
        }).start();

线程里需要执行的longTest.countTest();方法一定是在start() 方法执行之后才会执行。

2.6.线程终止规则

线程终止规则:线程的join()规则,线程中所有的操作都先行发生于此线程的终止操作 也就是说调用线程的join()方法时,线程所执行的任务产生的结果一定是对 join()方法可见的。 例如:

 final LongTest longTest = new LongTest();
        Thread threadA = new Thread(new Runnable() {
            public void run() {
                longTest.countTest();
            }
        });
        Thread threadB = new Thread(new Runnable() {
            public void run() {
                longTest.countTest();
            }
        });
        threadA.start(); //1
        threadA.join();  //2
        threadB.start(); //3
        threadB.join(); //4

如上程序所示:第二步调用threadA.join();方法之后,线程A 计算所得到的值,对第三步一定是可见的。因为调用 threadA.join();之后线程A就被终止了。 在程序里我们可以通过 Thread.isAlive() 的返回值检测线程是否终止执行。

2.7.线程中断规则

线程中断规则:对线程interrupt()方法的调用先行发生于被中断的线程代码检测到中断事件的发生 也就是说:对线程的中断操作,对后续操作检测中断事件的操作一定是可见的。

    public static void main(String[] args) throws InterruptedException {
        Thread threadA = new Thread("线程A");
        threadA.start();
        threadA.interrupt();
        System.out.println("线程A是否终止"+threadA.isInterrupted());
    }

如上程序:输出的结果永远都是 线程A是否终止true。这样正验证了我们前面提到的规则。

2.8.对象终结规则

对象终结规则:一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始 在Java中finalize()方法我们用的比较少。在此就不详细展开说明。

2.9.final

用final 修饰的变量的表示初始化完成之后就不能修改。

  1. final修饰的基本类型的变量,则其数值一旦在初始化之后便不能更改。
  2. final修饰的引用类型的变量, 则在对其初始化之后便不能再让其指向另一个对象。
  3. final 修饰的类不能被继承,类中的方法不能被重写。
  4. final 修饰的方法,不能被重写。 在多线程环境下可以利用其不变性 第一篇(2):Java内存模型 第二篇:原子类的说明与使用

    3.synchronized

    Java内存模型解决了 可见性问题和有序性问题,但是针对原子性问题还不能解决。那么原子性问题该如何解决呢?答案就是通过锁机制。这里的锁有两种:一种是JVM自带的synchronized锁,一种是lock锁。首先,我们来看看synchronized锁。 Java自带的锁工具是synchronized,用synchronized修饰的代码就相当于上了锁。上了锁就需要互斥执行。即:同一时刻只能有一个线程执行。 我们将一段需要互斥执行的代码称之为临界区。 例如:
   synchronized (this) {
            this.a = 100;
            System.out.println("*******test2执行");
        }

如上程序:代码System.out.println("*******test2执行");synchronized修饰,故此代码被称之为临界区。而a这是受保护的资源。其关系图如下: 在这里插入图片描述

3.1.synchronized的运用

synchronized 可以修改方法,修饰代码块。使用如下:

class SynchronizedTest {  
     public synchronized  void test1() {
        System.out.println("*******test1执行");
    }
    public void test2() {
        synchronized (this) {
            System.out.println("*******test2执行");
        }
    }
    public synchronized static void test3() {
        System.out.println("*******test3执行");
    }
}
  1. 当synchronized修饰实例方法时,锁定的就是当前实例对象this。如方法test1所示。
  2. 当synchronized修饰代码块块时,锁定的就是括号里的对象。如方法test2所示。
  3. 当synchronized修饰静态方法时,锁定的就是当前类的class对象,如方法test3所示。

3.2.锁与受保护资源的关系

在现实生活中,我们可以通过通过一把锁保护多个东西,例如,用一把大门的锁,保护你家里面的所有东西。同样的,你一个给一个东西加上两把锁。但是,在并发编程中,同一个资源只能由一把锁保护,一把锁可以保护多个资源。故,并发编程中,锁与受保护资源的关系是1:N。例如:

public class SynchronizedTest3 {
    int a = 0;
    int b = 0;
    static int c = 0;

    /**
     * 锁定的是this对象,保护了,a,b两个资源
     */
    synchronized void setValue() {
        a = 100;
        b = 20;
    }

    /**
     * 锁定的是SynchronizedTest3的class对象,
     * 保护了资源c
     * @return
     */
    synchronized static int getValue() {
        return SynchronizedTest3.c;
    }

}

如上程序,synchronized修饰的setValue方法中有a,b两个资源,因为这两个资源都所属与this对象,所以都可以受到synchronized的保护。 而synchronized修饰的getValue方法中只有资源c,而这个c是一个静态变量,属于SynchronizedTest3类,所以它也可以受到保护。 详细信息可以查看 第三篇(1) : synchonized解决原子性问题

4. lock锁

4.1. lock的类图

在这里插入图片描述​ 从该类图我们可以看出ReentrentLock实现了Lock接口,其对受保护资源的读写都要加锁。同时加锁之后需要手动的释放锁。由Lock产生Condition实例。 另外一个接口是ReentrentReadWriteLock,其内部维护了一个读锁ReadLock和一个写锁WriteLock。其中ReadLock是并发的,而WriteLock是同步的,同一时间只能有一个线程持有WriteLock。

4.2. lock 与synchronized 的区别

上面说可以通过lock来破坏不可抢占的条件,那么lock为啥可以支持呢?因为lock锁有如下三个特性:

//能够响应中断
 void lockInterruptibly() throws InterruptedException;
 //能够响应超时
  boolean tryLock(long time, TimeUnit unit) throws InterruptedException
  //能够非阻塞的获取锁
  boolean tryLock()
  1. 能够响应中断 synchronized的问题是,持有锁A后,如果尝试获取锁B失败,那么线程就进入 阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。如果阻塞状态的 线程能够响应中断信号,也就是说当我们给阻塞线程发送中断信号的时候,能够唤醒它, 那么它就有机会释放曾经持有的锁A。

  2. 支持超时 如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误, 那么这个线程也有机会释放曾经持有的锁,

  3. 非阻塞地获取锁 通过调用tryLock()方法,如果返回true,则表示获取到锁,如果返回false,则表示获取锁失败,不过其并不会进入阻塞状态,而是直接返回。

4.3.Lock的实现原理简介

典型的lock的使用如下所示:

public class LockTest2 {
    final Lock lock = new ReentrantLock();
    int value = 0;

    public void addOne() {
        lock.lock();
        try {
            value + = 1;
        } finally {
            lock.unlock();
        }

    }
}

Java SDK里面的ReentrantLock, 内部持有一个volatile的成员变量state, 获取锁的时候,会读写state的值,解锁的时候也会读写state的值, 也就是说在执行value+=1之前,会读写一次volatile变量state,在执行value+=1之后 又读写了一次volatile变量state。 根据Happens-Before规则:

  1. 顺序性规则:对于线程T1 value+=1 Happens-Before 释放锁的操作 unlock()
  2. valatile变量规则:由于线程T1获取锁之后state为1,之后释放会后才会变成0,T2回去锁必须先读取state,所以线程T1的unlock()操作Happens-Before线程T2的lock()操作。
  3. 根据传递性:所以线程T1的value+=1 Happens-Before 线程T2的lock()操作

4.4. 第一个lock的实例

public class LockTest {

    public static void main(String[] args) {
        MyThreadService myThreadService = new MyThreadService();
        for (int i = 0; i < 5; i++) {
            MyThread myThread = new MyThread("线程" + i, myThreadService);
            myThread.start();
        }
    }

    static class MyThread extends Thread {
        MyThreadService myThreadService = null;

        public MyThread(String name, MyThreadService myThreadService) {
            super(name);
            this.myThreadService = myThreadService;
        }

        @Override
        public void run() {
            myThreadService.printThread();
        }
    }
}


public class MyThreadService {
    final Lock lock = new ReentrantLock();

    public void printThread() {
        lock.lock();
        try {
            for (int i = 0; i < 5; i++) {
                System.out.println("*******" + Thread.currentThread().getName() + (" " + i));
            }
        } finally {
            lock.unlock();
        }
    }

}

运行结果是: 在这里插入图片描述​ 从如上运行结果可以看出,每个线程顺序执行,lock 起到了锁的作用,被锁住的代码块同一时刻只有一个线程在执行。 详细信息可以查看 第三篇(2):lock的学习与使用

5.死锁的发生以及避免

5.1.死锁发生条件

  1. 互斥,共享资源A和B只能被一个线程占用,就是本例中的,一根筷子同一时刻只能被一个哲学家获得
  2. 占有且等待:线程T1持有共享资源A,在等待共享资源B时,不释放占用的资源,在本例中就是:哲学家1获得他左边的筷子,等待获得他右边的筷子,即使没有得到也不会放回其获得的筷子。
  3. 不可抢占:其他线程不能强行占用线程T1占用的资源,在本例中就是:每个哲学家获得的筷子不能被其他哲学家抢走。
  4. 循环等待:线程T1等待线程T2占用的资源,线程T2等待线程T1占用的资源。在本例中:所有哲学家围坐一桌,已经形成了一个申请资源的环。

5.2. 如何避免死锁

前面我们说了,死锁的发生条件是必须同时满足上述四个条件。那么避免死锁的方式就是破坏掉其中的一个条件就可以了。

对于占用且等待

对于占用且等待的情况,我们只需要一次性申请所有的资源,只有申请到了才会往下面走。对于这种情况,我们需要一个调度者,由它来统一申请资源。调度者必须是单例的,由他给哲学家分配筷子。

public class Allocator {
    private List<Object> applyList = new ArrayList<Object>();

    private final static Allocator allocator = new Allocator();

    private Allocator() {

    }

    /**
     * 只能由一个人完成,所以是单例模式
     * @return
     */
    public static Allocator getAllocator() {
        return allocator;
    }

    /**
     * 申请资源
     */
    synchronized boolean applyResource(Object from, Object to) {
        if (applyList.contains(from) ||
                applyList.contains(to)) {
            return false;
        }
        applyList.add(from);
        applyList.add(to);
        return true;
    }

    /**
     * 释放资源
     */
    synchronized void free(Object from, Object to) {
        applyList.remove(from);
        applyList.remove(to);
    }

调度者会一直申请拿到两个资源,如果能拿到这执行后续流程,拿不到的话则一直循环申请。

public void eat(Account2 target) {
        //没有申请到锁就一直循环下去,直到成功
        while (!Allocator.getAllocator().applyResource(this, target)) {
            return;
        }
        try {
            //左边
            synchronized (this) {
                //右边
                synchronized (target) {

                }
            }
        } finally {
            //释放已经申请的资源
            Allocator.getAllocator().free(this, target);
        }
    }

对于不可抢占资源

对于不可抢占资源,占有部分资源的线程进一步申请其他资源,如果申请不到则主动释放它占用的资源。在后面我们会运用lock来实现。给锁设定超时时间。如果在超时未获得需要的资源,则释放其所占资源。

class Philosopher extends Thread{
    private ReentrantLock left,right;

    public Philosopher(ReentrantLock left, ReentrantLock right) {
        super();
        this.left = left;
        this.right = right;
    }
    public void run(){
        try {
            while(true){
                Thread.sleep(1000);//思考一段时间
                left.lock();
                try{
                    if(right.tryLock(1000,TimeUnit.MILLISECONDS)){
                        try{
                            Thread.sleep(1000);//进餐一段时间
                        }finally {
                            right.unlock();
                        }
                    }else{
                        //没有获取到右手的筷子,放弃并继续思考
                    }
                }finally {
                    left.unlock();
                }
            }
        } catch (InterruptedException e) {
        }
    }
}

如上程序,我们将right锁的超时时间设置为1秒,如果不能获取到,右手的筷子,则放弃吃面并继续思考。

对于循环等待

对于循环等待,我们可以按序申请资源来预防,所谓的按序申请,是指资源是有线性顺序的,申请的时候可以先申请序号小的,再申请序号大的。 我们在Chopsticks中添加一个id 字段,作为资源的序号。然后在申请资源时按照序号从小到大开始申请。

   static class Chopsticks {
        private int id;

        public Chopsticks(int id) {
            this.id = id;
        }

        public int getId() {
            return id;
        }
    }

     public Philosopher(Chopsticks left, Chopsticks right) {
            if (left.getId() > right.getId()) {
                this.left = right;
                this.right = left;
            } else {
                this.left = left;
                this.right = right;
            }
        }

5.3. 死锁发生后如何处理

当检测到死锁时,一个可行的做法是释放所有锁,回退,并且等待一段随机时间后重试。这个和简单的加锁超时类似,不一样的是只有死锁已经发生了才回退 。而不会是因为加锁的请求超时了,虽然有回退和等待,但是如果有大量线程竞争同一批锁,它们还是会重复地死锁。

一个更好的方案是给这些线程设置优先级,让一个(或几个)线程回退,剩下的线程就像没发生死锁一样继续保持着它们需要的锁。可以再死锁发生的时候设置随机的优先级。

详细信息可以查看 第三篇(3):死锁的发生与避免

6.线程间通信

线程间的通信主要通过wait() ,notify(), notifyAll()这三个方法。 当线程被调用wait()方法之后会进入等待池,即当前线程的状态变成了阻塞状态。 当调用notify() 方法时会唤醒等待池中的某个线程,而notifyAll()方法则会唤醒等待线程池中的所有线程,具体哪个线程可以获得执行权还要看CPU切换的情况。

6.1. wait()与sleep() 的相同点与不同点

相同点:都会让当前线程挂起一段时间,让渡CPU的执行时间,等待再次调度 不同点:

  1. wait(),notify(),notifyAll()一定是在synchronized{}内部调用,等待和通知的对象必须要对应。而sleep可以再任何地方调用
  2. wait 会释放锁对象的“锁标志”,当调用某一对象的wait()方法后,会使当前线程暂停执行,并将当前线程放入对象等待池中。知道调用notifyAll()方法,而sleep则不会释放,也就是说在休眠期间,其他线程仍然不能访问共享数据。
  3. wait可以被唤醒,sleep的只能等其睡眠结束
  4. wait()是在Object 类里,而sleep是在Thread 里的。 详细信息可以查看 第三篇(4):线程间通信

    7. ConcurrentHashMap分析

    HashMap在多线程环境下会有线程安全问题,虽然也有线程安全容器Hashtable,但是其通过synchronized修饰方法,通过独占锁的方式锁定类对象,效率不高,所以Java 又提供了线程安全容器ConcurrentHashMap,与HashMap的底层的数据结构相同,ConcurrentHashMap也是采用的“散列表+链表+红黑树”,不过红黑树中存储的不是TreeNode,而是TreeBin。在JDK1.8中 ConcurrentHashMap 大量采用CAS算法,unsafe.compareAndSwapInt(this, valueOffset, expect, update); CAS(compareAndSwap)比较并交换,就是比较valueOffset位置上的值是否等于expect,如果等于的话则返回true,并更新值。(PS:在JDK1.7中采用的是分段锁的方式)。在扩容,设值的过程中大量采用CAS无锁不阻塞的方式支持并发操作,但是是不是就不需要加锁了呢?答案是否定的。

7.1. 源码分析

首先,我们来看看ConcurrentHashMap中三个重要的原子操作。这三个方法的作用分别的 1.获得在i位置上的Node节点 2.利用CAS算法设置i位置上的Node节点 3.设置节点位置的值,仅在上锁区被调用

7.2. ConcurrentHashMap定义的三个原子操作

//获得在i位置上的Node节点
  static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    //利用CAS算法设置i位置上的Node节点,之所以能实现并发是因为他指定了原来这个节点的值是多少。
    //在CAS算法中,会比较内存中的值与指定的值是是否相等,如果相等则更新,并返回true,如果不相等则不更新,直接返回false。
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    //设置节点位置的值,仅在上锁区被调用
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

接下来我们来看看容器初始化的过程,调用构造函数只是设置了相关的参数,并没有实际的创建容器,分配内存,initTable 是在调用put方法时才会调用的,也就是说只有设置了第一个元素,才会真正的初始化容器。

7.3. 初始化initTable方法

 private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
        //sizeCtl 表示有其他线程正在进行初始化操作,把线程挂起,对于table的初始化操作,只能有一个线程进行
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  //利用CAS方法把sizeCtl的值设置为-1,表示本线程正在进行初始化
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);  //相当于0.75*n  设置一个扩容的阈值
                    }
                } finally {
                    //将sc赋值给sizeCtl
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

如上代码:我们初始化容器主要就是创建一个大小为n的Node数组,可以看出与HashMap中初始化容器最大的不同就是加了并发控制,通过共享变量sizeCtl来控制的,如果sizeCtl小于0,表示表示有其他线程正在进行初始化操作,把线程挂起,对于table的初始化操作,只能有一个线程进行。否则,通过CAS的方式将sizeCtl的值设置为-1,表示本线程正在进行初始化。最后在finally代码块中将sizeCtl设置为0.75*n

7.4.sizeCtl 控制符标识

这里我们特别需要注意的一个变量是sizeCtl。

    private transient volatile int sizeCtl; //控制标识符

sizeCtl是一个用于同步多个线程的共享变量,是控制符标识符,扩容控制标识符,负数表示正在进行初始化或者扩容,操作,-1表示正在初始化,-N表示有N-1个线程正在进行扩容操作。 正数或者0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,类似于扩容阈值,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。实际容量>=sizeCtl,则扩容。

7.5.put方法

put 方法是ConcurrentHashMap的最核心方法,前面做的一系列铺垫也是为了解释put方法做准备。与HashMap类似,put操作采用CAS+synchronized实现并发插入或更新操作。其主要的过程也是如下几步:

  1. 计算hash值,然后根据hash值计算出key的存储位置i。
  2. 如果位置i没有值,则通过CAS算法直接放进去,不需要加锁
  3. 如果table[i]的节点的hash等于MOVED,则检测到正在扩容,则帮助其扩容
  4. 如果不等于MOVED,则给链表的头结点上锁,然后遍历链表,通过尾插法的方式将元素插入到链表的尾部。

    说明:

  5. 获取index位置上的元素时通过 U.getObjectVolatile 来获取的,而不是直接通过table[index] 这是为什么呢? 在java内存模型中,我们知道每个线程都有一个工作内存,里面存储的是table的副本,虽然table是volatile修饰的,但是不能保证线程每次都拿到table的最新元素。 U.getObjectVolatile 可以直接获取指定内存的数据,保证了每次取到的数据都是最新的。
  6. ConcurrentHashMap和HashMap的区别还有一点,就是HashMap允许一个key和value为null,而ConcurrentHashMap则不允许key和value为null,如果发现key或者value为null,则会抛出NPE,这一点需要特别注意,而这也说明,在ConcurrentHashMap中可以通过使用get操作来测试是否具有某个记录,因为只要get方法返回null,就说明table中必然不存在一个记录和当前查询的匹配,而在HashMap中,get操作返回null有可能是我们查询的记录的value就是null,所以不能使用get方法来测试某个记录是否存在于table中。

    7.6. put方法流程图

    在这里插入图片描述​ 详细信息可以查看。 第六篇:走近ConcurrentHashMap(基于JDK1.8)

    8.线程池的使用

    8.1. 为啥要用线程池呢?

    第一个问题来了,为啥要使用线程池呢?直接new一个线程它不香么?就像这样
new Thread(new Runnable() {
            public void run() {
                longTest.countTest();
            }
        }).start()

简单又快捷,其中run方法的作用是用来执行一个任务单元(也就是一段代码)。start方法的作用是用来创建一个新线程,同时设置好这个线程的上下文,比如这个线程的栈,线程的状态等一系列的信息。 这些信息处理好之后这个线程才可以被调度,一旦调度,就会执行run()方法。 但是在实际项目中是禁止这样做了,在阿里出的JAVA开发手册中就明确说了原因:

所以,直接new一个线程不香,原因主要在于创建大量相同的线程会大量的消耗系统内存,甚至会导致系统内存耗尽;同时,大量的线程会竞争CPU的调度,导致CPU过度切换。 在这里插入图片描述​ 接下来我们就看看香香的线程池的优点。

8.2.线程池的优点

  1. 减少在创建和销毁线程上所花费的时间和系统资源的开销
  2. 提高响应速度,当任务到达之后,任务可以不需要等到线程创建就能被立即执行
  3. 提高线程的可管理性,线程是稀缺资源,如果无限制的的创建,不仅会消耗系统资源,还会降低系统性能,使用线程池可以进行统一分配,调优和监控。

8.3. 创建线程的几种方式

  1. 创建一个缓存线程池
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

由于SynchronousQueue是一个无界的队列,当任务过多时,大量的任务堆积到队列里可能会发生OOM异常(Java内存溢出异常),同时线程池的最大线程数也没有限制,创建大量线程缺点前面也说了,综上所述不推荐使用这种方式创建线程池

  1. 创建固定容量的线程池
 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

这个线程池多了一个参数ThreadFactory,这个参数的作用可以自定义线程的名称如下:

 ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("测试-%s").build();

线程池的容量固定为传入值nThreads,任务的阻塞队列用的是LinkedBlockingQueue,没有指定队列的容量,所以队列的最大容量可达Integer.MAX_VALUE。当有大量请求时,可能造成任务的大量堆积,发生OOM异常(Java内存溢出异常)。综上所说实际项目中不推荐使用这种方式创建线程池

  1. 创建单个线程的线程池
 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

这个线程池中线程的容量只有1个,可用于一些特殊的场景下。 从上,我们知道Executors类中各种创建线程池的方法,其实内部都调用的是ThreadPoolExecutor的构造器,那么我们就来看看ThreadPoolExecutor类。 在这里插入图片描述

8.4. ThreadPoolExecutor类

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ThreadPoolExecutor类的构造方法参数比较多,下面分别介绍下各个参数:

  1. corePoolSize: 核心线程数的大小,就是线程池中始终保留的存活线程的数量,这个就相当于项目组的常驻组员。
  2. maximumPoolSize: 线程池中允许最大的线程数,当任务比较多时,可以适当增加到线程,只要总的线程数量不超过最大线程数。就相当于项目组常驻组员数+外援组员数<=最大的组员数
  3. keepAliveTime :空闲线程允许的最大存活时间,当一个线程空闲超过这段时间,就会被回收
  4. unit :存活时间的时间单位
  5. workQueue :保存任务的阻塞队列,所有需要线程执行的任务都会保存到这个队列中,当线程被CPU调度之后就会从队列中取出一个任务来执行。
  6. threadFactory: 线程工厂用来创建线程,通过这个参数可以自定义如何创建线程,例如:你可以给线程指定一个有意义的名字
  7. handler: 拒绝策略,针对当队列满了是 新来任务的处理方式,如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接受,至于拒绝的策略,可以通过handler这个参数来指定,ThreadPoolExector已经提供了以下4中策略。
    • CallerRunsPolicy: 提交任务的线程自己去执行该任务
    • AbortPolicy: 默认的拒绝策略,会抛出 RejectedExecutionException。
    • DiscardPolicy: 直接丢弃任务,没有任何异常抛出
    • DiscardOldestPolicy: 丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。

8.5. 创建线程池的正确姿势是啥呢?

      ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("测试-%d").build();
        ExecutorService executorService = new ThreadPoolExecutor(10, 15, 1, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(10), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());

就像如上所示,创建的线程池,指定了核心线程数以及最大线程数,同时,指定了任务队列的容量,这个很重要,最后,给线程指定了一个有意义的名字。 这里的相关的参数的设置要根据不同的场景进行不同的设置:

  1. 高并发,任务执行时间短的场景 这种场景应该是CPU密集型的计算场景,可以将核心线程数设置为CPU数+1,减少线程上下文之间的切换
  2. 并发不高,任务执行时间长的场景
    • 假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以加大线程池中的线程数目,让CPU处理更多的业务。
    • 假如是业务时间长集中在CPU操作上,也就是CPU密集型的任务,这就只能跟(1)一样,线程池中的线程数设置的少一点,减少线程上下文的切换
  3. 高并发,任务执行时间长的场景 解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,然后增加服务器是第二步,再是考虑线程池的设置。

    8.6.线程池的执行

    se.execute(new Job);
    
    这样的方式提交一个任务到线程池,所以核心的逻辑就是execute()函数。

    8.7. 线程池的配置

    按照经验,我们首先可以分析线程池需要执行的任务是那种类型: 对于 IO 密集型任务:由于线程并不是一直在运行,所以可以尽可能的多配置线程,比如CPU个数*2 对于CPU密集型任务(大量复杂的运算)应当分配较少的线程,比如CPU个数相当的大小。

    8.8. 如何优雅的关闭线程池?

    关闭线程池无非就是两种方法shutdown()/shutdownNow()。 这两个方法有着重要的区别:
  4. shutdown()执行后停止接受新任务,会把队列的任务执行完毕。
  5. shutdownNow()也是停止接受新任务,但会中断所有的任务,将线程池的状态改成stop。 shutdownNow()更加的简单粗暴,可以根据实际场景来选用不同的方法。 详细信息可以查看。 第十六篇:面试必备的线程池知识-线程池的使用

    9.线程池的原理

    9.1. ThreadPoolExecutor类的常量

     //用来存放工作线程数量和线程池状态
     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
     private static final int COUNT_BITS = Integer.SIZE - 3;  //32-3=29
     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
     // runState is stored in the high-order bits
     //运行状态,可以执行任务
     private static final int RUNNING    = -1 << COUNT_BITS;
     //不能接受新任务,但是可以执行完正在执行任务
     private static final int SHUTDOWN   =  0 << COUNT_BITS;
     //不能接受新任务,也不能执行已有的任务
     private static final int STOP       =  1 << COUNT_BITS;
     //所有任务都终止,工作线程数归零
     private static final int TIDYING    =  2 << COUNT_BITS;
     //终止状态执行完成
     private static final int TERMINATED =  3 << COUNT_BITS;
    
     //获取线程池的状态
     private static int runStateOf(int c)     { return c & ~CAPACITY; }
     //获取工作线程的数量
     private static int workerCountOf(int c)  { return c & CAPACITY; }
     private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    ctl 变量主要是为了把工作线程数量和线程池状态放在一个整型变量存储而设置的一个原子类型的变量。在ctl中,低位的29位表示工作线程的数量,高位用来表示RUNNING,SHUTDOWN,STOP等线程池状态。上面定义的三个方法只是为了计算得到线程池的状态和工作线程的数量以及得到ctl。 下面是一段线程池的测试代码,定义线程池,并调用execute方法添加任务,并执行任务。

public class ExectorTest {
    public static void main(String[] args) {
       //给线程设置一个自定义名称
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("测试线程-%d").build();
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                3,
                6,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(5),
                threadFactory
//                , new ThreadPoolExecutor.CallerRunsPolicy()
        );
        for (int i=0;i<20;i++) {
            executorService.execute(()->{
               //模拟耗时的任务
                System.out.println(Thread.currentThread().getName()+" 开始执行任务");
                int j = 10000 * 10000;
                while (j >0) {
                    j--;
                }

                System.out.println(Thread.currentThread().getName()+" 执行结束");

            });
        }
    }
}

利用debug模式得到的调试栈如下:

在这里插入图片描述​ 提交任务execute方法是整个线程池的执行入口,下面我就从它开始分析。

9.2. execute方法

    public void execute(Runnable command) {
    //如果任务为空,则抛出NPE异常
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
         //获取线程池状态
        int c = ctl.get();
        //1.如果工作线程的数量小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            //调用addWorker增加一个新线程,并执行一个任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果线程池的状态是运行状态,并且任务加入到了工作队列成功
        if (isRunning(c) && workQueue.offer(command)) {
            //双重检查,再次检查线程池的状态。
            int recheck = ctl.get();
            //如果线程池的状态不是运行状态并且移除任务成功则调用拒绝策略
            if (! isRunning(recheck) && remove(command))
                //调用RejectedExecutionHandler.rejectedExecution()方法。根据不同的拒绝策略去处理
                 reject(command);
            //如果工作线程的数量为0,说明工作队列中可能有任务没有线程执行,此时则新建一个线程来执行任务,由于执行的是队列中已经堆积的任务,所以没有传入具体的任务。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果前面的新增work,放入队列都失败,则会继续新增worker,此时线程池中的工作线程数达到corePoolSize,阻塞队列任务已满,只能基于maximumPoolSize来继续增加work,如果还是失败
        else if (!addWorker(command, false))
           //如果还是失败,则调用RejectedExecutionHandler.rejectedExecution()方法。根据不同的拒绝策略去处理
            reject(command);
    }

从上代码中,我们可以总结出execute方法主要有如下三个流程

  1. 如果线程池中当前工作线程数小于核心线程数(corePoolSize),则创建一个新线程来执行传入的任务(执行这一步骤需要获取全局锁)
  2. 如果工作线程数大于等于核心线程数,并且线程池是运行状态,则将传入的任务加入到工作队列(BlockingQueue)中。
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(执行这一步骤需要获取全局锁)
  4. 如果创建新线程将使得当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。根据不同的拒绝策略去处理 运行的流程图如下: 在这里插入图片描述​ 从execute()方法可以看到新增线程并且执行任务核心逻辑在addWorker方法中。

    9.3. addWorker的方法

    首先第一段代码,这段代码有两个死循环,外层的死循环主要是检查线程池的状态,更新线程池的状态。内层的死循环,是检查工作线程的数量,并且通过CAS的方式在ctl中更新工作线程的数量。

    for (;;) {
             int c = ctl.get();
             int rs = runStateOf(c);
    
             //检查线程池的状态是否是运行状态,并且队列不为空
             if (rs >= SHUTDOWN &&
                 ! (rs == SHUTDOWN &&
                    firstTask == null &&
                    ! workQueue.isEmpty()))
                 return false;
    
             for (;;) {
                 //获取工作线程数
                 int wc = workerCountOf(c);
                 //工作线程数
                 if (wc >= CAPACITY ||
                     wc >= (core ? corePoolSize : maximumPoolSize))
                     return false;
                     //通过CAS的方式来在ctl中增加工作线程的数量
                 if (compareAndIncrementWorkerCount(c))
                     break retry;
                 //再次获取状态
                 c = ctl.get();  // Re-read ctl
                 //如果状态更新失败,则循环更新
                 if (runStateOf(c) != rs)
                     continue retry;
                 // else CAS failed due to workerCount change; retry inner loop
             }
         }
    

    分析完了前置的一些检查工作的代码,接下来,来看下主流程的代码:

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           //1. 新建一个工作线程,Work后面会说
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //再次获取线程池的状态,在新建线程或者释放锁时,都会重新检查。
                    int rs = runStateOf(ctl.get());
                    //如果线程池的状态不是关闭状态,则进入下面的分支
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //检查新建的线程是否是可运行状态
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //将工作线程添加到HashSet类型的集合中
                        workers.add(w);
                        int s = workers.size();
                        //如果工作线程的集合数大于largestPoolSize
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //新建工作线程成功之后,将操作标志workerAdded设为true,表示新增工作线程成功,后续流程用
                        workerAdded = true;
                    }
                } finally {
                    //释放锁
                    mainLock.unlock();
                }
                //如果新建工作线程成功,则调用start() 方法启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //如果workerStarted为false,表示新建工作线程失败
            if (! workerStarted)
                //移除已经创建的工作线程
                addWorkerFailed(w);
        }
        return workerStarted;

如上,该主流程的代码逻辑也是比较清晰的,首先是新建一个工作线程,然后就是在同步代码块中检查线程池的状态,如果不是SHUTDOWN状态,则将新增的线程放在HashSet类型线程的集合中,放入成功之后,将创建work的标识workerAdded改成true,然后释放锁。接着就是调用start()方法使得线程可以执行任务。接下来就来看看Worker的结构。 详细信息可以查看。 第十七篇:面试必备的线程池知识-线程池的原理

10. ThreadLocal

10.1. ThreadLocal的定义

JDK对ThreadLocal的定义如下: TheadLocal提供了线程内部的局部变量:每个线程都有自己的独立的副本;ThreadLocal实例通常是类中的private static字段,该类一般与线程状态相关(或线程上下文)中使用。只要线程处于活动状态且ThreadLocal实例时可访问的状态下,每个线程都持有对其线程局部变量的副本的隐式引用,在线程消亡后,ThreadLocal实例的所有副本都将进行垃圾回收。

10.2. ThreadLocal的应用场景

ThreadLocal 不是用来解决多线程访问共享变量的问题,所以不能替换掉同步方法。一般而言,ThreadLocal的最佳应用场景是:按照线程多实例(每个线程对应一个实例)的对象的访问。 例如:在事务中,connection绑定到当前线程来保证这个线程中的数据库操作用的是同一个connection。

10.3. ThreadLocal的demo

public class ThreadLocalTest {
    public static void main(String[] args) {
        ThreadLocal<String> threadLocal = new ThreadLocal<>();
        threadLocal.set("张三");

        new Thread(()->{
            threadLocal.set("李四");
            System.out.println("*******"+Thread.currentThread().getName()+"获取到的数据"+threadLocal.get());
        },"线程1").start();
        new Thread(()->{
            threadLocal.set("王二");
            System.out.println("*******"+Thread.currentThread().getName()+"获取到的数据"+threadLocal.get());
        },"线程2").start();
        new Thread(()->{
            System.out.println("*******"+Thread.currentThread().getName()+"获取到的数据"+threadLocal.get());
        },"线程3").start();
        System.out.println("线程=" + Thread.currentThread().getName() + "获取到的数据=" + threadLocal.get());
    }
}

运行结果:

从运行结果,我们可以看出线程1和线程2在ThreadLocal中设置的值相互独立,每个线程只能取到自己设置的那个值。 运行结果

10.4. TheadLocal的源码解析

ThreadLocal存储数据的逻辑是:每个线程持有一个自己的ThreadLocalMap,key为ThreadLocal对象的实例,value 是我们需要设值的值。

ThreadLocal的set方法

    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }

getMap的方法如下:

public class Thread implements Runnable {

    //每个线程自己的ThreadLocalMap对象通过ThreadLocal保存下来
  ThreadLocal.ThreadLocalMap threadLocals = null;

  ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }
}

首先获取当前线程的ThreadLocalMap对象,该对象是通过实例变量threadLocals保存的。

  1. 如果获取得到ThreadLocalMap,则直接设值,key为当前ThreadLocal类的this实例,如果获取不到调用createMap方法创建ThreadLoalMap实例,并将值设置到这个ThreadLocalMap中,后面我们会重点介绍ThreadLocal的createMap方法。 接下来我们就来看看ThreadLocal的get方法。

ThreadLocal的get方法

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }

1.首先获取当前线程的ThreadLocalMap对象,没有的话,设置初始值(null)并返回

  1. 如果可以获取到ThreadLocalMap 则获取其Entry对象,如果不为空则直接返回value 说完了ThreadLocal的set方法和get方法。我就来具体看看前面提到的ThreadLocalMap。

    ThreadLocalMap的结构

public class ThreadLocal<T> {

    private static AtomicInteger nextHashCode =new AtomicInteger();

    //初始的Hash值是0x61c88647
     private static final int HASH_INCREMENT = 0x61c88647;

    //每次调用就原子性的将hash值增加HASH_INCREMENT
    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

    static class ThreadLocalMap {
         //Entry继承WeakReference
          static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

                private static final int INITIAL_CAPACITY = 16;


                 private void setThreshold(int len) {
                    threshold = len * 2 / 3;
                }


            void createMap(Thread t, T firstValue) {
                t.threadLocals = new ThreadLocalMap(this, firstValue);
            }

             ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
                    table = new Entry[INITIAL_CAPACITY];
                    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
                    table[i] = new Entry(firstKey, firstValue);
                    size = 1;
                    setThreshold(INITIAL_CAPACITY);
                }
            }
    }

如上,ThreadLocalMap作为ThreadLocal的静态内部类,由ThreadLocal所持有,每个线程内部通过ThreadLocal来获取自己的ThreadLocalMap实例。结构如下图所示: 在这里插入图片描述​ 从上述代码我们可以看出ThreadLocalMap实际上没有继承Map接口,其只是一个可扩展的散列表结构。初始大小是16。大于等于数据的1/2 的时候会扩容为2倍的原数组的rehash。初始的hashCode值为0x61c88647。每创建一个Entry对象,hash值就会增加一个固定大小0x61c88647。同时,我们注意到,ThreadLocalMap的Entry是继承WeakReference,和HashMap很大的区别是,Entry中没有next字段,所以不存在链表的情况。那么没有链表结构,发生hash冲突了怎么办呢?要解答这个问题就需要看看ThreadLocalMap的set方法了。

ThreadLocalMap的set方法

  private void set(ThreadLocal<?> key, Object value) {
            Entry[] tab = table;
            int len = tab.length;
            //1.根据ThreadLocal对象的hash值,定位到table中的位置i
            int i = key.threadLocalHashCode & (len-1);

            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();
                //判断Entry.key等于当前的ThreadLoacl对象key,则覆盖旧值,退出。
                if (k == key) {
                    e.value = value;
                    return;
                }

                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }

            tab[i] = new Entry(key, value);
            int sz = ++size;
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }

前面我们提到了每个ThreadLocal对象都有一个hash值threadLocalHashCode,每创建一个Entry对象,hash值就增加一个固定的大小0x61c88647。 1.根据ThreadLocal对象的hash值,定位到table中的位置i 2.如果table[i]的Entry不为null 2.1. 判断Entry.key等于当前的ThreadLoacl对象key,则覆盖旧值,退出。 2.2. 如果Entry.key为null,将执行删除两个null 槽之间的所有过期的stale的entry, 并把当前的位置i上初始化一个Entry对象,退出 2.3 继续查找下一个位置i++ 3.如果找到了一个位置k,table[k]为null,初始化一个Entry对象。

ThreadLocalMap的getEntry方法

     private Entry getEntry(ThreadLocal<?> key) {
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            if (e != null && e.get() == key)
                return e;
            else
                return getEntryAfterMiss(key, i, e);
        }

         private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
            Entry[] tab = table;
            int len = tab.length;

            while (e != null) {
                ThreadLocal<?> k = e.get();
                if (k == key)
                    return e;
                if (k == null)
                    expungeStaleEntry(i);
                else
                    i = nextIndex(i, len);
                e = tab[i];
            }
            return null;
        }
  1. 根据当前ThreadLocal的hashCode mod table.length,计算直接索引的位置i,如果e不为null并且key相同则返回e。
  2. 如果e为null,返回null
  3. 如果e不为空且key不相同,则查找下一个位置,继续查找比较,直到e为null退出
  4. 在查找的过程中如果发现e不为空,且e的k为空的话,删除当前槽和下一个null槽之间的所有过期entry对象。 总结ThreadLocalMap:

  5. ThreadLocalMap的散列表采用开放地址,线性探测的方法处理hash冲突,在hash冲突较大的时候效率低下,因为ThreadLoaclMap是一个Thread的一个属性,所以即使在自己的代码中控制设置的元素个数,但还是不能控制其他代码的行为。

  6. ThreadLocalMap的set、get、remove操作中都带有删除过期元素的操作,类似缓存的lazy淘汰。 在这里插入图片描述

    10.5. ThreadLocal的内存泄露

    ThreadLocal可能导致内存泄露,为什么?先看看Entry的实现:
      static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

通过之前的分析我们已经知道,当使用ThreadLocal保存一个value时,会在ThreadLoalMap中的数组插入一个Entry对象,按理来说key-value都可以以强引用保存在Entry对象中,但在ThreadLocalMap的实现中,key被保存到了WeakReference对象(弱引用)中,即ThreadLocalMap弱引用ThreadLocal。 Key的引用链是 ThreadLocalRef---->ThreadLocal, 这就导致了一个问题,当一个ThreadLocal没有强引用时,threadLocal会被GC清理,会形成一个key为null的Map的引用。 但是value是强引用的,只有当当前线程结束了value的强引用才会结束,但线程迟迟未结束时,就会出现 ThreadRef---->Thread---->ThreadLocalMap--->Entry--->value这条强引用链条。 废弃threadLocal占用的内存会在三种情况下清理:

  1. thread结束,那么与之相关的threadlocal value会被清理
  2. GC后,thread.threadLocal(map) 的threadhold超过最大值时,会清理
  3. GC后,thread.threadlocals(maps)添加新的Entry时,hash算法没有命中既有Entry时,会清理

那么何时会“内存泄漏”?当Thread长时间不结束,存在大量废弃的ThreadLocal,而又不再添加新的ThreadLocal时。

11. 粉丝专属福利

软考资料:实用软考资料

面试题:5G 的Java高频面试题

学习资料:50G的各类学习资料

脱单秘籍:回复【脱单】

并发编程:回复【并发编程】

   👇🏻 验证码 可通过搜索下方 公众号 获取👇🏻