ArrayBlockingQueue 分析
2023-04-18 14:55:11 时间
ArrayBlockingQueue底层使用环形数组实现阻塞队列,因此为有界队列,其容量上限在实例化时通过传入的参数capacity决定,本质上就是实例化了一个长度为capacity的数组。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
- 使用变量:
* Condition 对象简介: Condition是在java
1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。undefinedCondition是个接口,基本的方法就是await()和signal()方法;Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition();调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用。undefinedConditon中的await()对应Object的wait();Condition中的signal()对应Object的notify();Condition中的signalAll()对应Object的notifyAll()。undefined引用自:
* Itrs 对象简介: ArrayBlockingQueue队列集合中所有的迭代器都在Itrs迭代器组中进行管理,这些迭代器将在Itrs迭代器组中以单向链表的方式进行排列。所以ArrayBlockingQueue队列需要在特定的场景下,对已经失效、甚至已经被垃圾回收的迭代器管理节点进行清理。undefined例如,当ArrayBlockingQueue队列有新的迭代器被创建时(并为非独立/无效工作模式),Itrs迭代器组就会尝试清理那些无效的迭代器,其工作逻辑主要由Itrs.doSomeSweeping(boolean)方法进行实现。undefined引用自:
final Object[] items; //底层数组实现
int takeIndex; //队列头指针
int putIndex; //队列尾指针
int count; // 当前队列中的对象(任务)数
final ReentrantLock lock; //使用可重入(默认非公平)锁对象加锁
private final Condition notEmpty; // 用于在队列满发生写阻塞时进行线程通信
private final Condition notFull; // 用于在队列空发生读阻塞时进行线程通信
transient Itrs itrs = null; // 迭代器组对象
- 底层调用方法:
* checkNotNull(Object v):检查当前传入的任务对象是否为null,若为null报空指针异常
* enqueue(E x):向队列尾插入元素,内部构建了环形队列,并维护了当前任务数
* dequeue():从队列头取出元素,内部构建了环形队列,并维护了当前任务数
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
private void enqueue(E x) {
// 断言当前线程持有锁且队列尾指针为空
final Object[] items = this.items;
items[putIndex] = x; // 将对象插入尾指针指向位置
if (++putIndex == items.length) // 此处构建环形队列
putIndex = 0;
count++; // 任务数 +1
notEmpty.signal(); // 归还锁对象,并唤醒阻塞的线程
}
private E dequeue() {
// 断言当前线程持有锁且队列头指针不为空
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null; // 获取当前头指针对应对象,并将指针位置置空
if (++takeIndex == items.length) // 此处构建环形队列
takeIndex = 0;
count--; // 任务数 -1
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 归还锁对象,并唤醒阻塞的线程
return x;
}
- 构造方法:
* ArrayBlockingQueue(int capacity):默认非公平可重入锁实现
* ArrayBlockingQueue(int capacity, boolean fair):可通过fair参数选择是否使用公平锁
* ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c):构造时添加集合中的对象到队列中·
/**
* capacity:队列容量
* 默认非公平锁
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* capacity:队列容量 fair:是否为公平加锁
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair); // 获取可重入锁(fair为true表示公平锁)
notEmpty = lock.newCondition(); //从锁对象获取读阻塞的线程通信对象
notFull = lock.newCondition(); // 从锁对象获取写阻塞的线程通信对象
}
/**
* capacity:队列容量 fair:是否为公平加锁 c:将集合中的元素放入队列
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);// 调用构造方法
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁仅为了可见性,不为互斥性
try {
int i = 0;
try {
for (E e : c) { // 将集合元素写入队列
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i; // 初始化元素数量为i
putIndex = (i == capacity) ? 0 : i; //初始化队列尾指针
} finally {
lock.unlock(); //解锁
}
}
- 入队列方法 :
* put(E e):在阻塞时触发wait使线程等待,适用于并发量较小的情形
* offer(E e):若队列满直接false,适用于并发量极大的情形
* offer(E e, long timeout, TimeUnit unit):会在超时后直接false,适用于并发量较大的情形
public void put(E e) throws InterruptedException {
checkNotNull(e); // 检查 e 不为 null,若为 null 报空指针异常
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 可中断加锁
try {
while (count == items.length)
notFull.await(); // 若队列满,线程 wait()
enqueue(e); // 若队列不满,放入对象,并唤醒读阻塞的线程
} finally {
lock.unlock();
}
}
public boolean offer(E e) {
checkNotNull(e); // 确保 e 不为 null
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length) //若队列满直接放弃 false
return false;
else {
enqueue(e); // 队列不满直接插入
return true;
}
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout); // 获取 nano 等待时间
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos); // 定时等待
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
- 出队列方法:
* take():会在阻塞时触发wait使线程等待,适用于并发量较小的情形
* poll():若队列空直接false,适用于并发量极大的情形
* poll(long timeout, TimeUnit unit):会在超时后直接false,适用于并发量较大的情形
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 可中断加锁
try {
while (count == 0) //若队列空,线程 wait()
notEmpty.await();
return dequeue(); // 若队列不空,取出元素,并唤醒写阻塞的线程(可能当前取出后队列才不满)
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue(); // 取出队列头元素,若队列为空返回 null
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout); // 获取 nano 时间
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos); // 定时等待,若 nanos 时间耗尽,则返回 null
}
return dequeue(); // 若队列不空,取出元素,并唤醒写阻塞的线程(可能当前取出后队列才不满)
} finally {
lock.unlock();
}
}
相关文章
- centos8.2安装
- 将Hutool的db.setting放在Nacos配置中心
- mixin 传递参数
- 用yii2进行注册接口+登录接口+带token就能登录+登录后的到底是谁?(一个人使用的版本)接口:
- 用yii2进行文件上传功能实现
- 个性化推荐系统设计(2.1)推荐算法介绍
- 个性化推荐系统设计(4.1)案例分析
- Salesforce Consumer Goods Cloud 浅谈篇四之店内拜访的创建和执行
- XHR,ajax,axios,fetch傻傻分不清?
- 彻底搞懂访问者模式的静态、动态和伪动态分派
- 第一回:Matplotlib初相识
- Flutter--在浏览器打开URL
- Flutter--missing compatible arch in ffi_c.bundle
- Golang -- Json序列化
- ThreadLocal子线程共享及源码分析
- 第三章(1.2)windows下安装Anaconda+TensorFlow+配置PyCharm
- 第三章(1.4)linux下部署tensorflow环境
- 单元测试 -- IllegalAccessError: class jdk.internal.reflect.ConstructorAccessorImpl loaded by org.powe...
- 【技术种草】如何免费申请通配SSL证书
- C:基于可以自动扩展缓冲区的stringbuffer,实现内存格式化输出(bufprintf)