使用OGG微服务将Oracle同步到kafka(全量+增量)
2023-06-13 09:18:57 时间
环境准备
Oracle环境
-- 创建专用网络
docker network create --subnet=172.72.7.0/24 ora-network
-- oracle 压测工具
docker pull lhrbest/lhrdbbench:1.0
docker rm -f lhrdbbench
docker run -d --name lhrdbbench -h lhrdbbench \
--net=ora-network --ip 172.72.7.33 \
-v /sys/fs/cgroup:/sys/fs/cgroup \
--privileged=true lhrbest/lhrdbbench:1.0 \
/usr/sbin/init
-- Oracle 12c
docker rm -f lhrora1221
docker run -itd --name lhrora1221 -h lhrora1221 \
--net=ora-network --ip 172.72.7.34 \
-p : -p : \
--privileged=true \
lhrbest/oracle_12cr2_ee_lhr_12.2.0.1:2.0 init
-- oracle数据库配置
1.开启数据库归档--如果没有开启
2.开启数据库级别附加日志--如果没有开始最小附加日志
3.开启强制日志--如果没有开启强制日志
4.设置ENABLE_GOLDENGATE_REPLICAT参数为TRUE
5.创建OGG用户包括包括源端用户、目标端用户以及OGG抽取用户
alter database add supplemental log data;
alter database add supplemental log data (all) columns;
alter database force logging;
alter system set enable_goldengate_replication=TRUE;
select name,supplemental_log_data_min , force_logging, log_mode from v$database;
alter system set streams_pool_size = M;
alter system set sga_max_size = g scope=spfile;
alter system set sga_target = g scope=spfile;
alter system set pga_aggregate_target=g;
startup force
-- OGG管理用户
CREATE USER ogg identified by lhr;
GRANT DBA to ogg;
grant SELECT ANY DICTIONARY to ogg;
GRANT EXECUTE ON SYS.DBMS_LOCK TO ogg;
grant select any transaction to ogg;
grant select any table to ogg;
grant flashback any table to ogg;
grant alter any table to ogg;
exec dbms_goldengate_auth.grant_admin_privilege('OGG','*',TRUE);
-- 业务用户
CREATE USER lhr identified by lhr;
alter user lhr identified by lhr;
GRANT DBA to lhr ;
grant SELECT ANY DICTIONARY to lhr;
GRANT EXECUTE ON SYS.DBMS_LOCK TO lhr;
-- 启动监听
vi /u01/app/oracle/product/12.2.0.1/dbhome_1/network/admin/listener.ora
lsnrctl start
lsnrctl status
Oracle数据初始化
-- 源端数据初始化
/usr/local/swingbench/bin/oewizard -s -create -c /usr/local/swingbench/wizardconfigs/oewizard.xml -create \
-version 2.0 -cs //172.72.7.34/lhrsdb -dba "sys as sysdba" -dbap lhr -dt thin \
-ts users -u lhr -p lhr -allindexes -scale 0.0001 -tc -v -cl
col TABLE_NAME format a30
SELECT a.table_name,a.num_rows FROM dba_tables a where a.OWNER='LHR' ;
select object_type,count(*) from dba_objects where owner='LHR' group by object_type;
select object_type,status,count(*) from dba_objects where owner='LHR' group by object_type,status;
select sum(bytes)// from dba_segments where owner='LHR';
-- 检查键是否正确:https://www.xmmup.com/ogg-01296-biaoyouzhujianhuoweiyijiandanshirengranshiyongquanbulielaijiexixing.html
-- 否则OGG启动后,会报错:OGG-01296、OGG-06439、OGG-01169 Encountered an update where all key columns for target table LHR.ORDER_ITEMS are not present.
select owner, constraint_name, constraint_type, status, validated
from dba_constraints
where owner='LHR'
and VALIDATED='NOT VALIDATED';
select 'alter table lhr.'||TABLE_NAME||' enable validate constraint '||CONSTRAINT_NAME||';'
from dba_constraints
where owner='LHR'
and VALIDATED='NOT VALIDATED';
-- 删除外键
SELECT 'ALTER TABLE LHR.'|| D.TABLE_NAME ||' DROP constraint '|| D.CONSTRAINT_NAME||';'
FROM DBA_constraints d where owner='LHR' and d.CONSTRAINT_TYPE='R';
sqlplus lhr/lhr@172.72.7.34:1521/lhrsdb
@/oggoracle/demo_ora_create.sql
@/oggoracle/demo_ora_insert.sql
SQL> select * from tcustmer;
CUST NAME CITY ST
---- ------------------------------ -------------------- --
WILL BG SOFTWARE CO. SEATTLE WA
JANE ROCKY FLYER INC. DENVER CO
-- 创建2个clob和blob类型的表
sqlplus lhr/lhr@172.72.7.34:1521/lhrsdb @/oggoracle/demo_ora_lob_create.sql
exec testing_lobs;
select * from lhr.TSRSLOB;
drop table IMAGE_LOB;
CREATE TABLE IMAGE_LOB (
T_ID VARCHAR2 () NOT NULL,
T_IMAGE BLOB,
T_CLOB CLOB
);
-- 插入blob文件
CREATE OR REPLACE DIRECTORY D1 AS '/home/oracle/';
grant all on DIRECTORY D1 TO PUBLIC;
CREATE OR REPLACE NONEDITIONABLE PROCEDURE IMG_INSERT(TID VARCHAR2,
FILENAME VARCHAR2,
name VARCHAR2) AS
F_LOB BFILE;
B_LOB BLOB;
BEGIN
INSERT INTO IMAGE_LOB
(T_ID, T_IMAGE,T_CLOB)
VALUES
(TID, EMPTY_BLOB(),name) RETURN T_IMAGE INTO B_LOB;
F_LOB := BFILENAME('D1', FILENAME);
DBMS_LOB.FILEOPEN(F_LOB, DBMS_LOB.FILE_READONLY);
DBMS_LOB.LOADFROMFILE(B_LOB, F_LOB, DBMS_LOB.GETLENGTH(F_LOB));
DBMS_LOB.FILECLOSE(F_LOB);
COMMIT;
END;
/
BEGIN
IMG_INSERT('1','1.jpg','xmmup.com');
IMG_INSERT('2','2.jpg','www.xmmup.com');
END;
/
select * from IMAGE_LOB;
----- oracle所有表
SQL> select * from tab;
TNAME TABTYPE CLUSTERID
------------------------------ ------- ----------
ADDRESSES TABLE
CARD_DETAILS TABLE
CUSTOMERS TABLE
IMAGE_LOB TABLE
INVENTORIES TABLE
LOGON TABLE
ORDERENTRY_METADATA TABLE
ORDERS TABLE
ORDER_ITEMS TABLE
PRODUCTS VIEW
PRODUCT_DESCRIPTIONS TABLE
PRODUCT_INFORMATION TABLE
PRODUCT_PRICES VIEW
TCUSTMER TABLE
TCUSTORD TABLE
TSRSLOB TABLE
TTRGVAR TABLE
WAREHOUSES TABLE
18 rows selected.
SELECT COUNT(*) FROM LHR.ADDRESSES UNION ALL
SELECT COUNT(*) FROM LHR.CARD_DETAILS UNION ALL
SELECT COUNT(*) FROM LHR.CUSTOMERS UNION ALL
SELECT COUNT(*) FROM LHR.IMAGE_LOB UNION ALL
SELECT COUNT(*) FROM LHR.INVENTORIES UNION ALL
SELECT COUNT(*) FROM LHR.LOGON UNION ALL
SELECT COUNT(*) FROM LHR.ORDERENTRY_METADATA UNION ALL
SELECT COUNT(*) FROM LHR.ORDERS UNION ALL
SELECT COUNT(*) FROM LHR.ORDER_ITEMS UNION ALL
SELECT COUNT(*) FROM LHR.PRODUCT_DESCRIPTIONS UNION ALL
SELECT COUNT(*) FROM LHR.PRODUCT_INFORMATION UNION ALL
SELECT COUNT(*) FROM LHR.TCUSTMER UNION ALL
SELECT COUNT(*) FROM LHR.TCUSTORD UNION ALL
SELECT COUNT(*) FROM LHR.TSRSLOB UNION ALL
SELECT COUNT(*) FROM LHR.TTRGVAR UNION ALL
SELECT COUNT(*) FROM LHR.WAREHOUSES
;
COUNT(*)
----------
150
150
100
2
900724
239
4
143
773
1000
1000
2
2
1
0
1000
16 rows selected.
最终,在Oracle端共包括16张表,2个视图,其中2个表TSRSLOB和IMAGE_LOB包括了blob和clob字段。
目标端kafka环境
docker pull lhrbest/kafka:3.2.0
docker rm -f lhrkafka
docker run -itd --name lhrkafka -h lhrkafka \
--net=ora-network --ip 172.72.7.44 \
-p 9092:9092 -p 2181:2181 \
-v /sys/fs/cgroup:/sys/fs/cgroup \
--privileged=true lhrbest/kafka:3.2.0 \
/usr/sbin/init
docker exec -it lhrkafka bash
-- 启动(默认已启动)
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
[root@lhrkafka /]# jps
QuorumPeerMain
Kafka
Jps
[root@lhrkafka /]# ps -ef|grep java
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释
* root : ? :: java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis= -XX:InitiatingHeapOccupancyPercent= -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel= -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles= -XX:GCLogFileSize=M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/kafka/config/zookeeper.properties
*/
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释
* root : ? :: java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis= -XX:InitiatingHeapOccupancyPercent= -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel= -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles= -XX:GCLogFileSize=M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port= -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar kafka.Kafka /usr/local/kafka/config/server.properties
*/
root : pts/ :: grep --color=auto java
[root@lhrkafka /]# netstat -tulnp | grep java
tcp 0.0.0.0: 0.0.0.0:* LISTEN /java
tcp 0.0.0.0: 0.0.0.0:* LISTEN /java
tcp 0.0.0.0: 0.0.0.0:* LISTEN /java
tcp 0.0.0.0: 0.0.0.0:* LISTEN /java
tcp 0.0.0.0: 0.0.0.0:* LISTEN /java
tcp 0.0.0.0: 0.0.0.0:* LISTEN /java
kafka默认占用9092端口,ZK默认占用2181端口。
kafka日志:
tailf /usr/local/kafka/logs/server.log
测试一下,在服务器上创建一个topic为test,然后生产几条信息:
-- 生产者
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
>hello
>world
-- 在另一台机器上,开启消费者控制台,监听test的topic,发现可以收到数据
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
hello
word
-- 查看当前服务器中的所有 topic
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
源端OGG for Oracle微服务环境
-- OGG机器
docker pull lhrbest/ogg213maoracle:v1.0
docker rm -f lhrogg213maoracle
docker run -d --name lhrogg213maoracle -h lhrogg213maoracle \
--net=ora-network --ip 172.72.7.100 \
-p 9391:3389 -p 29000-29005:9000-9005 \
-v /sys/fs/cgroup:/sys/fs/cgroup \
--privileged=true lhrbest/ogg213maoracle:v1.0 \
/usr/sbin/init
docker exec -it lhrogg213maoracle bash
su - oracle
adminclient
CONNECT http://127.0.0.1:9000 deployment deploy213 as oggadmin password lhr
访问:http://192.168.1.35:29001 ,用户名:oggadmin,密码:lhr
创建身份证明、添加trandata
ogg@172.72.7.34/lhrsdb
目标端OGG for bigdata微服务环境
docker pull lhrbest/ogg214mabigdata:v1.0
docker rm -f lhrogg214mabigdata
docker run -d --name lhrogg214mabigdata -h lhrogg214mabigdata \
--net=ora-network --ip 172.72.7.101 \
-p 9191:3389 -p 9000-9005:9000-9005 \
-v /sys/fs/cgroup:/sys/fs/cgroup \
--privileged=true lhrbest/ogg214mabigdata:v1.0 \
/usr/sbin/init
docker exec -it lhrogg214mabigdata bash
-- 配置kafka参数
vi /ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
gg.handler.kafkahandler.schemaTopicName=LHR_OGG
vi /ogg214c/ogg_deploy/etc/conf/ogg/custom_kafka_producer.properties
bootstrap.servers=172.72.7.44:9092
访问:http://192.168.1.35:9001 ,用户名:oggadmin,密码:lhr
全量同步
注意:在此阶段,源端需要停业务,不能产生新数据。
源端创建初始化加载
EXTRACT ext0
USERIDALIAS ora12c domain OGGMA
rmthost 172.72.7.101,mgrport 9003
rmtfile ./dirdat/e0
tableexclude LHR.PRODUCTS;
tableexclude LHR.PRODUCT_PRICES;
TABLE LHR.*;
查询报告,说明数据已经传输到目标端了,如下:
进入OS查询:
[root@lhrogg214mabigdata dirdat]# pwd
/ogg214c/ogg_deploy/var/lib/data/dirdat
[root@lhrogg214mabigdata dirdat]# ll
total 84236
-rw-r----- 1 oracle oinstall 86256166 Jul 22 12:52 e0000000
[root@lhrogg214mabigdata dirdat]#
[root@lhrogg214mabigdata dirdat]# ll -h
total 83M
-rw-r----- 1 oracle oinstall 83M Jul 22 12:52 e0000000
目标端kafka数据全量初始化
REPLICAT rep0
targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
end runtime
map LHR.*, target LHR.*;
运行完后,自动停止:
全量同步结果检查
image-20220722142220713
-- 查看所有历史数据
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic LHR_OGG --from-beginning
-- 查看当前服务器中的所有 topic
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
-- topic详情
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic LHR_OGG
一张表一个主题,如下:
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
test
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
ADDRESSES
CARD_DETAILS
CUSTOMERS
EMP
IMAGE_LOB
INVENTORIES
LHR_OGG
LOGON
ORDERENTRY_METADATA
ORDERS
ORDER_ITEMS
PRODUCT_DESCRIPTIONS
PRODUCT_INFORMATION
TCUSTMER
TCUSTORD
TSRSLOB
WAREHOUSES
__consumer_offsets
test
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092 | wc -l
19
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic WAREHOUSES
Topic: WAREHOUSES TopicId: HR3273rMTK6JsQt8OTjKNA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: WAREHOUSES Partition: 0 Leader: 0 Replicas: 0 Isr: 0
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic WAREHOUSES --from-beginning | wc -l
^CProcessed a total of 1000 messages
数据已全量同步完成。
增量同步
Oracle端配置
数据目录:/ogg213c/ogg_deploy/var/lib/data/dirdat
image-20220722142355233
EXTRACT ext1
USERIDALIAS ora12c DOMAIN OGGMA
DDL INCLUDE MAPPED
DDLOPTIONS REPORT
EXTTRAIL ./dirdat/e1
table LHR.*;
源端配置数据分发服务
登陆:http://192.168.1.35:29002
trail://172.72.7.100:9002/services/v2/sources?trail=./dirdat/e1
ogg://172.72.7.101:9003/services/v2/targets?trail=./dirdat/e1
此时,bigdata会自动添加接收方服务:
文件已传输到目标端:
[root@lhrogg214mabigdata dirdat]# ll
total 84252
-rw-r----- 1 oracle oinstall 86256166 Jul 22 12:52 e0000000
-rw-r----- 1 oracle oinstall 13994 Jul 22 15:37 e1000000000
[root@lhrogg214mabigdata dirdat]#
kafka端应用配置
目标端选项较多,包括:Warehouse、Cassandra、HBase、HDFS、JDBC、Kafka和Kafka Connect等。
REPLICAT rep1
targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
map LHR.*, target LHR.*;
增量测试
LHR@lhrsdb> delete from ADDRESSES where rownum<=;
1 row deleted.
LHR@lhrsdb> commit;
Commit complete.
源端:
数据分发:
kafka端:
命令行接收:
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic ADDRESSES
LHR.ADDRESSESD42022-07-22 15:47:58.00481242022-07-22 15:48:01.593000(00000000000000026625&2022-06-28 16:15:0025Street NameTalgarthBerkshireV
可见,数据会增量同步的。
使用kafka manager查看kafka数据
参考:https://www.xmmup.com/kafkatuxingguanligongjucmakkafka-manageranzhuangjishiyong.html
docker pull registry.cn-hangzhou.aliyuncs.com/lhrbest/kafkamanager_cmak:3.0.0.6
docker rm -f lhrkafkamanager
docker run -itd --name lhrkafkamanager -h lhrkafkamanager \
--net=ora-network --ip 172.72.7.45 \
-p 9100:9000 \
-v /sys/fs/cgroup:/sys/fs/cgroup \
--privileged=true lhrbest/kafkamanager_cmak:3.0.0.6 \
/usr/sbin/init
docker exec -it lhrkafkamanager bash
web登陆地址:http://192.168.1.35:9100/
总结
1、配置数据分发服务时,需要注意dirdat的位置
2、分发是9002端口,接收是9003端口。
相关文章
- 在Oracle中,如何将一个数据库添加到CRS中?
- Oracle缓存同步的实时优化(oracle缓存同步)
- Oracle实例停止指南(oracle停止实例)
- 深入解析Oracle系统时间,解决时间同步难题(oracle系统时间)
- Oracle 数据同步:实现高效数据共享的关键步骤(oracle同步数据)
- 的数据同步从MySQL到Kafka:实现实时数据同步(mysql到kafka)
- Oracle自动同步的新突破(oracle自动同步)
- 利用Oracle为多个数据表实现同步(oracle 同步数据表)
- 删除Oracle数据库的锁定记录(删除oracle锁)
- 存储Oracle共享存储保持企业数据同步(oracle共用)
- 实时捕获Oracle数据NiFi不离不弃(nifi实时oracle)
- 学习Oracle如何靠它实现你的梦想(m oracle怎么读)
- 利用Kafka与Oracle实现实时数据交换(kafka oracle)
- Oracle数据库中的函数类型介绍(oracle中的函数类型)
- Oracle中同步与异步理解和应用(oracle中同步与异步)
- and的区别Oracle数据库中的OR与AND的差异(oracle中or和)
- Oracle数据库两表同步技术简介(oracle 两表同步)
- Oracle XML安装指南(oracle xml安装)
- Oracle Goldengate – 实时数据库同步技术(oracle gtm)
- 复制Oracle ADG双向复制,实现无缝数据同步(oracle adg双向)