zl程序教程

您现在的位置是:首页 >  其他

当前栏目

JMeter5.1核心类ThreadGroup源码分析

2023-04-18 14:26:45 时间

概述

  • 线程组是一个测试计划的开始点
  • 在一个测试计划中的所有元件都必须在某个线程组下
  • 线程组决定 Jmeter 执行测试计划的线程数

作用:

  • 设置线程数
  • 设置ramp-up period:达到指定线程数所需要的时间
  • 设置执行测试的次数
  • 延迟创建线程:直到线程被需要的采样器开始执行时才会被创建,避免资源浪费

调度器

  • Duration (seconds) :持续时间,单位为秒
  • Startup Delay (seconds):启动延迟,单位为秒

每个线程组都会独立的运行测试计划,互不干扰,多个线程组用于模仿对服务器的并发访问。

源码解读

ThreadGroup类继承AbstractThreadGroup类

SetupThreadGroup和PostThreadGroup类继承ThreadGroup

主要变量

    /** Ramp-up time */
    public static final String RAMP_TIME = "ThreadGroup.ramp_time";

    /** Whether thread startup is delayed until required */
    public static final String DELAYED_START = "ThreadGroup.delayedStart";

    /** Whether scheduler is being used */
    public static final String SCHEDULER = "ThreadGroup.scheduler";

    /** Scheduler duration, overrides end time */
    public static final String DURATION = "ThreadGroup.duration";

    /** Scheduler start delay, overrides start time */
    public static final String DELAY = "ThreadGroup.delay";

    // 核心变量
    private transient Thread threadStarter;

    // List of active threads
    private final ConcurrentHashMap<JMeterThread, Thread> allThreads = new ConcurrentHashMap<>();
    
    private transient Object addThreadLock = new Object();

    /** Is test (still) running? */
    private volatile boolean running = false;

    /** Thread Group number */
    private int groupNumber;

    /** Are we using delayed startup? */
    private boolean delayedStartup;

    /** Thread safe class */
    private ListenerNotifier notifier;

    /** This property will be cloned */
    private ListedHashTree threadGroupTree; 

start

启动ThreadGroup线程组

    public void start(int groupNum, ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine) {
        // 设置线程运行标志
        this.running = true;
        // 线程组编号
        this.groupNumber = groupNum;
        // 通知类
        this.notifier = notifier;
        this.threadGroupTree = threadGroupTree;
        // 子线程数
        int numThreads = getNumThreads();
        // 预期线程组的所有线程从启动-运行-释放的总时间
        int rampUpPeriodInSeconds = getRampUp();
        float perThreadDelayInMillis = (float) (rampUpPeriodInSeconds * 1000) / (float) getNumThreads();

        //  延迟创建线程的标志
        delayedStartup = isDelayedStartup(); // Fetch once; needs to stay constant
        log.info("Starting thread group... number={} threads={} ramp-up={} perThread={} delayedStart={}", groupNumber,
                numThreads, rampUpPeriodInSeconds, perThreadDelayInMillis, delayedStartup);
        // 延迟创建线程直到需要
        if (delayedStartup) {
            // 创建延时启动线程ThreadStarter
            threadStarter = new Thread(new ThreadStarter(notifier, threadGroupTree, engine), getName()+"-ThreadStarter");
            // 设置为守护线程
            threadStarter.setDaemon(true);
            // 启动线程
            threadStarter.start();
            // N.B. we don't wait for the thread to complete, as that would prevent parallel TGs
        } else {
            long now = System.currentTimeMillis(); // needs to be same time for all threads in the group
            final JMeterContext context = JMeterContextService.getContext();
            // 多线程执行JMeterThread子线程
            for (int threadNum = 0; running && threadNum < numThreads; threadNum++) {
                startNewThread(notifier, threadGroupTree, engine, threadNum, context, now, (int)(threadNum * perThreadDelayInMillis));
            }
        }
        log.info("Started thread group number {}", groupNumber);
    } 

startNewThread

创建JMeterThread子线程

    private JMeterThread startNewThread(ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine,
            int threadNum, final JMeterContext context, long now, int delay) {
        JMeterThread jmThread = makeThread(notifier, threadGroupTree, engine, threadNum, context);
        // 调度器
        scheduleThread(jmThread, now); // set start and end time
        jmThread.setInitialDelay(delay);
        Thread newThread = new Thread(jmThread, jmThread.getThreadName());
        // 线程启动时存储线程,主要结合serialized来使用
        registerStartedThread(jmThread, newThread);
        // 启动JMeterThread子线程
        newThread.start();
        return jmThread;
    }

makeThread

创建JMeterThread线程,并初始化属性

    private JMeterThread makeThread(
            ListenerNotifier notifier, ListedHashTree threadGroupTree,
            StandardJMeterEngine engine, int threadNumber, 
            JMeterContext context) { // N.B. Context needs to be fetched in the correct thread
        boolean onErrorStopTest = getOnErrorStopTest();
        boolean onErrorStopTestNow = getOnErrorStopTestNow();
        boolean onErrorStopThread = getOnErrorStopThread();
        boolean onErrorStartNextLoop = getOnErrorStartNextLoop();
        String groupName = getName();
        final JMeterThread jmeterThread = new JMeterThread(cloneTree(threadGroupTree), this, notifier);
        jmeterThread.setThreadNum(threadNumber);
        jmeterThread.setThreadGroup(this);
        jmeterThread.setInitialContext(context);
        String distributedPrefix = 
                JMeterUtils.getPropDefault(JMeterUtils.THREAD_GROUP_DISTRIBUTED_PREFIX_PROPERTY_NAME, "");
        // 获取线程名字
        final String threadName = distributedPrefix + (distributedPrefix.isEmpty() ? "":"-") +groupName + " " + groupNumber + "-" + (threadNumber + 1);
        jmeterThread.setThreadName(threadName);
        jmeterThread.setEngine(engine);
        jmeterThread.setOnErrorStopTest(onErrorStopTest);
        jmeterThread.setOnErrorStopTestNow(onErrorStopTestNow);
        jmeterThread.setOnErrorStopThread(onErrorStopThread);
        jmeterThread.setOnErrorStartNextLoop(onErrorStartNextLoop);
        return jmeterThread;
    }

