Apache Spark源码走读(十)ShuffleMapTask计算结果的保存与读取 &WEB UI和Metrics初始化及数据更新过程分析
ShuffleMapTask的计算结果保存在哪,随后Stage中的task又是如何知道从哪里去读取的呢,这个过程一直让我困惑不已。
用比较通俗一点的说法来解释一下Shuffle数据的写入和读取过程
每一个task负责处理一个特定的data partition task在初始化的时候就已经明确处理结果可能会产生多少个不同的data partition 利用partitioner函数,task将处理结果存入到不同的partition,这些数据存放在当前task执行的机器上 假设当前是stage 2有两个task, stage 2可能输出4个不同的data partition, task 0和task 1各自运行于不同的机器上,task 0中的部分处理结果会存入到data partition 0,task 1的部分处理结果也可能存入到data partition 0. 由于stage 2产生了4个不同的data partition, 后续stage 1中的task个数就为4. task 0 就负责读取data partition 0的数据,对于(stage1, task0)来说,所要读取的data partition 0的内容由task 0和task 1中的partition 0共同组成。 现在问题的关键转换成为(stage_1, task_0)如何知道(stage_2, task_x)有没有相应的输出是属于data partition 0的呢?这个问题的解决就是MapStatus 每一个ShuffleMapTask在执行结束,都会上报一个MapStatus,在MapStatus中会反应出朝哪些data partition写入了数据,写入了数据则size为非零值,否则为零值 (stage_1,task_0)会去获取stage_2中所有task的MapStatus,以判定(stage_2, task_x)产生的数据中有自己需要读入的内容 假设(stage_1,task_0)知道(stage_2, task_0)生成了data partition 0中的数据,于是去(stage_2, task_0)运行时的机器去获取具体的数据,如果恰巧这个时候远端机器已经挂掉了,获取失败,怎么办? 上报异常,由DAGScheduler重新调度(stage_2,task_0),重新生成所需要的数据。 Spark不像Hadoop中的MapReduce有一个明显的combine阶段,在spark中combine过程有两次调用,一是Shuffle数据写入过程,另一个是Shuffle数据读取过程。如果能够明白上述的过程,并对应到相应的代码,那就无须看下述的详细解释了。
好了,让我们开始代码跟踪吧。
数据写入过程数据写入动作最原始的触发点是ShuffleMapTask.runTask函数,看一看源码先。
override def runTask(context: TaskContext): MapStatus = { metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(split, context).asInstanceOf[Iterator[_ if (writer != null) { writer.stop(success = false) throw e } finally { context.executeOnCompleteCallbacks()
managerGetWriter返回的是HashShuffleWriter,所以调用过程是:ShuffleMapTask.runTask- HashShuffleWriter.write- BlockObjectWriter.write. 注意dep.mapSideCombine这一分支判断。ReduceByKey(_ + _)中的(_ + _)在此处被执行一次,另一次执行是在read过程。
override def write(records: Iterator[_ : Product2[K, V]]): Unit = { val iter = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { dep.aggregator.get.combineValuesByKey(records, context) } else { records } else if (dep.aggregator.isEmpty dep.mapSideCombine) { throw new IllegalStateException("Aggregator is empty for map-side combine") } else { records for (elem - iter) { val bucketId = dep.partitioner.getPartition(elem._1) shuffle.writers(bucketId).write(elem)
HashShuffleWriter.write中主要处理两件事:
判断是否需要进行聚合,比如 hello,1 和 hello,1 都要写入的话,那么先生成 hello,2 然后再进行后续的写入工作 利用Partitioner函数来决定 k,val 写入到哪一个文件中Partitioner是在什么时候注入的,RDD抽象类中,Partitioner为空?以reduceByKey为例,HashPartitioner会在后面combineByKey的代码创建ShuffledRDD的时候作为ShuffledRDD的构造函数传入。
def reduceByKey(func: (V, V) = V, numPartitions: Int): RDD[(K, V)] = { reduceByKey(new HashPartitioner(numPartitions), func)
Stage在创建的时候通过构造函数入参明确需要从多少Partition读取数据,生成的Partition会有多少。看一看Stage的构造函数,读取的分区数目由RDD.partitions.size决定,输出的partitions由shuffleDep决定。
private[spark] class Stage( val id: Int, val rdd: RDD[_], val numTasks: Int, val shuffleDep: Option[ShuffleDependency[_, _, _]], // Output shuffle if stage is a map stage val parents: List[Stage], val jobId: Int, val callSite: CallSite) extends Logging { val isShuffleMap = shuffleDep.isDefined val numPartitions = rdd.partitions.size val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 private var nextAttemptId = 0
回到数据写入的问题上来,结果写入时的一个主要问题就是已经知道shuffle_id, map_id和要写入的elem,如何找到对应的写入文件。每一个临时文件由三元组(shuffle_id,map_id,reduce_id)来决定,当前已经知道了两个,还剩下一下reduce_id待确定。
reduce_id是使用partitioner计算出来的结果,输入的是elem的键值。也就是dep.partitioner.getPartition(elem._1)。 根据计算出来的bucketid找到对应的writer,然后真正写入。
在HashShuffleWriter.write中使用到的shuffle由ShuffleBlockManager的forMapTask函数生成,注意forMapTask中产生writers的代码逻辑。
每个writer分配一下文件, 文件名由三元组(shuffle_id,map_id,reduce_id)组成,如果知道了这个三元组就可以找到对应的文件。
如果consolidation没有打开,那么在一个task中,有多少个输出的partition就会有多少个中间文件。
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) { fileGroup = getUnusedFileGroup() Array.tabulate[BlockObjectWriter](numBuckets) { bucketId = val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize) } else { Array.tabulate[BlockObjectWriter](numBuckets) { bucketId = val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockFile = blockManager.diskBlockManager.getFile(blockId) // Because of previous failures, the shuffle file may already exist on this machine. // If so, remove it. if (blockFile.exists) { if (blockFile.delete()) { logInfo(s"Removed existing shuffle file $blockFile") } else { logWarning(s"Failed to remove existing shuffle file $blockFile") blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
getFile负责将三元组(shuffle_id,map_id,reduce_id)映射到文件名
def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir // Create the subdirectory if it doesnt already exist var subDir = subDirs(dirId)(subDirId) if (subDir == null) { subDir = subDirs(dirId).synchronized { val old = subDirs(dirId)(subDirId) if (old != null) { } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) newDir.mkdir() subDirs(dirId)(subDirId) = newDir newDir new File(subDir, filename) def getFile(blockId: BlockId): File = getFile(blockId.name)
产生的文件在哪呢,如果没有更改默认的配置,生成的目录结构类似于下
/tmp/spark-local-20140723092540-7f24 /tmp/spark-local-20140723092540-7f24/0d /tmp/spark-local-20140723092540-7f24/0d/shuffle_0_0_1 /tmp/spark-local-20140723092540-7f24/0d/shuffle_0_1_0 /tmp/spark-local-20140723092540-7f24/0c /tmp/spark-local-20140723092540-7f24/0c/shuffle_0_0_0 /tmp/spark-local-20140723092540-7f24/0e /tmp/spark-local-20140723092540-7f24/0e/shuffle_0_1_1
当所有的数据写入文件并提交以后,还需要生成MapStatus汇报给driver application. MapStatus在哪生成的呢?commitWritesAndBuildStatus就干这活。
调用关系HashShuffleWriter.stop- commitWritesAndBuildStatus
private def commitWritesAndBuildStatus(): MapStatus = { // Commit the writes. Get the size of each bucket block (total block size). var totalBytes = 0L var totalTime = 0L val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter = writer.commit() writer.close() val size = writer.fileSegment().length totalBytes += size totalTime += writer.timeWriting() MapOutputTracker.compressSize(size) // Update shuffle metrics. val shuffleMetrics = new ShuffleWriteMetrics shuffleMetrics.shuffleBytesWritten = totalBytes shuffleMetrics.shuffleWriteTime = totalTime metrics.shuffleWriteMetrics = Some(shuffleMetrics) new MapStatus(blockManager.blockManagerId, compressedSizes)
compressedSize是一个非常让人疑惑的地方,原因慢慢道来,先看一下MapStatus的构造函数
class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
compressedSize是一个byte数组,每一个byte反应了该partiton中的数据大小。如Array(0)=128就表示在data partition 0中有128byte数据。
问题的问题是一个byte只能表示255,如果超过255怎么办呢?
当当当,数学闪亮登场了,注意到compressSize没,通过转换将2^8变换为1.1^256。一下子由255byte延伸到近35G.
看一看这神奇的compressSize函数吧,只是聊聊几行代码而已。
def compressSize(size: Long): Byte = { if (size == 0) { } else if (size = 1L) { } else { math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
ShuffleMapTask运行结束时,会将MapStatus结果封装在StatusUpdate消息中汇报给SchedulerBackend, 由DAGScheduler在handleTaskCompletion函数中将MapStatus加入到相应的Stage。这一过程略过,不再详述。
MapOutputTrackerMaster会保存所有最新的MapStatus.
只画张图来表示存储之后的示意。
数据读取过程
ShuffledRDD.compute函数是读取过程的触发点。
override def compute(split: Partition, context: TaskContext): Iterator[P] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[P]]
shuffleManager.getReader返回的是HashShuffleReader,所以看一看HashShuffleReader中的read函数的具体实现。
read函数处理逻辑中需要注意到一点即combine过程有可能会被再次执行。注意dep.aggregator.isDefined这一分支判断。ReduceByKey(_ + _)中的(_ + _)在此处被执行。
override def read(): Iterator[Product2[K, C]] = { val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, Serializer.getSerializer(dep.serializer)) if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context)) } else { new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) } else if (dep.aggregator.isEmpty dep.mapSideCombine) { throw new IllegalStateException("Aggregator is empty for map-side combine") } else { iter
一路辗转,终于来到了读取过程中非常关键的所在BlockStoreShuffleFetcher。
BlockStoreShuffleFetcher需要回答如下问题
所要获取的mapid的mapstatus的内容是什么 根据获得的mapstatus去相应的blockmanager获取具体的数据val blockManager = SparkEnv.get.blockManager val startTime = System.currentTimeMillis val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime)) val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]] for (((address, size), index) (address, splits.map(s = (ShuffleBlockId(shuffleId, s._1, reduceId), s._2))) val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) val itr = blockFetcherItr.flatMap(unpackBlock)
一个ShuffleMapTask会生成一个MapStatus,MapStatus中含有当前ShuffleMapTask产生的数据落到各个Partition中的大小。如果大小为0,则表示该分区没有数据产生。MapStatus中另一个重要的成员变量就是BlockManagerId,该变量表示目标数据在哪个BlockManager当中。
MapoutputTrackerMaster拥有最新的MapStatus信息,为了执行效率,MapoutputTrackerWorker会定期更新数据到本地,所以MapoutputTracker先从本地查找,如果找不到再从MapoutputTrackerMaster上同步最新数据。
索引即是reduceId,如果array(0) == 0,就表示上一个ShuffleMapTask中生成的数据中没有任意的内容可以作为reduceId为0的ResultTask的输入。如果不能理解,返回仔细看一下MapStatus的结构图。
BlockManager.getMultiple用于读取BlockManager中的数据,根据配置确定生成tNettyBlockFetcherIterator还是BasicBlockFetcherIterator。
如果所要获取的文件落在本地,则调用getLocal读取,否则发送请求到远端blockmanager。看一下BlockFetcherIterator的initialize函数
override def initialize() { // Split local and remote blocks. val remoteRequests = splitLocalRemoteBlocks() // Add the remote requests into our queue in a random order fetchRequests ++= Utils.randomize(remoteRequests) // Send out initial requests for blocks, up to our maxBytesInFlight while (!fetchRequests.isEmpty (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size = maxBytesInFlight)) { sendRequest(fetchRequests.dequeue()) val numFetches = remoteRequests.size - fetchRequests.size logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime)) // Get Local Blocks startTime = System.currentTimeMillis getLocalBlocks() logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
至此,数据读取的正常流程讲述完毕。
数据读取异常如果数据读取中碰到异常怎么办?比如,
已知(stage_2,task_0)产生的parition_0的数据在机器m1, 当前任务在m2执行,于是从m2向m1发起远程获取请求,如果m2中拥有目标数据的JVM进程异常退出,则相应的目标数据无法获取。如果无法获取目标数据,就会上报FetchFailedException.
def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = { val blockId = blockPair._1 val blockOption = blockPair._2 blockOption match { case Some(block) = { block.asInstanceOf[Iterator[T]] case None = { blockId match { case ShuffleBlockId(shufId, mapId, _) = val address = statuses(mapId.toInt)._1 throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId) case _ = throw new SparkException( "Failed to get block " + blockId + ", which is not a shuffle block")
FetchFailedExecption会被包装在StatutsUpdate上报给SchedulerBackend,然后一路处理下去,最终将丢失目标数据的归属Task重新提交。比如当前是(stage_1, task_0),需要读取(stage_2, task_1)产生的目标数据,但是对应的目标数据丢失,这个时候就需要将(stage_2, task_1)重新提交运行。
注意DAGScheduler中的FetchFailed处理分支,一路跟踪下去就会看到任务被重新提交了
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) = // Mark the stage that the reducer was in as unrunnable val failedStage = stageIdToStage(task.stageId) runningStages -= failedStage // TODO: Cancel running tasks in the stage logInfo("Marking " + failedStage + " (" + failedStage.name + ") for resubmision due to a fetch failure") // Mark the map whose fetch failed as broken in the map stage val mapStage = shuffleToMapStage(shuffleId) if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress) mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name + "); marking it for resubmission") if (failedStages.isEmpty eventProcessActor != null) { // Dont schedule an event to resubmit failed stages if failed isnt empty, because // in that case the event will already have been scheduled. eventProcessActor may be // null during unit tests. import env.actorSystem.dispatcher env.actorSystem.scheduler.scheduleOnce( RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) failedStages += failedStage failedStages += mapStage // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.epoch))
生成的中间数据是在什么时候被清除的呢?
当Driver Application退出的时候,该Application生成的临时文件将会被一一清除,注意是application结束生命,不是job。一个application可以包含一至多个job。
以local-cluster方式运行spark-shell,观察/tmp/spark-local*目录下的文件变化,具体指令如下
MASTER=local-cluster[2,2,512] bin/spark-shell #进入spark-shell之后,输入 sc.textFile("README.md").flatMap(_.split(" ")).map(w= (w,1)).reduceByKey(_ + _)
Shuffle数据的写入和读取是Spark Core这一部分最为复杂的内容,彻底了解该部分内容才能深刻意识到Spark实现的精髓所在。
二 WEB UI和Metrics初始化及数据更新过程分析WEB UI和Metrics子系统为外部观察监测Spark内部运行情况提供了必要的窗口,本文将简略的过一下其内部代码实现。
WEB UI先上图感受一下spark webui 假设当前已经在本机运行standalone cluster模式,输入http://127.0.0.1:8080将会看到如下页面
driver application默认会打开4040端口进行http监听,可以看到application相关的详细信息
显示每个stage的详细信息
本节要讨论的重点是http server是如何启动的,页面中的数据是从哪里获取到的?Spark中用到的http server是jetty, jetty采用java编写,是非常轻巧的servlet engine和http server。能够嵌入到用户程序中执行,不用像tomcat或jboss那样需要自己独立的jvm进程。
SparkUI在SparkContext初始化的时候创建
// Initialize the Spark UI , registering all associated listeners private [spark] val ui = new SparkUI (this) ui.bind ()
initialize的主要工作是注册页面处理句柄,WebUI的子类需要实现自己的initialize函数
bind将真正启动jetty server.
def bind () { assert (! serverInfo .isDefined , " Attempted to bind % s more than once!". format ( className )) try { // 启 动 JettyServer serverInfo = Some( startJettyServer (" 0.0.0.0 ", port , handlers , conf)) logInfo (" Started %s at http ://%s:%d". format ( className , publicHostName , boundPort )) } catch { case e: Exception = logError (" Failed to bind %s". format ( className ) System .exit (1)
在startJettyServer函数中将JettyServer运行起来的关键处理函数是connect
def connect(currentPort: Int): (Server, Int) = { val server = new Server(new InetSocketAddress(hostName, currentPort)) val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) server.setHandler(collection) Try { server.start() } match { case s: Success[_] = (server, server.getConnectors.head.getLocalPort) case f: Failure[_] = val nextPort = (currentPort + 1) % 65536 server.stop() pool.stop() val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort." if (f.toString.contains("Address already in use")) { logWarning(s"$msg - $f") } else { logError(msg, f.exception) connect(nextPort) val (server, boundPort) = connect(port) ServerInfo(server, boundPort, collection)
页面中的数据是如何获取的呢,这就要归功于SparkListener了,典型的观察者设计模式。当有与stage及task相关的事件发生时,这些Listener都将收到通知,并进行数据更新。
需要指出的是,数据尽管得以自动更新,但页面并没有,还是需要手工刷新才能得到最新的数据。
上图显示的是SparkUI中注册了哪些SparkListener子类。来看一看这些子类是在什么时候注册进去的, 注意研究一下SparkUI.initialize函
def initialize() { listenerBus.addListener(storageStatusListener) val jobProgressTab = new JobProgressTab(this) attachTab(jobProgressTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/stages", basePath = basePath)) attachHandler( createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest)) if (live) { sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
举一个实际例子来看看Notifier发送Event的时刻,比如有任务提交的时 resourceOffer- taskStarted- handleBeginEvent
private [ scheduler ] def handleBeginEvent (task: Task[_ ], taskInfo : TaskInfo ) { listenerBus .post( SparkListenerTaskStart (task. stageId , taskInfo )) submitWaitingStages ()
post其实是向listenerBus的消息队列中添加一个消息,真正将消息发送 出去的时另一个处理线程listenerThread
override def run (): Unit = Utils. logUncaughtExceptions { while (true) { eventLock . acquire () // Atomically remove and process this event LiveListenerBus .this. synchronized { val event = eventQueue .poll if (event == SparkListenerShutdown ) { // Get out of the while loop and shutdown the daemon thread return Option (event). foreach ( postToAll )
Option(event).foreach(postToAll)负责将事件通知给各个Observer.postToAll的函数实现如下
def postToAll(event: SparkListenerEvent) { event match { case stageSubmitted: SparkListenerStageSubmitted = foreachListener(_.onStageSubmitted(stageSubmitted)) case stageCompleted: SparkListenerStageCompleted = foreachListener(_.onStageCompleted(stageCompleted)) case jobStart: SparkListenerJobStart = foreachListener(_.onJobStart(jobStart)) case jobEnd: SparkListenerJobEnd = foreachListener(_.onJobEnd(jobEnd)) case taskStart: SparkListenerTaskStart = foreachListener(_.onTaskStart(taskStart)) case taskGettingResult: SparkListenerTaskGettingResult = foreachListener(_.onTaskGettingResult(taskGettingResult)) case taskEnd: SparkListenerTaskEnd = foreachListener(_.onTaskEnd(taskEnd)) case environmentUpdate: SparkListenerEnvironmentUpdate = foreachListener(_.onEnvironmentUpdate(environmentUpdate)) case blockManagerAdded: SparkListenerBlockManagerAdded = foreachListener(_.onBlockManagerAdded(blockManagerAdded)) case blockManagerRemoved: SparkListenerBlockManagerRemoved = foreachListener(_.onBlockManagerRemoved(blockManagerRemoved)) case unpersistRDD: SparkListenerUnpersistRDD = foreachListener(_.onUnpersistRDD(unpersistRDD)) case applicationStart: SparkListenerApplicationStart = foreachListener(_.onApplicationStart(applicationStart)) case applicationEnd: SparkListenerApplicationEnd = foreachListener(_.onApplicationEnd(applicationEnd)) case SparkListenerShutdown =Metrics
在系统设计中,测量模块是不可或缺的组成部分。通过这些测量数据来感知系统的运行情况。
在Spark中,测量模块由MetricsSystem来担任,MetricsSystem中有三个重要的概念,分述如下。
instance 表示谁在使用metrics system, 目前已知的有master, worker, executor和client driver会创建metrics system用以测量 source 表示数据源,从哪里获取数据 sinks 数据目的地,将从source获取的数据发送到哪
JmxSink 注册到JMX,以通过JMXConsole来查看 MetricsServlet 在SparkUI中添加MetricsServlet用以查看Task运行时的测量数据 GraphiteSink 发送给Graphite以对整个系统(不仅仅包括spark)进行监控
下面从MetricsSystem的创建,数据源的添加,数据更新与发送几个方面来跟踪一下源码。
初始化过程MetricsSystem依赖于由codahale提供的第三方库Metrics,可以在metrics.codahale.com找到更为详细的介绍。
以Driver Application为例,driver application首先会初始化SparkContext,在SparkContext的初始化过程中就会创建MetricsSystem,具体调用关系如下。 SparkContext.init- SparkEnv.init- MetricsSystem.createMetricsSystem
注册数据源,继续以SparkContext为例
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) private def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) SparkEnv.get.metricsSystem.registerSource(blockManagerSource) initDriverMetrics()
数据读取由Sink来完成,在Spark中创建的Sink子类如下图所示
读取最新的数据,以CsvSink为例,最主要的就是创建CsvReporter,启动之后会定期更新最近的数据到console。不同类型的Sink所使用的Reporter是不一样的。
val reporter: CsvReporter = CsvReporter.forRegistry(registry) .formatFor(Locale.US) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build(new File(pollDir)) override def start() { reporter.start(pollPeriod, pollUnit)
Spark中关于metrics子系统的配置文件详见conf/metrics.properties. 默认的Sink是MetricsServlet,在任务提交执行之后,输入http://127.0.0.1:4040/metrics/json会得到以json格式保存的metrics信息。
《Apache Spark 中文实战攻略下册》电子版地址 《Apache Spark 中文实战攻略(下册)》让企业大数据平台性能更优。阿里、Databricks、领英、Intel都在用!Spark 企业级最佳实践中文解读全收纳!
《Apache Spark 中文实战攻略上册》电子版地址 《Apache Spark 中文实战攻略(上册)》全新收录了Spark+AI Summit 2020 中文精华版峰会,Apache Spark 3.0性能优化与基础实战一书看遍!
《Apache Spark 中文实战攻略下册》电子版 《Apache Spark 中文实战攻略(下册)》让企业大数据平台性能更优。阿里、Databricks、领英、Intel都在用!Spark 企业级最佳实践中文解读全收纳!
相关文章
- [应用]Linux下" >/dev/null 2>&1 "
- App Deploy as Code! SAE & Terraform 实现 IaC 式部署应用
- Java 容器 & 泛型:三、HashSet,TreeSet 和 LinkedHashSet比较
- [Typescript] Type Guard: is & assert
- [AWS - DA] Step function & AppSync
- nginx: [emerg] unknown "scripts" variable
- 解决apache启动错误"httpd:Could not reliably determine
- Apache Spark源码走读(三)Spark on Yarn &Spark源码编译 &在YARN上运行SparkPi
- 选择列表中的列 '***' 无效,因为该列没有包含在聚合函数或 GROUP BY 子句中
- C++ & OpenCV 零散学习总结
- SAP ABAP OData gateway框架序列化和反序列化(serialization & deserialization)的实现逻辑
- 华为OD机试 - 任务总执行时长(Java & JS & Python)
- hdoj-1289-A Bug's Life【种类并查集】
- LVGL 8.2 Tabview & Window
- 字符串转换为整数”123“->123
- mac安装Parallels Tools(kali)解决(内核版本>=4.15问题)