Spark-Sql源码解析之六 PrepareForExecution: spark plan -> executed Plan详解大数据
2023-06-13 09:20:26 时间
在SparkPlan中插入Shuffle的操作,如果前后2个SparkPlan的outputPartitioning不一样的话,则中间需要插入Shuffle的动作,比分说聚合函数,先局部聚合,然后全局聚合,局部聚合和全局聚合的分区规则是不一样的,中间需要进行一次Shuffle。
比方说sql语句:selectSUM(id) from test group by dev_chnid
其从逻辑计划转换为的物理计划如下:
Aggregate false, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L] Aggregate true, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L] PhysicalRDD [dev_chnid#0,id#17L], MapPartitionsRDD[1]
其中Aggregate的第一个构造函数指明了其ChildDistribution,即规定了该SparkPlan的分区规则
case class Aggregate( partial: Boolean, groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], child: SparkPlan) extends UnaryNode { override def requiredChildDistribution: List[Distribution] = { if (partial) { UnspecifiedDistribution :: Nil //当为true时,则对于Child的分区规则无所谓 } else { if (groupingExpressions == Nil) { AllTuples :: Nil } else { ClusteredDistribution(groupingExpressions) :: Nil //当为false时,必须按照聚合字段进行分区,此时为dev_chnid }
因此如果按照以上SparkPlan执行的话,其流程图如下:
Aggregate true, [dev_chnid#0], [dev_chnid#0,SUM(id#17L)AS PartialSum#45L]的输出是没有规则的,Aggregate false, [dev_chnid#0],[CombineSum(PartialSum#45L) AS c0#43L]所要求的输入是必须按照group字段分区的,因此中间必然有个转变,将前一个Aggretae无规则的输出变为后一个Aggregate有规则的输入,这就是prepareForExecution所负责的事。
lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { val batches = Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] { // TODO: Determine the number of partitions. def numPartitions: Int = sqlContext.conf.numShufflePartitions def apply(plan: SparkPlan): SparkPlan = plan.transformUp {//先遍历孩子节点,然后遍历自己 case operator: SparkPlan = // True iff every childs outputPartitioning satisfies the corresponding // required data distribution. //ClusteredDistribution(groupingExpressions) :: Nil zip def meetsRequirements: Boolean =//判断该SparkPlan的child的outputPartitioning是否满足其本身的要求 operator.requiredChildDistribution.zip(operator.children).forall { case (required, child) = val valid = child.outputPartitioning.satisfies(required) logInfo( s"${if (valid) "Valid" else "Invalid"} distribution," + s"required: $required current: ${child.outputPartitioning}") valid // True iff any of the children are incorrectly sorted. def needsAnySort: Boolean =//判断该SparkPlan的child的outputOrdering是否满足其本身的要求 operator.requiredChildOrdering.zip(operator.children).exists { case (required, child) = required.nonEmpty required != child.outputOrdering // True iff outputPartitionings of children are compatible with each other. // It is possible that every child satisfies its required data distribution // but two children have incompatible outputPartitionings. For example, // A dataset is range partitioned by "a.asc" (RangePartitioning) and another // dataset is hash partitioned by "a" (HashPartitioning). Tuples in these two // datasets are both clustered by "a", but these two outputPartitionings are not // compatible. // TODO: ASSUMES TRANSITIVITY? def compatible: Boolean =//当SparkPlan有多个child的时候,需要判断各个child之间的兼容性 !operator.children .map(_.outputPartitioning) .sliding(2) .map { case Seq(a) = true case Seq(a, b) = a.compatibleWith(b) }.exists(!_) // Adds Exchange or Sort operators as required def addOperatorsIfNecessary( partitioning: Partitioning, rowOrdering: Seq[SortOrder], child: SparkPlan): SparkPlan = { val needSort = rowOrdering.nonEmpty child.outputOrdering != rowOrdering val needsShuffle = child.outputPartitioning != partitioning val canSortWithShuffle = Exchange.canSortWithShuffle(partitioning, rowOrdering) if (needSort needsShuffle canSortWithShuffle) { Exchange(partitioning, rowOrdering, child) } else { val withShuffle = if (needsShuffle) { Exchange(partitioning, Nil, child) } else { child val withSort = if (needSort) { if (sqlContext.conf.externalSortEnabled) { ExternalSort(rowOrdering, global = false, withShuffle) } else { Sort(rowOrdering, global = false, withShuffle) } else { withShuffle withSort if (meetsRequirements compatible !needsAnySort) {//如果满足,则不做任何事情 operator } else { // At least one child does not satisfies its required data distribution or // at least one childs outputPartitioning is not compatible with another childs // outputPartitioning. In this case, we need to add Exchange operators. val requirements = (operator.requiredChildDistribution, operator.requiredChildOrdering, operator.children) val fixedChildren = requirements.zipped.map {//根据不同的要求产生一个中间的过渡的SparkPlan case (AllTuples, rowOrdering, child) = addOperatorsIfNecessary(SinglePartition, rowOrdering, child) case (ClusteredDistribution(clustering), rowOrdering, child) = //SUM分组求和的时候需要对分组字段进行hash分区 addOperatorsIfNecessary(HashPartitioning(clustering, numPartitions), rowOrdering, child) case (OrderedDistribution(ordering), rowOrdering, child) = addOperatorsIfNecessary(RangePartitioning(ordering, numPartitions), rowOrdering, child) case (UnspecifiedDistribution, Seq(), child) = child case (UnspecifiedDistribution, rowOrdering, child) = if (sqlContext.conf.externalSortEnabled) { ExternalSort(rowOrdering, global = false, child) } else { Sort(rowOrdering, global = false, child) case (dist, ordering, _) = sys.error(s"Dont know how to ensure $dist with ordering $ordering") operator.withNewChildren(fixedChildren) }
因此经过prepareForExecution处理之后其SparkPlan变成了如下的形式:
Aggregate false, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L] Exchange (HashPartitioning 200) Aggregate true, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L] PhysicalRDD [dev_chnid#0,id#17L], MapPartitionsRDD[1]
其流程图如下:
通过Exchange将原有2个数据集的实际输出和所要求的输入保持一致。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9308.html
分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集相关文章
- Spark-Sql源码解析之三 Analyzer:Unresolved logical plan –> analyzed logical plan详解大数据
- 快速导出Oracle数据库表记录SQL语句大全(oracle导出表数据sql)
- 如何使用SQL Server导出数据(sqlserver导出数据sql)
- Oracle中使用SQL创建数据表(oracle创建表sql)
- Oracle中的SQL语句学习指南(oracle的sql语句)
- 操作Oracle 日期数据的SQL操作简易教程(oracle日期sql)
- MySQL实现数据分页的SQL技术(mysql分页sql)
- SQL连接MySQL:从零开始(sql连接mysql)
- Mysql中删除记录的SQL语句(mysql删除sql语句)
- MySQL查看历史SQL:史上最全指南(mysql查看历史sql)
- :MySQL移除重复记录的SQL语句方法(mysql删除重复sql)
- 如何使用MySQL合并SQL数据?(mysql合并sql)
- 如何在Linux上导出SQL文件(linux导出sql文件)
- SQL 同步MSSQL实现数据一致性(sql 同步mssql)
- 调整字段SQL Server中使用批量操作调整字段(sqlserver 批量)
- MySQL数据库备份还原SQL操作指南(mysql数据还原sql)
- 深入解析MySQL的SQL语句(mysql的 sql语句)
- 构建高性能数据库用 SQL 还是 Redis(用sql还是redis)
- 从SQL到Redis实现灵活高效的数据访问(sql访问redis数据)
- Oracle SQL语句实现列数据修改(oracle修改列sql)
- Redis VS SQL 哪种更适合解决您的问题(redis 适合sql吗)
- Oracle SQL让你的数据不再缺位(oracle sql补位)
- Oracle SQL实现数据累加的巧妙方式(oracle sql累加)
- 籍学习Oracle SQL让你全面了解向数据库说话(oracle sql的书)