Flink MySQL CDC 使用总结
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
学习总结Flink MySQL CDC,主要目的是同步MySQL数据至其他数据源如Hudi、MySQL等,本文主要以 MySQL2Hudi、MySQL2MySQL两个场景进行示例验证。
版本
Flink | 版本 |
---|---|
Flink | 1.14.3、1.15.4、1.16.1 |
Hudi | 0.13.0 |
MYSQL CDC | 2.3.0 |
安装
将下面的Jar包拷贝到flink/lib下面 (以flink1.15.4为例)
- MySQL CDC(CDC读取MySQL): flink-sql-connector-mysql-cdc-2.3.0.jar,下载地址: https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar
- Hudi (Sink Hudi): hudi-flink1.15-bundle-0.13.0.jar,自己对应版本的打包
- Jdbc (Sink MySQL): flink-connector-jdbc-1.15.4.jar, 下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.4/flink-connector-jdbc-1.15.4.jar
Flink CDC,只是对于Source表,比如MySQL CDC,就是抽取MySQL Source表,CDC 官方文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#,可以查看官方文档了解目前Flink CDC支持哪些数据源,每一种数据源都需要下载对应的Jar包
MySQL CDC 参数
CREATE TABLE mysql_cdc_source (
id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致
name string,
price double,
ts bigint,
dt string
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '19.168.44.128',
'port' = '3306',
'username' = 'root',
'password' = 'root-123',
'database-name' = 'cdc',
'table-name' = 'mysql_cdc_source'
);
要使用MySQL CDC Source首先要开启MySQL binlog日志,其他参数和详细信息可以查看官方文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc%28ZH%29.html#id6
示例
创建MySQL Source物理表
mysql -uroot -proot-123 cdc
CREATE TABLE `mysql_cdc_source` (
`id` int(11) NOT NULL,
`name` text,
`price` double DEFAULT NULL,
`ts` int(11) DEFAULT NULL,
`dt` text,
`insert_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
造数
insert into mysql_cdc_source(id,name,price,ts,dt) values (1,'hudi1',1.1,1000,'20230331');
insert into mysql_cdc_source(id,name,price,ts,dt) values (2,'hudi2',2.2,2000,'20230331');
......
CDC MySQL2Hudi
set yarn.application.name=cdc_mysql2hudi;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;
set execution.checkpointing.interval=10000;
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2hudi;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;
CREATE TABLE mysql_cdc_source (
id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致
name string,
price double,
ts bigint,
dt string
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '19.168.44.128',
'port' = '3306',
'username' = 'root',
'password' = 'root-123',
'database-name' = 'cdc',
'table-name' = 'mysql_cdc_source'
);
CREATE TABLE hudi_cdc_sink (
id int PRIMARY KEY NOT ENFORCED,
name string,
price double,
ts bigint,
dt string
)
WITH (
'connector' = 'hudi',
'path' = '/tmp/cdc/hudi_cdc_sink',
'write.operation'='upsert', --写类型,可选
'write.tasks'='1', --并行度,可选,需要传参
'table.type'='COPY_ON_WRITE', --表类型,可选
'precombine.field' = 'ts', --可选,预合并字段和历史比较字段,当新来的数据该字段大于历史值时才会更新,默认为ts(如果有这个ts字段的话),需要传参,没有可不填,建议将该值设置为update_time
'hoodie.datasource.write.recordkey.field' = 'id', -- 可选,和primary key效果一样,二者至少选一个
'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator', --该参数目前版本有bug
'index.type' = 'BUCKET', -- flink只支持两种index,默认state index,默认state index对于数据量比较大的情况会因为tm内存不足导致GC OOM
'hoodie.bucket.index.num.buckets' = '16', -- 桶数
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',
'hive_sync.db' = 'cdc',
'hive_sync.table' = 'hudi_cdc_sink',
'hoodie.datasource.hive_sync.create_managed_table' = 'true' --是否为内部表,0.13.0版本开始支持
);
insert into hudi_cdc_sink select * from mysql_cdc_source;
注意,要求source表和sink表字段顺序要对应
CDC MySQL2Mysql
创建MySQL Sink物理表
CREATE TABLE `test_sink_mysql` (
`id` int(11) NOT NULL,
`name` text,
`price` double DEFAULT NULL,
`ts` int(11) DEFAULT NULL,
`dt` text,
`insert_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
set yarn.application.name=cdc_mysql2mysql;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;
set execution.checkpointing.interval=10000;
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;
CREATE TABLE mysql_cdc_source (
id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致
name string,
price double,
ts bigint,
dt string
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '19.168.44.128',
'port' = '3306',
'username' = 'root',
'password' = 'root-123',
'database-name' = 'cdc',
'table-name' = 'mysql_cdc_source'
);
create table test_sink_mysql (
id int PRIMARY KEY NOT ENFORCED,
name string,
price double,
ts bigint,
dt string
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true',
'username' = 'root',
'password' = 'root-123',
'table-name' = 'test_sink_mysql',
'sink.buffer-flush.max-rows' = '1000000'
);
insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;
验证
对源表mysql_cdc_source执行insert/update/delete操作,查看目标表数据同步情况,发现数据一致 对源表执行truncate操作,目标表数据不会同步truncate
相关文章
- MySQL学习:从零开始使用MySQL软件(学习mysql用什么软件)
- 技巧极速轻松:MySQL千万级数据快速删除技巧(mysql千万数据删除)
- MySQL三表联合查询:一窥究竟(mysql三表关联查询)
- 令人振奋:使用 Yum 升级 Mysql”(yum升级mysql)
- PHP轻松访问MySQL数据:使用简明易懂的方法获取数据(php获取mysql数据)
- 探究MySQL:学习难度如何?(mysql好学吗)
- 使用MySQL轻松建立数据库(mysql建数据库)
- MySQL中使用week函数进行时间计算(mysql中week函数)
- MySQL中repeat函数的使用详解(mysql中repeat)
- MySQL中使用lt操作符进行小于比较的条件查询(mysql中 lt)
- MySQL中使用key关键字来定义索引(mysql中key关键字)
- MySQL中使用Go语言的好处(mysql中go什么)
- MySQL引擎个性化个性化引擎的使用与掌握(mysql个性化引擎)
- MySQL Case使用指南(case的使用mysql)
- MySQL数据库实现canal同步分析(canal同步mysql)
- MySQL加法操作教程详解如何使用MySQL进行加法计算(mysql中加法怎么做)
- MySQL约束删除操作(mysql中删除约束)
- MySQL 64位下载及安装教程(mysql下载64位教程)
- 解决方案MySQL不能联合两个表,应该采用什么替代方法(mysql不能 两个表)