zl程序教程

您现在的位置是:首页 >  Javascript

当前栏目

线程池监控:执行超时、等待超时;执行超时数量、等待超时数量

2023-02-19 12:23:10 时间

​监控线程池:执行超时、等待超时;执行超时数量、等待超时数量;

扩展线程池 ThreadPoolExecutor 的两个方法 beforeExecute 和 afterExecute

自定义Runnable 记录关键节点时间

关键时间节点参数:

  • 任务创建(提交)时间:submitTime
  • 任务开始执行时间:startExeTime
  • 任务结束执行时间:endExeTime
  • 任务在队列等待时间:任务开始执行时间 - 任务创建(提交)时间
  • 任务执行总时间:任务结束执行时间 - 任务开始执行时间

源码分析

线程池 ThreadPoolExecutor 为了提供扩展,提供了两个方法 beforeExecute 和 afterExecute,每个任务执行前后都会调用这两个方法,相当于对线程任务的执行做了一个切面。

public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* @param t 执行任务的线程
* @param
protected void beforeExecute(Thread t, Runnable r){ }

/**
* @param r 将要被执行的任务
* @param
protected void afterExecute(Runnable r, Throwable t){ }
}

源码执行逻辑:

线程池扩展代码:

public class ThreadPoolExpandTest {
// 定义线程池
public static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
2,
4,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
new ThreadPoolExecutor.DiscardOldestPolicy()
){
@Override
/**
* @param t 执行任务的线程
* @param
protected void beforeExecute(Thread t, Runnable r){
System.out.println("beforeExecute将要被执行");
}

/**
* @param r 将要被执行的任务
* @param
@Override
protected void afterExecute(Runnable r, Throwable t){
System.out.println("afterExecute已经执行完毕");
}
};
public static void main(String[] args){
poolExecutor.execute(()->{
System.out.println("任务执行");
});
}
}

运行结果:

beforeExecute执行
任务执行
afterExecute执行

总结:从测试代码可以看出,通过扩展线程池参数可以进行任务执行的监控。

自定义Runnable

通过自定义Runnable,记录任务执行的一些时间:

  • 任务创建(提交)时间
  • 任务开始执行时间
public class DynamicRunnable implements Runnable{
/**
* runnable
*/
private final Runnable runnable;
/**
* 任务创建(提交)时间
*/
private final Long submitTime;
/**
* 任务开始执行时间
*/
private Long startExeTime;

public DynamicRunnable(Runnable runnable){
this.runnable = runnable;
submitTime = System.currentTimeMillis();
}

@Override
public void run(){
runnable.run();
}

public Long getSubmitTime(){
return submitTime;
}

public void setStartExeTime(Long startExeTime){
this.startExeTime = startExeTime;
}

public Long getStartExeTime(){
return startExeTime;
}
}

继承线程池+自定义Runnable

核心参数:

/**
* 执行超时,单位(毫秒)
*/
private long runTimeout;

/**
* 等待超时,单位(毫秒)
*/
private long queueTimeout;

/**
* 执行超时数量
*/
private final AtomicInteger runTimeoutCount = new AtomicInteger();

/**
* 等待超时数量
*/
private final AtomicInteger queueTimeoutCount = new AtomicInteger();

重写ThreadPoolExecutor方法:

@Override
public void execute(Runnable command){
if (runTimeout > 0 || queueTimeout > 0) {
// 记录任务提交时间
command = new DynamicRunnable(command);
}
super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r){
if (!(r instanceof DynamicRunnable)) {
super.beforeExecute(t, r);
return;
}
DynamicRunnable runnable = (DynamicRunnable) r;
long currTime = System.currentTimeMillis();
if (runTimeout > 0) {
// 记录任务开始执行时间
runnable.setStartExeTime(currTime);
}
if (queueTimeout > 0) {
// 任务开始执行时间 - 任务创建(提交)时间
long waitTime = currTime - runnable.getSubmitTime();
if (waitTime > queueTimeout) {
log.error("{} execute queue timeout waitTime: {}ms", this.getThreadPoolName(),waitTime);
}
}
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t){
if (runTimeout > 0) {
DynamicRunnable runnable = (DynamicRunnable) r;
// 任务执行总时间:任务结束执行时间 - 任务开始执行时间
long runTime = System.currentTimeMillis() - runnable.getStartExeTime();
if (runTime > runTimeout) {
runTimeoutCount.incrementAndGet();
log.error("{} execute, run timeout runTime: {}ms", this.getThreadPoolName(), runTime);
}
}
super.afterExecute(r, t);
}