zl程序教程

您现在的位置是:首页 >  其他

当前栏目

JMeter5.1核心类StandardJMeterEngine源码分析

2023-04-18 14:26:56 时间

概述

JMeter 默认单机压测引擎,运行 JMeter 测试,直接用于本地 GUI 和非 GUI 调用,或者RemoteJMeterEngineImpl 在服务器模式下运行时启动。

StandardJMeterEngine继承JMeterEngine 和Runable接口,本质上是一个线程对象。

API地址:

API地址:https://jmeter.apache.org/api/org/apache/jmeter/engine/StandardJMeterEngine.htm

简要解读:

  • HashTree 是依赖的数据结构;
  • SearchByClass 用来查找 HashTree 中的所有节点,并把节点实例化为真正的对象,例如TestPlan/ThreadGroup/Sampler/ResultCollector 在 HashTree 中本来都是只是配置,全部通过 SearchByClass 实例化的;
  • 实例化出来的对象如果是 TestStateListener 类型,则会在有生命周期的函数回调,测试前调 testStarted,结束时调 testEnded, 比如 ResultCollector是该类型的一种,在结束的时候回调 testEnded 方法完成 report 的写入;
  • PreCompiler 用来解析 Arguments, 把 TestPlan 节点中配置的参数作为JMeterVariables 加入到测试线程上线文中,同时进行参数和函数替换操作;
  • ThreadGroup 用来用来管理一组线程,包括线程的个数/启动/关闭等;
  • StopTest 作为其内部类对外不可见,作为一个 Runnable,作用是异步停止测试,stopTest方法也是通过该内部类实现的。

源码解读

主要参数变量

    private static volatile StandardJMeterEngine engine;

    private volatile boolean running = false;

    private volatile boolean active = false;

    private volatile boolean serialized = false;

    private volatile boolean tearDownOnShutdown = false;

    private HashTree test;

    private final String host;

构造函数

    // 不带参数的构造函数,用于单机压测
    public StandardJMeterEngine() {
        this(null);
    }

    // 带参数的构造函数,用于分布式压测
    public StandardJMeterEngine(String host) {
        this.host = host;
        // Hack to allow external control
        initSingletonEngine(this);
    }

主要方法

configure(HashTree testTree)

配置引擎,HashTree 是 JMeter 执行测试依赖的数据结构,configure 在执行测试之前进行配置测试数据。

    // 从HashTree中解析出TestPlan,并创建
    SearchByClass<TestPlan> testPlan = new SearchByClass<>(TestPlan.class);
    testTree.traverse(testPlan);
    Object[] plan = testPlan.getSearchResults().toArray();
    if (plan.length == 0) {
        throw new IllegalStateException("Could not find the TestPlan class!");
    }
    TestPlan tp = (TestPlan) plan[0];

    // 一个测试中可能会有多个线程组,如果serialized为true,则StandardJMeterEngine会串行的去执行这些线程组,每启动一个ThreadGroup主线程都会等它结束;否则就并行执行所有的线程组。
    serialized = tp.isSerialized();

    // tearDownOnShutdown与PostThreadGroup配合使用的,用来做清理工作
    tearDownOnShutdown = tp.isTearDownOnShutdown();
    active = true;
    test = testTree;

runTest

runTest()方法启动测试,该方法调用的是继承Runnable后重写的run()方法。

StandardJMeterEngine实现了Runable接口,本质上是一个线程,通过运行run()方法启动测试。

    public void runTest() throws JMeterEngineException {
        if (host != null){
            long now=System.currentTimeMillis();
            System.out.println("Starting the test on host " + host + " @ "+new Date(now)+" ("+now+")"); // NOSONAR Intentional
        }
        try {
            // StandardJMeterEngine本质上是一个线程
            Thread runningThread = new Thread(this, "StandardJMeterEngine");
            runningThread.start();
        } catch (Exception err) {
            stopTest();
            throw new JMeterEngineException(err);
        }
    }

run

