Flink CDC 2.x changlog格式的使用【监听MySQL表变化并写入kafka示例】
2023-09-11 14:16:24 时间
为什么数据先进入kafka?
MySQL 数据通过 Flink CDC 进入到 Kafka。
原因如下:
- 为了实现多个实时任务复用 MySQL 过来的数据,
- 避免多个任务通过 Flink CDC 接 MySQL 表以及 Binlog,对 MySQL 库的性能造成影响。
测试环境
flink 1.3.2
flink cdc 2.0.2
依赖的jar:
- flink-sql-connector-kafka_2.11-1.13.2.jar
- flink-format-changelog-json-2.0.2.jar
- flink-sql-connector-mysql-cdc-2.0.2.jar
3需要放在<FLINK_HOME>/lib/下面, 1,2通过sql-client -j参数 或 flink run的-C参数导入即可
bin/sql-client.sh embedded -j ../cdc-jars/flink-format-changelog-json-2.0.2.jar -j ../jars/flink-sql-connector-kafka_2.11-1.13.2.jar
/data/flink/flink-1.13.2/bin/flink run -yjm 1024 -ytm 1024 -p 3 -yqu default \
-yD yarn.provided.lib.dirs="hdfs://bgdata01:8020/jars/flink/" \
-ynm flink@mysql_cdc_write_kafka -yd -m yarn-cluster \
-C http://172.25.11.17:50070/webhdfs/v1/jars/common/flink-sql-connector-kafka_2.11-1.13.2.jar?op=OPEN \
-C http://172.25.11.17:50070/webhdfs/v1/jars/common/flink-format-changelog-json-2.0.2.jar?op=OPEN \
-c com.flink.streaming.core.JobApplication /data/flink/flink-streaming-platform-web/lib/flink-streaming-core-1.3.0.RELEASE.jar \
-sql /data/flink/flink-streaming-platform-web/sql/job_sql_11.sql \
-checkpointDir hdfs://172.25.11.17:8020/flink/checkpoint/mysql_cdc_write_kafka \
-stateBackendType 2 -enableIncremental true -asynchronousSnapshots true \
-externalizedCheckpointCleanup RETAIN_ON_CANCELLATION -type 0
监听MySQL表的变化并写kafka
通过flink cdc监听MySQL表的变化,以 changelog-json 的格式写入kafka
CREATE TABLE orders_cdc (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.25.6.6',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpassword',
'database-name' = 'db_inventory_cdc',
'table-name' = 'orders',
'connect.timeout' = '60s',
'scan.incremental.snapshot.chunk.size' = '25',
'server-id'='5406-5410'
);
create table orders_cdc2kafka (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
)
with (
'connector' = 'kafka',
'topic' = 'flink_test',
'properties.bootstrap.servers' = '172.25.6.7:9092',
'format' = 'changelog-json',
'properties.zookeeper.connect' = '172.25.6.7:2181/kafka'
);
insert into orders_cdc2kafka select * from orders_cdc;
消费kafka的数据
注意: 表里面不要包含kafka connector的字段,否则会导致内存中有重复的数据,如
event_time TIMESTAMP(3) METADATA FROM ‘timestamp’ VIRTUAL,
create table orders_cdc2kafka (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
)
with (
'connector' = 'kafka',
'topic' = 'flink_test',
'properties.bootstrap.servers' = '172.25.6.7:9092',
'properties.group.id' = 'flink_gp_test99',
'scan.startup.mode' = 'group-offsets',
'format' = 'changelog-json',
'changelog-json.ignore-parse-errors' = 'true',
'changelog-json.timestamp-format.standard' = 'SQL',
'properties.zookeeper.connect' = '172.25.6.7:2181/kafka'
);
select * from orders_cdc2kafka;
验证
通过针对订单表进行增加、修改、删除,可以在sql client中看到内存中的数据与表的实际数据是一致的
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false);
delete from orders where order_id < 10120;
update orders set customer_name='1111aaaaa' where order_id=10120;
参考
相关文章
- Mysql加锁过程详解(7)-初步理解MySQL的gap锁
- JAVAEE之-----MySQL分页技术(带搜索)
- mysql binlog 参数_MySQL Binlog常用参数
- mysql 设置 row格式binlog_为什么要把MySQL的binlog格式修改为row
- Flink-状态一致性(如何保证exactly-once、flink+kafka端到端保证exactly-once)
- MySQL去重保留最大的那条记录(取最新的记录)
- 【MySQL高级】Mysql锁问题
- mysql远程连接 Host * is not allowed to connect to this MySQL server
- 15个 MySQL 基础面试题,DBA 们准备好了吗?
- Linux安装MySQL(只针对这个8.0版本其他版本的MYSQL不知道是不是也可以用可以自己尝试)
- .NET/Mysql-petatoco连接mysql数据库
- CentOS7下JSP连接Mysql
- Mysql的安全配置向导命令mysql_secure_installation
- 《PHP和MySQL Web开发从新手到高手(第5版)》一2.9 删除存储的数据
- 基于Java+Vue+MySQL开发在线视频系统【100010557】
- 基于 PHP+apache+MySql实现(Web)客户关系管理系统【100010108】
- mysql只更新日期不更新时分秒,Mysql取30天内每天最大的数据
- kafka producer batch expired TimeoutException: KAFKA-5621、KIP-91(Provide Intuitive User Timeouts in The Producer)、KAFKA-5886
- 使用 Docker 建立 Mysql 集群
- python操作mysql数据库系列-操作MySql数据库(五)
- python操作mysql数据库系列-安装MySql
- 有关Mysql的mysql_store_result函数返回NULL的情况以及其他注意事项
- mysql自定义函数
- ERROR 2002 (HY000): Can’t connect to local MySQL server through socket ‘/var mysql (转)
- Amoeba for MySQL 非常好用的mysql集群软件
- MySQL server has gone away
- 黄金法则:MySQL基准测试最佳实践
- perationalError: (2003, "Can't connect to MySQL server on u'192.168.1.6' (timed out)")
- 快速与MySQL交互,使用XMAPP打开MySQL数据库,并用shell进行与MySQL交互<Window 10>
- mysql 常用管理命令
- MySQL 教程(基础篇)第04话:mysqld 和 mysql 命令的区别