利用Doug Lea的并发包实现带超时机制的线程池
2023-03-14 10:15:58 时间
jdk5引入的concurrent包来自于Doug Lea的卓越贡献。最近我在查找服务器OOM的原因之后,决定采用这个包重写应用中一个servlet,这个servlet调用了一个阻塞方法,当被阻塞之后,服务器中的线程数(因为阻塞了,后续请求不断地新增线程)突然增加导致了服务器当机,因此决定采用一个线程池,并且设置超时,如果阻塞方法超过一定时间就取消线程。因为我们的项目仍然跑在jdk 1.4.2上面,短期内不可能升级到jdk5,还是要利用这个并发包。去这里下载源码并自己打包成jar,加入项目的lib,然后利用PooledExecutor和TimedCallable来实现我们的需求。首先是线程池,相当简单:
静态初始化并设置一个PoolExecutor,设置空闲线程的存活时间为5分钟,设置最小线程数为4,最大线程数为30,一开始创建5个线程以待使用(根据各自的应用调整这些参数),另外提供了shutdown方法以供ServeltContextListener的contextDestroyed方法调用以关闭线程池。那么,结合TimedCallable来实现提交线程的超时机制,调用类似:
import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
/**
* <p>类说明:线程池</p>
* <p>注意事项:</p>
* <pre></pre>
* <p>创建日期:Sep 7, 2007 1:25:33 PM</p>
* @author:dennis zane
* @version $Id:$
*/
public class MyThreadPool{
private static PooledExecutor exec = new PooledExecutor(new BoundedBuffer(
20), 30);
static {
exec.setKeepAliveTime(1000 * 60 * 5);
exec.createThreads(5);
exec.setMinimumPoolSize(4);
}
public static void execute(final Runnable r) throws InterruptedException{
exec.execute(r);
}
public static void shutdown() {
exec.shutdownAfterProcessingCurrentlyQueuedTasks();
exec = null;
}
}
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
/**
* <p>类说明:线程池</p>
* <p>注意事项:</p>
* <pre></pre>
* <p>创建日期:Sep 7, 2007 1:25:33 PM</p>
* @author:dennis zane
* @version $Id:$
*/
public class MyThreadPool{
private static PooledExecutor exec = new PooledExecutor(new BoundedBuffer(
20), 30);
static {
exec.setKeepAliveTime(1000 * 60 * 5);
exec.createThreads(5);
exec.setMinimumPoolSize(4);
}
public static void execute(final Runnable r) throws InterruptedException{
exec.execute(r);
}
public static void shutdown() {
exec.shutdownAfterProcessingCurrentlyQueuedTasks();
exec = null;
}
}
静态初始化并设置一个PoolExecutor,设置空闲线程的存活时间为5分钟,设置最小线程数为4,最大线程数为30,一开始创建5个线程以待使用(根据各自的应用调整这些参数),另外提供了shutdown方法以供ServeltContextListener的contextDestroyed方法调用以关闭线程池。那么,结合TimedCallable来实现提交线程的超时机制,调用类似:
//设置超时时间
private static final long timeout = 1000;
......
......
try{
Callable callable = new Callable() {
public Object call() {
return new YourProgram().run();
}
};
TimedCallable timedCallable = new TimedCallable(callable, timeout);
FutureResult future = new FutureResult();
Runnable cmd = future.setter(timedCallable);
//提交给线程池来执行
MyThreadPool.execute(cmd);
//获取任务结果
YourObject obj= (YourObject) future.get();
......
......
} catch (InterruptedException e) {
if (e instanceof TimeoutException) {
log.error("任务超时");
...
}
}catch(InvocationTargetException e)
{
//清理任务..
}
......
private static final long timeout = 1000;
......
......
try{
Callable callable = new Callable() {
public Object call() {
return new YourProgram().run();
}
};
TimedCallable timedCallable = new TimedCallable(callable, timeout);
FutureResult future = new FutureResult();
Runnable cmd = future.setter(timedCallable);
//提交给线程池来执行
MyThreadPool.execute(cmd);
//获取任务结果
YourObject obj= (YourObject) future.get();
......
......
} catch (InterruptedException e) {
if (e instanceof TimeoutException) {
log.error("任务超时");
...
}
}catch(InvocationTargetException e)
{
//清理任务..
}
......
如果不是很理解这段代码,那么也许你应该先看看jdk5引入的Future、FutureTask等类,或者看看这里的文档。对于超时时间的大小估算,你应当在生产环境中计算该阻塞方法的调用时间,正常运行一段时间,利用脚本语言(比如ruby、python)分析日志以得到一个调用花费时间的最大值作为timeout,这里的单位是毫秒。而线程池大小的估算,要看你提交给线程执行的任务的类型:如果是计算密集型的任务,那么线程池的大小一般是(cpu个数+1);如果是IO密集型的任务(一般的web应用皆是此类),那么估算有一个公式,
假设N-cpu是cpu个数,U是目标CPU使用率,W/C是任务的等待时间与计算时间的比率,那么最优的池大小等于:
N-threads=N-cpu*U*(1+W/C)
文章转自庄周梦蝶 ,原文发布时间
相关文章
- ChatGPT使用注册教程和插件,无需注册在线体验chatgpt方式
- 元宇宙,逃离「元宇宙」
- 互联网嬗变,催生金融科技新变革
- 产业互联网,互联网的再出发
- 大型集团企业数据治理实践,推进全域数据资产体系建设 | 数字化标杆
- C/C++ CreateFileMapping 共享内存
- C/C++ 简单特征码匹配
- vue3 中如何实现数组响应式
- vite配置别名
- BSI模型
- 9大项目管理领域
- 软件安全知识
- ChatGPT 最常用的五个使用场景
- PS/AE渐变工具色带波纹色彩过渡不均匀怎么办
- Topaz Photo AI for Mac(人工智能降噪软件)
- Capture One 23 Enterprise for Mac(RAW图像编辑软件)
- ?️ 递归思想
- Fix My iPhone for Mac(iOS系统修复软件)
- MacX Video Converter Pro for mac(视频转换工具)
- Typora for Mac(文本编辑器)