zl程序教程

您现在的位置是:首页 >  其他

当前栏目

Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运行流程解析

流程SQLSpark入门 解析 运行 精通 之道
2023-09-14 09:00:24 时间
使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implic

使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

// Define the schema using a case class.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,

// you can use custom classes that implement the Product interface.

case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.

val people = sc.textFile("/examples/src/main/resources/people.txt").map(_.split(",")).map(p = Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age = 13 AND age = 19")

(1)查看teenagers的Schema信息

scala teenagers.printSchema

 |-- name: string (nullable = true)

 |-- age: integer (nullable = false)

(2)查看运行流程

scala teenagers.queryExecution

res3: org.apache.spark.sql.SQLContext#QueryExecution =

== Parsed Logical Plan ==

Project [unresolvedalias(name),unresolvedalias(age)]

 Filter ((age = 13) (age = 19))

 UnresolvedRelation [people], None

== Analyzed Logical Plan ==

name: string, age: int

Project [name#0,age#1]

 Filter ((age#1 = 13) (age#1 = 19))

 Subquery people

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

== Optimized Logical Plan ==

Filter ((age#1 = 13) (age#1 = 19))

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

== Physical Plan ==

Filter ((age#1 = 13) (age#1 = 19))

 Scan PhysicalRDD[name#0,age#1]

Code Generation: true

QueryExecution中表示的是整体Spark SQL运行流程,从上面的输出结果可以看到,一个SQL语句要执行需要经过下列步骤:

== (1)Parsed Logical Plan ==

Project [unresolvedalias(name),unresolvedalias(age)]

 Filter ((age = 13) (age = 19))

 UnresolvedRelation [people], None

== (2)Analyzed Logical Plan ==

name: string, age: int

Project [name#0,age#1]

 Filter ((age#1 = 13) (age#1 = 19))

 Subquery people

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

== (3)Optimized Logical Plan ==

Filter ((age#1 = 13) (age#1 = 19))

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

== (4)Physical Plan ==

Filter ((age#1 = 13) (age#1 = 19))

 Scan PhysicalRDD[name#0,age#1]

//启动动态字节码生成技术(bytecode generation,CG),提升查询效率

Code Generation: true
2.全表查询运行流程

执行语句:

val all= sqlContext.sql("SELECT * FROM people")

运行流程:

scala all.queryExecution

res9: org.apache.spark.sql.SQLContext#QueryExecution =

//注意*号被解析为unresolvedalias(*)

== Parsed Logical Plan ==

Project [unresolvedalias(*)]

 UnresolvedRelation [people], None

== Analyzed Logical Plan ==

//unresolvedalias(*)被analyzed为Schema中所有的字段

//UnresolvedRelation [people]被analyzed为Subquery people

name: string, age: int

Project [name#0,age#1]

 Subquery people

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

== Optimized Logical Plan ==

LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

== Physical Plan ==

Scan PhysicalRDD[name#0,age#1]

Code Generation: true

3. filter查询运行流程

执行语句:


scala val filterQuery= sqlContext.sql("SELECT * FROM people WHERE age = 13 AND age = 19")

filterQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]

执行流程:

scala filterQuery.queryExecution

res0: org.apache.spark.sql.SQLContext#QueryExecution =

== Parsed Logical Plan ==

Project [unresolvedalias(*)]

 Filter ((age = 13) (age = 19))

 UnresolvedRelation [people], None

== Analyzed Logical Plan ==

name: string, age: int

Project [name#0,age#1]

 //多出了Filter,后同

 Filter ((age#1 = 13) (age#1 = 19))

 Subquery people

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :20

== Optimized Logical Plan ==

Filter ((age#1 = 13) (age#1 = 19))

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :20

== Physical Plan ==

Filter ((age#1 = 13) (age#1 = 19))

 Scan PhysicalRDD[name#0,age#1]

Code Generation: true

val joinQuery= sqlContext.sql("SELECT * FROM people a, people b where a.age=b.age")

查看整体执行流程

scala joinQuery.queryExecution

res0: org.apache.spark.sql.SQLContext#QueryExecution =

//注意Filter

//Join Inner

== Parsed Logical Plan ==

Project [unresolvedalias(*)]

 Filter (a.age = b.age)

 Join Inner, None

 UnresolvedRelation [people], Some(a)

 UnresolvedRelation [people], Some(b)

== Analyzed Logical Plan ==

name: string, age: int, name: string, age: int

Project [name#0,age#1,name#2,age#3]

 Filter (age#1 = age#3)

 Join Inner, None

 Subquery a

 Subquery people

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

 Subquery b

 Subquery people

 LogicalRDD [name#2,age#3], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

== Optimized Logical Plan ==

Project [name#0,age#1,name#2,age#3]

 Join Inner, Some((age#1 = age#3))

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4]...

//查看其Physical Plan

scala joinQuery.queryExecution.sparkPlan

res16: org.apache.spark.sql.execution.SparkPlan =

TungstenProject [name#0,age#1,name#2,age#3]

 SortMergeJoin [age#1], [age#3]

 Scan PhysicalRDD[name#0,age#1]

 Scan PhysicalRDD[name#2,age#3]

前面的例子与下面的例子等同,只不过其运行方式略有不同,执行语句:

scala val innerQuery= sqlContext.sql("SELECT * FROM people a inner join people b on a.age=b.age")

innerQuery: org.apache.spark.sql.DataFrame = [name: string, age: int, name: string, age: int]

查看整体执行流程:

scala innerQuery.queryExecution

res2: org.apache.spark.sql.SQLContext#QueryExecution =

//注意Join Inner

//另外这里面没有Filter

== Parsed Logical Plan ==

Project [unresolvedalias(*)]

 Join Inner, Some((a.age = b.age))

 UnresolvedRelation [people], Some(a)

 UnresolvedRelation [people], Some(b)

== Analyzed Logical Plan ==

name: string, age: int, name: string, age: int

Project [name#0,age#1,name#4,age#5]

 Join Inner, Some((age#1 = age#5))

 Subquery a

 Subquery people

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

 Subquery b

 Subquery people

 LogicalRDD [name#4,age#5], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

//注意Optimized Logical Plan与Analyzed Logical Plan

//并没有进行特别的优化,突出这一点是为了比较后面的子查询

//其Analyzed和Optimized间的区别

== Optimized Logical Plan ==

Project [name#0,age#1,name#4,age#5]

 Join Inner, Some((age#1 = age#5))

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder ...

//查看其Physical Plan

scala innerQuery.queryExecution.sparkPlan

res14: org.apache.spark.sql.execution.SparkPlan =

TungstenProject [name#0,age#1,name#6,age#7]

 SortMergeJoin [age#1], [age#7]

 Scan PhysicalRDD[name#0,age#1]

 Scan PhysicalRDD[name#6,age#7]

5. 子查询运行流程

执行语句:

scala val subQuery=sqlContext.sql("SELECT * FROM (SELECT * FROM people WHERE age = 13)a where a.age = 19")

subQuery: org.apache.spark.sql.DataFrame = [name: string, age: int]

查看整体执行流程:


scala subQuery.queryExecution

res4: org.apache.spark.sql.SQLContext#QueryExecution =

== Parsed Logical Plan ==

Project [unresolvedalias(*)]

 Filter (a.age = 19)

 Subquery a

 Project [unresolvedalias(*)]

 Filter (age = 13)

 UnresolvedRelation [people], None

== Analyzed Logical Plan ==

name: string, age: int

Project [name#0,age#1]

 Filter (age#1 = 19)

 Subquery a

 Project [name#0,age#1]

 Filter (age#1 = 13)

 Subquery people

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

//这里需要注意Optimized与Analyzed间的区别

//Filter被进行了优化

== Optimized Logical Plan ==

Filter ((age#1 = 13) (age#1 = 19))

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

== Physical Plan ==

Filter ((age#1 = 13) (age#1 = 19))

 Scan PhysicalRDD[name#0,age#1]

Code Generation: true

6. 聚合SQL运行流程

执行语句:

scala val aggregateQuery=sqlContext.sql("SELECT a.name,sum(a.age) FROM (SELECT * FROM people WHERE age = 13)a where a.age = 19 group by a.name")

aggregateQuery: org.apache.spark.sql.DataFrame = [name: string, _c1: bigint]

运行流程查看:


scala aggregateQuery.queryExecution

res6: org.apache.spark.sql.SQLContext#QueryExecution =

//注意Aggregate [a.name], [unresolvedalias(a.name),unresolvedalias(sum(a.age))]

//即group by a.name被 parsed为unresolvedalias(a.name)

== Parsed Logical Plan ==

Aggregate [a.name], [unresolvedalias(a.name),unresolvedalias(sum(a.age))]

 Filter (a.age = 19)

 Subquery a

 Project [unresolvedalias(*)]

 Filter (age = 13)

 UnresolvedRelation [people], None

== Analyzed Logical Plan ==

name: string, _c1: bigint

Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]

 Filter (age#1 = 19)

 Subquery a

 Project [name#0,age#1]

 Filter (age#1 = 13)

 Subquery people

 LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at rddToDataFrameHolder at console :22

== Optimized Logical Plan ==

Aggregate [name#0], [name#0,sum(cast(age#1 as bigint)) AS _c1#9L]

 Filter ((age#1 = 13) (age#1 = 19))

 LogicalRDD [name#0,age#1], MapPartitions...

//查看其Physical Plan

scala aggregateQuery.queryExecution.sparkPlan

res10: org.apache.spark.sql.execution.SparkPlan =

TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Final,isDistinct=false)], output=[name#0,_c1#14L])

 TungstenAggregate(key=[name#0], functions=[(sum(cast(age#1 as bigint)),mode=Partial,isDistinct=false)], output=[name#0,currentSum#17L])

 Filter ((age#1 = 13) (age#1 = 19))

 Scan PhysicalRDD[name#0,age#1]

其它SQL语句,大家可以使用同样的方法查看其执行流程,以掌握Spark SQL背后实现的基本思想。


第十二届 BigData NoSQL Meetup — 基于hbase的New sql落地实践 立即下载