zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

goldengate_mysql_kafka同步详解大数据

mysqlKafka同步数据 详解 goldengate
2023-06-13 09:20:27 时间
goldengate架构

图片名称

图片名称

goldengate相关概念 Manager进程是GoldenGate的控制进程,运行在源端和目标端上。它主要作用有以下几个方面:启动、监控、重启Goldengate的其他进程,报告错误及事件,分配数据存储空间,发布阀值报告等。在目标端和源端有且只有一个manager进程 Extract运行在数据库源端,负责从源端数据表或者日志中捕获数据 Data Pump进程运行在数据库源端,其作用是将源端产生的本地trail文件,把trail以数据块的形式通过TCP/IP 协议发送到目标端。 Collector进程与Data Pump进程对应 的叫Server Collector进程,这个进程不需要引起我的关注,因为在实际操作过程中,无需我们对其进行任何配置,所以对我们来说它是透明的。它运行在目标端,其 任务就是把Extract/Pump投递过来的数据重新组装成远程ttrail文件。

Replicat进程,通常我们也把它叫做应用进程。运行在目标端,是数据传递的最后一站,负责读取目标端trail文件中的内容。


为了更有效、更安全的把数据库事务信息从源端投递到目标端。GoldenGate引进trail文件的概念。前面提到extract抽取完数据以 后 Goldengate会将抽取的事务信息转化为一种GoldenGate专有格式的文件。然后pump负责把源端的trail文件投递到目标端,所以源、 目标两端都会存在这种文件。 trail文件存在的目的旨在防止单点故障,将事务信息持久化,并且使用checkpoint机制来记录其读写位置,如果故障发生,则数据可以根据checkpoint记录的位置来重传 。

goldengate版本

Oracle GoldenGate 12.3.0.1.1 for MySQL on Linux x86-64 (源端版本) 

Oracle GoldenGate for Big Data 12.3.1.1.1 on Linux x86-64 (目标端版本)

下载地址

http://www.oracle.com/technetwork/middleware/goldengate/downloads/index.html

mysql用户登录

dblogin sourcedb test@localhost:3306,userid root,password 123456 

dynamicportlist 17800-18000 purgeoldextracts ./dirdat/*,usecheckpoints, minkeepdays 7
setenv (MYSQL_HOME="/var/lib/mysql") tranlogoptions altlogdest /var/lib/mysql/mysql-bin.index dboptions host localhost,connectionport 3306 sourcedb test, userid root,password 123456 exttrail /opt/ggs/dirdat/W3 dynamicresolution gettruncates GETUPDATEBEFORES NOCOMPRESSDELETES NOCOMPRESSUPDATES table test.wms_test;
 

# ADD EXTRACT ext_wpkg, tranlog,begin now 


table test.wms_test;
 # ADD EXTRACT pum_wpkg,EXTTRAILSOURCE /opt/ggs/dirdat/W3; 

 # ADD RMTTRAIL /opt/ggs/dirdat/W3, EXTRACT pum_wpkg

defsfile /opt/ggs/dirdef/defgen_wpkg.prm sourcedb [email protected]:3306,userid root,password 123456 table test.wms_entry_warehouse_wpkg;

备注:用于生成表字段映射


生成defgen表字段映射
进入ogg根目录

./defgen paramfile /opt/ggs/dirprm/defgen_wpkg.prm

备注:拷贝dirdef/defgen_wpkg.prm文件到目标端dirdef/目录下


AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 PURGEOLDEXTRACTS /opt/ggs/dirdat/*,usecheckpoints, minkeepdays 3
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP test.*,TARGET test.*;
add replicat rep_wpkg, exttrail /opt/ggs/dirdat/W3,begin now

gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties #The following resolves the topic name using the short table name gg.handler.kafkahandler.topicMappingTemplate=${tableName} #The following selects the message key using the concatenated primary keys gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys} gg.handler.kafkahandler.format=json_row gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic gg.handler.kafkahandler.BlockingSend =false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode=op
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=/opt/ggs/ggjava/ggjava.jar gg.handler.kafkahandler.topicMappingTemplate kafka topic名称的映射,指定topic名称,也可以通过占位符的方式,例如${tableName},每一张表对应一个topic gg.handler.kafkahandler.SchemaTopicName 表的Schema信息对应的topic名称

配置kafka连接信息custom_kafka_producer.properties

bootstrap.servers=localhost:9092 

acks=1 

reconnect.backoff.ms=1000 

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 

# 100KB per partition 

batch.size=16384 

linger.ms=0 


 # kafka-console-consumer.sh --zookeeper localhost:2181 --topic mySchemaTopic --from-beginning 

 "$schema":"http://json-schema.org/draft-04/schema#", 

 "title":"TEST.WMS_TEST", 

 "description":"JSON schema for table TEST.WMS_TEST", 

 "definitions":{ 

 "tokens":{ 

 "type":"object", 

 "description":"Token keys and values are free form key value pairs.", 

 "properties":{ 

 "additionalProperties":true 

 "type":"object", 

 "properties":{ 

 "table":{ 

 "description":"The fully qualified table name", 

 "type":"string" 

 "op_type":{ 

 "description":"The operation type", 

 "type":"string" 

 "op_ts":{ 

 "description":"The operation timestamp", 

 "type":"string" 

 "current_ts":{ 

 "description":"The current processing timestamp", 

 "type":"string" 

 "pos":{ 

 "description":"The position of the operation in the data source", 

 "type":"string" 

 "primary_keys":{ 

 "description":"Array of the primary key column names.", 

 "type":"array", 

 "items":{ 

 "type":"string" 

 "minItems":0, 

 "uniqueItems":true 

 "tokens":{ 

 "$ref":"#/definitions/tokens" 

 "id":{ 

 "type":[ 

 "integer", 

 "null" 

 "first_name":{ 

 "type":[ 

 "string", 

 "null" 

 "last_name":{ 

 "type":[ 

 "string", 

 "null" 

 "sex":{ 

 "type":[ 

 "string", 

 "null" 

 "address":{ 

 "type":[ 

 "string", 

 "null" 

 "flag":{ 

 "type":[ 

 "integer", 

 "null" 

 "required":[ 

 "table", 

 "op_type", 

 "op_ts", 

 "current_ts", 

 "pos" 

 "additionalProperties":false 

}

从topic读取事务日志

kafka-console-consumer.sh --zookeeper localhost:2181 --topic wms_test --from-beginning
新增一条记录
INSERT INTO `test`.`wms_test` (`first_name`, `last_name`, `sex`, `address`, `flag`) VALUES (a, b, c, d, 1); 


在环境变量中添加如下LD_LIBRARY_PATH

export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so

db2安装报错 /ggsci: error while loading shared libraries: libdb2.so.1: cannot open shared object file: No such file or directory

添加db2的lib依赖

export LD_LIBRARY_PATH=$DB2_HOME/lib64/:$LD_LIBRARY_PATH 


创建队列

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

运行producer

in/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

运行consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning