深入理解Spark:核心思想与源码分析. 3.11 ContextCleaner的创建与启动
3.11 ContextCleaner的创建与启动
ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下。
private[spark] val cleaner: Option[ContextCleaner] = {
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
}
cleaner.foreach(_.start())
ContextCleaner的组成如下:
referenceQueue:缓存顶级的AnyRef引用;
referenceBuffer:缓存AnyRef的虚引用;
listeners:缓存清理工作的监听器数组;
cleaningThread:用于具体清理工作的线程。
ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepCleaning方法。keepCleaning的实现见代码清单3-48。
代码清单3-48 keep Cleaning的实现
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.map(_.task).foreach { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
相关文章
- MSP360 – 十多年来不断改进的基于AWS的云备份
- 使用 Prometheus 和 PromCat 监控 Kubernetes 使用的 AWS 服务
- 利用 ECS ComposeX 自动化 ECS 容器架构部署
- 自动创建和更新 CloudFront 中的 Lambda@Edge
- python-SMTP发邮件
- 通过 AWS Lambda 和 AutoScaling 实现高可用的企业全球一张网(一)
- 通过 AWS Lambda 和 AutoScaling 实现高可用的企业全球一张网(二)
- 通过 AWS Lambda 和 AutoScaling 实现高可用的企业全球一张网(三)
- 新增功能 – 采用 AWS Graviton2 的 EC2 M6g 实例
- AWS 下调区域间数据传输 (DTIR) 价格
- EDA on AWS ——远程接入解决方案 OpenText ETX 篇
- 解析 AWS 详细账单中对于跨账户使用预留实例的计费方法
- EC2 降价 – 适用于 EC2 Instance Saving Plans 和标准预留实例
- 使用 DJL (Deep Java Library) 和 Spring Boot 在您的微服务中采用机器学习
- 新功能 – 使用 Step Functions 和 AWS CodeBuild 构建持续集成工作流程
- 新增功能 – AWS Elemental Link – 将活动和流的直播视频发布到云中
- AWS IoT 作业功能全球开放并降价超过 90%
- 新增功能 – Amazon EventBridge 架构注册表现已全面推出
- 新增功能 – 适用于 VPC 终端节点的 Amazon Simple Email Service (SES)
- 新开放 – AWS 欧洲(米兰)区域