zl程序教程

您现在的位置是:首页 >  工具

当前栏目

HIKARI源码之-ConcurrentBag简单分析

源码 简单 分析
2023-06-13 09:17:23 时间

ConcurrentBag

HikariCP 的核心ConcurrentBag,它是管理连接池的最重要的核心类。

该类依赖了SynchronousQueueCopyOnWriteArrayListThreadLocalAtomicInteger等JDK的并发类实现。

CopyOnWriteArrayList

Copy-On-Write思想(延时懒惰策略):在计算机中就是当你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内 存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉嘛!

CopyOnWriteArrayList是ArrayList的线程安全版本,CopyOnWriteArrayList是在有写操作的时候会copy一份数据,然后写完再设置成新的数据;

CopyOnWriteArrayList使用了ReentrantLock来支持并发操作,array就是实际存放数据的数组对象。ReentrantLock是一种支持重入的独占锁,任意时刻只允许一个线程获得锁,所以可以安全的并发去写数组;

CopyOnWriteArrayList是怎么解决线程安全问题的?答案就是----写时复制,加锁 ; 那么有没有这么一种情况,当一个线程刚好调用完add()方法,也就是刚好执行到上面1处的代码,也就是刚好将引用指向新数组,而此时有线程正在遍历呢?会不会报错呢?(`答案是不会的,因为你正在遍历的集合是旧的)

    /** The lock protecting all mutators */
    final transient ReentrantLock lock = new ReentrantLock();

    /** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;

如何使用这个lock实现并发写,看它的add方法

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock(); //上锁,只允许一个线程进入
        try {
            Object[] elements = getArray(); // 获得当前数组对象(通过方法获取,可以保证拿到当前线程安全的数组数据,不会存在此刻有写的数组元素;
            int len = elements.length;
            // 利用高效数组拷贝方法:System.arraycopy
            Object[] newElements = Arrays.copyOf(elements, len + 1);//拷贝到一个新的数组中
            newElements[len] = e;//插入数据元素
            setArray(newElements);//将新的数组对象设置回去
            return true;
        } finally {
            lock.unlock();//释放锁
        }
    }

CopyOnWriteArrayList优缺点

缺点

  • 耗内存(集合复制,在大集合的情况下会有体现)
  • 实时性不高,操作的时候都是在旧数据上

优点

  • 数据一致性完整(加锁,并发有保障)
  • 解决ArrayList等多线程遍历迭代的问题;

CopyOnWriteArrayList的使用场景

  • 读多写少(白名单,黑名单,商品类目的访问和更新场景)
  • 集合数据不大;
  • 实时性要求不高;

COW vs 读写锁,两者都是通过读写分离的思想实现的,但又有所区别

相同点:1. 两者都是通过读写分离的思想实现;2.读线程间是互不阻塞的

不同点:

对读线程而言,为了实现数据实时性,在写锁被获取后,读线程会等待或者当读锁被获取后,写线程会等待,从而解决“脏读”等问题。(也就是说如果使用读写锁依然会出现读线程阻塞等待的情况。

COW 则完全放开了牺牲数据实时性而保证数据最终一致性,即读线程对数据的更新是延时感知的,因此读线程不会存在等待的情况。

SynchronousQueue实现原理

定义:实际上不是一个真正的队列,它不会为队列元素维护存储空间。与其他队列不同的是,他维护一组线程,这些线程都是在等待着把元素加入或移除队列。因为SynchronusQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。

SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。

使用SynchronousQueue的目的就是保证“对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。

应用场景:Executors.newCachedThreadPool()

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
}

由于ThreadPoolExecutor内部实现任务提交的时候调用的是工作队列(BlockingQueue接口的实现类)的非阻塞式入队列方法(offer方法),因此,在使用SynchronousQueue作为工作队列的前提下,客户端代码向线程池提交任务时,而线程池中又没有空闲的线程能够从SynchronousQueue队列实例中取一个任务,那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。此时,ThreadPoolExecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小)。

所以,使用SynchronousQueue作为工作队列,工作队列本身并不限制待执行的任务的数量。但此时需要限定线程池的最大大小为一个合理的有限值,而不是Integer.MAX_VALUE,否则可能导致线程池中的工作者线程的数量一直增加到系统资源所无法承受为止。

  • 如果应用程序确实需要比较大的工作队列容量,而又想避免无界工作队列可能导致的问题,不妨考虑SynchronousQueue。SynchronousQueue实现上并不使用缓存空间。
  • 使用SynchronousQueue的目的就是保证“对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。