zl程序教程

您现在的位置是:首页 >  其他

当前栏目

spark sql非join情况的谓词下推优化器PushPredicateThroughNonJoin

2023-03-14 22:57:57 时间

spark sql谓词下推逻辑优化器PushDownPredicates包含了三个规则:

PushPredicateThroughNonJoin是sparksql中非join情况的谓词下推的逻辑执行计划优化器

谓词可以下推的前提:不影响查询结果,即要保证下推前和下推后两个sql执行得到的效果相同

PushPredicateThroughNonJoin优化器处理了6类可以下推的情况

处理Filter节点下为Project节点的情况

为啥要project里的字段必须是确定性的才能下推?

因为如果project里的字段是非确定性的话,下推前和下推后的查询效果不一样

比如:

sql里用到了monotonically_increasing_id()函数(产生64位整数自增id的非确定性expression)

select a,b,id from (
  select  A,B,monotonically_increasing_id() as id from 
  testdata2  where a>2 
)tmp  where  b<1
如果下推,就相当于:

select a,b,id from (
  select  A,B,monotonically_increasing_id() as id from 
  testdata2  where a>2 and  b<1
)tmp

上面两个sql相比,过滤a>2 和 过滤(a>2 and b<1)两种情况下,该sql的数据得到的对应的自增id的情况是不一样的

其它的还有rand()函数, 过滤a>2 和 过滤(a>2 and b<1)两种情况下,取rand() 的效果肯定也是不一样的

处理Filter节点下为Aggregate节点的情况
select a,b from (
  select  A,B,count(1) as c from testdata2 
  where a>2  group by a,b
)tmp where c=1 and b<5

c字段是由子查询count(1)得来的,c=1不能下推,而b<5下推了

处理Filter节点下为Window节点的情况

这个和处理Aggregate有点相似,可以下推的条件:

  • 谓词的表达式必须是窗口聚合的分区key
  • 谓词必须是确定性的
select a,b,rn from (
select  A,B,row_number() over(partition by a order by b desc )
 as rn from testdata2
)tmp where a>1 and b<5

a>1下推到window函数执行之前了,因为b不在partition by后的字段中,因此b<5

没有被下推

处理Filter节点下为Union节点的情况
select tmpc from (
select  A as tmpc  from testdata2
union all
select  b as tmpc  from testdata2
) tmp where tmpc>1 and rand()>0.1

确实性的字段被下推了,而非确定性的rand()函数没有被下推

处理Filter节点下为其他节点的情况

其他节点,列出了可以用统一逻辑处理下推的节点:

def canPushThrough(p: UnaryNode): Boolean = p match {
    // Note that some operators (e.g. project, aggregate, union) are being handled separately
    // (earlier in this rule).
    case _: AppendColumns => true
    case _: Distinct => true
    case _: Generate => true
    case _: Pivot => true
    case _: RepartitionByExpression => true
    case _: Repartition => true
    case _: ScriptTransformation => true
    case _: Sort => true
    case _: BatchEvalPython => true
    case _: ArrowEvalPython => true
    case _ => false
  }

统一处理的逻辑:

总结
  • 非join情况下,PushPredicateThroughNonJoin可以优化的情况:Filter节点子节点为Project、Aggregate、Window、Union、EventTimeWatermark(实时的情况)、 AppendColumns 、 Distinct 、 Generate 、Pivot 、RepartitionByExpression 、 Repartition 、 ScriptTransformation 、 Sort 、BatchEvalPython 、ArrowEvalPython 的情况下,可进行优化操作
  • 字段或者表达式为确定性的是非常重要的条件,在做优化时,一般会把Filter中的condition以是否确定性_.deterministic 给分成可下推的部分和不可下推的部分,分别做操作