消息队列
关于消息队列
???? 文章简介:Kafka ???? 创作目的:消息队列 ☀️ 今日天气:天气很好 ???? 每日一言:“所行皆坦途 所求皆如愿。”
kafka常用于构建TB级别的异步消息系统
首先谈到对于框架的含义 :
Java 框架由一系列可重用的预编写代码组成,它们起着模板的作用,开发人员可以根据需要通过填充自定义代码来创建应用。
框架创建后可反复使用,这样开发人员即可以在一定的结构上来编写应用,而无需从头开始手动创建。
Java 框架中可以包含预定义类(例如对象类别)和函数,用于处理、输入和管理硬件设备,以及与系统软件进行交互。当然,具体的框架内容要取决于框架的类型、Java 开发人员的技能水平、他们所要完成的工作以及自己的偏好。
框架(Framework )的本质:对特定的类或方法进行封装的集成
如果我们不使用框架是否还能解决类似的问题呢 ?
答案是:
可以的,比如Kafka框架。在我们不使用Kafka的情况下,我们也能通过Java自带的API:BlockingQueue解决阻塞队列、实现消息系统或解决类似的问题、 ![6NUAJZIC_RJ`5NI4TDRZS.png](http://blog-dm-01.oss-cn-hangzhou.aliyuncs.com/articles/5509dbc218cd3763fa8bdd4298d9f36f.png)
关于Kafka使用的冷知识 现象:在windows的命令行里启动kafka之后,当关闭命令行窗口时,就会强制关闭kafka。这种关闭方式为暴力关闭,很可能会导致kafka无法完成对日志文件的解锁。届时,再次启动kafka的时候,就会提示日志文件被锁,无法成功启动。 方案:将kafka的日志文件全部删除,再次启动即可。 建议:不要暴力关闭kafka,建议通过在命令行执行kafka-server-stop命令来关闭它。 其他:将来在Linux上部署kafka之后,采用后台运行的方式,就会避免这样的问题
那么什么是阻塞队列呢 ?
阻塞队列—BlockingQueue(Java自带的API)
生产者&消费者
生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,如下图所示,生产者向空间里存放数据,而消费者取用数据,如果不加以协调可能会出现以下情况:
存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,生产者等着消费者消费产品,从而向空间中添加产品。互相等待,从而发生死锁。
上代码:
public class ProducerConsumerTest {
public static void main(String[] args) {
CubbyHole c = new CubbyHole();
Producer p1 = new Producer(c, 1);
Consumer c1 = new Consumer(c, 1);
p1.start();
c1.start();
}
}
class CubbyHole {
private int contents;
private boolean available = false;
public synchronized int get() {
while (available == false) {
try {
wait();
}
catch (InterruptedException e) {
}
}
available = false;
notifyAll();
return contents;
}
public synchronized void put(int value) {
while (available == true) {
try {
wait();
}
catch (InterruptedException e) {
}
}
contents = value;
available = true;
notifyAll();
}
}
class Consumer extends Thread {
private CubbyHole cubbyhole;
private int number;
public Consumer(CubbyHole c, int number) {
cubbyhole = c;
this.number = number;
}
public void run() {
int value = 0;
for (int i = 0; i < 10; i++) {
value = cubbyhole.get();
System.out.println("消费者 #" + this.number+ " got: " + value);
}
}
}
class Producer extends Thread {
private CubbyHole cubbyhole;
private int number;
public Producer(CubbyHole c, int number) {
cubbyhole = c;
this.number = number;
}
public void run() {
for (int i = 0; i < 10; i++) {
cubbyhole.put(i);
System.out.println("生产者 #" + this.number + " put: " + i);
try {
sleep((int)(Math.random() * 100));
} catch (InterruptedException e) { }
}
}
}
运行结果:
消费者 #1 got: 0
生产者 #1 put: 0
生产者 #1 put: 1
消费者 #1 got: 1
生产者 #1 put: 2
消费者 #1 got: 2
生产者 #1 put: 3
消费者 #1 got: 3
生产者 #1 put: 4
消费者 #1 got: 4
生产者 #1 put: 5
消费者 #1 got: 5
生产者 #1 put: 6
消费者 #1 got: 6
生产者 #1 put: 7
消费者 #1 got: 7
生产者 #1 put: 8
消费者 #1 got: 8
生产者 #1 put: 9
消费者 #1 got: 9
同等案例(二)
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @program: BlockingQueue
* @description:
* @author: DM
* @create: 2023
**/
public class BlockingQueueTests {
public static void main(String[] args) {
//因为数组实现所以要求设定队列容量
BlockingQueue queue=new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();//启动生产者线程
new Thread(new Consumer(queue)).start();//三个消费者并发消费数据
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
//生成者
class Producer implements Runnable{
//传入阻塞队列
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue){
this.queue=queue;
}
@Override
public void run() {
try {
for (int i=0;i<100;i++){
//不管企业和组件中间都有间隔
Thread.sleep(20);//停顿20毫秒
queue.put(i);
System.out.println(Thread.currentThread().getName()+"生产:"+queue.size());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
//消费者
class Consumer implements Runnable{
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue){
this.queue=queue;
}
@Override
public void run() {
try {
while (true){
//不管企业和组件中间都有间隔
Thread.sleep(new Random().nextInt(1000));//停顿0~1000毫秒
//使用数据
queue.take();
System.out.println(Thread.currentThread().getName()+"消费:"+queue.size());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
运行结果:
Thread-0生产:1
Thread-0生产:2
Thread-0生产:3
Thread-0生产:4
Thread-0生产:5
Thread-0生产:6
Thread-0生产:7
Thread-0生产:8
Thread-0生产:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-0生产:10
Thread-1消费:10
Thread-2消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
//中间为生产者生产与消费者消费的过程(由于生产数据量相对较长所以省略)
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-3消费:8
Thread-2消费:7
Thread-1消费:6
Thread-2消费:5
Thread-3消费:4
Thread-1消费:3
Thread-2消费:2
Thread-3消费:1
Thread-1消费:0
Process finished with exit code 130
BlockingQueue实现类
BlockingQueue常见的有下面5个实现类,主要是应用场景不同。
- ArrayBlockingQueue 基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。
- LinkedBlockingQueue 基于链表实现的阻塞队列,默认是无界队列,创建可以指定容量大小
- SynchronousQueue 一种没有缓冲的阻塞队列,生产出的数据需要立刻被消费
- PriorityBlockingQueue 实现了优先级的阻塞队列,基于数据显示,是无界队列
- DelayQueue 实现了延迟功能的阻塞队列,基于PriorityQueue实现的,是无界队列
BlockingQueue源码解析
BlockingQueue的5种子类实现方式大同小异,这次就以最常用的ArrayBlockingQueue做源码解析。
ArrayBlockingQueue类属性
先看一下ArrayBlockingQueue类里面有哪些属性:
// 用来存放数据的数组
final Object[] items;
// 下次取数据的数组下标位置
int takeIndex;
// 下次放数据的数组下标位置
int putIndex;
// 当前已有元素的个数
int count;
// 独占锁,用来保证存取数据安全
final ReentrantLock lock;
// 取数据的条件
private final Condition notEmpty;
// 放数据的条件
private final Condition notFull;
ArrayBlockingQueue中4组存取数据的方法实现也是大同小异,本次以put和take方法进行解析。
put方法源码解析
无论是放数据还是取数据都是从队头开始,逐渐往队尾移动。
// 放数据,如果队列已满,就一直阻塞,直到有其他线程从队列中取走数据
public void put(E e) throws InterruptedException {
// 校验元素不能为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁,加可中断的锁
lock.lockInterruptibly();
try {
// 如果队列已满,就一直阻塞,直到被唤醒
while (count == items.length)
notFull.await();
// 如果队列未满,就往队列添加元素
enqueue(e);
} finally {
// 结束后,别忘了释放锁
lock.unlock();
}
}
// 实际往队列添加数据的方法
private void enqueue(E x) {
// 获取数组
final Object[] items = this.items;
// putIndex 表示本次插入的位置
items[putIndex] = x;
// ++putIndex 计算下次插入的位置
// 如果本次插入的位置,正好等于队尾,下次插入就从 0 开始
if (++putIndex == items.length)
putIndex = 0;
// 元素数量加一
count++;
// 唤醒因为队列空等待的线程
notEmpty.signal();
}
源码中有个有意思的设计,添加元素的时候如果已经到了队尾,下次就从队头开始添加,相当于做成了一个循环队列。
像下面这样:
4.3 take方法源码
// 取数据,如果队列为空,就一直阻塞,直到有其他线程往队列中放数据
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁,加可中断的锁
lock.lockInterruptibly();
try {
// 如果队列为空,就一直阻塞,直到被唤醒
while (count == 0)
notEmpty.await();
// 如果队列不为空,就从队列取数据
return dequeue();
} finally {
// 结束后,别忘了释放锁
lock.unlock();
}
}
// 实际从队列取数据的方法
private E dequeue() {
// 获取数组
final Object[] items = this.items;
// takeIndex 表示本次取数据的位置,是上一次取数据时计算好的
E x = (E) items[takeIndex];
// 取完之后,就把队列该位置的元素删除
items[takeIndex] = null;
// ++takeIndex 计算下次拿数据的位置
// 如果本次取数据的位置,正好是队尾,下次就从 0 开始取数据
if (++takeIndex == items.length)
takeIndex = 0;
// 元素数量减一
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒被队列满所阻塞的线程
notFull.signal();
return x;
}
总结
- ArrayBlockingQueue基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。
- ArrayBlockingQueue底层采用循环队列的形式,保证数组位置可以重复使用。
- ArrayBlockingQueue存取都采用ReentrantLock加锁,保证线程安全,在多线程环境下也可以放心使用。
- 使用ArrayBlockingQueue的时候,预估好队列长度,保证生产者和消费者速率相匹配。
相关文章
- ROS1云课→06节点消息流(计算图级)
- java 实现 springboot项目 使用socket推送消息,前端实时进行接收后端推送的消息(亲测有效)
- 微信Windows端IM消息数据库的优化实践:查询慢、体积大、文件损坏等
- 分布式--RabbitMQ集成SpringBoot、消息可靠性、死信队列、延迟交换机、集群
- 4.Go语言项目操作之NSQ分布式消息队列实践
- Dapr 入门教程之消息队列
- rabbitmq集群安装_java实现消息队列
- 消息队列RabbitMQ核心:简单(Hello World)模式、队列(Work Queues)模式、发布订阅模式
- 【Spring Boot实战与进阶】集成Kafka消息队列
- 消息队列:第六章:ObjectMessage与MapMessage
- 飞书开放平台-发送文本&富文本消息
- 消息队列的过去、现在和未来
- HttpServletResponse详解:封装HTTP响应消息
- Linux中的MSGsnd函数:实现消息通信的功能(linuxmsgsnd)
- 消息称天鹅到家暂停赴美IPO计划
- 消息称三星收到苹果公司约 1.2 亿块 iPad OLED 面板订单
- 实现实时消息系统的Redis订阅功能(redis消息订阅)
- Redis:高效稳定的消息队列工具(redis作为消息队列)
- Redis消息队列开发实战篇(消息队列实战redis)
- 消息队列原理借助Redis实现(消息队列原理 redis)
- 基于Redis的消息队列服务实现(基于redis的MQ实现)
- 使用Redis构建消息队列集群(redis 消息队列集群)
- 问题深入浅出Redis消息队列断开的原因与解决方案(redis消息队列断开)