zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

Flink CDC 2.x 让一切变得美好

flink 变得 一切 美好 CDC
2023-09-11 14:16:24 时间

本文基于阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flink Meetup 分享的《详解 Flink-CDC》整理。
《详解 Flink-CDC》深入讲解了最新发布的 Flink CDC 2.0.0 版本带来的核心特性,包括:全量数据的并发读取、checkpoint、无锁读取等重大改进。

项目地址
文档地址
详解Flink-CDC PPT

CDC

什么是CDC

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术

开源 CDC 方案比较

在这里插入图片描述

flink CDC Connectors

什么是Flink CDC Connectors?

Flink CDC Connectors是一组用于Apache Flink的源连接器,使用更改数据捕获(CDC)从不同的数据库接收更改。Flink CDC Connectors集成Debezium作为捕获数据更改的引擎。所以它可以充分利用Debezium的能力。

核心 feature

  • 支持读取数据库快照,并继续采用精确一次处理的方式读取binlogs,即使发生故障。
  • DataStream API的CDC连接器,用户可以在不部署Debezium和Kafka的情况下,在一个作业中消费多个数据库和表上的更改。
  • 对于Table/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源来监视单个表上的更改。
  • 全程无锁,不对线上业务产生锁的风险;
  • 多并行度,可水平扩展,全量数据的读取阶段支持水平扩展,使亿级别的大表可以通过加大并行度来加快读取速度;
  • 断点续传,支持全量阶段的 checkpoint,即使任务因某种原因退出了,也可通过保存的 checkpoint 对任务进行恢复实现数据的断点续传。

后面3点是Flink CDC 2.0中才解决的。

binlog就是一张Dynamic Table

Flink 有两个基础概念:Dynamic Table 和 Changelog Stream。

  • Dynamic Table 就是 Flink SQL 定义的动态表,动态表和流的概念是对等的。参照下图,流可以转换成动态表,动态表也可以转换成流。
  • 在 Flink SQL中,数据在从一个算子流向另外一个算子时都是以 Changelog Stream 的形式,任意时刻的 Changelog Stream 可以翻译为一个表,也可以翻译为一个流。
  • 数据库的binlog用于记录一张表所有的变更。如果一直对表进行更新,binlog 日志流也一直会追加,数据库中的表就相当于 binlog 日志流在某个时刻点物化的结果;日志流就是将表的变更数据持续捕获的结果。这说明 Flink SQL 的 Dynamic Table 是可以非常自然地表示一张不断变化的 MySQL 数据库表。

注意,Flink 提供了 changelog-json format,可以将 changelog 数据写入离线数仓如 Hive / HDFS;对于实时数仓,Flink 支持将 changelog 通过 upsert-kafka connector 直接写入 Kafka。
在这里插入图片描述

底层采集工具

flink 使用Debezium 作为 Flink CDC 的底层采集工具。
Debezium 支持全量同步,也支持增量同步,也支持全量 + 增量的同步,非常灵活,同时基于日志的 CDC 技术使得提供 Exactly-Once 成为可能。

将 Flink SQL 的内部数据结构 RowData 和 Debezium 的数据结构进行对比,可以发现两者是非常相似的。

  • 每条 RowData 都有一个元数据 RowKind,包括 4 种类型, 分别是插入 (INSERT)、更新前镜像 (UPDATE_BEFORE)、更新后镜像 (UPDATE_AFTER)、删除 (DELETE),这四种类型和数据库里面的 binlog 概念保持一致。
  • Debezium 的数据结构,也有一个类似的元数据 op 字段, op 字段的取值也有四种,分别是 c、u、d、r,各自对应 create、update、delete、read。对于代表更新操作的 u,其数据部分同时包含了前镜像 (before) 和后镜像 (after)。

通过分析两种数据结构,Flink 和 Debezium 两者的底层数据是可以非常方便地对接起来的,大家可以发现 Flink 做 CDC 从技术上是非常合适的。
在这里插入图片描述

