zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Flink CDC 2.x changlog格式的使用【监听MySQL表变化并写入kafka示例】

mysqlKafkaflink 示例 格式 监听 写入 变化
2023-09-11 14:16:24 时间

为什么数据先进入kafka?

MySQL 数据通过 Flink CDC 进入到 Kafka。
原因如下:

  • 为了实现多个实时任务复用 MySQL 过来的数据,
  • 避免多个任务通过 Flink CDC 接 MySQL 表以及 Binlog,对 MySQL 库的性能造成影响。

测试环境

flink 1.3.2
flink cdc 2.0.2
依赖的jar:

  1. flink-sql-connector-kafka_2.11-1.13.2.jar
  2. flink-format-changelog-json-2.0.2.jar
  3. flink-sql-connector-mysql-cdc-2.0.2.jar

3需要放在<FLINK_HOME>/lib/下面, 1,2通过sql-client -j参数 或 flink run的-C参数导入即可

bin/sql-client.sh embedded -j ../cdc-jars/flink-format-changelog-json-2.0.2.jar -j ../jars/flink-sql-connector-kafka_2.11-1.13.2.jar

/data/flink/flink-1.13.2/bin/flink run -yjm 1024 -ytm 1024 -p 3 -yqu default  \
-yD yarn.provided.lib.dirs="hdfs://bgdata01:8020/jars/flink/" \
 -ynm flink@mysql_cdc_write_kafka  -yd -m yarn-cluster   \
 -C http://172.25.11.17:50070/webhdfs/v1/jars/common/flink-sql-connector-kafka_2.11-1.13.2.jar?op=OPEN  \
 -C http://172.25.11.17:50070/webhdfs/v1/jars/common/flink-format-changelog-json-2.0.2.jar?op=OPEN \
 -c com.flink.streaming.core.JobApplication /data/flink/flink-streaming-platform-web/lib/flink-streaming-core-1.3.0.RELEASE.jar \
 -sql /data/flink/flink-streaming-platform-web/sql/job_sql_11.sql  \
 -checkpointDir hdfs://172.25.11.17:8020/flink/checkpoint/mysql_cdc_write_kafka  \
 -stateBackendType 2 -enableIncremental true -asynchronousSnapshots true \
 -externalizedCheckpointCleanup RETAIN_ON_CANCELLATION -type 0 


监听MySQL表的变化并写kafka

通过flink cdc监听MySQL表的变化,以 changelog-json 的格式写入kafka

CREATE TABLE orders_cdc (
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '172.25.6.6',
    'port' = '3306',
    'username' = 'flinkuser',
    'password' = 'flinkpassword',
    'database-name' = 'db_inventory_cdc',
    'table-name' = 'orders',
    'connect.timeout' = '60s',
    'scan.incremental.snapshot.chunk.size' = '25',
    'server-id'='5406-5410'
);

 create table orders_cdc2kafka ( 
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
)
 with ( 
  'connector' = 'kafka',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = '172.25.6.7:9092', 
  'format' = 'changelog-json',
  'properties.zookeeper.connect' = '172.25.6.7:2181/kafka'
 );
 
insert into orders_cdc2kafka select * from orders_cdc;


消费kafka的数据

注意: 表里面不要包含kafka connector的字段,否则会导致内存中有重复的数据,如
event_time TIMESTAMP(3) METADATA FROM ‘timestamp’ VIRTUAL,

create table orders_cdc2kafka ( 
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
)
 with ( 
  'connector' = 'kafka',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = '172.25.6.7:9092', 
  'properties.group.id' = 'flink_gp_test99',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json',
  'changelog-json.ignore-parse-errors' = 'true',
  'changelog-json.timestamp-format.standard' = 'SQL',
  'properties.zookeeper.connect' = '172.25.6.7:2181/kafka'
 );

select * from orders_cdc2kafka;


验证

通过针对订单表进行增加、修改、删除,可以在sql client中看到内存中的数据与表的实际数据是一致的

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false);
delete from orders where order_id < 10120;
update orders set  customer_name='1111aaaaa' where order_id=10120;

在这里插入图片描述

参考

sql client
changelog format
kafka connector