使用线程执行框架的一次经历
场景
一个线程从某个地方接收消息(数据),可以是其他主机或者消息队列,然后转由另外的一个线程池来执行具体处理消息的逻辑,并且消息的处理速度小于接收消息的速度。这种情景很常见,试想一下,你会怎么设计和实现?
直观想法
很显然采用JUC的线程框架,可以迅速写出代码。
消息接收者:
private static volatile boolean inited = false; private static volatile boolean shutdown = false; private static volatile int cnt = 0; private MessageHandler messageHandler; public void start(){ Executors.newSingleThreadExecutor().execute(new Runnable() { @Override public void run() { while(!shutdown){ init(); recv(); } } }); } /** * 模拟消息接收 */ public void recv(){ Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg)); messageHandler.handle(msg); } public void init(){ if(!inited){ messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) { new Receiver().start(); }
消息处理:
private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE); public void handle(Message msg) { try { service.execute(new Runnable() { @Override public void run() { parseMsg(msg); } }); } catch (Throwable e) { System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message) { while (true) { try { System.out.println("解析消息:" + message); Thread.sleep(5000); System.out.println("============================"); } catch (InterruptedException e) { e.printStackTrace(); } } }
效果:这种方案导致的现象是接收到的消息会迅速堆积,我们从消息队列(或者其他地方)取出了大量消息,但是处理线程的速度又跟不上,所以导致的问题是大量的Task会堆积在线程池底层维护的一个阻塞队列中,这会极大的耗费存储空间,影响系统的性能。
分析:当execute()一个任务的时候,如果有空闲的worker线程,那么投入运行,否则看设置的最大线程个数,没有达到线程个数限制就创建新线程,接新任务,否则就把任务缓冲到一个阻塞队列中,问题就是这个队列,默认的大小是没有限制的,所以就会大量的堆积任务,必然耗费heap空间。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue Runnable } public LinkedBlockingQueue() { this(Integer.MAX_VALUE); // capacity }
计数限制
面对上述问题,想到了要限制消息接收的速度,自然就想到了各种线程同步的原语,不过在这里最简单的就是使用一个Volatile的计数器。
消息接收者:
private static volatile boolean inited = false; private static volatile boolean shutdown = false; private static volatile int cnt = 0; private MessageHandler messageHandler; public void start(){ Executors.newSingleThreadExecutor().execute(new Runnable() { @Override public void run() { while(!shutdown){ init(); recv(); } } }); } /** * 模拟消息接收 */ public void recv(){ Message msg = new Message("Msg" + System.currentTimeMillis()); System.out.println(String.format("接收到消息(%d): %s", ++cnt, msg)); messageHandler.handle(msg); } public void init(){ if(!inited){ messageHandler = new MessageHandler(); inited = true; } } public static void main(String[] args) { new Receiver().start(); }
消息处理:
private static final int THREAD_POOL_SIZE = 1; private ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE); public void handle(Message msg){ try { service.execute(new Runnable() { @Override public void run() { parseMsg(msg); } }); } catch (Throwable e) { System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message){ try { Thread.sleep(10000); System.out.println("解析消息:" + message); } catch (InterruptedException e) { e.printStackTrace(); }finally { Receiver.limit --; } }
效果:通过控制消息的个数来阻塞消息的接收过程,就不会导致任务的堆积,系统的内存消耗会比较平缓,限制消息的个数本质就和下面限制任务队列大小一样。
使用同步队列 SynchronousQueue
SynchronousQueue 虽名为队列,但是其实不会缓冲任务的对象,只是作为对象传递的控制点,如果有空闲线程或者没有达到最大线程限制,就会交付给worker线程去执行,否则就会拒绝,我们需要自己实现对应的拒绝策略RejectedExecutionHandler,默认的是抛出异常RejectedExecutionException。
消息接收者同上。
消息处理:
ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue Runnable (), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("自定义拒绝策略"); try { executor.getQueue().put(r); System.out.println("重新放任务回队列"); } catch (InterruptedException e) { e.printStackTrace(); } } }); public void handle(Message msg) { try { System.out.println(service.getTaskCount()); System.out.println(service.getQueue().size()); System.out.println(service.getCompletedTaskCount()); service.execute(new Runnable() { @Override public void run() { parseMsg(msg); } }); } catch (Throwable e) { System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message) { while (true) { try { System.out.println("线程名:" + Thread.currentThread().getName()); System.out.println("解析消息:" + message); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
效果:能够控制消息的接收速度,但是我们需要在rejectedExecution中实现某种阻塞的操作,但是选择在发生拒绝的时候把任务重新放回队列,带来的问题就是这个Task会发生饥饿现象。
使用大小限制的阻塞队列
使用LinkedBlockingQueue作为线程框架底层的任务缓冲区,并且设置大小限制,思想上和上述方案一样,都是有一个阻塞的点,但是通过最后的jvm monitor看到这里的CPU消耗更少,内存使用有所降低,并且波动小(具体原因有待探索)。
消息接收者同上。
消息处理:
private static final int THREAD_POOL_SIZE = 4; private static final int BLOCK_QUEUE_CAP = 500; ThreadPoolExecutor service = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue Runnable (BLOCK_QUEUE_CAP), new SimpleThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("自定义拒绝策略"); try { executor.getQueue().put(r); System.out.println("重新放任务回队列"); } catch (InterruptedException e) { e.printStackTrace(); } } }); public void handle(Message msg) { try { service.execute(new Runnable() { @Override public void run() { parseMsg(msg); } }); } catch (Throwable e) { System.out.println("消息处理异常" + e); } } /** * 比较耗时的消息处理流程 */ public void parseMsg(Message message) { try { Thread.sleep(5000); System.out.println("线程名:" + Thread.currentThread().getName()); System.out.println("解析消息:" + message); } catch (InterruptedException e) { e.printStackTrace(); } } static class SimpleThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("Thread-" + System.currentTimeMillis()); return thread; } }
总结
多线程是比较容易出问题的地方,特别当对方法不熟悉的时候
作者:Chown
来源:51CTO
如何处理JDK线程池内线程执行异常?讲得这么通俗,别还搞不懂 本篇 《如何处理 JDK 线程池内线程执行异常》 这篇文章适合哪些小伙伴阅读呢? 适合工作中使用线程池却不知异常的处理流程,以及不知如何正确处理抛出的异常
【多线程】面试官:如何利用线程工具,防止多线程同时操作一个资源? 通过前面的学习,知道了线程的利与弊,正确的使用多线程,会尽最大的可能去压榨我们系统的资源,从而提高效率,但是如果不合理使用线程,可能会造成副作用,给系统带来更大的压力,进一步的思考,如何才能防止多线程操作一个资源?
面试必问 | 一个线程从创建到消亡要经历哪些阶段? 在【精通高并发系列】中的《高并发之——线程与多线程》一文中,我们简单介绍了线程的生命周期和线程的几个重要状态,并以代码的形式实现了线程是如何进入各个状态的。
相关文章
- c#Winform程序调用app.config文件配置数据库连接字符串 SQL Server文章目录 浅谈SQL Server中统计对于查询的影响 有关索引的DMV SQL Server中的执行引擎入门 【译】表变量和临时表的比较 对于表列数据类型选择的一点思考 SQL Server复制入门(一)----复制简介 操作系统中的进程与线程
- 启动hbase时出现HMaster Aborted错误(HMaster启动之(HMaster线程的调用))
- python线程的注意点(线程之间执行是无序的、主线程会等待所有的子线程执行结束再结束(守护主线程)、线程之间共享全局变量、线程之间共享全局变量数据出现错误问题(线程等待(join)、互斥锁))
- python线程执行带有参数的任务(args、kwargs)
- 如何优雅地停止一个线程?
- 国内首位中间件Oracle ACE:WebLogic执行线程耗尽解决方案
- 线程执行者(九)执行者取消一个任务
- 线程系列06,通过CLR代码查看线程池及其线程
- Java 线程和多线程执行过程分析
- 线程属性--十分重要的概念
- 想在子线程里面触发的信号的槽函数在子线程执行,信号槽连接必须使用DirectConnection 方式(即使跨线程,也可以强迫DirectConnection,而不能是AutoConnection)
- 《CUDA C编程权威指南》——3.2 理解线程束执行的本质
- 使用 CountDownLatch 控制多个线程执行顺序
- Java并发编程(十一)线程池的使用
- 进程与线程的区别
- 异常信息:CLR无法从COM 上下文0x645e18 转换为COM上下文0x645f88,这种状态已持续60秒。拥有目标上下文/单元的线程很有可能执行的是非泵式等待或者在不发送 Windows 消息的
- java获取异步线程执行结果示例,也是Executors框架的基本原理
- C# 线程