Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运行流程解析
2023-09-14 09:00:24 时间
使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implic
第十二届 BigData NoSQL Meetup — 基于hbase的New sql落地实践 立即下载
使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("/examples/src/main/resources/people.txt").map(_.split(",")).map(p = Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age = 13 AND age = 19")
(1)查看teenagers的Schema信息
scala teenagers.printSchema |-- name: string (nullable = true) |-- age: integer (nullable = false)
(2)查看运行流程
scala teenagers.queryExecution res3: org.apache.spark.sql.SQLContext#QueryExecution = == Parsed Logical Plan == Project [unresolvedalias(name),unresolvedalias(age)] Filter ((age = 13) (age = 19)) UnresolvedRelation [people], None == Analyzed Logical Plan == name: string, age: int Project [name#0,age#1] Filter ((age#1 = 13) (age#1 = 19)) Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 == Optimized Logical Plan == Filter ((age#1 = 13) (age#1 = 19)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 == Physical Plan == Filter ((age#1 = 13) (age#1 = 19)) Scan PhysicalRDD[name#0,age#1] Code Generation: true
QueryExecution中表示的是整体Spark SQL运行流程,从上面的输出结果可以看到,一个SQL语句要执行需要经过下列步骤:
== (1)Parsed Logical Plan == Project [unresolvedalias(name),unresolvedalias(age)] Filter ((age = 13) (age = 19)) UnresolvedRelation [people], None == (2)Analyzed Logical Plan == name: string, age: int Project [name#0,age#1] Filter ((age#1 = 13) (age#1 = 19)) Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 == (3)Optimized Logical Plan == Filter ((age#1 = 13) (age#1 = 19)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 == (4)Physical Plan == Filter ((age#1 = 13) (age#1 = 19)) Scan PhysicalRDD[name#0,age#1] //启动动态字节码生成技术(bytecode generation,CG),提升查询效率 Code Generation: true2.全表查询运行流程
执行语句:
val all= sqlContext.sql("SELECT * FROM people")
运行流程:
scala all.queryExecution res9: org.apache.spark.sql.SQLContext#QueryExecution = //注意*号被解析为unresolvedalias(*) == Parsed Logical Plan == Project [unresolvedalias(*)] UnresolvedRelation [people], None == Analyzed Logical Plan == //unresolvedalias(*)被analyzed为Schema中所有的字段 //UnresolvedRelation [people]被analyzed为Subquery people name: string, age: int Project [name#0,age#1] Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 == Optimized Logical Plan == LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 == Physical Plan == Scan PhysicalRDD[name#0,age#1] Code Generation: true3. filter查询运行流程
执行语句:
scala val filterQuery= sqlContext.sql("SELECT * FROM people WHERE age = 13 AND age = 19") filterQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]
执行流程:
scala filterQuery.queryExecution res0: org.apache.spark.sql.SQLContext#QueryExecution = == Parsed Logical Plan == Project [unresolvedalias(*)] Filter ((age = 13) (age = 19)) UnresolvedRelation [people], None == Analyzed Logical Plan == name: string, age: int Project [name#0,age#1] //多出了Filter,后同 Filter ((age#1 = 13) (age#1 = 19)) Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :20 == Optimized Logical Plan == Filter ((age#1 = 13) (age#1 = 19)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :20 == Physical Plan == Filter ((age#1 = 13) (age#1 = 19)) Scan PhysicalRDD[name#0,age#1] Code Generation: true
val joinQuery= sqlContext.sql("SELECT * FROM people a, people b where a.age=b.age")
查看整体执行流程
scala joinQuery.queryExecution res0: org.apache.spark.sql.SQLContext#QueryExecution = //注意Filter //Join Inner == Parsed Logical Plan == Project [unresolvedalias(*)] Filter (a.age = b.age) Join Inner, None UnresolvedRelation [people], Some(a) UnresolvedRelation [people], Some(b) == Analyzed Logical Plan == name: string, age: int, name: string, age: int Project [name#0,age#1,name#2,age#3] Filter (age#1 = age#3) Join Inner, None Subquery a Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 Subquery b Subquery people LogicalRDD [name#2,age#3], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 == Optimized Logical Plan == Project [name#0,age#1,name#2,age#3] Join Inner, Some((age#1 = age#3)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4]... //查看其Physical Plan scala joinQuery.queryExecution.sparkPlan res16: org.apache.spark.sql.execution.SparkPlan = TungstenProject [name#0,age#1,name#2,age#3] SortMergeJoin [age#1], [age#3] Scan PhysicalRDD[name#0,age#1] Scan PhysicalRDD[name#2,age#3]
前面的例子与下面的例子等同,只不过其运行方式略有不同,执行语句:
scala val innerQuery= sqlContext.sql("SELECT * FROM people a inner join people b on a.age=b.age") innerQuery: org.apache.spark.sql.DataFrame = [name: string, age: int, name: string, age: int]
查看整体执行流程:
scala innerQuery.queryExecution res2: org.apache.spark.sql.SQLContext#QueryExecution = //注意Join Inner //另外这里面没有Filter == Parsed Logical Plan == Project [unresolvedalias(*)] Join Inner, Some((a.age = b.age)) UnresolvedRelation [people], Some(a) UnresolvedRelation [people], Some(b) == Analyzed Logical Plan == name: string, age: int, name: string, age: int Project [name#0,age#1,name#4,age#5] Join Inner, Some((age#1 = age#5)) Subquery a Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 Subquery b Subquery people LogicalRDD [name#4,age#5], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 //注意Optimized Logical Plan与Analyzed Logical Plan //并没有进行特别的优化,突出这一点是为了比较后面的子查询 //其Analyzed和Optimized间的区别 == Optimized Logical Plan == Project [name#0,age#1,name#4,age#5] Join Inner, Some((age#1 = age#5)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder ... //查看其Physical Plan scala innerQuery.queryExecution.sparkPlan res14: org.apache.spark.sql.execution.SparkPlan = TungstenProject [name#0,age#1,name#6,age#7] SortMergeJoin [age#1], [age#7] Scan PhysicalRDD[name#0,age#1] Scan PhysicalRDD[name#6,age#7]5. 子查询运行流程
执行语句:
scala val subQuery=sqlContext.sql("SELECT * FROM (SELECT * FROM people WHERE age = 13)a where a.age = 19") subQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]
查看整体执行流程:
scala subQuery.queryExecution res4: org.apache.spark.sql.SQLContext#QueryExecution = == Parsed Logical Plan == Project [unresolvedalias(*)] Filter (a.age = 19) Subquery a Project [unresolvedalias(*)] Filter (age = 13) UnresolvedRelation [people], None == Analyzed Logical Plan == name: string, age: int Project [name#0,age#1] Filter (age#1 = 19) Subquery a Project [name#0,age#1] Filter (age#1 = 13) Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 //这里需要注意Optimized与Analyzed间的区别 //Filter被进行了优化 == Optimized Logical Plan == Filter ((age#1 = 13) (age#1 = 19)) LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 == Physical Plan == Filter ((age#1 = 13) (age#1 = 19)) Scan PhysicalRDD[name#0,age#1] Code Generation: true6. 聚合SQL运行流程
执行语句:
scala val aggregateQuery=sqlContext.sql("SELECT a.name,sum(a.age) FROM (SELECT * FROM people WHERE age = 13)a where a.age = 19 group by a.name") aggregateQuery: org.apache.spark.sql.DataFrame = [name: string, _c1: bigint]
运行流程查看:
scala aggregateQuery.queryExecution res6: org.apache.spark.sql.SQLContext#QueryExecution = //注意Aggregate [a.name], [unresolvedalias(a.name),unresolvedalias(sum(a.age))] //即group by a.name被 parsed为unresolvedalias(a.name) == Parsed Logical Plan == Aggregate [a.name], [unresolvedalias(a.name),unresolvedalias(sum(a.age))] Filter (a.age = 19) Subquery a Project [unresolvedalias(*)] Filter (age = 13) UnresolvedRelation [people], None == Analyzed Logical Plan == name: string, _c1: bigint Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L] Filter (age#1 = 19) Subquery a Project [name#0,age#1] Filter (age#1 = 13) Subquery people LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22 == Optimized Logical Plan == Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L] Filter ((age#1 = 13) (age#1 = 19)) LogicalRDD [name#0,age#1], MapPartitions... //查看其Physical Plan scala aggregateQuery.queryExecution.sparkPlan res10: org.apache.spark.sql.execution.SparkPlan = TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Final,isDistinct=false)], output=[name#0,_c1#14L]) TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Partial,isDistinct=false)], output=[name#0,currentSum#17L]) Filter ((age#1 = 13) (age#1 = 19)) Scan PhysicalRDD[name#0,age#1]
其它SQL语句,大家可以使用同样的方法查看其执行流程,以掌握Spark SQL背后实现的基本思想。
第十二届 BigData NoSQL Meetup — 基于hbase的New sql落地实践 立即下载
相关文章
- springboot启动流程概述_简述app启动的主要流程
- 关于SAFe流程中PI Planning的认知迭代
- 软件测试|Python流程控制,你真的会了吗(二)
- 干货|六西格玛项目辅导的流程有哪些?
- 【运筹学】线性规划数学模型 ( 单纯形法原理 | 单纯形法流程 | 查找初始基可行解 )
- 【Android 逆向】APK 加壳脱壳现状 | 判断 APK 是否加壳 | APK 逆向流程
- 【Android 逆向】ART 脱壳 ( DexClassLoader 脱壳 | oat_file_assistant.cc 中涉及的 oat 文件生成流程 )
- 【Android 逆向】Dalvik 函数抽取加壳 ② ( 类加载流程分析 | ClassLoader#loadClass 分析 | BaseDexClassLoader#findClass 分析 )
- Python基础语法-控制流程语句-if
- LinearLayout 绘制流程解析详解手机开发
- Spark-Sql源码解析之二 Sqlparser:sql –> unresolved logical plan详解大数据
- 手机app微信支付后台处理流程详解编程语言
- 深入了解Linux启动过程(linux的启动流程)
- Linux运行 SQL: 获取自动化数据操作能力(linux执行sql)
- 文件Linux快速导入SQL文件的方法(linux导入sql)
- MySQL如何修改SQL语句?(mysql修改sql)
- 利用Redis改善系统缓存性能(redis缓存流程)
- 数据库迁移到mysql深度解析:从Oracle到MySQL的数据库迁移流程(如何将oracle)
- 字符串Oracle SQL中如何判断字符串(oracle sql判断)
- MSSQL中最佳的SQL语句编写技巧(sql mssql 语句)
- SQL与MySQL的异同——引起关注的相似之处(sql和mysql的区别)
- Linux 上使用 Yum 安装软件的流程(yumlinux)
- Oracle数据库中SQL文件的导入(.sql导入oracle)
- SQL玩转MySQL,数据库操作简单易学(mysql中使用sql)
- Oracle SQL实现求余运算(oracle sql求余)
- Oracle SQL基线检测实现安全性提升(oracle sql基线)
- Sql学习第一天——SQL练习题(建表/sql语句)
- Git客户端图文详解如何安装配置GitHub操作流程攻略