基于flink cdc的聚合分析

  • 是一个纯 SQL 作业,这意味着只要会 SQL 的 BI,业务线同学都可以完成此类工作。同时,用户也可以利用 Flink SQL 提供的丰富语法进行数据清洗、分析、聚合。
  • 利用 Flink SQL 双流 JOIN、维表 JOIN、UDTF 语法可以非常容易地完成数据打宽,以及各种业务逻辑加工。
    在这里插入图片描述

Flink CDC 2.0 详解

需要解决的痛点问题

  • 一致性通过加锁保证:全量 + 增量读取的过程需要保证所有数据的一致性,因此需要通过加锁保证,但是加锁在数据库层面上是一个十分高危的操作。底层 Debezium 在保证数据一致性时,需要对读取的库或表加锁,全局锁可能导致数据库锁住,表级锁会锁住表的读,DBA 一般不给锁权限。
  • 不支持水平扩展:因为 Flink CDC 底层是基于 Debezium,起架构是单节点,所以Flink CDC 只支持单并发。在全量阶段读取阶段,如果表非常大 (亿级别),读取时间在小时甚至天级别,用户不能通过增加资源去提升作业速度。
  • 全量读取阶段不支持 checkpoint:CDC 读取分为两个阶段,全量读取和增量读取,目前全量读取阶段是不支持 checkpoint 的,因此会存在一个问题:当我们同步全量数据时,假设需要 5 个小时,当我们同步了 4 小时的时候作业失败,这时候就需要重新开始,再读取 5 个小时。

如何解决

Netflix 的 DBLog 论文中 Chunk 读取算法 + 基于 FLIP-27 来优雅地实现的

整体流程可以概括为,首先通过主键对表进行 Snapshot Chunk 划分,再将 Snapshot Chunk 分发给多个 SourceReader,每个 Snapshot Chunk 读取时通过算法实现无锁条件下的一致性读,SourceReader 读取时支持 chunk 粒度的 checkpoint,在所有 Snapshot Chunk 读取完成后,下发一个 binlog chunk 进行增量部分的 binlog 读取,这便是 Flink CDC 2.0 的整体流程,如下图所示:
在这里插入图片描述

使用示例

mysql-cdc为例,讲解如何使用

模拟电商公司的订单表和物流表,通过flink join将订单表打宽 【线上运行时,需要使用时态表join进行优化】
需求:需要对订单数据进行统计分析,对于不同的信息需要进行关联后续形成订单的大宽表后,交给下游的业务方使用 ES 做数据分析,
这个案例演示了如何只依赖 Flink 不依赖其他组件,借助 Flink 强大的计算能力实时把 Binlog 的数据流关联一次并同步至 ES

环境是flink 1.13.2, flink-sql-connector-mysql-cdc-2.0.2.jar (放在<FLINK_HOME>/lib/下面)

注意:

  • flink-sql-connector-mysql-cdc-2.0.2.jar不能通过sql-client.sh 的-j或 -l参数指定,否则报:io.debezium.relational.RelationalDatabaseConnectorConfig NoSuchFieldError: PASSWORD 异常,导致整个flink集群退出
  • 在mysql-cdc 2.x中默认开启了scan.incremental.snapshot.enabled, 如果表没有主键,则会导致增量快照读( incremental snapshot reading)失败,则需要将scan.incremental.snapshot.enabled设置为false
  • 快照数据块分割采用的算法是:chunk reading algorithm ,块切分采用固定的步长,由参数scan.incremental.snapshot.chunk.size确定,默认值是:8096
    • 针对自增的数字,则按主键从小到大进行切换
    • 针对其它主键,则按SELECT MAX(STR_ID) AS chunk_high FROM (SELECT * FROM TestTable WHERE STR_ID > ‘uuid-001’ limit 25) 获取切换的范围
    • 每个chunk reader执行Offset Signal Algorithm以获得快照块的最终一致输出
  • 如果需要并行运行,每个并行Reader应该有一个唯一的服务器id,所以’ server-id ‘必须是’ 5400-6400 '这样的范围,并且范围必须大于并行度 (并行度通过 SET ‘parallelism.default’ = 8; 设置)。

mysql开启binlog

修改配置

