关于阻塞队列
1 概念和数据结构
支持以下操作的队列:
-
在检索元素时等待队列变为非空
-
在存储元素时等待队列中的空间变为可用。
BlockingQueue方法有四种形式,具有不同的处理操作的方法,这些操作不能立即得到满足,但可能在将来的某个时候得到满足:
-
第一个抛出异常,
-
第二个返回一个特殊值(null或false,取决于操作),
-
第三个阻塞当前线程,直到操作成功,
-
第四个阻塞只有给定的最大时间限制,然后放弃。
这些方法总结如下表: BlockingQueue方法的总结
接下来是BlockingQueue的继承关系图:
BlockingQueue的七大实现类:
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
- DelayedWorkQueue
- ForwardingBlockingQueue
- SynchronousQueue
- DelayQueue
- LinkedBlockingQueue:由链表结构组成的有界(大小默认为Integer.MAX_VALUE)的阻塞队列。
- PriorityBlockingQueue
阻塞队列的基本原型:
2 应用场景
- 消息队列/消息中间件
- 生产-消费者问题
- …
3 代码
3.1 常用阻塞队列
3.1.1 ArrayBlockingQueue
由数组结构组成的有界阻塞队列。
/**
* ArrayBlockingQueueTest
*/
public static void ArrayBlockingQueueTest() {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
try {
//向队列中添加元素add()方法会抛出异常
blockingQueue.add("zs");
blockingQueue.add("ls");
blockingQueue.add("ww");
blockingQueue.add("zl");
} catch (Exception e) {
//发生异常时超出部分会添加失败
System.err.println("err -> " + e.getMessage());
}
//检查栈顶元素
System.out.println("栈顶元素 -> " + blockingQueue.element());
try {
//从队头排出元素
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
} catch (Exception e) {
System.err.println("err -> " + e.getMessage());
}
//其他API与Queue使用类似
}
3.1.2 LinkedBlockingQueue
由链表结构组成的有界(大小默认为Integer.MAX_VALUE)的阻塞队列。
/**
* LinkedBlockingQueueTest
*/
public static void LinkedBlockingQueueTest() {
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
blockingQueue.add("zs");
System.out.println(blockingQueue.remove());
}
3.1.3 SynchronousQueue
不存储元素的阻塞队列,即单个元素的队列,每个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
/**
* SynchronousQueueTest
*/
public static void SynchronousQueueTest() throws InterruptedException {
SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();
//必须要使用两个线程,一个put一个take
//以下情况会造成线程阻塞
//原因:均在主线程中
/**
* synchronousQueue.put("zs");
* synchronousQueue.take();
*/
//正常使用
new Thread(() -> {
try {
synchronousQueue.put("zs");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println(synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
3.2 使用阻塞队列解决生产-消费者问题
/**
* @desc: 使用阻塞队列完成生产-消费者模型
* @author: YanMingXin
* @create: 2021/8/1-16:00
**/
public class Test02 {
/**
* 声明一个队列
*/
SynchronousQueue queue = new SynchronousQueue<Integer>();
/**
* 生产次数
*/
public static volatile int productNum = 0;
/**
* 消费次数
*/
public static volatile int consumeNum = 0;
/**
* 生产者
*/
public void producer(Integer val) throws Exception {
queue.put(val);
System.out.println("生产 -> " + val);
}
/**
* 消费者
*/
public void consumer() throws Exception {
System.out.println("消费 -> " + queue.take());
}
/**
* 测试
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(2);
Test02 test = new Test02();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(100);
test.producer(i);
productNum += 1;
} catch (Exception e) {
e.printStackTrace();
}
}
try {
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(100);
test.consumer();
consumeNum += 1;
} catch (Exception e) {
e.printStackTrace();
}
}
try {
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
latch.await();
System.out.println("生产次数 = " + productNum);
System.out.println("消费次数 = " + consumeNum);
}
}
4 底层原理
在这里我们拿ArrayBlockingQueue举例:
ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列,可以按照 FIFO原则对元素进行排序。
-
ArrayBlockingQueue是基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。 并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
-
ArrayBlockingQueue内部通过Object[]数组保存数据的,也就是说ArrayBlockingQueue本质上是通过数组实现的。ArrayBlockingQueue的大小,即数组的容量是在创建创建ArrayBlockingQueue时候指定的。
-
ArrayBlockingQueue和ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象。ReentrantLock是可重入的互斥锁。ArrayBlockingQueue就是根据ReentrantLock互斥锁实现"多线程对共享资源的访问"。ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定;而且,ArrayBlockingQueue默认会使用非公平锁。
-
ArrayBlockingQueue和Condition是组合关系,ArrayBlockingQueue中包含两个Condition对象(notEmpty和notFull)。使用通知模式实现:所谓通知模式,当生产者往满的队列里面添加元素的时候,会阻塞生产者(调用Condition notFull.await()进行等待);当消费者消费了一个队列中的元素后,会通知(调用Condition notFull.signal()唤醒生产者)生产者当前队列可用。反之,当消费者消费的时候,发现队列是空的,则消费者会被阻塞(通过Condition的 notEmpty.await()进行等待),当生产者插入了队列中的一个元素后,则会调用notEmpty.signal()唤醒消费者继续消费。
/**
* 构造方法
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//声明一个ReentrantLock
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* put方法
*/
public void put(E e) throws InterruptedException {
//检查不为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
//插入元素
enqueue(e);
} finally {
lock.unlock();
}
}
/**
* take方法
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
//提取元素当前的取位、前进和信号,仅在持有锁时调用。
return dequeue();
} finally {
lock.unlock();
}
}
相关文章
- 【万字总结】图解堆算法、链表、栈与队列(多图预警)
- Ajax 请求队列解决方案并结合elementUi做全局加载状态
- UI线程和Windows消息队列
- HDU 5945 Fxx and game (DP+单调队列)
- java 线程池线程忙碌且阻塞队列也满了时给一个拒接的详细报告
- MQ在高并发环境下,如果队列满了,如何防止消息丢失?
- tp5,把耗时操作转为队列,queue + redis + supervisor消息推送(队列的执行异步不异步不知道,workman,swoole可以异步)
- linux 内核 工作队列
- Java的优先级任务队列的实践
- 【Kafka】Apache Kafka消息队列JavaAPI实战
- 【Kafka】Apache Kafka消息队列Topic管理
- Java 优先级队列