执行StandardJMeterEngine的run方法

JMeterContextService类,初始化操作:numberOfActiveThreads=0, 重置 testStart时间,创建线程变量JMeterContext

    JMeterContextService.startTest();

PreCompiler类,继承HashTree类,初始化testPlan的Arguments类变量,替换参数化变量,存入JMeterVariables

    PreCompiler compiler = new PreCompiler();
    test.traverse(compiler);

利用 SearchByClass 解析所有 TestStateListener类

TestStateListener类:生命周期的函数回调,测试前调 testStarted,结束时调 testEnded

        SearchByClass<TestStateListener> testListeners = new SearchByClass<>(TestStateListener.class); // TL - S&E
        test.traverse(testListeners);

        //将testlist中的元素添加进去
        testListeners.getSearchResults().addAll(testList); 
        testList.clear();

测试前调用testStart方法,比如TestPlan会加载依赖包,BackendListener会启动守护线程收集SamplerResult,ResultCollector类会递增 instanceCount,初始化 fileOutput,DataSourceElement类创建跟数据库的连接等等。

    notifyTestListenersOfStart(testListeners);

    private void notifyTestListenersOfStart(SearchByClass<TestStateListener> testListeners) {
        for (TestStateListener tl : testListeners.getSearchResults()) {
            if (tl instanceof TestBean) {
                TestBeanHelper.prepare((TestElement) tl);
            }
            if (host == null) {
                tl.testStarted();
            } else {
                tl.testStarted(host);
            }
        }
    }

利用 SearchByClass 解析所有 ThreadGroup(包括SetupThreadGroup,ThreadGroup, PostThreadGroup)

    SearchByClass<SetupThreadGroup> setupSearcher = new SearchByClass<>(SetupThreadGroup.class);
    SearchByClass<AbstractThreadGroup> searcher = new SearchByClass<>(AbstractThreadGroup.class);
    SearchByClass<PostThreadGroup> postSearcher = new SearchByClass<>(PostThreadGroup.class);

    test.traverse(setupSearcher);
    test.traverse(searcher);
    test.traverse(postSearcher);

    Iterator<SetupThreadGroup> setupIter = setupSearcher.getSearchResults().iterator();
    Iterator<AbstractThreadGroup> iter = searcher.getSearchResults().iterator();
    Iterator<PostThreadGroup> postIter = postSearcher.getSearchResults().iterator();

实例化一个 ListenerNotifier 实例,用来通知事件发生,比如backendListener, resultCollector等

    ListenerNotifier notifier = new ListenerNotifier();

启动所有 SetupThreadGroup (一般情况下没有 SetupThreadGroup )并等待结束

    if (setupIter.hasNext()) {
        log.info("Starting setUp thread groups");
        while (running && setupIter.hasNext()) {//for each setup thread group
            AbstractThreadGroup group = setupIter.next();
            groupCount++;
            String groupName = group.getName();
            log.info("Starting setUp ThreadGroup: {} : {} ", groupCount, groupName);
            startThreadGroup(group, groupCount, setupSearcher, testLevelElements, notifier);
            // 判断ThreadGroup是否串行执行
            if (serialized && setupIter.hasNext()) {
                log.info("Waiting for setup thread group: {} to finish before starting next setup group", 
                        groupName);
                group.waitThreadsStopped();
            }
        }    
        log.info("Waiting for all setup thread groups to exit");
        //wait for all Setup Threads To Exit
        waitThreadsStopped();
        log.info("All Setup Threads have ended");
        groupCount=0;
        JMeterContextService.clearTotalThreads();
    }

    groups.clear();

