goldengate_mysql_kafka同步详解大数据
2023-06-13 09:20:27 时间
goldengate架构
为了更有效、更安全的把数据库事务信息从源端投递到目标端。GoldenGate引进trail文件的概念。前面提到extract抽取完数据以 后 Goldengate会将抽取的事务信息转化为一种GoldenGate专有格式的文件。然后pump负责把源端的trail文件投递到目标端,所以源、 目标两端都会存在这种文件。 trail文件存在的目的旨在防止单点故障,将事务信息持久化,并且使用checkpoint机制来记录其读写位置,如果故障发生,则数据可以根据checkpoint记录的位置来重传 。
dynamicportlist 17800-18000 purgeoldextracts ./dirdat/*,usecheckpoints, minkeepdays 7
setenv (MYSQL_HOME="/var/lib/mysql") tranlogoptions altlogdest /var/lib/mysql/mysql-bin.index dboptions host localhost,connectionport 3306 sourcedb test, userid root,password 123456 exttrail /opt/ggs/dirdat/W3 dynamicresolution gettruncates GETUPDATEBEFORES NOCOMPRESSDELETES NOCOMPRESSUPDATES table test.wms_test;
defsfile /opt/ggs/dirdef/defgen_wpkg.prm sourcedb [email protected]:3306,userid root,password 123456 table test.wms_entry_warehouse_wpkg;
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 PURGEOLDEXTRACTS /opt/ggs/dirdat/*,usecheckpoints, minkeepdays 3
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP test.*,TARGET test.*;
gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties #The following resolves the topic name using the short table name gg.handler.kafkahandler.topicMappingTemplate=${tableName} #The following selects the message key using the concatenated primary keys gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys} gg.handler.kafkahandler.format=json_row gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic gg.handler.kafkahandler.BlockingSend =false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode=op
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=/opt/ggs/ggjava/ggjava.jar gg.handler.kafkahandler.topicMappingTemplate kafka topic名称的映射,指定topic名称,也可以通过占位符的方式,例如${tableName},每一张表对应一个topic gg.handler.kafkahandler.SchemaTopicName 表的Schema信息对应的topic名称
Replicat进程,通常我们也把它叫做应用进程。运行在目标端,是数据传递的最后一站,负责读取目标端trail文件中的内容。
为了更有效、更安全的把数据库事务信息从源端投递到目标端。GoldenGate引进trail文件的概念。前面提到extract抽取完数据以 后 Goldengate会将抽取的事务信息转化为一种GoldenGate专有格式的文件。然后pump负责把源端的trail文件投递到目标端,所以源、 目标两端都会存在这种文件。 trail文件存在的目的旨在防止单点故障,将事务信息持久化,并且使用checkpoint机制来记录其读写位置,如果故障发生,则数据可以根据checkpoint记录的位置来重传 。
goldengate版本
Oracle GoldenGate 12.3.0.1.1 for MySQL on Linux x86-64 (源端版本) Oracle GoldenGate for Big Data 12.3.1.1.1 on Linux x86-64 (目标端版本)
下载地址
http://www.oracle.com/technetwork/middleware/goldengate/downloads/index.html
mysql用户登录
dblogin sourcedb test@localhost:3306,userid root,password 123456
dynamicportlist 17800-18000 purgeoldextracts ./dirdat/*,usecheckpoints, minkeepdays 7
setenv (MYSQL_HOME="/var/lib/mysql") tranlogoptions altlogdest /var/lib/mysql/mysql-bin.index dboptions host localhost,connectionport 3306 sourcedb test, userid root,password 123456 exttrail /opt/ggs/dirdat/W3 dynamicresolution gettruncates GETUPDATEBEFORES NOCOMPRESSDELETES NOCOMPRESSUPDATES table test.wms_test;
# ADD EXTRACT ext_wpkg, tranlog,begin now
table test.wms_test;
# ADD EXTRACT pum_wpkg,EXTTRAILSOURCE /opt/ggs/dirdat/W3; # ADD RMTTRAIL /opt/ggs/dirdat/W3, EXTRACT pum_wpkg
defsfile /opt/ggs/dirdef/defgen_wpkg.prm sourcedb [email protected]:3306,userid root,password 123456 table test.wms_entry_warehouse_wpkg;
备注:用于生成表字段映射
生成defgen表字段映射
进入ogg根目录
./defgen paramfile /opt/ggs/dirprm/defgen_wpkg.prm
备注:拷贝dirdef/defgen_wpkg.prm文件到目标端dirdef/目录下
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 PURGEOLDEXTRACTS /opt/ggs/dirdat/*,usecheckpoints, minkeepdays 3
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP test.*,TARGET test.*;
add replicat rep_wpkg, exttrail /opt/ggs/dirdat/W3,begin now
gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties #The following resolves the topic name using the short table name gg.handler.kafkahandler.topicMappingTemplate=${tableName} #The following selects the message key using the concatenated primary keys gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys} gg.handler.kafkahandler.format=json_row gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic gg.handler.kafkahandler.BlockingSend =false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode=op
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=/opt/ggs/ggjava/ggjava.jar gg.handler.kafkahandler.topicMappingTemplate kafka topic名称的映射,指定topic名称,也可以通过占位符的方式,例如${tableName},每一张表对应一个topic gg.handler.kafkahandler.SchemaTopicName 表的Schema信息对应的topic名称
配置kafka连接信息custom_kafka_producer.properties
bootstrap.servers=localhost:9092 acks=1 reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size=16384 linger.ms=0
# kafka-console-consumer.sh --zookeeper localhost:2181 --topic mySchemaTopic --from-beginning "$schema":"http://json-schema.org/draft-04/schema#", "title":"TEST.WMS_TEST", "description":"JSON schema for table TEST.WMS_TEST", "definitions":{ "tokens":{ "type":"object", "description":"Token keys and values are free form key value pairs.", "properties":{ "additionalProperties":true "type":"object", "properties":{ "table":{ "description":"The fully qualified table name", "type":"string" "op_type":{ "description":"The operation type", "type":"string" "op_ts":{ "description":"The operation timestamp", "type":"string" "current_ts":{ "description":"The current processing timestamp", "type":"string" "pos":{ "description":"The position of the operation in the data source", "type":"string" "primary_keys":{ "description":"Array of the primary key column names.", "type":"array", "items":{ "type":"string" "minItems":0, "uniqueItems":true "tokens":{ "$ref":"#/definitions/tokens" "id":{ "type":[ "integer", "null" "first_name":{ "type":[ "string", "null" "last_name":{ "type":[ "string", "null" "sex":{ "type":[ "string", "null" "address":{ "type":[ "string", "null" "flag":{ "type":[ "integer", "null" "required":[ "table", "op_type", "op_ts", "current_ts", "pos" "additionalProperties":false }
从topic读取事务日志
kafka-console-consumer.sh --zookeeper localhost:2181 --topic wms_test --from-beginning新增一条记录
INSERT INTO `test`.`wms_test` (`first_name`, `last_name`, `sex`, `address`, `flag`) VALUES (a, b, c, d, 1);在环境变量中添加如下LD_LIBRARY_PATH
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.sodb2安装报错 /ggsci: error while loading shared libraries: libdb2.so.1: cannot open shared object file: No such file or directory
添加db2的lib依赖
export LD_LIBRARY_PATH=$DB2_HOME/lib64/:$LD_LIBRARY_PATH创建队列
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test运行producer
in/kafka-console-producer.sh --broker-list localhost:9092 --topic test运行consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
相关文章
- 深入理解MySQL中的数据类型(mysql中的数据类型)
- MySQL数据源驱动:快速下载体验(mysql数据源驱动下载)
- MySQL查询结果的编号索引方法(mysql查询结果编号)
- MySQL的收费方式及其优势分析(mysql如何收费)
- MySQL官网下载安装指南(mysql官网下载安装)
- MySQL多库同步之路(mysql多库同步)
- 权限MySQL权限设置指南(mysql如何设置)
- MySQL主从同步:配置实例指南(mysql主从同步配置)
- 快速编译MySQL库(编译mysql)
- MySQL实时数据同步:提升实时性能(mysql实时数据同步)
- MySQL查询语句:如何取前十条数据(mysql取前十条)
- 解决 MySQL 主从同步错误的方法:跳过错误处理(mysql主从跳过错误)
- MySQL 快速查询编码技巧(mysql查询编码)
- MySQL 主从复制不同步问题解决方案(mysql主从不同步)
- 如何实现 MySQL 数据库主从同步,同步表数据?(mysql主从同步表)
- MySQL 差异同步:异地复制解放数据管理(mysql 差异同步)
- MySQL如何设置密码?25字中文文章标题:MySQL密码设置详解(mysql密码设置)
- MySQL 数据同步到 Elasticsearch:实现数据无缝转移(mysql同步到es)
- 高效实现数据同步,MySQL两表同步方法大揭秘!(mysql两表同步)
- 一步到位:MySQL快速导入全部数据库(mysql导入全部数据库)
- MySQL主从复制验证:实现可靠的数据同步(mysql主从验证)
- MySQL中如何修改列名称,步骤详解(mysql中 修改列名称)
- MySQL中使用IF作为条件语句的方法(mysql中if as)
- 监测MySQL数据库变化C语言实现(C mysql变化监听)
- CDC实现MySQL数据同步(cdc同步 mysql)
- MySQL间的三台数据库同步(3个mysql之间同步)
- MySQL实现三主同步技术简介(mysql 三主同步)
- MySQL查询不包含某个值(mysql不包含某个值)
- MySQL实现虚拟主机上传功能(mysql上传虚拟主机)