深入理解Spark:核心思想与源码分析. 3.3 创建metadataCleaner
3.3 创建metadataCleaner
SparkContext为了保持对所有持久化的RDD的跟踪,使用类型是TimeStamped-WeakValueHashMap的persistentRdds缓存。metadataCleaner的功能是清除过期的持久化RDD。创建metadataCleaner的代码如下。
private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
private[spark] val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
我们仔细看看MetadataCleaner的实现,见代码清单3-14。
代码清单3-14 MetadataCleaner的实现
private[spark] class MetadataCleaner(
cleanerType: MetadataCleanerType.MetadataCleanerType,
cleanupFunc: (Long) = Unit,
conf: SparkConf)
extends Logging
{
val name = cleanerType.toString
private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)
private val periodSeconds = math.max(10, delaySeconds / 10)
private val timer = new Timer(name + " cleanup timer", true)
private val task = new TimerTask {
override def run() {
try {
cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
logInfo("Ran metadata cleaner for " + name)
} catch {
case e: Exception = logError("Error running cleanup task for " + name, e)
}
}
}
if (delaySeconds 0) {
timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)
}
def cancel() {
timer.cancel()
}
}
从MetadataCleaner的实现可以看出其实质是一个用TimerTask实现的定时器,不断调用cleanupFunc: (Long) = Unit这样的函数参数。构造metadataCleaner时的函数参数是cleanup,用于清理persistentRdds中的过期内容,代码如下。
private[spark] def cleanup(cleanupTime: Long) {
persistentRdds.clearOldValues(cleanupTime)
}
Apache Spark Delta Lake 事务日志实现源码分析 Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
Spark源码分析之Spark Shell(上) 终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
Spark MapOutputTracker源码分析 ## 技能标签 - Spark ShuffleMapTask处理完成后,把MapStatus数据(BlockManagerId,[compressSize])发送给MapOutputTrackerMaster.
Spark 源码分析之ShuffleMapTask内存数据Spill和合并 - Spark ShuffleMapTask 内存中的数据Spill到临时文件 - 临时文件中的数据是如何定入的,如何按partition升序排序,再按Key升序排序写入(key,value)数据 - 每个临时文件,都存入对应的每个分区有多少个(key,value)对,有多少次流提交数组,数组中...
Spark源码分析之ResultTask处理 ResultTask 执行当前分区的计算,首先从ShuffleMapTask拿到当前分区的数据,会从所有的ShuffleMapTask都拿一遍当前的分区数据,然后调用reduceByKey自定义的函数进行计算,最后合并所有的ResultTask输出结果,进行输出
相关文章
- Spark修改spark-shell启动LOGO
- 深入理解Spark:核心思想与源码分析
- 深入理解Spark:核心思想与源码分析. 导读
- 深入理解Spark:核心思想与源码分析. 1.2 Spark初体验
- 深入理解Spark:核心思想与源码分析. 2.1 初识Spark
- 深入理解Spark:核心思想与源码分析. 2.2 Spark基础知识
- 深入理解Spark:核心思想与源码分析. 3.13 创建DAGSchedulerSource和BlockManagerSource
- Spark 源码分析 -- RDD
- Spark源码分析 – Dependency
- Spark源码分析 – SchedulerBackend
- Spark MLlib - Decision Tree源码分析
- macOS SwiftUI 图表组件之 Spark line Chart火花线图 (教程含源码)
- spark-RDD源码分析
- SparkCore任务调度源码阅读_简述Spark任务的调度原理