zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Spark修炼之道(高级篇)——Spark源码阅读:第七节 resourceOffers方法与launchTasks方法解析

方法源码Spark 解析 高级 之道 阅读 修炼
2023-09-14 09:00:24 时间
 // Make fake resource offers on just one executor

 private def makeOffers(executorId: String) {

 // Filter out executors under killing

 if (!executorsPendingToRemove.contains(executorId)) {

 val executorData = executorDataMap(executorId)

 val workOffers = Seq(

 new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))

 launchTasks(scheduler.resourceOffers(workOffers))

 }

上面的代码依赖于两个重要方法,它们分别是TaskSchedulerImpl resourceOffers方法及CoarseGrainedSchedulerBackend的launchTasks方法


* Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added // 处理新的executor加入 var newExecAvail = false for (o - offers) { executorIdToHost(o.executorId) = o.host activeExecutorIds += o.executorId if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true for (rack - getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host // Randomly shuffle offers to avoid always placing tasks on the same set of workers. //随机打散,使Task均匀分配各Worker节点上 val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o = new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o = o.cores).toArray //根据调度策略获取ArrayBuffer[TaskSetManager] val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet - sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY //按就近原则进行Task调度 var launchedTask = false for (taskSet - sortedTaskSets; maxLocality - taskSet.myLocalityLevels) { do { launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) if (tasks.size 0) { hasLaunchedTask = true return tasks

调用完resourceOffers方法后,再调用launchTasks方法,最终在Worker节点上启动任务的运行

//CoarseGrainedSchedulerBackend中的launchTasks方法

 // Launch tasks returned by a set of resource offers

 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {

 for (task - tasks.flatten) {

 val serializedTask = ser.serialize(task)

 //序列化后的任何不能超过设定的大小

 if (serializedTask.limit = akkaFrameSize - AkkaUtils.reservedSizeBytes) {

 scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr = 

 try {

 var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +

 "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +

 "spark.akka.frameSize or using broadcast variables for large values."

 msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,

 AkkaUtils.reservedSizeBytes)

 taskSetMgr.abort(msg)

 } catch {

 case e: Exception = logError("Exception in error callback", e)

 else {

 val executorData = executorDataMap(task.executorId)

 executorData.freeCores -= scheduler.CPUS_PER_TASK

 //Worker节点上的CoarseGrainedExecutorBackend对象将接受LaunchTask消息,在Worker节点的Executor上启动Task的执行

 executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

 }

Spark Netty与Jetty (源码阅读十一) spark呢,对Netty API又做了一层封装,那么Netty是什么呢~是个鬼。它基于NIO的服务端客户端框架,具体不再说了,下面开始。 创建了一个线程工厂,生成的线程都给定一个前缀名。 像一般的netty框架一样,创建Netty的EventLoopGroup: 在常用...
Spark之SQL解析(源码阅读十) 如何能更好的运用与监控sparkSQL?或许我们改更深层次的了解它深层次的原理是什么。之前总结的已经写了传统数据库与Spark的sql解析之间的差别。那么我们下来直切主题~ 如今的Spark已经支持多种多样的数据源的查询与加载,兼容了Hive,可用JDBC的方式或者ODBC来连接Spark SQL。
Spark BlockManager的通信及内存占用分析(源码阅读九) 之前阅读也有总结过Block的RPC服务是通过NettyBlockRpcServer提供打开,即下载Block文件的功能。然后在启动jbo的时候由Driver上的BlockManagerMaster对存在于Executor上的BlockManager统一管理,注册Executor的BlockManager、更新Executor上Block的最新信息、询问所需要Block目前所在的位置以及当Executor运行结束时,将Executor移除等等。
Spark Job的提交与task本地化分析(源码阅读八) 我们又都知道,Spark中任务的处理也要考虑数据的本地性(locality),Spark目前支持PROCESS_LOCAL(本地进程)、NODE_LOCAL(本地节点)、NODE_PREF、RACK_LOCAL(本地机架)、ANY(任何)几种。
Spark Shuffle数据处理过程与部分调优(源码阅读七) shuffle。。。相当重要,为什么咩,因为shuffle的性能优劣直接决定了整个计算引擎的性能和吞吐量。相比于Hadoop的MapReduce,可以看到Spark提供多种计算结果处理方式,对shuffle过程进行了优化。
Spark常用函数(源码阅读六) 源码层面整理下我们常用的操作RDD数据处理与分析的函数,从而能更好的应用于工作中。       连接Hbase,读取hbase的过程,首先代码如下: def tableInitByTime(sc : SparkContext,tableName : String,columns : Strin...