zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

基于InLong采集Mysql数据

2023-02-26 09:50:48 时间

前言

目前用户常用的两款大数据架构包括EMR(数据建模和建仓场景,支持hive、spark、presto等引擎)和DLC(数据湖分析场景,引擎支持spark、presto引擎),其中EMR场景存储为HDFS(支持本地盘和对象存储cos),数据格式支持Iceberg、orc、parquet、text等,均支持内外表;DLC场景存储为cos,内表数据格式为Iceberg,外表数据格式为orc和text。下文通过离线和实时两种模式描述如何通过Inlong实现mysql数据的同步到HDFS和DLC,同时实现下游用户可读。

采集方案能力组合

场景

类型

模式

建议场景

场景+方案推荐度

EMR

离线

Append

数据表 日志型

数据表:有保留数据天级变更状态诉求,推荐采用此方案 1、读取数据对采集源端产生压力; 2、终态数据需要业务根据主键合并; 3、增量模式采用分区的处理办法,分区可以保留源端数据变更的全状态记录;日志型:日志使用该方案较少

Overwrite

数据表

数据表:无保留数据变更状态诉求,推荐采用此方案 1、读取数据对采集源端产生压力; 2、重写过程中hive会锁定,无法使用; 3、终态数据需要业务根据主键合并; 4、全量模式适用小表,增量模式适用大表;

实时

Append

数据表 日志型

数据表:降低源端压力诉求或者binlog备份可采用此方案 1、Binlog的方式,读取数据对采集源端压力较小; 2、终态数据需要业务合并; 3、增量适用数据大小表;日志型:推荐方案

DLC

离线

Append

数据表 日志型

数据表:方案等同EMR-Append,但是DLC底层支持upsert语义,此方案并不建议 1、读取数据对采集源端产生压力; 2、终态数据需要业务合并; 3、增量模式采用分区的处理办法,分区可以保留源端数据变更的全状态记录日志型:日志使用该方案较少

Overwrite

数据表

数据表:方案等同EMR-Overwrite,但是DLC底层支持upsert语义,此方案并不建议 1、读取数据对采集源端产生压力; 2、重写过程中hive会锁定,无法使用; 3、终态数据需要业务根据主键合并; 4、全量模式适用小表,增量模式适用大表

Upsert

数据表

数据表:推荐采用此方案 1、读取数据对采集源端产生压力; 2、终态数据需要借助引擎的小文件合并能力

实时

Append

数据表 日志型

数据表:此方案适用binlog备份 1、 binlog的方式,读取数据对采集源端压力较小; 2、终态数据需要业务合并; 3、增量模式下能感知物理删除日志型:推荐方案

Upsert

数据表

数据表:推荐采用此方案 1、binlog的方式,读取数据对采集源端压力较小; 2、需要开启小文件合并; 3、数据维护成本较低

场景推荐方案

场景

用户诉求

选型

接入成本

EMR

离线同步,保留历史状态

Append+Hive分区表+Hive全量表+目标视图

离线同步,不保留历史状态

Overwrite+Hive非分区表+Hive全量表+目标视图

相对低

离线同步-小表

Overwrite模式+Hive目标表

实时同步

Append+Hive分区表+Hive全量表+目标视图

DLC

离线同步

Upsert+Hive非分区目标表

实时同步

Upsert+Hive目标表

EMR场景-CDH相同

EMR场景现在主流存储格式为HDFS-ORC。数据采集到HDFS主要由离线和实时两类方案,离线引擎为DataX,实时引擎为Flink。系统架构图如下:

说明:目前离线支持Append和Overwrite模式,实时支持Append模式,下文展开各模式的数据处理方案。

离线类型

离线采集类型目前支持两种写入模式,Append适用于增量、Overwrite适用于小表全量和大表增量场景,因HDFS数据本身不具备更新能力,所以在增量场景下需要额外的Merge任务对数据进行加工处理。

Append模式