等待所有的ThreadGroup结束

    
    JMeterContextService.getContext().setSamplingStarted(true);
    boolean mainGroups = running; // still running at this point, i.e. setUp was not cancelled

    while (running && iter.hasNext()) {// for each thread group
        AbstractThreadGroup group = iter.next();
        //ignore Setup and Post here.  We could have filtered the searcher. but then
        //future Thread Group objects wouldn't execute.
        if (group instanceof SetupThreadGroup ||
                group instanceof PostThreadGroup) {
            continue;
        }
        groupCount++;
        String groupName = group.getName();
        log.info("Starting ThreadGroup: {} : {}", groupCount, groupName);
        startThreadGroup(group, groupCount, searcher, testLevelElements, notifier);
        // 判断ThreadGroup是否串行执行
        if (serialized && iter.hasNext()) {
            log.info("Waiting for thread group: {} to finish before starting next group", groupName);
            // 调用ThreadGroup的waitThreadsStopped方法
            group.waitThreadsStopped();
        }
    } // end of thread groups
    if (groupCount == 0){ // No TGs found
        log.info("No enabled thread groups found");
    } else {
        if (running) {
            log.info("All thread groups have been started");
        } else {
            log.info("Test stopped - no more thread groups will be started");
        }
    }

    //wait for all Test Threads To Exit
    waitThreadsStopped();
    groups.clear();

执行所有PostThreadGroup(一般没有), 并等待结束

    if (postIter.hasNext()){
        groupCount = 0;
        JMeterContextService.clearTotalThreads();
        log.info("Starting tearDown thread groups");
        if (mainGroups && !running) { // i.e. shutdown/stopped during main thread groups
            running = tearDownOnShutdown; // re-enable for tearDown if necessary
        }
        while (running && postIter.hasNext()) {//for each setup thread group
            AbstractThreadGroup group = postIter.next();
            groupCount++;
            String groupName = group.getName();
            log.info("Starting tearDown ThreadGroup: {} : {}", groupCount, groupName);
            startThreadGroup(group, groupCount, postSearcher, testLevelElements, notifier);
            if (serialized && postIter.hasNext()) {
                log.info("Waiting for post thread group: {} to finish before starting next post group", groupName);
                group.waitThreadsStopped();
            }
        }
        waitThreadsStopped(); // wait for Post threads to stop
    }

测试结束调用testListener 的 testEnded 方法,比如:

JavaSampler 会调用真正跑的 AbstractJavaSamplerClient 的 teardownTest 方法,可以打印该 JavaSamplerClient 测试总共花费的时间;

ResultCollector 用来将测试结果写如文件生成;

reportTestPlan 用来关闭文件。

BackendListener 用来清理BackendListenerClients

    notifyTestListenersOfEnd(testListeners);
    JMeterContextService.endTest();

    private void notifyTestListenersOfEnd(SearchByClass<TestStateListener> testListeners) {
        log.info("Notifying test listeners of end of test");
        for (TestStateListener tl : testListeners.getSearchResults()) {
            try {
                if (host == null) {
                    tl.testEnded();
                } else {
                    tl.testEnded(host);
                }
            } catch (Exception e) {
                log.warn("Error encountered during shutdown of "+tl.toString(),e);
            }
        }
        if (host != null) {
            log.info("Test has ended on host {} ", host);
            long now=System.currentTimeMillis();
            System.out.println("Finished the test on host " + host + " @ "+new Date(now)+" ("+now+")" // NOSONAR Intentional
                    +(EXIT_AFTER_TEST ? " - exit requested." : ""));
            if (EXIT_AFTER_TEST){
                // 远程server退出
                exit();
            }
        }
        active=false;
    }

startThreadGroup

