Spark修炼之道(高级篇)——Spark源码阅读:第七节 resourceOffers方法与launchTasks方法解析
2023-09-14 09:00:24 时间
// Make fake resource offers on just one executor private def makeOffers(executorId: String) { // Filter out executors under killing if (!executorsPendingToRemove.contains(executorId)) { val executorData = executorDataMap(executorId) val workOffers = Seq( new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) launchTasks(scheduler.resourceOffers(workOffers)) }
上面的代码依赖于两个重要方法,它们分别是TaskSchedulerImpl resourceOffers方法及CoarseGrainedSchedulerBackend的launchTasks方法
* Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added // 处理新的executor加入 var newExecAvail = false for (o - offers) { executorIdToHost(o.executorId) = o.host activeExecutorIds += o.executorId if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true for (rack - getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host // Randomly shuffle offers to avoid always placing tasks on the same set of workers. //随机打散,使Task均匀分配各Worker节点上 val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o = new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o = o.cores).toArray //根据调度策略获取ArrayBuffer[TaskSetManager] val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet - sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY //按就近原则进行Task调度 var launchedTask = false for (taskSet - sortedTaskSets; maxLocality - taskSet.myLocalityLevels) { do { launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) if (tasks.size 0) { hasLaunchedTask = true return tasks
调用完resourceOffers方法后,再调用launchTasks方法,最终在Worker节点上启动任务的运行
//CoarseGrainedSchedulerBackend中的launchTasks方法 // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task - tasks.flatten) { val serializedTask = ser.serialize(task) //序列化后的任何不能超过设定的大小 if (serializedTask.limit = akkaFrameSize - AkkaUtils.reservedSizeBytes) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr = try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + "spark.akka.frameSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes) taskSetMgr.abort(msg) } catch { case e: Exception = logError("Exception in error callback", e) else { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK //Worker节点上的CoarseGrainedExecutorBackend对象将接受LaunchTask消息,在Worker节点的Executor上启动Task的执行 executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) }
Spark Netty与Jetty (源码阅读十一) spark呢,对Netty API又做了一层封装,那么Netty是什么呢~是个鬼。它基于NIO的服务端客户端框架,具体不再说了,下面开始。 创建了一个线程工厂,生成的线程都给定一个前缀名。 像一般的netty框架一样,创建Netty的EventLoopGroup: 在常用...
Spark之SQL解析(源码阅读十) 如何能更好的运用与监控sparkSQL?或许我们改更深层次的了解它深层次的原理是什么。之前总结的已经写了传统数据库与Spark的sql解析之间的差别。那么我们下来直切主题~ 如今的Spark已经支持多种多样的数据源的查询与加载,兼容了Hive,可用JDBC的方式或者ODBC来连接Spark SQL。
Spark BlockManager的通信及内存占用分析(源码阅读九) 之前阅读也有总结过Block的RPC服务是通过NettyBlockRpcServer提供打开,即下载Block文件的功能。然后在启动jbo的时候由Driver上的BlockManagerMaster对存在于Executor上的BlockManager统一管理,注册Executor的BlockManager、更新Executor上Block的最新信息、询问所需要Block目前所在的位置以及当Executor运行结束时,将Executor移除等等。
Spark Job的提交与task本地化分析(源码阅读八) 我们又都知道,Spark中任务的处理也要考虑数据的本地性(locality),Spark目前支持PROCESS_LOCAL(本地进程)、NODE_LOCAL(本地节点)、NODE_PREF、RACK_LOCAL(本地机架)、ANY(任何)几种。
Spark Shuffle数据处理过程与部分调优(源码阅读七) shuffle。。。相当重要,为什么咩,因为shuffle的性能优劣直接决定了整个计算引擎的性能和吞吐量。相比于Hadoop的MapReduce,可以看到Spark提供多种计算结果处理方式,对shuffle过程进行了优化。
Spark常用函数(源码阅读六) 源码层面整理下我们常用的操作RDD数据处理与分析的函数,从而能更好的应用于工作中。 连接Hbase,读取hbase的过程,首先代码如下: def tableInitByTime(sc : SparkContext,tableName : String,columns : Strin...
相关文章
- invalidate方法知多少[-View-] 源码级
- SpringBoot日志源码解析:日志监听器的注册方法及触发
- 【Android 性能优化】应用启动优化 ( 安卓应用启动分析 | Launcher 应用简介 | Launcher 应用源码简介 | Launcher 应用快捷方式图标点击方法分析 )
- 【Android 异步操作】线程池 ( 线程池 execute 方法源码解析 )
- 【EventBus】EventBus 源码解析 ( 注册订阅者 | 注册订阅方法详细过程 )
- 【EventBus】EventBus 源码解析 ( 事件发送 | 发布线程为 子线程 切换到 主线程 执行订阅方法的过程分析 )
- 【Android 逆向】整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )
- Django MemCache 缓存使用方法和源码
- Ubuntu环境源码编译安装xdebug的方法
- Spark源码分析之spark-submit详解大数据
- python3开发进阶-Django框架中form的查看校验方法is_valid()的源码,自定义验证方法详解编程语言
- Linux下修改用户属组的方法(linux修改用户属组)
- 实现Oracle数据库高效运行的优化方法(oracle优化方法)
- ajax下载文件的方法 php下载图片的方法详解编程语言
- PHP源码编译报错解决方法详解编程语言
- JSP Request.getServerPort()方法:获取服务器的端口号
- Kubernetes源码探疑:Pod IP泄露排查及解决方法
- 解决Oracle删除用户连接的正确方法(oracle删除用户连接)
- Oracle数据库高可用性技术及实现方法(oracle高可用性)
- 掌握Linux服务器源码下载的方法!(linux服务器源码下载)
- Linux下定时删除文件的方法(linux定时删除)
- 深入探索Linux下查询电脑配置的方法(linux查电脑配置)
- 使用 MySQL PDO 连接数据库的优势和实现方法(mysql_pdo)
- Oracle的双重排序提升效率的有效方法(oracle两个排序条件)
- 初识Redis连接池的使用方法(redis连接池怎么使用)
- Fedora环境下装MySQL命令方法介绍
- C#禁用鼠标中间键的方法
- 解决java查看JDK中底层源码的实现方法
- Windows下获取Android源码方法的详解