Append模式下可以写入hive非分区表或者分区表,两类表的数据都需要落地之后经过任务合并处理。但Mysql端可能存在大量的DML操作,非分区表在积累一定时间周期后读取最新数据成本会越来越高,所以建议写入hive分区表。

备注:数据唯一性问题

1、 单表场景:单表数据存在主键即可保证数据唯一性

2、 分库分表场景:数据表的主键非全局唯一,需要业务使用方识别;数据表的主键全局唯一,数据唯一性正常

分区表

配置方式

Mysql源表

CREATE DATABASE streamdemo;
USE streamdemo;
CREATE TABLE `productinfo` (
  `product_id` int(11) NOT NULL COMMENT '商品ID',
  `product_name` varchar(100) DEFAULT NULL COMMENT '商品名称',
  `price` float DEFAULT NULL COMMENT '商品价格',
  `create_time` datetime DEFAULT NULL COMMENT '上架时间',
  `update_time` datetime DEFAULT NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `description` varchar(500) DEFAULT NULL COMMENT '商品描述',
  PRIMARY KEY (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

Hive 分区表

CREATE TABLE `default.productinfo_offline`(
    `product_id` int COMMENT '商品ID',
    `product_name` string COMMENT '商品名称',
    `price` float COMMENT '商品价格',
    `create_time` timestamp COMMENT '上架时间',
    `update_time` timestamp COMMENT '更新时间',
    `description` string COMMENT '商品描述'
  ) PARTITIONED BY (`pt` string) stored as orc

任务配置

说明点:

1. 通常离线采集上一天数据,示例是根据update_time采集,需要在《筛选条件》处填写 update_time=${yyyyMMdd-1d},时间函数参考详见数据集成 时间参数说明-操作指南-文档中心-腾讯云

2. 这里的分区定义是根据productinfo的update_time生成,同时源字段是datetime类型,Hive分区字段pt是string类型,需要经过DATE_FORMAT(update_time,'%Y-%m-%d')做转换;

3. 离线同步采用的源端数据库函数,当前示例mysql数据源

源表配置

函数

目标表配置

DATE_FORMAT(update_time,'%Y-%m-%d')

Mysql函数

pt

字段配置示例

数据合并流程

因Append模式写入的数据并不会对主键去重,所以完成一次采集后需要经过下游业务去重处理。逻辑如下:

操作流程描述:以下操作基于2023年1月14开始,后续时间自行替换

1. Inlong实时将1月14号及之前的全量数据(任务配置选择全量模式即可:第一次运行会全量同步,目标分区表根据update_time动态生成多个分区)写入${T}_增量_分区表。以下操作只在第一初始化操作:第一次的全量表${T}_全量_{20230113}数据通过insert into select * 的方式全取自第一同步的${T}_增量_分区表

2. Inlong的任务根据业务时间(示例场景为update_time)写到对应分区

3. Wedata配置定时任务生成一个${T}_全量_{20230113}数据表,处理逻辑:${T}_全量_{20230114}=${T}_全量_{20230113}+${T}_增量_{20230114}(根据业务属性做Merge)

4. Wedata完成Merge之后,基于${T}_全量_{20230113} =${T}视图(此处也可以不采用视图方案,${T}_全量只保留一份,每次采用Overwrite模式写入。此方案优势:不用清除历史${T}_全量_{时间}数据和维护视图缺点:Overwrite模式可能存在风险,合并过程中存在锁)

5. 系统重复1-4步骤

数据合并逻辑

实现原理:当天分区最新的数据+上一天之前的全量数据(根据当天分区数据主键去除当天数据),合并代码如下

#合并逻辑 (基于 union + row_number, 优点完全去重, 缺点资源浪费)
create table IF NOT EXISTS `T_全量_20230114` as
select
    product_id,
    product_name,
    price,
    create_time,
    update_time,
    description
from
    (
        select
            *,
            row_number() over(
                partition by `product_id`
                order by
                    `update_time` desc
            ) as rn
        from
            (
                select
                    *
                from
                    `T_全量_20230113`
                union
                all
                select
                    *
                from
                    `T_增量_分区表`
                where pt = '20230114'
            ) a
    ) b
where
    b.rn = 1

#创建视图
alter view `T` as select * from `T_全量_20230114`
关注点

1. 关注点1:什么时候做合并和创建视图

答复:离线分区写入任务可配置成Wedata调度任务,调度任务完成通过依赖触发下游的Merge任务

2. 关注点2:数据修复如何处理

答复:人工处理

3. 关注点3:DDL操作如何处理

答复:人工处理

4. 关注点4:下游读取view过程中,alter view是否会死锁

答复:alter view可以正常执行,下游读取的是view对应的源表,不影响

关注点5:${T}_全量_{时间}占用大量存储空间

答复:借助数据治理能力,设置该类表的生命周期为N天,定时删除历史数据

非分区表

配置基本等同于分区表配置,差异点主要在以下两方面

1. 任务配置阶段不需要对update_time做字段转换映射hive的分区字段

2. 任务Merge过程根据update_time读取数据,但是因为非分区表,当前扫描文件的量较大,同时随着时间的积累,文件会越来越多,导致性能会越来越差。

Overwrite模式

全量表场景

导入任务写入同一个表,每次导入都是讲全量的最新数据写入到目标表,下游可直接使用

配置方式

增量表场景

增量模式的数据处理逻辑类似Append模式下的分区表逻辑,区别主要在导入表是通过Overwrite模式,通过增量数据覆盖,无法追溯数据的历史状态。

配置方式

Mysql源表

CREATE DATABASE streamdemo;
USE streamdemo;
CREATE TABLE `productinfo` (
  `product_id` int(11) NOT NULL COMMENT '商品ID',
  `product_name` varchar(100) DEFAULT NULL COMMENT '商品名称',
  `price` float DEFAULT NULL COMMENT '商品价格',
  `create_time` datetime DEFAULT NULL COMMENT '上架时间',
  `update_time` datetime DEFAULT NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `description` varchar(500) DEFAULT NULL COMMENT '商品描述',
  PRIMARY KEY (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

Hive 分区表

CREATE TABLE `default.productinfo_offline`(
    `product_id` int COMMENT '商品ID',
    `product_name` string COMMENT '商品名称',
    `price` float COMMENT '商品价格',
    `create_time` timestamp COMMENT '上架时间',
    `update_time` timestamp COMMENT '更新时间',
    `description` string COMMENT '商品描述'
  )  stored as orc

任务配置

说明点:

1. 通常离线采集上一天数据,示例是根据update_time采集,需要在《筛选条件》处填写 update_time=${yyyyMMdd-1d},时间函数参考详见数据集成 时间参数说明-操作指南-文档中心-腾讯云

数据合并流程

因写入的数据并不会对主键去重,所以完成一次采集后需要经过下游业务去重处理。逻辑如下:

操作流程描述:以下操作基于2023年1月14开始,后续时间自行替换

1. Inlong将1月14号及之前的全量数据(任务配置选择全量模式即可:第一次运行会全量同步,目标分区表根据update_time动态生成多个分区)写入${T}_增量。以下操作只在第一初始化操作:第一次的全量表${T}_全量_{20230113}数据通过insert into select * 的方式全取自第一同步的${T}_增量

2. Inlong的任务根据业务时间(示例场景为update_time)写到对应分区

3. Wedata配置定时任务生成一个${T}_全量_{20230113}数据表,处理逻辑:${T}_全量_{20230114}=${T}_全量_{20230113}+${T}_增量(根据业务属性做Merge)

4. Wedata完成Merge之后,基于${T}_全量_{20230113} =${T}视图(此处也可以不采用视图方案,${T}_全量只保留一份,每次采用Overwrite模式写入。此方案优势:不用清除历史${T}_全量_{时间}数据和维护视图缺点:Overwrite模式可能存在风险,合并过程中存在锁)

5. 系统重复1-4步骤

数据合并逻辑

实现原理:当天分区最新的数据+上一天之前的全量数据(根据当天分区数据主键去除当天数据),合并代码如下

#合并逻辑 (基于 union + row_number, 优点完全去重, 缺点资源浪费)
create table IF NOT EXISTS `T_全量_20230114` as
select
    product_id,
    product_name,
    price,
    create_time,
    update_time,
    description
from
    (
        select
            *,
            row_number() over(
                partition by `product_id`
                order by
                    `update_time` desc
            ) as rn
        from
            (
                select
                    *
                from
                    `T_全量_20230113`
                union
                all
                select
                    *
                from
                    `T_增量`
             ) a
    ) b
where
    b.rn = 1

#创建视图
alter view `T` as select * from `T_全量_20230114`
关注点

1. 关注点1:什么时候做合并和创建视图

答复:离线分区写入任务可配置成Wedata调度任务,调度任务完成通过依赖触发下游的Merge任务

2. 关注点2:数据修复如何处理

答复:人工处理

3. 关注点3:DDL操作如何处理

答复:人工处理

4. 关注点4:下游读取view过程中,alter view是否会死锁

答复:alter view可以正常执行,下游读取的是view对应的源表,不影响

关注点5:${T}_全量_{时间}占用大量存储空间

答复:借助数据治理能力,设置该类表的生命周期为N天,定时删除历史数据

实时类型

实时写入通过订阅Mysql-binlog,将binlog+DDL操作标识写入HDFS,后续基于操作标识Merger产出最新的数据镜像,实时写入流程图

当前实时写入hive只支持append模式,hive目标表可为非分区表或者分区表,两类表的数据都需要落地之后经过任务合并处理。但mysql端可能存在大量的DML操作,非分区表在积累一定时间周期后读取最新数据成本会越来越高,所以在实时写入场景,建议写入hive分区表

Append模式

分区表

配置方式

mysql源表

CREATE DATABASE streamdemo;
USE streamdemo;
CREATE TABLE `productinfo` (
  `product_id` int(11) NOT NULL COMMENT '商品ID',
  `product_name` varchar(100) DEFAULT NULL COMMENT '商品名称',
  `price` float DEFAULT NULL COMMENT '商品价格',
  `create_time` datetime DEFAULT NULL COMMENT '上架时间',
  `update_time` datetime DEFAULT NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `description` varchar(500) DEFAULT NULL COMMENT '商品描述',
  PRIMARY KEY (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

hive 分区表

CREATE TABLE `default.productinfo_uid`(
    `uid` string COMMENT '唯一键',
    `type_Wedata_di` string,
    `ts_Wedata_di` timestamp,
    `processing_time_Wedata_di` timestamp,
    `product_id` int COMMENT '商品ID',
    `product_name` string COMMENT '商品名称',
    `price` float COMMENT '商品价格',
    `create_time` timestamp COMMENT '上架时间',
    `update_time` timestamp COMMENT '更新时间',
    `description` string COMMENT '商品描述'
  ) PARTITIONED BY (`pt` string) stored as orc

特殊字段说明

该部分字段在实时任务同步过程中引擎自定生成,主要用来实现主键扩展,DDL操作标识,为下游区分有效数据

字段

内容

database_Wedata_di

streamdemo

table_Wedata_di

productinfo

type_Wedata_di

INSERT、UPDATE、DELETE

ts_Wedata_di

2023-01-30 14:31:42.0 事件时间

processing_time_Wedata_di

2023-01-30 14:31:43.019 处理时间

任务配置

说明点:

分区定义是根据productinfo的update_time生成,同时源字段是datetime类型,hive分区字段pt是string类型,需要经过to_date(cast(update_time as string),'yyyy-MM-dd')做转换;

目标表数据唯一性

1、 单表场景:单表数据存在主键即可保证数据唯一性

2、 分库分表场景:源表的主键非全局唯一,可通过database_Wedata_di ||'_'|| table_Wedata_di || '_' || CAST(product_id AS string)组成唯一键;数据表的主键全局唯一,直接使用product_id作为目标表唯一键

实时同步采用的Flink函数,支持函数列表:系统(内置)函数 | Apache Flink

源表配置

函数

目标表配置

to_date(cast(update_time as string),'yyyy-MM-dd')

Flink函数

pt

database_Wedata_di ||'_'|| table_Wedata_di || '_' || CAST(product_id AS string)

Flink函数

uid(可选)

数据合并流程

操作流程描述:以下操作基于2023年1月14开始,后续时间自行替换

1. Inlong实时将1月14号及之前的全量数据(任务配置选择全量模式即可:第一次运行会全量同步,目标分区表根据update_time动态生成多个分区)写入${T}_增量_{20230114}。以下操作只在第一初始化操作:第一次的全量表${T}_全量_{20230113}数据通过insert into select * 的方式全取自第一同步的${T}_增量

2. Inlong的任务根据业务时间(示例场景为update_time)写到对应分区

3. Wedata配置定时任务生成一个${T}_全量_{20230113}数据表,处理逻辑:${T}_全量_{20230114}=${T}_全量_{20230113}+${T}_增量_{20230114}(根据业务属性做Merge)

4. Wedata完成Merge之后,基于${T}_全量_{20230113} =${T}视图(此处也可以不采用视图方案,${T}_全量只保留一份,每次采用Overwrite模式写入。此方案优势:不用清除历史${T}_全量_{时间}数据和维护视图缺点:Overwrite模式可能存在风险,合并过程中存在锁)

5. 系统重复1-4步骤

数据合并逻辑

实现原理:当天分区最新的数据+上一天之前的全量数据(根据当天分区数据主键去除当天数据),合并代码如下

#合并逻辑 (基于 union + row_number, 优点完全去重, 缺点资源浪费)
create table IF NOT EXISTS `T_全量_20230114` as
select
    uid,
    type_Wedata_di
    ts_Wedata_di,
    product_id,
    product_name,
    price,
    create_time,
    update_time,
    description
from
    (
        select
            *,
            row_number() over(
                partition by `uid`
                order by
                    `ts_Wedata_di` desc
            ) as rn
        from
            (
                select
                    *
                from
                    `T_全量_20230113`
                union
                all
                select
                    *
                from
                    `T_增量_分区表`
                where pt='20230114'
            ) a
    ) b
where
    b.rn = 1
    and b.`type_Wedata_di` <> 'DELETE';

#创建视图
alter view `T` as select product_id,product_name,price,create_time,update_time,description from `T_全量_20230114`
关注点

1. 关注点1:什么时候做合并和创建视图

答复:

1. 方案1:目前整条数据链路计划延迟控制在15分钟内,所以Merge任务可以00:15分后开始执行,完成之后执行创建视图

2. 方案2:编写shell脚本检测${T}_增量当天新分区的生成情况,如果已经生成则通过Wedata-event触发Merge任务

2. 关注点2:数据修复如何处理

答复:人工处理

3. 关注点3:DDL操作如何处理

答复:人工处理

4. 关注点4:下游读取View过程中,Alter View是否会死锁

答复:Alter View可以正常执行,下游读取的是View对应的源表,不影响

关注点5:${T}_全量_{时间}占用大量存储空间

答复:借助数据治理能力,设置该类表的生命周期为N天,定时删除历史数据

非分区表

配置基本等同于分区表配置,差异点主要在以下两方面

1. 任务配置阶段不需要对update_time做字段转换映射hive的分区字段

2. 任务Merge过程不是根据分区读取数据,而是根据update_time读取数据。当前模式文件扫描性能会越来越差

DLC场景

DLC场景现在主流存储格式为Iceberg(以下建表均为内表)。数据采集到支持离线和实时两种类型,离线引擎为DataX,实时引擎为Flink。系统架构图如下:

离线类型

Append模式

等同于EMR场景离线类型的Append模式,这里不做描述。

Overwrite模式

等同于EMR场景离线类型的Overwrite模式,这里不做描述

Upsert模式

当前模式主要通过离线写入并更新的方式生成目标数据内容,下游用户可以无感查询最新数据。当前目标表支持分区和非分区表,在实时数据湖场景下,非分区表更加适用。

非分区表

配置方式

Mysql源表

CREATE DATABASE streamdemo;
USE streamdemo;
CREATE TABLE `productinfo` (
  `product_id` int(11) NOT NULL COMMENT '商品ID',
  `product_name` varchar(100) DEFAULT NULL COMMENT '商品名称',
  `price` float DEFAULT NULL COMMENT '商品价格',
  `create_time` datetime DEFAULT NULL COMMENT '上架时间',
  `update_time` datetime DEFAULT NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `description` varchar(500) DEFAULT NULL COMMENT '商品描述',
  PRIMARY KEY (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

DLC内表结构

CREATE TABLE IF NOT EXISTS `DataLakeCatalog`.`rikhuang_iceberg`.`productinfo_iceberg` (
  `product_id` int,
  `product_name` string,
  `price` float,
  `create_time` timestamp,
  `update_time` timestamp,
  `description` string
  ) TBLPROPERTIES (
  'format-version' = '2',
  'write.distribution-mode' = 'hash',
  'write.Merge.mode' = 'Merge-on-read',
  'write.parquet.bloom-filter-enabled.column.addr' = 'true',
  'write.parquet.bloom-filter-enabled.column.id' = 'true',
  'write.update.mode' = 'Merge-on-read',
  'write.upsert.enabled' = 'true',
  'write.metadata.delete-after-commit.enabled' = 'true',
  'write.metadata.previous-versions-max' = '100',
  'write.metadata.metrics.default' = 'full',
  'dlc.ao.data.govern.inherit' = 'default',
  'dlc.ao.Merge.data.enable' = 'enable',
  'dlc.ao.Merge.data.engine' = {用户创建的spark引擎name},
  'dlc.ao.Merge.data.min-input-files' = '5',
  'dlc.ao.Merge.data.target-file-size-bytes' = '536870912',
  'dlc.ao.Merge.data.interval-min' = '60',
  'dlc.ao.expired.snapshots.enable' = 'enable',
  'dlc.ao.expired.snapshots.engine' = {用户创建的spark引擎name},
  'dlc.ao.expired.snapshots.retain-last' = '5',
  'dlc.ao.expired.snapshots.before-days' = '2',
  'dlc.ao.expired.snapshots.max-concurrent-deletes' = '25',
  'dlc.ao.expired.snapshots.interval-min' = '60',
  'dlc.ao.remove.orphan.enable' = 'enable',
  'dlc.ao.remove.orphan.engine' = {用户创建的spark引擎name},
  'dlc.ao.remove.orphan.before-days' = '3',
  'dlc.ao.remove.orphan.max-concurrent-deletes' = '25',
  'dlc.ao.remove.orphan.interval-min' = '120',
  'dlc.ao.Merge.manifests.enable' = 'enable',
  'dlc.ao.Merge.manifests.engine' = {用户创建的spark引擎name},
  'dlc.ao.Merge.manifests.interval-min' = '120'
);

表描述:DLC表采用MOR的模式写入和读取,如果不做治理,实时读取性能会非常糟糕,比对数据详见附录

表治理:DLC当前可以配置表治理规则,表治理工作可由DLC的内置调度机制完成,表治理规则如下:

任务配置

目标表数据唯一性

1、 单表场景:单表数据存在主键即可保证数据唯一性

2、 分库分表场景:源表的主键非全局唯一,当前场景会出现数据覆盖的问题;数据表的主键全局唯一,直接使用product_id作为目标表唯一键

关注点

1. 关注点1:数据修复如何处理

答复:人工处理

2. 关注点2:DDL操作如何处理

答复:人工处理

3. 关注点3:DLC表的治理规则

答复:可在DLC数据管理,创建原生表阶段可视化配置

分区表

配置基本等同于非分区表配置,差异点主要在以下两方面

1. 任务配置阶段需要对update_time做字段转换映射DLC的分区字段

2. 目标表的唯一键除上述常规配置之外需要加入分区字段(不然会报错)

实时类型

Upsert模式

当前模式主要通过实时写入并更新的方式生成目标数据内容,下游用户可以无感查询最新数据。当前目标表支持分区和非分区表,在实时数据湖场景下,非分区表更加适用。

非分区表

配置方式

Mysql源表

CREATE DATABASE streamdemo;
USE streamdemo;
CREATE TABLE `productinfo` (
  `product_id` int(11) NOT NULL COMMENT '商品ID',
  `product_name` varchar(100) DEFAULT NULL COMMENT '商品名称',
  `price` float DEFAULT NULL COMMENT '商品价格',
  `create_time` datetime DEFAULT NULL COMMENT '上架时间',
  `update_time` datetime DEFAULT NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `description` varchar(500) DEFAULT NULL COMMENT '商品描述',
  PRIMARY KEY (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

DLC内表结构

CREATE TABLE IF NOT EXISTS `DataLakeCatalog`.`rikhuang_iceberg`.`productinfo_iceberg` (
  `uid` string,
  `product_id` int,
  `product_name` string,
  `price` float,
  `create_time` timestamp,
  `update_time` timestamp,
  `description` string
  ) TBLPROPERTIES (
  'format-version' = '2',
  'write.distribution-mode' = 'hash',
  'write.Merge.mode' = 'Merge-on-read',
  'write.parquet.bloom-filter-enabled.column.addr' = 'true',
  'write.parquet.bloom-filter-enabled.column.id' = 'true',
  'write.update.mode' = 'Merge-on-read',
  'write.upsert.enabled' = 'true',
  'write.metadata.delete-after-commit.enabled' = 'true',
  'write.metadata.previous-versions-max' = '100',
  'write.metadata.metrics.default' = 'full',
  'dlc.ao.data.govern.inherit' = 'default',
  'dlc.ao.Merge.data.enable' = 'enable',
  'dlc.ao.Merge.data.engine' = {用户创建的spark引擎name},
  'dlc.ao.Merge.data.min-input-files' = '5',
  'dlc.ao.Merge.data.target-file-size-bytes' = '536870912',
  'dlc.ao.Merge.data.interval-min' = '60',
  'dlc.ao.expired.snapshots.enable' = 'enable',
  'dlc.ao.expired.snapshots.engine' = {用户创建的spark引擎name},
  'dlc.ao.expired.snapshots.retain-last' = '5',
  'dlc.ao.expired.snapshots.before-days' = '2',
  'dlc.ao.expired.snapshots.max-concurrent-deletes' = '25',
  'dlc.ao.expired.snapshots.interval-min' = '60',
  'dlc.ao.remove.orphan.enable' = 'enable',
  'dlc.ao.remove.orphan.engine' = {用户创建的spark引擎name},
  'dlc.ao.remove.orphan.before-days' = '3',
  'dlc.ao.remove.orphan.max-concurrent-deletes' = '25',
  'dlc.ao.remove.orphan.interval-min' = '120',
  'dlc.ao.Merge.manifests.enable' = 'enable',
  'dlc.ao.Merge.manifests.engine' = {用户创建的spark引擎name},
  'dlc.ao.Merge.manifests.interval-min' = '120'
);

表描述:DLC表采用MOR的模式写入和读取,如果不做治理,实时读取性能会非常糟糕,比对数据详见附录

表治理:DLC当前可以配置表治理规则,表治理工作可由DLC的内置调度机制完成,表治理规则如下:

特殊字段说明

该部分字段在实时任务同步过程中引擎自定生成,主要用来实现主键扩展,DDL操作标识,为下游区分有效数据。当前DLC表模式下,如果源端非分库分表或者分库分表模式下主键不冲突,如下特殊字段可以不用采集。

目标表数据唯一性

1、 单表场景:单表数据存在主键即可保证数据唯一性

2、 分库分表场景:源表的主键非全局唯一,可通过database_Wedata_di ||'_'|| table_Wedata_di || '_' || CAST(product_id AS string)组成唯一键;数据表的主键全局唯一,直接使用product_id作为目标表唯一键

字段

内容

database_Wedata_di

streamdemo

table_Wedata_di

productinfo

type_Wedata_di

INSERT、UPDATE、DELETE

ts_Wedata_di

2023-01-30 14:31:42.0 事件时间

processing_time_Wedata_di

2023-01-30 14:31:43.019 处理时间

任务配置

说明点:

1. hive表中的主键uid字段由database_Wedata_di ||'_'|| table_Wedata_di || '_' || CAST(product_id AS string)组成

2. 实时同步采用的Flink函数,支持函数列表:系统(内置)函数 | Apache Flink

3. uid字段必须配置再唯一键中

源表配置

函数

目标表配置

database_Wedata_di ||'_'|| table_Wedata_di || '_' || CAST(product_id AS string)

Flink函数

uid

关注点

1. 关注点1:数据修复如何处理

答复:人工处理

2. 关注点2:DDL操作如何处理

答复:人工处理

3. 关注点3:DLC表的治理规则

答复:可在DLC数据管理,创建原生表阶段可视化配置

分区表

配置基本等同于非分区表配置,差异点主要在以下两方面

3. 任务配置阶段需要对update_time做字段转换映射DLC的分区字段

4. 目标表的唯一键除上述常规配置之外需要加入分区字段(不然会报错)

Append模式

当前模式主要写入数据内容+DDL标识符,数据并不会自动更新。配置方式和原理等同于EMR场景-实时类型同步。这里不做描述。

附录

字典表

比对语法

执行SQL

count(1)

select count(id) from {table};

max(id)

select max(id),min(id) from {table}

id asc

select * from {table} order by id ;

id desc

select * from {table} order by id desc;

k-max(id)

select * from {table} where id =max(id) ;

k-min(id)

select * from {table} where id =min(id) ;

all max

select max(id),max(name),max(addr),max(remark) from {table} ;

SparkSQL 16CU

sparkSQL(16CUs)

表类型

COS(orc)外部表

合并表(更新少)

合并表(更新多)

非合并表(更新少)

非合并表(更新多)

写入量

1000万

1000万

2000万

1000万

2000万

是否合并

/

数据量

1000万

787万

930万

1000万

2000万

容量

1.33GB

1.57GB

1.71GB

3.42GB

比对语法

耗时

扫描量

耗时

扫描量

耗时

扫描量

耗时

扫描量

耗时

扫描量

count(id)

6s

34MB

7s

35MB

6s

40MB

37s

172MB

108s

566MB

max(id)

6s

34MB

8s

35MB

7s

40MB

36s

172MB

108s

566MB

id asc

40s

3.3GB

36s

2.7GB

43s

3.1GB

104s

3.6GB

257s

7.7GB

id desc

38s

3.3GB

34s

2.7GB

41s

3.1GB

102s

3.6GB

255s

7.7GB

k-max(id)

6s

5.1MB

5s

35MB

5s

127MB

8s

51MB

10s

50MB

k-min(id)

7s

16MB

7s

127MB

7s

48MB

12s

51MB

12s

104MB

all max

12s

1.7GB

12s

1.3GB

15s

1.6GB

47s

1.8GB

128s

3.8GB