从中间件团队窃取了这个组件,见识到了编码能力的天花板!!!
大家好,我是陶朱公Boy。
前言
今天跟大家分享一款基于“生产者消费者模式”下实现的组件。
该组件是作者偶然在翻阅公司一中间件源码的时候碰到的,觉得设计的非常精美、巧妙,花了点时间整理成文分享给大家。
生产者和消费者彼此之间不进行通信,中间通过一个容器(如阻塞队列)来解决强解耦问题。 阻塞队列起到了一定的数据缓冲作用,平衡了生产者和消费者对数据的处理能力。 By《Java并发编程的艺术》
生产者消费者模式示意图
组件介绍
该组件基于生产者消费者模式来编码实现,是一款本地化解决流量削峰、解耦、异步的利器。
此组件由以下知识点构成:线程池、阻塞队列、LockSupport、Executor框架、final、volatile。此外你还能接触到hash取模算法、接口回调等机制。
组件本身代码量并不大,但知识点比较密集,所以希望大家能花一点时间认真看完。我将从适用场景、架构设计、源码解析这三个角度给大家讲介绍这款组件。
适用场景
☆场景一:报表下载
现在很多后台下载功能,普适的做法是先筛选转换数据,然后对接云存储平台进行保存,最后生成一个可访问的文件地址,整个过程非常耗时。
其实完全可以生产者发送一个下载请求就结束响应,服务端异步的去消费这个任务请求,处理完生成地址后,再进行通知(比如更新对应数据库文件字段)这是一种异步体现,也解耦了生产者与消费者原来的同步交互方式,整体效率会更高。
☆场景二:日志埋点
有些应用它的QPS非常高,产生的数据本身并不是特别重要比如埋点的日志,如果实时调用埋点平台可能给平台侧造成非常大的访问压力。
所以这个时候中间的阻塞队列就起到了一定的缓冲作用,等一段时间或队列数据量达到一定量(参赛可动态配置)再一次性拿出来转换后,最后批量传递出去。
☆场景三:Yana(阿里内部一款基于邮件分享技术文章的工具)
《Java并发编程的艺术》作者方腾飞有分享过他们基于生产者消费者模式实现的一个案例。
他们团队早期有一个习惯,大家如果在平时工作当中遇到比较好的文章,会通过邮件转发到专属邮箱进行内部分享,这样其他成员就能看到这篇文章,甚至大家会在底部评论、回复、交流。
但期间遇到一个问题:一旦时间一长,以前的文章很难被检阅。邮件列表的可视化太差,也不能进行归类,有些新入职员工也看不到以往其他成员分享过的文章。
基于这些问题,有几个小伙伴自发的趁业余时间开发了一个简易工具--yana。该工具功能就是:生产者线程会先往邮箱里将所有分享的邮件下载下来(包括附件、图片、邮件回复等内容),下载完成后,通过confluence的Web Service接口,把文章保存到confluence中去。这样不仅好维护,而且留存问题也得到了解决。
不过随着这款工具在其他部门的推广,发现系统响应时间越来越长。只要单位时间内积累邮件一多,一次处理完可能就要花费几分钟。
于是他们升级了方案,把架构演进到了V2.0版本。整体思路是使用了生产者消费者模式来处理。
思路如下:生产者线程去邮件系统下载完邮件后,不会立即调用confluence的web service接口,而是选择把下载的内容放入阻塞队列后立即返回。而消费者启动CPU*2个线程数来并行处理队列中的邮件,从之前的单线程演变成了多线程处理,生产者和消费者实现了异步、解耦。经过观察,比起V1.0同步处理,速度比之前要快好了几倍。
...
架构设计
☆对象图
该组件支持“多生产者多消费者”场景,在多核时代充分利用CPU多核机制,消费者多线程并行处理阻塞队列中的数据,加快任务处理速度。
☆逻辑架构图
工作线程阻塞:
工作者线程在组件初始化的时候会根据指定threadNum数量完成新建。工作线程一开始运行run方法后,判断条件不满足运行条件,就阻塞自己。
直到生产者调用其add方法检查队列大小是否超过阈值亦或定时线程定时判断距离上次任务执行时间差是否超过指定阈值。只要满足其一,就能唤醒被阻塞的线程,让其继续执行下去。
数据存储:
该组件实例持有一个工作线程对象数组,当生产者提交数据的时候,会先经过一个route组件(采用hash取模算法),动态路由到其中一个线程对象内的阻塞队列中存储起来。
数据消费:
等到满足一定条件,工作线程就会将对应线程对象内部中阻塞队列的数据转换成指定容量的List对象(BlockQueue的drainto方法有支持),然后传递给回调函数。
☆流程图
我们一起来看下上述这张工作线程内部处理机制图:
工作线程内部处理机制
1)add方法:
生产者线程调用组件的add方法后,add方法内部通过hash取模算法,会动态路由到指定工作线程对象,继而调用其内部的add方法存储到内部的阻塞队列中去。 当工作者线程内部的add被调用后,方法底部都会检查当前队列的实际大小是否超过指定阈值(可配置),如果超过,就会触发LockSupport.unpark方法,唤醒被阻塞的工作线程。
2)timeout方法
ScheduledThreadPoolExecutor在组件初始化新建工作线程的时候,为每一个工作线程对象开启一个定时器,按固定时间间隔周期(可配置),检查工作线程距离上一次任务处理完的时间差是否超过指定阈值(可配置),如果超过就会触发LockSupport.unpark方法,唤醒被阻塞的工作线程。
源码赏析
public class ProducerAndConsumerComponet<T> {
private final static Logger log = LoggerFactory.getLogger(ProducerAndConsumerComponet.class);
//组件持有一个工作线程对象数组
private final WorkThread<T>[] workThreads;
private AtomicInteger index;
private static final Random r = new Random();
//任务定时器
private static ScheduledExecutorService scheduleThreadPool = new ScheduledThreadPoolExecutor(1);
//组件初始化完成工作线程的新建
private static ExecutorService executorService = Executors.newCachedThreadPool();
/**
* 构造器
* @param threadNum 默认新建的消费者线程个数
* @param limitSize 队列长度阈值;超过将唤醒阻塞的线程
* @param period 工作线程对象的timeout方法前后两次执行的时间间隔周期
* @param capacity 工作线程内部的有界阻塞队列的初始容量大小
* @param processor 回调接口(初始化组价实例的时候需要传递)
*/
public ProducerAndConsumerComponet(int threadNum,int limitSize, int period, int capacity, Processor<T> processor) {
this.workThreads = new WorkThread[threadNum];
if (threadNum > 1) {
this.index = new AtomicInteger();
}
for(int i = 0; i < threadNum; ++i) {
WorkThread<T> workThread = new WorkThread("workThread"+ "_" + i, limitSize, period, capacity, processor);
this.workThreads[i] = workThread;
executorService.submit(workThread);
//开启threadNum个定时任务,每个任务各自检查各个工作线程对象内部的timeout方法,查看前后两次的timeout方法执行周期
scheduleThreadPool.scheduleAtFixedRate(workThread::timeout, r.nextInt(50), period, TimeUnit.MILLISECONDS);
}
}
/**
* 生产者线程将对象添加到对应消费者线程对象内部的阻塞队列中去<br>
* 内部采用HASH取模算法进行动态路由
* @param item 待添加的对象
* @return true:添加成功 false:添加失败
*/
public boolean add(T item) {
// log.info("add item={}",item);
int len = this.workThreads.length;
//log.info("add len..."+len);
if (len == 1) {
return this.workThreads[0].add(item);
} else {
int mod = this.index.incrementAndGet() % len;
// log.info("路由到this.workThreads[mod]={}",mod);
return this.workThreads[mod].add(item);
}
}
/**
* 消费者线程
*/
private static class WorkThread<T> implements Runnable {
/**
* 工作线程命名
*/
private final String threadName;
/**
* 队列中允许存放元素个数限制<br>
* 超出将从队列中取出此大小的元素转成List对象
*/
private final int queueSizeLimit;
/**
* 前后两个任务的执行周期
*/
private int period;
/**
* 用来记录任务的即时处理时间
*/
private volatile long lastFlushTime;
/**
* 当前工作线程对象
*/
private volatile Thread currentThread;
/**
* 工作线程对象内部的阻塞队列
*/
private final BlockingQueue<T> queue;
/**
* 回调接口
*/
private final Processor<T> processor;
/**
* 消费者线程构造器
* @param threadName 线程名
* @param queueSizeLimit 指定队列阈值(可配置)
* @param period 工作线程对象的timeout方法前后两次执行的时间间隔周期(可配置)
* @param capacity 阻塞队列初始容量
* @param processor 回调接口
*/
public WorkThread(String threadName, int queueSizeLimit, int period, int capacity, Processor<T> processor) {
this.threadName = threadName;
this.queueSizeLimit = queueSizeLimit;
this.period = period;
this.lastFlushTime = System.currentTimeMillis();
this.processor = processor;
this.queue = new ArrayBlockingQueue(capacity);
}
/**
* 往阻塞队列中添加元素
* @param item 添加的对象
* @return true:添加成功 false:添加失败
*/
public boolean add(T item) {
// log.info("add result:"+item);
boolean result = this.queue.offer(item);
// log.info("resultP{}",result);
this.checkQueueSize();
return result;
}
/**
* 当前时间与上次任务处理时间差是否超过指定阈值;如果超过触发start方法
*/
public void timeout() {
log.info("start timeout ...{}",currentThread.getName());
// log.info("{}====check timeout",currentThread.getName());
if (System.currentTimeMillis() - this.lastFlushTime >= (long)this.period) {
log.info("当前时间={}距离上次任务处理时间={}周期={}超出指定阈值={}",System.currentTimeMillis(),lastFlushTime,(System.currentTimeMillis() - this.lastFlushTime) ,period);
this.start();
}
}
/**
* 唤醒被阻塞的工作线程
*/
private void start() {
log.info("执行start方法,唤醒被阻塞的线程"+currentThread.getName());
LockSupport.unpark(this.currentThread);
}
/**
* 判断队列实际长度是否超过指定阈值;如果超过触发start方法
*/
private void checkQueueSize() {
if (this.queue.size() > this.queueSizeLimit) {
log.info("{}队列大小={}超出指定阈值={}",currentThread.getName(),this.queue.size() ,queueSizeLimit);
this.start();
}
}
/**
* 将队列中的元素添加到指定集合(初始容量限制)
*/
public void flush() {
//记录最新任务处理开始时间
this.lastFlushTime = System.currentTimeMillis();
if(queue.isEmpty()){
log.info("阻塞队列中元素为空,不需要处理...");
return;
}
List<T> temp = new ArrayList(this.queueSizeLimit);
int size = this.queue.drainTo(temp, this.queueSizeLimit);
if (size > 0) {
log.info("{}被唤醒后,开始执行任务:从队列中腾出大小为{}的数据且转成List对象",currentThread.getName(),size);
try {
//执行回调函数
this.processor.process(temp);
} catch (Throwable var4) {
System.out.println("process error");
}
}
}
/**
* 判断队列实际大小是否超过指定阈值亦或距离上次任务处理时间差是否超过指定阈值
* @return true:满足触发条件 false:不满足触发条件
*/
private boolean canFlush() {
return this.queue.size() > this.queueSizeLimit || System.currentTimeMillis() - this.lastFlushTime > (long)this.period;
}
@Override
public void run() {
this.currentThread = Thread.currentThread();
this.currentThread.setName(this.threadName);
//当前线程没有被其他线程打断
while(!this.currentThread.isInterrupted()) {
//死循环的判断是否满足触发条件(队列实际大小是否超出指定阈值或距离上次任务时间是否超出指定阈值),如果未满足将阻塞当前线程,避免死循环给系统带来性能开销
while(!this.canFlush()) {
//当前工作线程被阻塞
log.info("线程被阻塞...");
LockSupport.park(this);
}
//一旦add方法执行的时候判断存放的阻塞队列元素大小超出自定制阈值亦或距离上次任务处理时间差超出指定阈值,就会调用LockSupport.unpark方法解除阻塞的线程
//一旦线程被解除阻塞,就会触发此方法,将队列元素转成List对象且调用已经注册的回调函数
// log.info("阻塞线程被唤醒");
this.flush();
}
}
}
☆测试用例(队列元素超出指定阈值大小)
/**
* 前置条件:
* #主件初始化默认新增两个工作线程
* config.threadNum=2
* config.period=12000
* config.queueSizeLimit=3
* config.capacity=10
*/
@DisplayName("队列大小超出指定阈值")
@Test
void add() {
for(int i=0;i<10;i++){
producerAndConsumerComponet.add("1");
}
try {
TimeUnit.SECONDS.sleep(10);
}catch (Exception e){
e.printStackTrace();
}
}
结果打印:
2022-10-29 21:04:51,656 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:165] - workThread_1队列大小=4超出指定阈值=3
2022-10-29 21:04:51,658 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 执行start方法,唤醒被阻塞的线程workThread_1
2022-10-29 21:04:51,659 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_1被唤醒后,开始执行任务:从队列中腾出大小为3的数据且转成List对象
2022-10-29 21:04:51,659 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 线程被阻塞...
2022-10-29 21:04:51,659 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:165] - workThread_0队列大小=4超出指定阈值=3
2022-10-29 21:04:51,659 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 执行start方法,唤醒被阻塞的线程workThread_0
2022-10-29 21:04:51,660 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_0被唤醒后,开始执行任务:从队列中腾出大小为3的数据且转成List对象
2022-10-29 21:04:51,660 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 线程被阻塞...
2022-10-29 21:04:53,374 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:146] - 当前时间距离上次任务处理时间周期=1714超出指定阈值=1000
2022-10-29 21:04:53,374 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 执行start方法,唤醒被阻塞的线程workThread_0
2022-10-29 21:04:53,375 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_0被唤醒后,开始执行任务:从队列中腾出大小为2的数据且转成List对象
2022-10-29 21:04:53,375 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 线程被阻塞...
2022-10-29 21:04:53,379 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:146] - 当前时间距离上次任务处理时间周期=1720超出指定阈值=1000
2022-10-29 21:04:53,379 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 执行start方法,唤醒被阻塞的线程workThread_1
2022-10-29 21:04:53,380 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_1被唤醒后,开始执行任务:从队列中腾出大小为2的数据且转成List对象
2022-10-29 21:04:53,380 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 线程被阻塞...
☆测试用例(timeout前后两次检查时间超出指定阈值)
/**
前置条件
config.threadNum=2
config.period=500
config.queueSizeLimit=1000
config.capacity=10
*/
@Test
void timeout() {
try {
for (int i = 0; i < 100; i++) {
producerAndConsumerComponet.add("1");
TimeUnit.MILLISECONDS.sleep( new Random().nextInt(50));
}
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
}
输出
可能有部分小伙伴对在生产环境如何正确使用这款组件有疑虑,对此完整版我已经开源到GitHub上(基于springboot构建内含如何正确初始化此组件以及完整的测试用例),有兴趣的小伙伴可以自取。
github地址:
https://github.com/TaoZhuGongBoy/ProducerAndConsumerComponet
总结
好了这款组件介绍已接近尾声,接下来让我们一起做下总结:
☆是什么
我们说这款工具是一款基于“生产者-消费者模式”实现的组件。
以前生产者必须同步调用、等待相关业务操作的处理结果后才能返回(一旦有些业务场景生产者生产的速度过快,方法内部自身业务处理又比较耗时)这时如果同步等待调用结果返回,系统整体吞吐量会极具降低。
现在换了一种思路即生产者不需要同步等待业务的处理结果,当它发送一个请求后立即返回,耗时的处理由一致多个消费者线程来异步处理,加快任务整体处理速度。(异步、解耦)
☆适用场景
它比较适合处理一些重要程度不是很高的数据(比如埋点日志、下载请求等),当生产者生产数据过快,业务本身处理又比较耗时,那用这个方案是比较合适的。
为什么在这里要强调重要程度不是很高这句话呢?因为BlockQueue毕竟是基于内存的数据结构,极端情况下是存在数据丢失风险的,像埋点日志、下载请求这种数据小部分丢失其实对业务影响不大。
看到这里可能有部分小伙伴会产生一个疑问:怎么看这个组件的功能跟MQ这么像。对的,功能是相似的,但这款组件是一个本地化的解决方案,目的就是为了降低引入消息队列的复杂度才设计(设计意图)。
☆怎么实现的
消费者线程在组件初始化的时候会根据指定threadNum数量完成新建。run方法一开始就进行死循环检查,根据两个条件判断:
条件一:工作线程对象内部的阻塞队列大小实际元素个数是否超过指定阈值;
条件二:当前时间和上一次任务处理时间差是否超过指定阈值,如果两个都不满足,工作线程就通过LockSupport.park(this)将自己阻塞,等待条件之一被满足,然后被唤醒。
每个工作线程内部的add方法和time方法是触发上述条件之一的入口,详细请参详架构设计部分。
写到最后
这款组件几乎囊括了并发编程领域半壁江山的技术点,能把这些散的点串起来,根据相应场景为我所用,着实不易,也体现出原作者深厚的技术功底。
如果你是《并发编程》的初学者亦或有几年经验的老兵,都建议好好揣摩与学习一下这款组件的架构设计与编码实现;如果在你的生产场景中你刚好也碰到类似问题与场景,那么这款组件也许能帮助你。
本文完!
相关文章
- Mila唐建团队开源大分子机器学习平台TorchProtein:分析蛋白质序列及结构数据,仅需一两行代码
- 团队内训-“软件需求设计建模方法学全程实例剖析”训练方案(202208更新)
- Apifox --- 全套服务提升了团队效率,让研测之间充满了爱(记Apifox在工程中的实际应用)【云原生】
- 一年 100% 云原生化,众安保险架构演进的探索与实践 | 卓越技术团队访谈录
- 魔法成为现实?武大学生团队造出了“隐身衣”,成本不到 500 元!
- 高端麒麟处理器还在研发 消息称华为不惜一切保住海思团队
- 太阳能制氢新突破 研究团队刚搞定了低成本制氧这“半个”问题
- Facebook AI团队让机器人行走适应各种环境和路面
- 腾讯微信团队:公众号及新用户注册全部暂停 8月初升级完成后恢复
- 德尔塔病毒劲敌!杨晓明团队发现单抗有效 临床申报正在推进
- 专访《舌尖上的中国》“御用”航拍团队:以后会诞生很多优秀的航拍手
- 国内航拍团队简介之上海翼枫航空——开设全国首家DJI形象4S店
- 如何打造高效团队MySQL?(团队mysql)
- 九号公司上市后首次架构调整:成立四大机器人团队,要在未来扳回一局
- 罗杰波教授团队最新成果发表,刷新你对“推特治国”的认知
- 在百度知道团队中快速审批新成员的js脚本