zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Apache Calcite 在 Flink 中的应用

2023-09-11 14:16:24 时间

1 Apache Calcite

1.1 什么是calcite

Apache Calcite 是一个动态数据的管理框架,可以用来构建数据库系统的语法解析模块,是高性能数据库的基础

  • Standard SQL:行业标准的SQL解析器、验证器和JDBC驱动程序
  • Query optimization: 用关系代数【relational algebra】表示查询,使用规划规则【planning rules】进行转换,并根据成本模型进行优化
  • Any data, anywhere:连接到第三方数据源,浏览元数据,并通过下推进行数据计算的优化
  • 不包含数据存储、数据处理等功能
  • 可以通过编写 Adaptor 来扩展功能,以支持不同的数据处理平台,目前实现的Adaptor,如jdbc、file、elasticsearch、MongoDB、redis、kafka等

Flink SQL 使用并对其扩展以支持 SQL 语句的解析和验证

1.2 谁在用calcite

下图是一张官方提供的生态系统图,可以看到大名鼎鼎的 Hive、Flink、Druid 以及 Spark、ES 等都可以被纳入 Calcite 生态圈。
在这里插入图片描述

1.3 基本概念

名称中文名称类名描述
Relational algebra关系代数RelNode关系表达式。它们通常以动词命名,
例如 Sort, Join, Project, Filter, Scan, Sample.
Row expressions行表达式RexNode例如RexLiteral (常量), RexVariable (变量), RexCall (调用) 等,
例如投影列表(Project)、过滤规则列表(Filter)、
JOIN 条件列表、ORDER BY 列表、WINDOW 表达式、函数调用等。
使用 RexBuilder 来构建行表达式。
Traits特征RelTraits表达式有各种特征(Trait):使用 Trait 的 satisfies() 方法来
测试某个表达式是否符合某 Trait 或 Convention.
Conventions转化特征Convention属于 Trait 的子类用于转化 RelNode 到具体平台实现
(可以将下文提到的 Planner 注册到 Convention 中).
例如 JdbcConvention,FlinkConventions.DATASTREAM 等。
同一个关系表达式的输入必须来自单个数据源,
各表达式之间通过 Converter 生成的 Bridge 来连接
flink实现的Convention:
LOGICAL: Convention
DATASET: Convention
DATASTREAM: Convention
Rules规则RelOptRule用于将一个表达式转换(Transform)为另一个表达式。
它有一个由 RelOptRuleOperand 组成的列表来决定
是否可将规则应用于树的某部分。
Planners规则器RelOptPlanner即请求优化器,它可以根据一系列规则和成本模型
(例如 基于成本的优化模型VolcanoPlanner启发式优化模型 HepPlanner
来将一个表达式转为语义等价(但效率更优)的另一个表达式
Programs程序Program

1.4 calcite架构

与传统数据库管理系统有一些相似之处,相比而言,它将数据存储、元数据存储和数据处理算法这些部分忽略掉了,这样设计带来的好处是:对于涉及多种数据源和多种计算引擎的应用(如flink、spark、hive等)而言,Calcite 因为可以兼容多种存储和计算引擎,使得 Calcite 可以提供统一查询服务,Calcite 将会是这些应用的最佳选择。

在 Calcite 架构中,最核心地方就是 Optimizer,即优化器,一个 Optimization Engine 包含三个组成部分:

  • rules配规则,Calcite 内置上百种 Rules 来优化 relational expression,当然也支持自定义 rules;
  • metadata providers:主要是向优化器提供信息,这些信息会有助于指导优化器向着目标(减少整体 cost)进行优化,信息可以包括行数、table 哪一列是唯一列等,也包括计算 RelNode 树中执行 subexpression cost 的函数;
  • planner engines:它的主要目标是进行触发 rules 来达到指定目标,比如像 cost-based optimizer(CBO)的目标是减少cost(Cost 包括处理的数据行数、CPU cost、IO cost 等)。

在这里插入图片描述

1.5 calcite 处理流程

Sql 的执行过程一般可以分为下图中的四个阶段,Calcite 同样也是这样:

  • Parser: Calcite通过Java CC将SQL解析成未经校验的AST(Abstract Syntax Tree,抽象语法树
  • Validate: 主要作用是校验AST是否合法,如验证SQL schema、字段、函数等是否存在; SQL语句是否合法等. 验证完成之后生成RelNode树
  • Optimize: 主要的作用优化RelNode树, 并将其转化成物理执行计划。主要涉及SQL规则优化如:基于规则优化(RBO)及基于代价(CBO)优化; Optimze 这一步原则上来说是可选的, 通过Validate后的RelNode树已经可以直接转化物理执行计划,但现代的SQL解析器基本上都包括有这一步,目的是优化SQL执行计划。
  • Execute: 执行阶段。主要做的是:将物理执行计划转化成可在特定的平台执行的程序。如Hive与Flink都在在此阶段将物理执行计划CodeGen生成相应的可执行代码。

1.6 模块介绍

  • Catalog: 定义元数据和命名空间,包含 Schema(库)、Table(表)、RelDataType(类型信息)
    • schema: 主要定义schema与表的集合,schema 并不是强制一定需要的,比如说有两张同名的表T1、T2,就需要schema要区分这两张表,如A.T1, B.T1
    • table:对应关系数据库的表,代表一类数据,在calcite中由RelDataType定义,Statistic 用于统计表的相关数据、特别是在CBO用于计表计算表的代价
    • RelDataType: 代表Row的数据类型,如表数据列名称、类型等。
  • SQL Parser – 将用户编写的 SQL 语句转为 SqlNode 构成的抽象语法树(AST)
    • Calcite 使用 JavaCC 做 SQL 解析,JavaCC 根据 Calcite 中定义的 Parser.jj 模板生成 LL(k) 语法分析器,然后生成一系列的 java 代码,生成的 Java 代码会把 SQL 转换成 AST 的数据结构
    • 负责处理各个 Token,逐步生成一棵 SqlNode 组成的 AST
  • SQL Validator:使用 Catalog 中的元数据检验上述 SqlNode AST 并生成 RelNode 组成的 AST
  • Query Optimizer: 将 RelNode AST 转为逻辑计划,然后优化它,最终转为实际执行方案。以下是一些常见的优化规则(Rules):
    • 移除未使用的字段
    • 合并多个投影(projection)列表
    • 使用 JOIN 来代替子查询
    • 对 JOIN 列表重排序
    • 下推(push down)投影项
    • 下推过滤条件
      整体而言,Calcite 处理流程整体可以分为 Parse(语法和语义解析,生成 SqlNode 树)、Validate(验证各对象是否已在 Catalog 中注册)、Optimize(优化、生成 RelNode 树以及物理执行计划)、Execute(具体执行)四个阶段。

1.7 优化器

calcite主要采用两种优化器,详细可参见apache calcite 优化器(二) 分别是:
HepPlanner(RBO):一个启发式的优化器,按照规则进行匹配,直到达到次数限制(match 次数限制)或者遍历一遍后不再出现 rule match 的情况才算完成;
VolcanoPlanner(RBO+CBO):会一直迭代 rules,直到找到 cost 最小的 paln。

类图如下:可以看到,HiveVolcanoPlanner 是VolcanoPlanner的子类,是一种基于代价的优化器
在这里插入图片描述

1.7.1 基于规则优化(RBO)

基于规则的优化器(Rule-Based Optimizer,RBO):根据优化规则对关系表达式进行转换,
转换是指一个关系表达式经过优化规则后会变成另外一个关系表达式,同时原有表达式会被裁剪掉,经过一系列转换后生成最终的执行计划。

RBO 中包含了一套有着严格顺序的优化规则,同样一条 SQL,无论读取的表中数据是怎么样的,最后生成的执行计划都是一样的。同时,在 RBO 中 SQL 写法的不同很有可能影响最终的执行计划,从而影响执行计划的性能

1.7.2 基于成本优化(CBO)

基于代价的优化器(Cost-Based Optimizer,CBO):根据优化规则对关系表达式进行转换,
转换是指一个关系表达式经过优化规则后会生成另外一个关系表达式,同时原有表达式也会保留,经过一系列转换后会生成多个执行计划,然后 CBO 会根据统计信息和代价模型 (Cost Model) 计算每个执行计划的 Cost,从中挑选 Cost 最小的执行计划。

由上可知,CBO 中有两个依赖:统计信息和代价模型
统计信息的准确与否、代价模型的合理与否都会影响 CBO 选择最优计划。

从上述描述可知,CBO 是优于 RBO 的,原因是 RBO 是一种只认规则,对数据不敏感的呆板的优化器,而在实际过程中,数据往往是有变化的,通过 RBO 生成的执行计划很有可能不是最优的。事实上目前各大数据库和大数据计算引擎都倾向于使用 CBO,但是对于流式计算引擎来说,使用 CBO 还是有很大难度的,因为并不能提前预知数据量等信息,这会极大地影响优化效果,CBO 主要还是应用在离线的场景

1.8 calcite针对流计算的支持

详见calcite stream

Calcite 支持部分 SQL 流处理语句,也提供了对 Tumbling / Hopping / Sliding / Cascading 等类型 Window 的支持,而 Flink 则把 Window 分为 Tumbling、Sliding (Hopping in SQL)、Session、Global 等类型,与 Calcite 提供的并不完全一致。

目前 Calcite 流处理语句已实现对 SELECT, WHERE, GROUP BY, HAVING, UNION ALL, ORDER BY 以及 FLOOR, CEIL 函数的支持。

2 flink中的calcite

2.1 Table API 与 SQL 模块以 Calcite 为核心

在Flink 系统结构中,Table API 与 SQL 模块以 Calcite 为核心,大量用到 Calcite 的各种类和方法。

在Flink Table 模块的内部架构图中,可以看到它以 Calcite Catalog 为核心,上面承载了 Table API 和 SQL API 两套表达方式,最后殊途同归,统一生成为 Calcite Logical Plan(SqlNode 树);随后验证、优化为 RelNode 树,最终通过 Rules(规则)和 Convention(转化特征)生成具体的 DataSet Plan(批处理)或 DataStream Plan(流处理),即 Flink 算子构成的处理逻辑。
在这里插入图片描述

2.2 Table API 和 SQL 的转换流程

Table API 和 SQL 的转换流程如下,绿色的节点代表 Flink Table Nodes,蓝色的节点代表 Calcite Logical Nodes。最终都转化成了相同的 Logical Plan 表现形式。
在这里插入图片描述
之后会进入优化器,Calcite 会基于优化规则来优化这些 Logical Plan,根据运行环境的不同会应用不同的优化规则(Flink提供了批的优化规则,和流的优化规则)。
这里的优化规则分为两类:

  • 一类是Calcite提供的内置优化规则(如条件下推,剪枝等),
  • 另一类是是将Logical Node转变成 Flink Node 的规则。
    这两类规则的应用体现为下图中的①和②步骤,这两步骤都属于 Calcite 的优化阶段。
    得到的 DataStream Plan 封装了如何将节点翻译成对应 DataStream/DataSet 程序的逻辑。
    步骤③就是将不同的 DataStream/DataSet Node 通过代码生成(CodeGen)翻译成最终可执行的 DataStream/DataSet 程序。
    在这里插入图片描述
    代码生成是 Table API & SQL 中最核心的一块内容。表达式、条件、内置函数等等是需要CodeGen生成具体的Function 代码的,这部分跟Spark SQL的结构很相似。CodeGen 生成的Function以字符串的形式存在。在提交任务后会分发到各个 TaskManager 中运行,在运行时会使用 Janino 编译器编译代码后运行。

Janino的编译可参见:Apache Calcite VolcanoPlanner代价计算解析与源码调试方法

2.3 在flink-sql-parser中导入calcite的依赖

在flink-table中,包含了flink-sql-parser、flink-sql-parser-hive等模块,并在sql-parser模块中导入了calcite的依赖
目前使用calcite版本是1.26.0

<dependency>
      <groupId>org.apache.calcite</groupId>
      <artifactId>calcite-core</artifactId>
      <version>1.26.0</version>
      <scope>compile</scope>
      <exclusions>
        <exclusion>
          <artifactId>avatica-metrics</artifactId>
          <groupId>org.apache.calcite.avatica</groupId>
        </exclusion>
        <exclusion>
          <artifactId>avatica-server</artifactId>
          <groupId>org.apache.calcite.avatica</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-io</artifactId>
          <groupId>commons-io</groupId>
        </exclusion>
        <exclusion>
          <artifactId>protobuf-java</artifactId>
          <groupId>com.google.protobuf</groupId>
        </exclusion>
        <exclusion>
          <artifactId>httpclient</artifactId>
          <groupId>org.apache.httpcomponents</groupId>
        </exclusion>
        <exclusion>
          <artifactId>httpcore</artifactId>
          <groupId>org.apache.httpcomponents</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-dbcp2</artifactId>
          <groupId>org.apache.commons</groupId>
        </exclusion>
        <exclusion>
          <artifactId>esri-geometry-api</artifactId>
          <groupId>com.esri.geometry</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jackson-dataformat-yaml</artifactId>
          <groupId>com.fasterxml.jackson.dataformat</groupId>
        </exclusion>
        <exclusion>
          <artifactId>sketches-core</artifactId>
          <groupId>com.yahoo.datasketches</groupId>
        </exclusion>
        <exclusion>
          <artifactId>aggdesigner-algorithm</artifactId>
          <groupId>net.hydromatic</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jackson-core</artifactId>
          <groupId>com.fasterxml.jackson.core</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jackson-databind</artifactId>
          <groupId>com.fasterxml.jackson.core</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jackson-annotations</artifactId>
          <groupId>com.fasterxml.jackson.core</groupId>
        </exclusion>
        <exclusion>
          <artifactId>json-path</artifactId>
          <groupId>com.jayway.jsonpath</groupId>
        </exclusion>
        <exclusion>
          <artifactId>joda-time</artifactId>
          <groupId>joda-time</groupId>
        </exclusion>
        <exclusion>
          <artifactId>calcite-linq4j</artifactId>
          <groupId>org.apache.calcite</groupId>
        </exclusion>
        <exclusion>
          <artifactId>janino</artifactId>
          <groupId>org.codehaus.janino</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-compiler</artifactId>
          <groupId>org.codehaus.janino</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jsr305</artifactId>
          <groupId>com.google.code.findbugs</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-lang3</artifactId>
          <groupId>org.apache.commons</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-codec</artifactId>
          <groupId>commons-codec</groupId>
        </exclusion>
        <exclusion>
          <artifactId>uzaygezen-core</artifactId>
          <groupId>com.google.uzaygezen</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.calcite</groupId>
      <artifactId>calcite-core</artifactId>
      <version>1.26.0</version>
      <type>test-jar</type>
      <scope>test</scope>
      <exclusions>
        <exclusion>
          <artifactId>avatica-server</artifactId>
          <groupId>org.apache.calcite.avatica</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jackson-core</artifactId>
          <groupId>com.fasterxml.jackson.core</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jackson-databind</artifactId>
          <groupId>com.fasterxml.jackson.core</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jackson-annotations</artifactId>
          <groupId>com.fasterxml.jackson.core</groupId>
        </exclusion>
        <exclusion>
          <artifactId>uzaygezen-core</artifactId>
          <groupId>com.google.uzaygezen</groupId>
        </exclusion>
        <exclusion>
          <artifactId>calcite-linq4j</artifactId>
          <groupId>org.apache.calcite</groupId>
        </exclusion>
        <exclusion>
          <artifactId>esri-geometry-api</artifactId>
          <groupId>com.esri.geometry</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jackson-dataformat-yaml</artifactId>
          <groupId>com.fasterxml.jackson.dataformat</groupId>
        </exclusion>
        <exclusion>
          <artifactId>json-path</artifactId>
          <groupId>com.jayway.jsonpath</groupId>
        </exclusion>
        <exclusion>
          <artifactId>sketches-core</artifactId>
          <groupId>com.yahoo.datasketches</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-codec</artifactId>
          <groupId>commons-codec</groupId>
        </exclusion>
        <exclusion>
          <artifactId>aggdesigner-algorithm</artifactId>
          <groupId>net.hydromatic</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-dbcp2</artifactId>
          <groupId>org.apache.commons</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-lang3</artifactId>
          <groupId>org.apache.commons</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-io</artifactId>
          <groupId>commons-io</groupId>
        </exclusion>
        <exclusion>
          <artifactId>commons-compiler</artifactId>
          <groupId>org.codehaus.janino</groupId>
        </exclusion>
        <exclusion>
          <artifactId>janino</artifactId>
          <groupId>org.codehaus.janino</groupId>
        </exclusion>
      </exclusions>
    </dependency>

3 参考

Apache Calcite 功能简析及在 Flink 的应用
apache calcite adapter
calcite基础概念
Flink 原理与实现:Table & SQL API