Spark-Sql源码解析之五 Spark Planner:optimized logical plan –> spark plan详解大数据
2023-06-13 09:20:26 时间
前面描述的主要是逻辑计划,即sql如何被解析成logicalplan,以及logicalplan如何被analyzer以及optimzer,接下来主要介绍逻辑计划如何被翻译成物理计划,即SparkPlan。
lazy val sparkPlan: SparkPlan = { SparkPlan.currentContext.set(self) planner.plan(optimizedPlan).next() }
当optimizedPlan经过planner转化之后就变为sparkPlan了。因此首先看下planner是什么?
protected[sql] val planner = new SparkPlanner //包含不同策略的策略来优化物理执行计划 protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext: SparkContext = self.sparkContext val sqlContext: SQLContext = self def codegenEnabled: Boolean = self.conf.codegenEnabled def unsafeEnabled: Boolean = self.conf.unsafeEnabled def numPartitions: Int = self.conf.numShufflePartitions //把LogicPlan转换成实际的操作,具体操作类在org.apache.spark.sql.execution包下面 def strategies: Seq[Strategy] = experimental.extraStrategies ++ ( DataSourceStrategy :: DDLStrategy :: //把limit转换成TakeOrdered操作 TakeOrdered :: //转换聚合操作 HashAggregation :: //left semi join只显示连接条件成立的时候连接左边的表的信息 // 比如select * from table1 left semi join table2 on(table1.student_no=table2.student_no); // 它只显示table1中student_no在表二当中的信息,它可以用来替换exist语句 LeftSemiJoin :: //等值连接操作,有些优化的内容,如果表的大小小于spark.sql.autoBroadcastJoinThreshold设置的字节 //就自动转换为BroadcastHashJoin,即把表缓存,类似hive的map join(顺序是先判断右表再判断右表)。 //这个参数的默认值是10000 //另外做内连接的时候还会判断左表右表的大小,shuffle取数据大表不动,从小表拉取数据过来计算 HashJoin :: //在内存里面执行select语句进行过滤,会做缓存 InMemoryScans :: //和parquet相关的操作 ParquetOperations :: //基本的操作 BasicOperators :: //没有条件的连接或者内连接做笛卡尔积 CartesianProduct :: //把NestedLoop连接进行广播连接 BroadcastNestedLoopJoin :: Nil) }
通过上述不同的策略来解析LogicalPlan。比分说sql语句:
String sql = " select SUM(id) from test group by dev_chnid";
其对应的optimizedPlan为:
Aggregate [dev_chnid#0], [SUM(id#17L) AS c0#43L] Project [dev_chnid#0,id#17L] Relation[dev_chnid#0,car_img_count#1,save_flag#2,dc_cleanflag#3,pic_id#4,car_img_plate_top#5L,car_img_plate_left#6L,car_img_plate_bottom#7L,car_img_plate_right#8L,car_brand#9L,issafetybelt#10,isvisor#11,bind_stat#12,car_num_pic#13,combined_pic_url#14,verify_memo#15,rec_stat_tmp#16,id#17L,dev_id#18,dev_chnnum#19L,dev_name#20,dev_chnname#21,car_num#22,car_numtype#23,car_numcolor#24,car_speed#25,car_type#26,car_color#27,car_length#28L,car_direct#29,car_way_code#30,cap_time#31L,cap_date#32L,inf_note#33,max_speed#34,min_speed#35,car_img_url#36,car_img1_url#37,car_img2_url#38,car_img3_url#39,car_img4_url#40,car_img5_url#41,rec_stat#42] [email protected]
则转化为的sparkPlan如下:
Aggregate false, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L] Aggregate true, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L] PhysicalRDD [dev_chnid#0,id#17L], MapPartitionsRDD[1] at
其转化过程如下:
一):首先被HashAggregation解析
object HashAggregation extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // Aggregations that can be performed in two phases, before and after the shuffle. // Cases where all aggregates can be codegened. case PartialAggregation( namedGroupingAttributes, rewrittenAggregateExpressions, groupingExpressions, partialComputation, child) if canBeCodeGened(//开启CodeGened allAggregates(partialComputation) ++ allAggregates(rewrittenAggregateExpressions)) codegenEnabled = execution.GeneratedAggregate( partial = false, namedGroupingAttributes, rewrittenAggregateExpressions, unsafeEnabled, execution.GeneratedAggregate( partial = true, groupingExpressions, partialComputation, unsafeEnabled, planLater(child))) :: Nil // Cases where some aggregate can not be codegened case PartialAggregation( namedGroupingAttributes, rewrittenAggregateExpressions, groupingExpressions, partialComputation, child) = //关闭CodeGened,测试的时候spark.sql.codegen为false execution.Aggregate( partial = false, namedGroupingAttributes, rewrittenAggregateExpressions, execution.Aggregate( partial = true, groupingExpressions, partialComputation, planLater(child))) :: Nil)) case _ = Nil }
然后呢?有没有注意到planLater(child)这个函数,它本质上是继续解析其子节点,即
Project [dev_chnid#0,id#17L] Relation[dev_chnid#0,car_img_count#1,save_flag#2,dc_cleanflag#3,pic_id#4,car_img_plate_top#5L,car_img_plate_left#6L,car_img_plate_bottom#7L,car_img_plate_right#8L,car_brand#9L,issafetybelt#10,isvisor#11,bind_stat#12,car_num_pic#13,combined_pic_url#14,verify_memo#15,rec_stat_tmp#16,id#17L,dev_id#18,dev_chnnum#19L,dev_name#20,dev_chnname#21,car_num#22,car_numtype#23,car_numcolor#24,car_speed#25,car_type#26,car_color#27,car_length#28L,car_direct#29,car_way_code#30,cap_time#31L,cap_date#32L,inf_note#33,max_speed#34,min_speed#35,car_img_url#36,car_img1_url#37,car_img2_url#38,car_img3_url#39,car_img4_url#40,car_img5_url#41,rec_stat#42] [email protected]
abstract class QueryPlanner[PhysicalPlan : TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ def strategies: Seq[GenericStrategy[PhysicalPlan]] protected def planLater(plan: LogicalPlan) = this.plan(plan).next()//继续解析 def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... val iter = strategies.view.flatMap(_(plan)).toIterator assert(iter.hasNext, s"No plan for $plan") iter }
二):其次继续解析其子节点
private[sql] object DataSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { // Scanning partitioned HadoopFsRelation case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) if t.partitionSpec.partitionColumns.nonEmpty = val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray logInfo { val total = t.partitionSpec.partitions.length val selected = selectedPartitions.length val percentPruned = (1 - total.toDouble / selected.toDouble) * 100 s"Selected $selected partitions out of $total, pruned $percentPruned% partitions." // Only pushes down predicates that do not reference partition columns. val pushedFilters = { val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet filters.filter { f = val referencedColumnNames = f.references.map(_.name).toSet referencedColumnNames.intersect(partitionColumnNames).isEmpty buildPartitionedTableScan( projectList, pushedFilters, t.partitionSpec.partitionColumns, selectedPartitions) :: Nil // Scanning non-partitioned HadoopFsRelation //加载Parquet文件,走这个分支 case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) = // See buildPartitionedTableScan for the reason that we need to create a shard // broadcast HadoopConf. val sharedHadoopConf = SparkHadoopUtil.get.conf val confBroadcast = t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf)) pruneFilterProject(//返回PhysicalRDD projectList, filters, (a, f) = t.buildScan(a, f, t.paths, confBroadcast)) :: Nil }
因此select SUM(id) from test group by dev_chnid最终被翻译成:
Aggregate false, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L] Aggregate true, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L] PhysicalRDD [dev_chnid#0,id#17L], MapPartitionsRDD[1]
至于其他策略目前还没有深入研究,上面的注释都是网上摘来的,待以后研究,这里只列举了一个聚合函数的例子,其它类似。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9309.html
分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集相关文章
- Spark-Sql源码解析之八 Codegen详解大数据
- Spark-Sql源码解析之六 PrepareForExecution: spark plan -> executed Plan详解大数据
- MySQL创建表:SQL语句实现(mysql创建表的sql语句)
- MySQL分页查询:实现快速简单的SQL语句(mysql分页查询sql语句)
- Oracle中使用SQL删除字段(oracle删除字段sql)
- 一键实现:SQL数据转换到Oracle的神器(sql转oracle工具)
- Oracle中的SQL语句学习指南(oracle的sql语句)
- MySQL实现数据分页的SQL技术(mysql分页sql)
- MySQL导出SQL数据的工具推荐(mysql导出sql工具)
- 数据合法性使用SQL语句判断MSSQL数据合法性(sql判断mssql)
- Oracle SQL跟踪工具介绍及使用技巧(oracle跟踪sql工具)
- 使用MSSQL快速生成SQL文件(mssql生成sql文件)
- serverMySQL 与 SQL Server 的比较与选择(mysql 和 sql)
- Oracle 减法运算一次性解决复杂SQL问题(oracle 减法sql)
- 如何利用C语言快速导入MySQL中的SQL脚本(c mysql导入sql)
- 中的数据使用SQL从Redis中检索数据(使用sql获取redis)
- Oracle也用SQL有没有别的区别(oracle也是sql吗)
- 查询Oracle数据库也支持SQL查询(oracle也支持sql)
- Oracle SQL在数据库中的应用(oracle中sql应用)
- Oracle与SQL连接之路追求数据的完美结合(oracle与sql链接)
- Oracle与SQL数据库间的互联互通(oracle与sql互通)
- 使用Oracle SQL进行批处理分析(oracle sql跑批)
- 间空间技术提升 Oracle SQL 性能(oracle sql 空)
- Oracle SQL序列有效自动增长及控制ID(oracle sql序列)
- 基于Oracle SQL技术的数据延时处理(oracle sql延时)
- 掌握Oracle SQL写法,激发你的SQL能力(oracle sql写法)
- openGauss数据库源码解析系列文章—— SQL引擎源解析(二)
- t-sql/mssql用命令行导入数据脚本的SQL语句示例