zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Spark源码分析之四:Stage提交

源码Spark 分析 提交 之四 stage
2023-09-27 14:29:33 时间
        各位看官,上一篇《Spark源码分析之Stage划分》详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交。         Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:         与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交。

        各位看官,上一篇《Spark源码分析之Stage划分》详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交。

        Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:


        与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交。在handleJobSubmitted()方法的最后两行代码,便是Stage提交的处理。代码如下:

// 提交最后一个stage

 submitStage(finalStage)

 // 提交其他正在等待的stage

 submitWaitingStages()
        从代码我们可以看出,Stage提交的逻辑顺序,是由后往前,即先提交最后一个finalStage,即ResultStage,然后再提交其parent stages,但是实际物理顺序是否如此呢?我们首先看下finalStage的提交,方法submitStage()代码如下:

/** Submits stage, but first recursively submits any missing parents. */

 // 提交stage,但是首先要递归的提交所有的missing父stage

 private def submitStage(stage: Stage) {

 // 根据stage获取jobId

 val jobId = activeJobForStage(stage)

 if (jobId.isDefined) {// 如果jobId已定义

 // 记录Debug日志信息:submitStage(stage)

 logDebug("submitStage(" + stage + ")")

 // 如果在waitingStages、runningStages或

 // failedStages任意一个中,不予处理

 // 既不在waitingStages中,也不在runningStages中,还不在failedStages中

 // 说明未处理过

 if (!waitingStages(stage) !runningStages(stage) !failedStages(stage)) {

 // 调用getMissingParentStages()方法,获取stage还没有提交的parent

 val missing = getMissingParentStages(stage).sortBy(_.id)

 logDebug("missing: " + missing)

 if (missing.isEmpty) {

 // 如果missing为空,说明是没有parent的stage或者其parent stages已提交,

 // 则调用submitMissingTasks()方法,提交tasks

 logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")

 submitMissingTasks(stage, jobId.get)

 } else {

 // 否则,说明其parent还没有提交,递归,循环missing,提交每个stage

 for (parent - missing) {

 submitStage(parent)

 // 将该stage加入到waitingStages中

 waitingStages += stage

 } else {

 // 放弃该Stage

 abortStage(stage, "No active job for stage " + stage.id, None)

 }
        代码逻辑比较简单。根据stage获取到jobId,如果jobId未定义,说明该stage不属于明确的Job,则调用abortStage()方法放弃该stage。如果jobId已定义的话,则需要判断该stage属于waitingStages、runningStages、failedStages中任意一个,则该stage忽略,不被处理。顾名思义,waitingStages为等待处理的stages,spark采取由后往前的顺序处理stage提交,即先处理child stage,然后再处理parent stage,所以位于waitingStages中的stage,由于其child stage尚未处理,所以必须等待,runningStages为正在运行的stages,正在运行意味着已经提交了,所以无需再提交,而最后的failedStages就是失败的stages,既然已经失败了,再提交也还是会失败,徒劳无益啊~

        此时,如果stage不位于上述三个数据结构中,则可以继续执行提交流程。接下来该怎么做呢?

        首先调用getMissingParentStages()方法,获取stage还没有提交的parent,即missing;如果missing为空,说明该stage要么没有parent stage,要么其parent stages都已被提交,此时该stage就可以被提交,用于提交的方法submitMissingTasks()我们稍后分析。

        如果missing不为空,则说明该stage还存在尚未被提交的parent stages,那么,我们就需要遍历missing,循环提交每个stage,并将该stage添加到waitingStages中,等待其parent stages都被提交后再被提交。

        我们先看下这个missing是如何获取的。进入getMissingParentStages()方法,代码如下:

private def getMissingParentStages(stage: Stage): List[Stage] = {

 // 存储尚未提交的parent stages,用于最后结果的返回

 val missing = new HashSet[Stage]

 // 已被处理的RDD集合

 val visited = new HashSet[RDD[_]]

 // We are manually maintaining a stack here to prevent StackOverflowError

 // caused by recursively visiting

 // 待处理RDD栈,后入先出

 val waitingForVisit = new Stack[RDD[_]]

 // 定义函数visit

 def visit(rdd: RDD[_]) {

 // 通过visited判断rdd是否已处理

 if (!visited(rdd)) {

 // 添加到visited,下次不会再处理

 visited += rdd

 val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)

 if (rddHasUncachedPartitions) {

 // 循环rdd的dependencies

 for (dep - rdd.dependencies) {

 dep match {

 // 宽依赖

 case shufDep: ShuffleDependency[_, _, _] = 

 // 调用getShuffleMapStage,获取ShuffleMapStage

 val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)

 if (!mapStage.isAvailable) {

 missing += mapStage

 // 窄依赖,直接将RDD压入waitingForVisit栈

 case narrowDep: NarrowDependency[_] = 

 waitingForVisit.push(narrowDep.rdd)

 // 将stage的rdd压入到waitingForVisit顶部

 waitingForVisit.push(stage.rdd)

 // 循环处理waitingForVisit,对弹出的每个rdd调用函数visit

 while (waitingForVisit.nonEmpty) {

 visit(waitingForVisit.pop())

 // 返回stage列表

 missing.toList

 }
        有没有些似曾相识的感觉呢?对了,和《Spark源码分析之Stage划分》一文中getParentStages()方法、getAncestorShuffleDependencies()方法结构类似,也是定义了三个数据结构和一个visit()方法。三个数据结构分别是:

        1、missing:HashSet[Stage]类型,存储尚未提交的parent stages,用于最后结果的返回;

        2、visited:HashSet[RDD[_]]类型,已被处理的RDD集合,位于其中的RDD不会被重复处理;

        3、waitingForVisit:Stack[RDD[_]]类型,等待被处理的RDD栈,后入先出。

        visit()方法的处理逻辑也比较简单,大致如下:

        通过RDD是否在visited中判断RDD是否已处理,若未被处理,添加到visited中,然后循环rdd的dependencies,如果是宽依赖ShuffleDependency,调用getShuffleMapStage(),获取ShuffleMapStage(此次调用则是直接取出已生成的stage,因为划分阶段已将stage全部生成,拿来主义即可),判断该stage的isAvailable标志位,若为false,则说明该stage未被提交过,加入到missing集合,如果是窄依赖NarrowDependency,直接将RDD压入waitingForVisit栈,等待后续处理,因为窄依赖的RDD同属于同一个stage,加入waitingForVisit只是为了后续继续沿着DAG图继续往上处理。

        那么,整个missing的获取就一目了然,将final stage即ResultStage的RDD压入到waitingForVisit顶部,循环处理即可得到missing。

        至此,各位可能有个疑问,这个ShuffleMapStage的isAvailable为什么能决定该stage是否已被提交呢?卖个关子,后续再分析。

        submitStage()方法已分析完毕,go on,我们再回归到handleJobSubmitted()方法,在调用submitStage()方法提交finalStage之后,实际上只是将最原始的parent stage提交,其它child stage均存储在了waitingStages中,那么,接下来,我们就要调用submitWaitingStages()方法提交其中的stage。代码如下:

/**

 * Check for waiting or failed stages which are now eligible for resubmission.

 * Ordinarily run on every iteration of the event loop.

 private def submitWaitingStages() {

 // TODO: We might want to run this less often, when we are sure that something has become

 // runnable that wasnt before.

 logTrace("Checking for newly runnable parent stages")

 logTrace("running: " + runningStages)

 logTrace("waiting: " + waitingStages)

 logTrace("failed: " + failedStages)

 // 将waitingStages转换为数组

 val waitingStagesCopy = waitingStages.toArray

 // 清空waitingStages

 waitingStages.clear()

 // 循环waitingStagesCopy,挨个调用submitStage()方法进行提交

 for (stage - waitingStagesCopy.sortBy(_.firstJobId)) {

 submitStage(stage)

 }

        很简单,既然stages的顺序已经梳理正确,将waitingStages转换为数组waitingStagesCopy,针对每个stage挨个调用submitStage()方法进行提交即可。

        还记得我卖的那个关子吗?ShuffleMapStage的isAvailable为什么能决定该stage是否已被提交呢?现在来解开这个谜团。首先,看下ShuffleMapStage的isAvailable是如何定义的,在ShuffleMapStage中,代码如下:

/**

 * Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.

 * This should be the same as `outputLocs.contains(Nil)`.

 * 如果map stage已就绪的话返回true,即所有分区均有shuffle输出。这个将会和outputLocs.contains保持一致。

 def isAvailable: Boolean = _numAvailableOutputs == numPartitions
        它是通过判断_numAvailableOutputs和numPartitions是否相等来确定stage是否已被提交(或者说准备就绪可以提交is ready)的,而numPartitions很好理解,就是stage中的全部分区数目,那么_numAvailableOutputs是什么呢?

private[this] var _numAvailableOutputs: Int = 0

 * Number of partitions that have shuffle outputs.

 * When this reaches [[numPartitions]], this map stage is ready.

 * This should be kept consistent as `outputLocs.filter(!_.isEmpty).size`.

 * 拥有shuffle的分区数量。

 * 当这个numAvailableOutputs达到numPartitions时,这个map stage也就准备好了。

 * 这个应与outputLocs.filter(!_.isEmpty).size保持一致

 def numAvailableOutputs: Int = _numAvailableOutputs
        可以看出,_numAvailableOutputs就是拥有shuffle outputs的分区数量,当这个numAvailableOutputs达到numPartitions时,这个map stage也就准备好了。

        那么这个_numAvailableOutputs开始时默认为0,它是在何时被赋值的呢?通篇看完ShuffleMapStage的源码,只有两个方法对_numAvailableOutputs的值做修改,代码如下:

def addOutputLoc(partition: Int, status: MapStatus): Unit = {

 val prevList = outputLocs(partition)

 outputLocs(partition) = status :: prevList

 if (prevList == Nil) {

 _numAvailableOutputs += 1

 def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = {

 val prevList = outputLocs(partition)

 val newList = prevList.filterNot(_.location == bmAddress)

 outputLocs(partition) = newList

 if (prevList != Nil newList == Nil) {

 _numAvailableOutputs -= 1

 }
        什么时候调用的这个addOutputLoc()方法呢?答案就在DAGScheduler的newOrUsedShuffleStage()方法中。方法主要逻辑如下:

if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {

 // 如果mapOutputTracker中存在

 // 根据shuffleId从mapOutputTracker中获取序列化的多个MapOutputStatus对象

 val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)

 // 反序列化

 val locs = MapOutputTracker.deserializeMapStatuses(serLocs)

 // 循环

 (0 until locs.length).foreach { i = 

 if (locs(i) ne null) {

 // locs(i) will be null if missing

 // 将

 stage.addOutputLoc(i, locs(i))

 } else {

 // 如果mapOutputTracker中不存在,注册一个

 // Kind of ugly: need to register RDDs with the cache and map output tracker here

 // since we cant do it in the RDD constructor because # of partitions is unknown

 logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")

 // 注册的内容为

 // 1、根据shuffleDep获取的shuffleId;

 // 2、rdd中分区的个数

 mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)

 }

        这个方法在stage划分过程中,第一轮被调用,此时mapOutputTracker中并没有注册shuffle相关信息,所以走的是else分支,调用mapOutputTracker的registerShuffle()方法注册shuffle,而在stage提交过程中,第二轮被调用,此时shuffle已在mapOutputTracker中注册,则会根据shuffleId从mapOutputTracker中获取序列化的多个MapOutputStatus对象,反序列化并循环调用stage的addOutputLoc()方法,更新stage的outputLocs,并累加_numAvailableOutputs,至此,关子卖完,再有疑问,后续再慢慢分析吧。

        到了这里,就不得不分析下真正提交stage的方法submitMissingTasks()了。莫慌,慢慢看,代码如下:

/** Called when stages parents are available and we can now do its task. */

 private def submitMissingTasks(stage: Stage, jobId: Int) {

 logDebug("submitMissingTasks(" + stage + ")")

 // Get our pending tasks and remember them in our pendingTasks entry

 // 清空stage的pendingPartitions

 stage.pendingPartitions.clear()

 // First figure out the indexes of partition ids to compute.

 // 首先确定该stage需要计算的分区ID索引

 val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

 // Create internal accumulators if the stage has no accumulators initialized.

 // Reset internal accumulators only if this stage is not partially submitted

 // Otherwise, we may override existing accumulator values from some tasks

 if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {

 stage.resetInternalAccumulators()

 // Use the scheduling pool, job group, description, etc. from an ActiveJob associated

 // with this Stage

 val properties = jobIdToActiveJob(jobId).properties

 // 将stage加入到runningStages中

 runningStages += stage

 // SparkListenerStageSubmitted should be posted before testing whether tasks are

 // serializable. If tasks are not serializable, a SparkListenerStageCompleted event

 // will be posted, which should always come after a corresponding SparkListenerStageSubmitted

 // event.

 // 开启一个stage时,需要调用outputCommitCoordinator的stageStart()方法,

 stage match {

 // 如果为ShuffleMapStage

 case s: ShuffleMapStage = 

 outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)

 // 如果为ResultStage

 case s: ResultStage = 

 outputCommitCoordinator.stageStart(

 stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)

 // 创建一个Map:taskIdToLocations,存储的是id- Seq[TaskLocation]的映射关系

 // 对stage中指定RDD的每个分区获取位置信息,映射成id- Seq[TaskLocation]的关系

 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {

 stage match {

 // 如果是ShuffleMapStage

 case s: ShuffleMapStage = 

 partitionsToCompute.map { id = (id, getPreferredLocs(stage.rdd, id))}.toMap

 // 如果是ResultStage

 case s: ResultStage = 

 val job = s.activeJob.get

 partitionsToCompute.map { id = 

 val p = s.partitions(id)

 (id, getPreferredLocs(stage.rdd, p))

 }.toMap

 } catch {

 case NonFatal(e) = 

 stage.makeNewStageAttempt(partitionsToCompute.size)

 listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

 abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))

 runningStages -= stage

 return

 // 标记新的stage attempt

 stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

 // 发送一个SparkListenerStageSubmitted事件

 listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

 // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.

 // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast

 // the serialized copy of the RDD and for each task we will deserialize it, which means each

 // task gets a different copy of the RDD. This provides stronger isolation between tasks that

 // might modify state of objects referenced in their closures. This is necessary in Hadoop

 // where the JobConf/Configuration object is not thread-safe.

 // 对stage进行序列化,如果是ShuffleMapStage,序列化rdd和shuffleDep,如果是ResultStage,序列化rdd和func

 var taskBinary: Broadcast[Array[Byte]] = null

 try {

 // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).

 // 对于ShuffleMapTask,序列化并广播,广播的是rdd和shuffleDep

 // For ResultTask, serialize and broadcast (rdd, func).

 // 对于ResultTask,序列化并广播,广播的是rdd和func

 val taskBinaryBytes: Array[Byte] = stage match {

 case stage: ShuffleMapStage = 

 // 序列化ShuffleMapStage

 closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()

 case stage: ResultStage = 

 // 序列化ResultStage

 closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()

 // 通过sc广播序列化的task

 taskBinary = sc.broadcast(taskBinaryBytes)

 } catch {

 // In the case of a failure during serialization, abort the stage.

 case e: NotSerializableException = 

 abortStage(stage, "Task not serializable: " + e.toString, Some(e))

 runningStages -= stage

 // Abort execution

 return

 case NonFatal(e) = 

 abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e))

 runningStages -= stage

 return

 // 针对stage的每个分区构造task,形成tasks:ShuffleMapStage生成ShuffleMapTasks,ResultStage生成ResultTasks

 val tasks: Seq[Task[_]] = try {

 stage match {

 // 如果是ShuffleMapStage

 case stage: ShuffleMapStage = 

 partitionsToCompute.map { id = 

 // 位置信息

 val locs = taskIdToLocations(id)

 val part = stage.rdd.partitions(id)

 // 创建ShuffleMapTask,其中包括位置信息

 new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,

 taskBinary, part, locs, stage.internalAccumulators)

 // 如果是ResultStage

 case stage: ResultStage = 

 val job = stage.activeJob.get

 partitionsToCompute.map { id = 

 val p: Int = stage.partitions(id)

 val part = stage.rdd.partitions(p)

 val locs = taskIdToLocations(id)

 // 创建ResultTask

 new ResultTask(stage.id, stage.latestInfo.attemptId,

 taskBinary, part, locs, id, stage.internalAccumulators)

 } catch {

 case NonFatal(e) = 

 abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))

 runningStages -= stage

 return

 // 如果存在tasks,则利用taskScheduler.submitTasks()提交task,否则标记stage已完成

 if (tasks.size 0) {

 logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")

 // 赋值pendingPartitions

 stage.pendingPartitions ++= tasks.map(_.partitionId)

 logDebug("New pending partitions: " + stage.pendingPartitions)

 // 利用taskScheduler.submitTasks()提交task

 taskScheduler.submitTasks(new TaskSet(

 tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

 // 记录提交时间

 stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

 } else {

 // Because we posted SparkListenerStageSubmitted earlier, we should mark

 // the stage as completed here in case there are no tasks to run

 // 标记stage已完成

 markStageAsFinished(stage, None)

 val debugString = stage match {

 case stage: ShuffleMapStage = 

 s"Stage ${stage} is actually done; " +

 s"(available: ${stage.isAvailable}," +

 s"available outputs: ${stage.numAvailableOutputs}," +

 s"partitions: ${stage.numPartitions})"

 case stage : ResultStage = 

 s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"

 logDebug(debugString)

 }
        submitMissingTasks()方法,最主要的就是针对每个stage生成一组Tasks,即TaskSet,并调用TaskScheduler的submitTasks()方法提交tasks。它主要做了以下几件事情:

        1、清空stage的pendingPartitions;

        2、首先确定该stage需要计算的分区ID索引,保存至partitionsToCompute;

        3、将stage加入到runningStages中,标记stage正在运行,与上面的阐述对应;

        4、开启一个stage时,需要调用outputCommitCoordinator的stageStart()方法;

        5、创建一个Map:taskIdToLocations,存储的是id- Seq[TaskLocation]的映射关系,并对stage中指定RDD的每个分区获取位置信息,映射成id- Seq[TaskLocation]的关系;

        6、标记新的stage attempt,并发送一个SparkListenerStageSubmitted事件;

        7、对stage进行序列化并广播,如果是ShuffleMapStage,序列化rdd和shuffleDep,如果是ResultStage,序列化rdd和func;

        8、最重要的,针对stage的每个分区构造task,形成tasks:ShuffleMapStage生成ShuffleMapTasks,ResultStage生成ResultTasks;

        9、如果存在tasks,则利用taskScheduler.submitTasks()提交task,否则标记stage已完成。

        至此,stage提交的主体流程已全部分析完毕,后续的Task调度与执行留待以后分析,而stage提交部分细节或者遗漏之处,特别是task生成时的部分细节,也留待以后再细细琢磨吧~

        晚安!


Apache Spark 将支持 Stage 级别的资源控制和调度 我们需要对不同 Stage 设置不同的资源。但是目前的 Spark 不支持这种细粒度的资源配置,导致我们不得不在作业启动的时候设置大量的资源,从而导致资源可能浪费,特别是在机器学习的场景下。
Spark2.4.0源码分析之WorldCount Stage提交(DAGScheduler)(六) - 理解ShuffuleMapStage是如何转化为ShuffleMapTask并作为TaskSet提交 - 理解ResultStage是如何转化为ResultTask并作为TaskSet提交
Spark stage提交 Spark stage 提交,在stage进行提交前,每判断该stage的上级stage是否已经提交,如果没有就先提交上级stage,循环处理,所以会递归提交所有上级Stage,再提交当前Stage
Spark FinalStage处理(Stage划分) Spark FinalStage的处理,会递归找出所有的上级Stage,此时FinalStage开始,到顶级Stage已经计算完成,因为每个Stage都有上级Stage的依赖,所以此时已经进行Stage划分,只是没有进行Stage提交