Spark-Sql源码解析之八 Codegen详解大数据
2023-06-13 09:20:26 时间
Codegen,动态字节码技术,那么什么是动态字节码技术呢?先看来一段代码,假设SparkPlan为Sort
case class Sort( sortOrder: Seq[SortOrder], global: Boolean, child: SparkPlan) extends UnaryNode { override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil protected override def doExecute(): RDD[Row] = attachTree(this, "sort") { child.execute().mapPartitions( { iterator = val ordering = newOrdering(sortOrder, child.output) iterator.map(_.copy()).toArray.sorted(ordering).iterator }, preservesPartitioning = true) override def output: Seq[Attribute] = child.output override def outputOrdering: Seq[SortOrder] = sortOrder abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable { protected def newOrdering(order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[Row] = { if (codegenEnabled) {//开启动态字节码技术 GenerateOrdering.generate(order, inputSchema) } else {//否则关闭 new RowOrdering(order, inputSchema) }
可见针对Sort的SparkPlan,针对是否开启动态字节码技术的情况下会发生两种情况:当关闭的时候,其Compare函数如下:
class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] { def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = this(ordering.map(BindReferences.bindReference(_, inputSchema))) def compare(a: Row, b: Row): Int = { var i = 0 while (i ordering.size) { val order = ordering(i) val left = order.child.eval(a)//虚函数调用,然后装箱 val right = order.child.eval(b)//虚函数调用,然后装箱 if (left == null right == null) { // Both null, continue looking. } else if (left == null) { return if (order.direction == Ascending) -1 else 1 } else if (right == null) { return if (order.direction == Ascending) 1 else -1 } else { val comparison = order.dataType match { case n: AtomicType if order.direction == Ascending = n.ordering.asInstanceOf[Ordering[Any]].compare(left, right)//调用具体对象的compare函数 case n: AtomicType if order.direction == Descending = n.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)//调用具体对象的compare函数 case other = sys.error(s"Type $other does not support ordered operations") if (comparison != 0) return comparison i += 1 return 0 }
其涉及到虚函数调用及装箱,虚函数的调用相对普通函数而言比较耗时。
当开启动态字节码技术的时候,其Compare函数如下:
object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] with Logging { import scala.reflect.runtime.{universe = ru} import scala.reflect.runtime.universe._ protected def canonicalize(in: Seq[SortOrder]): Seq[SortOrder] = in.map(ExpressionCanonicalizer.execute(_).asInstanceOf[SortOrder]) protected def bind(in: Seq[SortOrder], inputSchema: Seq[Attribute]): Seq[SortOrder] = in.map(BindReferences.bindReference(_, inputSchema)) protected def create(ordering: Seq[SortOrder]): Ordering[Row] = { val a = newTermName("a") val b = newTermName("b") val comparisons = ordering.zipWithIndex.map { case (order, i) = val evalA = expressionEvaluator(order.child) val evalB = expressionEvaluator(order.child) val compare = order.child.dataType match { case BinaryType = q""" val x = ${if (order.direction == Ascending) evalA.primitiveTerm else evalB.primitiveTerm}//直接指定类型,不涉及虚函数调用 val y = ${if (order.direction != Ascending) evalB.primitiveTerm else evalA.primitiveTerm}//直接指定类型,不涉及虚函数调用 var i = 0 while (i x.length i y.length) { val res = x(i).compareTo(y(i)) if (res != 0) return res i = i+1 return x.length - y.length """ case _: NumericType = q""" val comp = ${evalA.primitiveTerm} - ${evalB.primitiveTerm}//直接指定类型 if(comp != 0) { return ${if (order.direction == Ascending) q"comp.toInt" else q"-comp.toInt"} """ case StringType = if (order.direction == Ascending) { q"""return ${evalA.primitiveTerm}.compare(${evalB.primitiveTerm})"""//直接指定类型,不涉及虚函数调用 } else { q"""return ${evalB.primitiveTerm}.compare(${evalA.primitiveTerm})""" q""" i = $a ..${evalA.code} i = $b ..${evalB.code} if (${evalA.nullTerm} ${evalB.nullTerm}) { // Nothing } else if (${evalA.nullTerm}) { return ${if (order.direction == Ascending) q"-1" else q"1"} } else if (${evalB.nullTerm}) { return ${if (order.direction == Ascending) q"1" else q"-1"} } else { $compare """ val q"class $orderingName extends $orderingType { ..$body }" = reify { class SpecificOrdering extends Ordering[Row] { val o = ordering }.tree.children.head val code = q""" class $orderingName extends $orderingType { ..$body def compare(a: $rowType, b: $rowType): Int = { var i: $rowType = null // Holds current row being evaluated. ..$comparisons return 0 new $orderingName() """ logDebug(s"Generated Ordering: $code") toolBox.eval(code).asInstanceOf[Ordering[Row]] }
可见动态字节码技术中不涉及虚函数的调用,其本质就是scala的反射机制。关于虚调用为什么耗时的原因如下:
以具体的SQL语句 select a+b fromtable 为例进行说明,下面是它的解析过程: 1.调用虚函数Add.eval(),需确认Add两边数据类型 2.调用虚函数a.eval(),需要确认a的数据类型 3.确认a的数据类型是int,装箱 4.调用虚函数b.eval(),需确认b的数据类型 5.确认b的数据类型是int,装箱 6.调用int类型的add 7.返回装箱后的计算结果 从上面的步骤可以看出,一条SQL语句的解析需要进行多次虚函数的调用。我们知道,虚函数的调用会极大的降低效率。那么,虚函数的调用为什么会影响效率呢? 有人答案是:虚函数调用会进行一次间接寻址过程。事实上这一步间接寻址真的会显著降低运行效率?显然不是。 流水线的打断才是真正降低效率的原因。 我们知道,虚函数的调用时是运行时多态,意思就是在编译期你是无法知道虚函数的具体调用。设想一下,如果说不是虚函数,那么在编译时期,其相对地址是确定的,编译器可以直接生成jmp/invoke指令; 如果是虚函数,多出来的一次查找vtable所带来的开销,倒是次要的,关键在于,这个函数地址是动态的,譬如 取到的地址在eax里,则在call eax之后的那些已经被预取进入流水线的所有指令都将失效。流水线越长,一次分支预测失败的代价也就越大,如下所示: pf- test 001E146D mov eax,dword ptr[pf] 011E1470 mov edx,dword,ptr[eax] 011E1472 mov esi,esp 011E1474 mov ecx,dword ptr[pf] 011E1477 mov eax,dword ptr[edx] 011E1479 eax -----------------------分支预测失败 011E147B cmp esi esp 011E147D @ILT+355(__RTC_CheckEsp)(11E1168h)
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9306.html
分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集相关文章
- Spark-Sql源码解析之七 Execute: executed Plan -> RDD[Row]详解大数据
- 快速执行Oracle SQL文件的姿势(oracle执行sql文件)
- sql通过Oracle实现批量执行SQL语句(oracle批量执行)
- Oracle访问SQL:初探编写方式(oracle访问sql)
- Oracle数据库删除数据的常用SQL语句(oracle删除sql)
- 深度挖掘MSSQL:从SQL手工注入谈开始(sql手工注入mssql)
- SQL 同步MSSQL实现数据一致性(sql 同步mssql)
- MySQL数据库备份还原SQL操作指南(mysql数据还原sql)
- 探索Oracle的SQL跟踪工具:优化数据库性能的重要利器(oracle跟踪sql工具)
- 精通Oracle元数据之SQL编程(oracle元数据sql)
- 在SQL中使用Redis实现数据存储(sql 操作redis)
- SQL与Redis实现数据的迅速存取(sql与redis)
- Oracle SQL语句实现列数据修改(oracle修改列sql)
- MySQL数据上传如何处理大型SQL文件大小(mysql上传sql大小)
- Oracle中SQL紧密相连(oracle中sql联系)
- 优化Oracle SQL调度的绝佳方法(oracle sql调度)
- Oracle SQL字符串截断技术研究(oracle sql截断)
- 掌握Oracle SQL写法,激发你的SQL能力(oracle sql写法)
- Oracle10 SQL进入新一代数据库时代(oracle10 sql)