启动ThreadGroup线程组

    private void startThreadGroup(AbstractThreadGroup group, int groupCount, SearchByClass<?> searcher, List<?> testLevelElements, ListenerNotifier notifier)
    {
        try {
            // 获取线程组的属性配置
            int numThreads = group.getNumThreads();
            JMeterContextService.addTotalThreads(numThreads);
            boolean onErrorStopTest = group.getOnErrorStopTest();
            boolean onErrorStopTestNow = group.getOnErrorStopTestNow();
            boolean onErrorStopThread = group.getOnErrorStopThread();
            boolean onErrorStartNextLoop = group.getOnErrorStartNextLoop();
            String groupName = group.getName();
            log.info("Starting {} threads for group {}.", numThreads, groupName);
            if (onErrorStopTest) {
                log.info("Test will stop on error");
            } else if (onErrorStopTestNow) {
                log.info("Test will stop abruptly on error");
            } else if (onErrorStopThread) {
                log.info("Thread will stop on error");
            } else if (onErrorStartNextLoop) {
                log.info("Thread will start next loop on error");
            } else {
                log.info("Thread will continue on error");
            }
            // 获取ThreadGroup的HashTree对象
            ListedHashTree threadGroupTree = (ListedHashTree) searcher.getSubTree(group);
            threadGroupTree.add(group, testLevelElements);
    
            // 将threadGroup加入List列表
            groups.add(group);
            // 执行ThreadGroup对象的start方法
            group.start(groupCount, notifier, threadGroupTree, this);
        } catch (JMeterStopTestException ex) { // NOSONAR Reported by log
            JMeterUtils.reportErrorToUser("Error occurred starting thread group :" + group.getName()+ ", error message:"+ex.getMessage()
                +", 
see log file for more details", ex);
            return; // no point continuing
        }
    }   

waitThreadsStopped

等待线程结束

    private void waitThreadsStopped() {
        // ConcurrentHashMap does not need synch. here
        for (AbstractThreadGroup threadGroup : groups) {
            // 调用ThreadGroup的waitThreadsStopped方法
            threadGroup.waitThreadsStopped();
        }
    }

stopTest

停止测试,若 now 为 true 则停止动作立即执行;若为 false 则等待当前正在执行的测试至少执行完一个 iteration。

    public synchronized void stopTest() {
        stopTest(true);
    }

    public synchronized void stopTest(boolean now) {
        Thread stopThread = new Thread(new StopTest(now));
        stopThread.start();
    }

调用内部类StopTest,由于该类继承Runnable,调用该类的run方法

    public void run() {
        running = false;
        resetSingletonEngine();
        // 立即停止线程的标志
        if (now) {
            // 停止线程组
            tellThreadGroupsToStop();
            pause(10L * countStillActiveThreads());
            // 验证线程组是否停止
            boolean stopped = verifyThreadsStopped();
            // 停止失败
            if (!stopped) {  // we totally failed to stop the test
                if (JMeter.isNonGUI()) {
                    log.error(JMeterUtils.getResString("stopping_test_failed")); //$NON-NLS-1$
                    if (SYSTEM_EXIT_ON_STOP_FAIL) { // default is true
                        log.error("Exiting");
                        System.out.println("Fatal error, could not stop test, exiting"); // NOSONAR Intentional
                        System.exit(1); // NOSONAR Intentional
                    } else {
                        System.out.println("Fatal error, could not stop test"); // NOSONAR Intentional                            
                    }
                } else {  // 非立即停止
                    JMeterUtils.reportErrorToUser(
                            JMeterUtils.getResString("stopping_test_failed"), //$NON-NLS-1$
                            JMeterUtils.getResString("stopping_test_title")); //$NON-NLS-1$
                }
            } // else will be done by threadFinished()
        } else {
            stopAllThreadGroups();
        }
    }

停止ThreadGroup线程组

        private void tellThreadGroupsToStop() {
            // ConcurrentHashMap does not need protecting
            for (AbstractThreadGroup threadGroup : groups) {
                threadGroup.tellThreadsToStop();
            }
        }

验证线程组是否全部停止

    private boolean verifyThreadsStopped() {
            boolean stoppedAll = true;
            // ConcurrentHashMap does not need synch. here
            for (AbstractThreadGroup threadGroup : groups) {
                stoppedAll = stoppedAll && threadGroup.verifyThreadsStopped();
            }
            return stoppedAll;
    }

等待线程组执行完

    private void stopAllThreadGroups() {
            // ConcurrentHashMap does not need synch. here
            for (AbstractThreadGroup threadGroup : groups) {
                threadGroup.stop();
            }
    }