zl程序教程

您现在的位置是:首页 >  工具

当前栏目

深入理解Spark:核心思想与源码分析. 3.3 创建metadataCleaner

源码Spark 分析 创建 深入 理解 3.3 核心思想
2023-09-11 14:16:02 时间

3.3 创建metadataCleaner

SparkContext为了保持对所有持久化的RDD的跟踪,使用类型是TimeStamped-WeakValueHashMap的persistentRdds缓存。metadataCleaner的功能是清除过期的持久化RDD。创建metadataCleaner的代码如下。

private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]

private[spark] val metadataCleaner =

    new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

我们仔细看看MetadataCleaner的实现,见代码清单3-14。

代码清单3-14 MetadataCleaner的实现

private[spark] class MetadataCleaner(

        cleanerType: MetadataCleanerType.MetadataCleanerType,

        cleanupFunc: (Long) = Unit,

        conf: SparkConf)

    extends Logging

{

    val name = cleanerType.toString

 

    private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)

    private val periodSeconds = math.max(10, delaySeconds / 10)

    private val timer = new Timer(name + " cleanup timer", true)

 

    private val task = new TimerTask {

        override def run() {

        try {

            cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))

            logInfo("Ran metadata cleaner for " + name)

        } catch {

            case e: Exception = logError("Error running cleanup task for " + name, e)

        }

      }

    }

 

    if (delaySeconds 0) {

        timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)

    }

 

    def cancel() {

        timer.cancel()

    }

}

从MetadataCleaner的实现可以看出其实质是一个用TimerTask实现的定时器,不断调用cleanupFunc: (Long) = Unit这样的函数参数。构造metadataCleaner时的函数参数是cleanup,用于清理persistentRdds中的过期内容,代码如下。

private[spark] def cleanup(cleanupTime: Long) {

    persistentRdds.clearOldValues(cleanupTime)

}


Apache Spark Delta Lake 事务日志实现源码分析 Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
Spark源码分析之Spark Shell(上) 终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
Spark MapOutputTracker源码分析 ## 技能标签 - Spark ShuffleMapTask处理完成后,把MapStatus数据(BlockManagerId,[compressSize])发送给MapOutputTrackerMaster.
Spark 源码分析之ShuffleMapTask内存数据Spill和合并 - Spark ShuffleMapTask 内存中的数据Spill到临时文件 - 临时文件中的数据是如何定入的,如何按partition升序排序,再按Key升序排序写入(key,value)数据 - 每个临时文件,都存入对应的每个分区有多少个(key,value)对,有多少次流提交数组,数组中...
Spark源码分析之ResultTask处理 ResultTask 执行当前分区的计算,首先从ShuffleMapTask拿到当前分区的数据,会从所有的ShuffleMapTask都拿一遍当前的分区数据,然后调用reduceByKey自定义的函数进行计算,最后合并所有的ResultTask输出结果,进行输出