Spark修炼之道(高级篇)——Spark源码阅读:第七节 resourceOffers方法与launchTasks方法解析

2023-09-14 09:00:24 时间
 // Make fake resource offers on just one executor

 private def makeOffers(executorId: String) {

 // Filter out executors under killing

 if (!executorsPendingToRemove.contains(executorId)) {

 val executorData = executorDataMap(executorId)

 val workOffers = Seq(

 new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))



上面的代码依赖于两个重要方法,它们分别是TaskSchedulerImpl resourceOffers方法及CoarseGrainedSchedulerBackend的launchTasks方法

* Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added // 处理新的executor加入 var newExecAvail = false for (o - offers) { executorIdToHost(o.executorId) = o.host activeExecutorIds += o.executorId if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true for (rack - getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host // Randomly shuffle offers to avoid always placing tasks on the same set of workers. //随机打散,使Task均匀分配各Worker节点上 val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o = new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o = o.cores).toArray //根据调度策略获取ArrayBuffer[TaskSetManager] val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet - sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY //按就近原则进行Task调度 var launchedTask = false for (taskSet - sortedTaskSets; maxLocality - taskSet.myLocalityLevels) { do { launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) if (tasks.size 0) { hasLaunchedTask = true return tasks



 // Launch tasks returned by a set of resource offers

 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {

 for (task - tasks.flatten) {

 val serializedTask = ser.serialize(task)


 if (serializedTask.limit = akkaFrameSize - AkkaUtils.reservedSizeBytes) {

 scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr = 

 try {

 var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +

 "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +

 "spark.akka.frameSize or using broadcast variables for large values."

 msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,



 } catch {

 case e: Exception = logError("Exception in error callback", e)

 else {

 val executorData = executorDataMap(task.executorId)

 executorData.freeCores -= scheduler.CPUS_PER_TASK


 executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))


