zl程序教程

您现在的位置是:首页 >  其他

当前栏目

深入理解Spark:核心思想与源码分析. 3.13 创建DAGSchedulerSource和BlockManagerSource

2023-03-09 22:24:38 时间

3.13 创建DAGSchedulerSource和BlockManagerSource

在创建DAGSchedulerSource、BlockManagerSource之前首先调用taskScheduler的post-StartHook方法,其目的是为了等待backend就绪,见代码清单3-53。postStartHook的实现见代码清单3-54。

创建DAGSchedulerSource和BlockManagerSource的过程类似于ExecutorSource,只不过DAGSchedulerSource测量的信息是stage. failedStages、stage. runningStages、stage. waiting-Stages、stage. allJobs、stage. activeJobs,BlockManagerSource测量的信息是memory. maxMem_MB、memory. remainingMem_MB、memory. memUsed_MB、memory. diskSpace-Used_MB。

代码清单3-53 创建DAGSchedulerSource和BlockManagerSource

    taskScheduler.postStartHook()

 

    private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)

    private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)

 

private def initDriverMetrics() {

    SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)

    SparkEnv.get.metricsSystem.registerSource(blockManagerSource)

}

 

initDriverMetrics()

代码清单3-54 postStartHook的实现

override def postStartHook() {

        waitBackendReady()

    }

 

private def waitBackendReady(): Unit = {

    if (backend.isReady) {

        return

    }

    while (!backend.isReady) {

        synchronized {

            this.wait(100)

        }

    }

}