在my.cnf中的mysqld中增加如下配置

  		[mysqld]
        # 前面还有其他配置
        # 添加的部分
        server-id = 12345
        log-bin = mysql-bin
        # 必须为ROW
        binlog_format = ROW
        # 必须为FULL,MySQL-5.7后才有该参数
        binlog_row_image  = FULL
        expire_logs_days  = 15

验证

SHOW VARIABLES LIKE ‘%binlog%’;
在这里插入图片描述

设置权限

		-- 设置拥有同步权限的用户
        CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpassword';
        -- 针对MySQL 8.0
        ALTER USER 'flinkuser' IDENTIFIED WITH mysql_native_password BY 'flinkpassword';
        -- 赋予同步相关权限
        GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser';
        
        FLUSH PRIVILEGES;
        
        创建用户并赋予权限成功后,使用该用户登录MySQL,可以使用以下命令查看主从同步相关信息
        SHOW MASTER STATUS
        SHOW SLAVE STATUS
        SHOW BINARY LOGS

MySQL中创建表

drop table if exists products;
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

drop table if exists orders;
CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- 是否下单
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

drop table if exists shipments;
CREATE TABLE shipments (
  shipment_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_id INTEGER NOT NULL,
  origin VARCHAR(255) NOT NULL,
  destination VARCHAR(255) NOT NULL,
  is_arrived BOOLEAN NOT NULL
) AUTO_INCREMENT = 1001;;

INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
       (default,10002,'Hangzhou','Shanghai',false),
       (default,10003,'Shanghai','Hangzhou',false);

flink sql client中配置

set table.exec.source.cdc-events-duplicate = true;
CREATE TABLE products (
  id INT,
  name STRING,
  description STRING,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '172.25.21.29',
  'port' = '3306',
  'username' = 'flinkuser',
  'password' = 'flinkpassword',
  'database-name' = 'db_inventory_cdc',
  'table-name' = 'products',
  'connect.timeout' = '60s',
  'scan.incremental.snapshot.chunk.size' = '25',
  'server-id'='5401-5405'
);

CREATE TABLE orders (
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10, 5),
  product_id INT,
  order_status BOOLEAN,
  PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '172.25.21.29',
  'port' = '3306',
  'username' = 'flinkuser',
  'password' = 'flinkpassword',
  'database-name' = 'db_inventory_cdc',
  'table-name' = 'orders',
  'connect.timeout' = '60s',
  'scan.incremental.snapshot.chunk.size' = '25',
  'server-id'='5406-5410'
);

CREATE TABLE shipments (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN,
  PRIMARY KEY(shipment_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '172.25.21.29',
  'port' = '3306',
  'username' = 'flinkuser',
  'password' = 'flinkpassword',
  'database-name' = 'db_inventory_cdc',
  'table-name' = 'shipments',
  'connect.timeout' = '60s',
  'scan.incremental.snapshot.chunk.size' = '25',
  'server-id'='5411-5415'
);

CREATE TABLE enriched_orders (
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10, 5),
  product_id INT,
  order_status BOOLEAN,
  product_name STRING,
  product_description STRING,
  shipment_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN,
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://172.25.11.77:9401',
    'index' = 'enriched_orders'
);

INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;

在执行完insert into这条语句后,则sql-client会提交一个flink job到本地的flink cluster, 详见下面的截图:
在这里插入图片描述
在这里插入图片描述

数据验证

  • 通过ES的kinba查询数据,发现enriched_orders索引中已经有了三条数据
  • 通过增加、修改、删除订单和物流记录,则ES中的enriched_orders的索引数据跟着做实时变化。
    注意:必须要设置checkpoint,否则程序会卡到全量阶段的最后一步,进不到增量读取阶段
INSERT INTO orders VALUES (default, '2021-09-27 15:22:00', 'Jark', 29.71, 104, false);
INSERT INTO shipments VALUES (default,10004,'Shanghai','Beijing',false);
--更新记录
UPDATE orders SET order_status = true WHERE order_id = 10004;
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1001;
--删除记录
DELETE FROM orders WHERE order_id = 10004;

在这里插入图片描述