Apache Spark源码走读(六)Task运行期之函数调用关系分析 &存储子系统分析
本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回。
spark已经安装完毕 spark运行在local mode或local-cluster mode local-cluster modelocal-cluster模式也称为伪分布式,可以使用如下指令运行
MASTER=local[1,2,1024] bin/spark-shell
[1,2,1024] 分别表示,executor number, core number和内存大小,其中内存大小不应小于默认的512M
Driver Programme的初始化过程分析 初始化过程的涉及的主要源文件 SparkContext.scala 整个初始化过程的入口 SparkEnv.scala 创建BlockManager, MapOutputTrackerMaster, ConnectionManager, CacheManager DAGScheduler.scala 任务提交的入口,即将Job划分成各个stage的关键 TaskSchedulerImpl.scala 决定每个stage可以运行几个task,每个task分别在哪个executor上运行 SchedulerBackend 最简单的单机运行模式的话,看LocalBackend.scala 如果是集群模式,看源文件SparkDeploySchedulerBackend初始化过程步骤详解
步骤1: 根据初始化入参生成SparkConf,再根据SparkConf来创建SparkEnv, SparkEnv中主要包含以下关键性组件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager
private[spark] val env = SparkEnv.create( conf, conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, isDriver = true, isLocal = isLocal) SparkEnv.set(env)
步骤2:创建TaskScheduler,根据Spark的运行模式来选择相应的SchedulerBackend,同时启动taskscheduler,这一步至为关键
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName) taskScheduler.start()
TaskScheduler.start目的是启动相应的SchedulerBackend,并启动定时器进行检测
override def start() { backend.start() if (!isLocal conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { checkSpeculatableTasks()
步骤3:以上一步中创建的TaskScheduler实例为入参创建DAGScheduler并启动运行
@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) dagScheduler.start()
步骤4:启动WEB UI
ui.start()RDD的转换过程
还是以最简单的wordcount为例说明rdd的转换过程
sc.textFile("README.md").flatMap(line= line.split(" ")).map(word = (word, 1)).reduceByKey(_ + _)
上述一行简短的代码其实发生了很复杂的RDD转换,下面仔细解释每一步的转换过程和转换结果
步骤1:val rawFile = sc.textFile("README.md")textFile先是生成hadoopRDD,然后再通过map操作生成MappedRDD,如果在spark-shell中执行上述语句,得到的结果可以证明所做的分析
scala sc.textFile("README.md") 14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0, maxMem=311387750 14/04/23 13:11:48 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB) 14/04/23 13:11:48 DEBUG BlockManager: Put block broadcast_0 locally took 277 ms 14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took 281 ms res0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13步骤2: val splittedText = rawFile.flatMap(line = line.split(" "))
flatMap将原来的MappedRDD转换成为FlatMappedRDD
def flatMap[U: ClassTag](f: T = TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f))步骤3:val wordCount = splittedText.map(word = (word, 1))
利用word生成相应的键值对,上一步的FlatMappedRDD被转换成为MappedRDD
步骤4:val reduceJob = wordCount.reduceByKey(_ + _),这一步最复杂步骤2,3中使用到的operation全部定义在RDD.scala中,而这里使用到的reduceByKey却在RDD.scala中见不到踪迹。reduceByKey的定义出现在源文件PairRDDFunctions.scala
细心的你一定会问reduceByKey不是MappedRDD的属性和方法啊,怎么能被MappedRDD调用呢?其实这背后发生了一个隐式的转换,该转换将MappedRDD转换成为PairRDDFunctions
implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) = new PairRDDFunctions(rdd)
这种隐式的转换是scala的一个语法特征,如果想知道的更多,请用关键字"scala implicit method"进行查询,会有不少的文章对此进行详尽的介绍。
接下来再看一看reduceByKey的定义:
def reduceByKey(func: (V, V) = V): RDD[(K, V)] = { reduceByKey(defaultPartitioner(self), func) def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] = { combineByKey[V]((v: V) = v, func, func, partitioner) def combineByKey[C](createCombiner: V = C, mergeValue: (C, V) = C, mergeCombiners: (C, C) = C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)] = { if (getKeyClass().isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) = { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else if (mapSideCombine) { val combined = self.mapPartitionsWithContext((context, iter) = { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) partitioned.mapPartitionsWithContext((context, iter) = { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) } else { // Dont apply map-side combiner. val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) = { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true)
reduceByKey最终会调用combineByKey, 在这个函数中PairedRDDFunctions会被转换成为ShuffleRDD,当调用mapPartitionsWithContext之后,shuffleRDD被转换成为MapPartitionsRDD
Log输出能证明我们的分析
res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13RDD转换小结
小结一下整个RDD转换过程
HadoopRDD- MappedRDD- FlatMappedRDD- MappedRDD- PairRDDFunctions- ShuffleRDD- MapPartitionsRDD
整个转换过程好长啊,这一切的转换都发生在任务提交之前。
运行过程分析 数据集操作分类在对任务运行过程中的函数调用关系进行分析之前,我们也来探讨一个偏理论的东西,作用于RDD之上的Transformantion为什么会是这个样子?
对这个问题的解答和数学搭上关系了,从理论抽象的角度来说,任务处理都可归结为“input- processing- output"。input和output对应于数据集dataset.
在此基础上作一下简单的分类
one-one 一个dataset在转换之后还是一个dataset,而且dataset的size不变,如map one-one 一个dataset在转换之后还是一个dataset,但size发生更改,这种更改有两种可能:扩大或缩小,如flatMap是size增大的操作,而subtract是size变小的操作 many-one 多个dataset合并为一个dataset,如combine, join one-many 一个dataset分裂为多个dataset, 如groupBy Task运行期的函数调用task的提交过程参考本系列中的第二篇文章。本节主要讲解当task在运行期间是如何一步步调用到作用于RDD上的各个operation
TaskRunner.run Task.run Task.runTask (Task是一个基类,有两个子类,分别为ShuffleMapTask和ResultTask) RDD.iterator RDD.computeOrReadCheckpoint RDD.compute或许当看到RDD.compute函数定义时,还是觉着f没有被调用,以MappedRDD的compute定义为例
override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f)
注意,这里最容易产生错觉的地方就是map函数,这里的map不是RDD中的map,而是scala中定义的iterator的成员函数map, 请自行参考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator
80 at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111) 81 at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154) 82 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) 83 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) 84 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 85 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 86 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 87 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 88 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 89 at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) 90 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 91 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 92 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 93 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 94 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 95 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) 96 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 97 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 98 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) 99 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) 100 at org.apache.spark.scheduler.Task.run(Task.scala:53) 101 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)ResultTask
compute的计算过程对于ShuffleMapTask比较复杂,绕的圈圈比较多,对于ResultTask就直接许多。
override def runTask(context: TaskContext): U = { metrics = Some(context.taskMetrics) try { func(context, rdd.iterator(split, context)) } finally { context.executeOnCompleteCallbacks()计算结果的传递
上面的分析知道,wordcount这个job在最终提交之后,被DAGScheduler分为两个stage,第一个Stage是shuffleMapTask,第二个Stage是ResultTask.
那么ShuffleMapTask的计算结果是如何被ResultTask取得的呢?这个过程简述如下
ShffuleMapTask将计算的状态(注意不是具体的数据)包装为MapStatus返回给DAGScheduler DAGScheduler将MapStatus保存到MapOutputTrackerMaster中 ResultTask在执行到ShuffleRDD时会调用BlockStoreShuffleFetcher的fetch方法去获取数据 第一件事就是咨询MapOutputTrackerMaster所要取的数据的location 根据返回的结果调用BlockManager.getMultiple获取真正的数据BlockStoreShuffleFetcher的fetch函数伪码
val blockManager = SparkEnv.get.blockManager val startTime = System.currentTimeMillis val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime)) val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) val itr = blockFetcherItr.flatMap(unpackBlock)
注意上述代码中的getServerStatuses及getMultiple,一个是询问数据的位置,一个是去获取真正的数据。
有关Shuffle的详细解释,请参考”详细探究Spark的shuffle实现一文" http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/
二 存储子系统分析Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析Spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互关系。
存储子系统概览上图是Spark存储子系统中几个主要模块的关系示意图,现简要说明如下
CacheManager RDD在进行计算的时候,通过CacheManager来获取数据,并通过CacheManager来存储计算结果 BlockManager CacheManager在进行数据读取和存取的时候主要是依赖BlockManager接口来操作,BlockManager决定数据是从内存(MemoryStore)还是从磁盘(DiskStore)中获取 MemoryStore 负责将数据保存在内存或从内存读取 DiskStore 负责将数据写入磁盘或从磁盘读入 BlockManagerWorker 数据写入本地的MemoryStore或DiskStore是一个同步操作,为了容错还需要将数据复制到别的计算结点,以防止数据丢失的时候还能够恢复,数据复制的操作是异步完成,由BlockManagerWorker来处理这一部分事情 ConnectionManager 负责与其它计算结点建立连接,并负责数据的发送和接收 BlockManagerMaster 注意该模块只运行在Driver Application所在的Executor,功能是负责记录下所有BlockIds存储在哪个SlaveWorker上,比如RDD Task运行在机器A,所需要的BlockId为3,但在机器A上没有BlockId为3的数值,这个时候Slave worker需要通过BlockManager向BlockManagerMaster询问数据存储的位置,然后再通过ConnectionManager去获取,具体参看“数据远程获取一节”支持的操作
由于BlockManager起到实际的存储管控作用,所以在讲支持的操作的时候,以BlockManager中的public api为例
put 数据写入 get 数据读取 remoteRDD 数据删除,一旦整个job完成,所有的中间计算结果都可以删除 启动过程分析上述的各个模块由SparkEnv来创建,创建过程在SparkEnv.create中完成
val blockManagerMaster = new BlockManagerMaster(registerOrLookup( "BlockManagerMaster", new BlockManagerMasterActor(isLocal, conf)), conf) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf) val connectionManager = blockManager.connectionManager val broadcastManager = new BroadcastManager(isDriver, conf) val cacheManager = new CacheManager(blockManager)
这段代码容易让人疑惑,看起来像是在所有的cluster node上都创建了BlockManagerMasterActor,其实不然,仔细看registerOrLookup函数的实现。如果当前节点是driver则创建这个actor,否则建立到driver的连接。
def registerOrLookup(name: String, newActor: = Actor): ActorRef = { if (isDriver) { logInfo("Registering " + name) actorSystem.actorOf(Props(newActor), name = name) } else { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name" val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
初始化过程中一个主要的动作就是BlockManager需要向BlockManagerMaster发起注册
数据写入过程分析数据写入的简要流程:
RDD.iterator是与storage子系统交互的入口 CacheManager.getOrCompute调用BlockManager的put接口来写入数据 数据优先写入到MemoryStore即内存,如果MemoryStore中的数据已满则将最近使用次数不频繁的数据写入到磁盘 通知BlockManagerMaster有新的数据写入,在BlockManagerMaster中保存元数据 将写入的数据与其它slave worker进行同步,一般来说在本机写入的数据,都会另先一台机器来进行数据的备份,即replicanumber=1 序列化与否写入的具体内容可以是序列化之后的bytes也可以是没有序列化的value. 此处有一个对scala的语法中Either, Left, Right关键字的理解。
数据读取过程分析def get(blockId: BlockId): Option[Iterator[Any]] = { val local = getLocal(blockId) if (local.isDefined) { logInfo("Found block %s locally".format(blockId)) return local val remote = getRemote(blockId) if (remote.isDefined) { logInfo("Found block %s remotely".format(blockId)) return remote None
首先在查询本机的MemoryStore和DiskStore中是否有所需要的block数据存在,如果没有则发起远程数据获取。
远程获取调用路径, getRemote- doGetRemote, 在doGetRemote中最主要的就是调用BlockManagerWorker.syncGetBlock来从远程获得数据
def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = { val blockManager = blockManagerWorker.blockManager val connectionManager = blockManager.connectionManager val blockMessage = BlockMessage.fromGetBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) val responseMessage = connectionManager.sendMessageReliablySync( toConnManagerId, blockMessageArray.toBufferMessage) responseMessage match { case Some(message) = { val bufferMessage = message.asInstanceOf[BufferMessage] logDebug("Response message received " + bufferMessage) BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage = { logDebug("Found " + blockMessage) return blockMessage.getData case None = logDebug("No response message received") null
上述这段代码中最有意思的莫过于sendMessageReliablySync,远程数据读取毫无疑问是一个异步i/o操作,这里的代码怎么写起来就像是在进行同步的操作一样呢。也就是说如何知道对方发送回来响应的呢?
别急,继续去看看sendMessageReliablySync的定义
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message) : Future[Option[Message]] = { val promise = Promise[Option[Message]] val status = new MessageStatus( message, connectionManagerId, s = promise.success(s.ackMessage)) messageStatuses.synchronized { messageStatuses += ((message.id, status)) sendMessage(connectionManagerId, message) promise.future
要是我说秘密在这里,你肯定会说我在扯淡,但确实在此处。注意到关键字Promise和Future没。
如果这个future执行完毕,返回s.ackMessage。我们再看看这个ackMessage是在什么地方被写入的呢。看一看ConnectionManager.handleMessage中的代码片段
case bufferMessage: BufferMessage = { if (authEnabled) { val res = handleAuthentication(connection, bufferMessage) if (res == true) { // message was security negotiation so skip the rest logDebug("After handleAuth result was true, returning") return if (bufferMessage.hasAckId) { val sentMessageStatus = messageStatuses.synchronized { messageStatuses.get(bufferMessage.ackId) match { case Some(status) = { messageStatuses -= bufferMessage.ackId status case None = { throw new Exception("Could not find reference for received ack message " + message.id) null sentMessageStatus.synchronized { sentMessageStatus.ackMessage = Some(message) sentMessageStatus.attempted = true sentMessageStatus.acked = true sentMessageStaus.markDone()
注意,此处的所调用的sentMessageStatus.markDone就会调用在sendMessageReliablySync中定义的promise.Success. 不妨看看MessageStatus的定义。
class MessageStatus( val message: Message, val connectionManagerId: ConnectionManagerId, completionHandler: MessageStatus = Unit) { var ackMessage: Option[Message] = None var attempted = false var acked = false def markDone() { completionHandler(this) }
我想至此调用关系搞清楚了,scala中的Future和Promise理解起来还有有点费劲。
TachyonStore在Spark的最新源码中,Storage子系统引入了TachyonStore. TachyonStore是在内存中实现了hdfs文件系统的接口,主要目的就是尽可能的利用内存来作为数据持久层,避免过多的磁盘读写操作。
有关该模块的功能介绍,可以参考http://www.meetup.com/spark-users/events/117307472/
一点点疑问,目前在Spark的存储子系统中,通信模块里传递的数据即有“心跳检测消息”,“数据同步的消息”又有“数据获取之类的信息流”。如果可能的话,要将心跳检测与数据同步即数据获取所使用的网卡分离以提高可靠性。
《Apache Spark 中文实战攻略下册》电子版地址 《Apache Spark 中文实战攻略(下册)》让企业大数据平台性能更优。阿里、Databricks、领英、Intel都在用!Spark 企业级最佳实践中文解读全收纳!
《Apache Spark 中文实战攻略上册》电子版地址 《Apache Spark 中文实战攻略(上册)》全新收录了Spark+AI Summit 2020 中文精华版峰会,Apache Spark 3.0性能优化与基础实战一书看遍!
《Apache Spark 中文实战攻略下册》电子版 《Apache Spark 中文实战攻略(下册)》让企业大数据平台性能更优。阿里、Databricks、领英、Intel都在用!Spark 企业级最佳实践中文解读全收纳!
相关文章
- Apache系列:Centos7.2下安装与配置apache
- spring 之 lookup-method & replaced-method II
- 取消普通域用户帐号加域权限&授权特定普通域用户加域权限
- Apache Spark源码走读(三)Spark on Yarn &Spark源码编译 &在YARN上运行SparkPi
- Apache Spark源码走读(十)ShuffleMapTask计算结果的保存与读取 &WEB UI和Metrics初始化及数据更新过程分析
- 华为OD机试 - 信号发射和接收(Java & JS & Python)
- 华为OD机试 - 查找充电设备组合(Java & JS & Python)
- ML之LassoR&RidgeR:基于datasets糖尿病数据集利用LassoR和RidgeR算法(alpha调参)进行(9→1)回归预测
- AI之Robot:带你玩转机器人&DIY机器人——让你成为机器人的真正主人
- AGI:走向通用人工智能的【生命学&哲学&科学】第一篇——生命、意识、五行、易经、量子
- 华为云新加坡峰会发布Cloud&AI创新实验室,四大核心优势助力智能化升级
- 图嵌入&知识表征の初体验
- 【nodejs原理&源码赏析(7)】【译】Node.js中的事件循环,定时器和process.nextTick
- 欧拉计划·第四题
- $('#checkbox').attr('checked'); 回报checked或undefined该解决方案
- 树莓派玩耍笔记1 -- 开箱 & 安装系统以及简单配置
- [WF4.0 现实] WF4.0 Receive && Send
- 事件监听 & 页面滚动(页面滚动到某一位置时显示/隐藏某元素,Vue环境)
- Linux - nohup - 实现后台运行程序及查看(nohup与&)