scheduleThread

调度器的作用:控制每个线程组运行的持续时间以及它在多少秒后再启动

  • Duration (seconds) :持续时间;线程组运行的持续时间
  • Startup Delay (seconds):启动延迟;测试计划开始后,线程组的线程将在多少秒后再启动运行
    private void scheduleThread(JMeterThread thread, long now) {

        if (!getScheduler()) { // if the Scheduler is not enabled
            return;
        }

        if (getDelay() >= 0) { // Duration is in seconds
            // 设置线程开始时间
            thread.setStartTime(getDelay() * 1000 + now);
        } else {
            throw new JMeterStopTestException("Invalid delay " + getDelay() + " set in Thread Group:" + getName());
        }

        // set the endtime for the Thread
        if (getDuration() > 0) {// Duration is in seconds
            // 设置线程运行的持续时间
            thread.setEndTime(getDuration() * 1000 + (thread.getStartTime()));
        } else {
            throw new JMeterStopTestException("Invalid duration " + getDuration() + " set in Thread Group:" + getName());
        }
        // Enables the scheduler
        thread.setScheduled(true);
    }

registerStartedThread

线程启动时存储线程,结合serialized属性一起使用,用于线程组是串行执行还是并行执行

    private void registerStartedThread(JMeterThread jMeterThread, Thread newThread) {
        allThreads.put(jMeterThread, newThread);
    }

ThreadStarter

ThreadStarter是延迟启动线程类,主要结合delayedStartup和schedule属性一起使用,该类继承Runnable接口,也是一个线程类对象。

构造函数

    public ThreadStarter(ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine) {
        super();
        this.notifier = notifier;
        this.threadGroupTree = threadGroupTree;
        this.engine = engine;
        // Store context from Root Thread to pass it to created threads
        this.context = JMeterContextService.getContext();
    }

run

执行启动延迟的线程,调用的仍然是JMeterThread类

    public void run() {
    try {
        // Copy in ThreadStarter thread context from calling Thread
        JMeterContextService.getContext().setVariables(this.context.getVariables());
        long endtime = 0;
        // 获取调度器标志
        final boolean usingScheduler = getScheduler();
        if (usingScheduler) {
            // set the start time for the Thread
            // 启动延迟时间
            if (getDelay() > 0) {// Duration is in seconds
                delayBy(getDelay() * 1000); 
            }
            // set the endtime for the Thread
            // 持续时间
            endtime = getDuration();  
            if (endtime > 0) {// Duration is in seconds, starting from when the threads start
                // 线程执行结束时间
                endtime = endtime *1000 + System.currentTimeMillis();
            }
        }
        // 获取线程组的执行线程数
        final int numThreads = getNumThreads();
        // ramp-up delay = 达到指定线程数所需要的时间(秒) / 线程数,最后取整
        final int perThreadDelayInMillis = Math.round((float) (getRampUp() * 1000) / (float) numThreads);
        for (int threadNumber = 0; running && threadNumber < numThreads; threadNumber++) {
            if (threadNumber > 0) {
                pause(perThreadDelayInMillis); // ramp-up delay (except first)
            }
            if (usingScheduler && System.currentTimeMillis() > endtime) {
                break; // no point continuing beyond the end time
            }
            JMeterThread jmThread = makeThread(notifier, threadGroupTree, engine, threadNumber, context);
            // 这里要注意下:父线程已经进行线程等待了,子线程就不需要设置等待时间了
            jmThread.setInitialDelay(0);   // Already waited
            if (usingScheduler) {
                jmThread.setScheduled(true);
                jmThread.setEndTime(endtime);
            }
            Thread newThread = new Thread(jmThread, jmThread.getThreadName());
            newThread.setDaemon(false); // ThreadStarter is daemon, but we don't want sampler threads to be so too
            registerStartedThread(jmThread, newThread);
            newThread.start();
        }
    } catch (Exception ex) {
        log.error("An error occurred scheduling delay start of threads for Thread Group: {}", getName(), ex);
    }
}

delayBy

设置启用延迟时间

        private void delayBy(long delay) {
            if (delay > 0) {
                //  获取当前时间
                long start = System.currentTimeMillis();
                // 启动延迟时间
                long end = start + delay;
                long now;
                long pause = RAMPUP_GRANULARITY; // maximum pause to use
                while(running && (now = System.currentTimeMillis()) < end) {
                    long togo = end - now;
                    // 比较大小
                    if (togo < pause) {
                        pause = togo;
                    }
                    // 线程等待
                    pause(pause); // delay between checks
                }
            }
        }

pause

线程等待

    private void pause(long ms){
            try {
                 // 实现上也是调用Thread.sleep方法
                TimeUnit.MILLISECONDS.sleep(ms); 
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }