zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

使用OGG微服务将Oracle同步到kafka(全量+增量)

OracleKafka同步服务 使用 增量 OGG 全量
2023-06-13 09:18:57 时间

环境准备

Oracle环境

-- 创建专用网络
docker network create --subnet=172.72.7.0/24 ora-network




-- oracle 压测工具
docker pull lhrbest/lhrdbbench:1.0 

docker rm -f lhrdbbench
docker run -d --name lhrdbbench -h lhrdbbench \
  --net=ora-network --ip 172.72.7.33 \
  -v /sys/fs/cgroup:/sys/fs/cgroup \
  --privileged=true lhrbest/lhrdbbench:1.0 \
  /usr/sbin/init




-- Oracle 12c
docker rm -f lhrora1221
docker run -itd --name lhrora1221 -h lhrora1221 \
  --net=ora-network --ip 172.72.7.34 \
  -p : -p : \
  --privileged=true \
  lhrbest/oracle_12cr2_ee_lhr_12.2.0.1:2.0 init




-- oracle数据库配置
1.开启数据库归档--如果没有开启
2.开启数据库级别附加日志--如果没有开始最小附加日志
3.开启强制日志--如果没有开启强制日志
4.设置ENABLE_GOLDENGATE_REPLICAT参数为TRUE
5.创建OGG用户包括包括源端用户、目标端用户以及OGG抽取用户


alter database add supplemental log data;
alter database add supplemental log data (all) columns;
alter database force logging;
alter system set enable_goldengate_replication=TRUE;

select name,supplemental_log_data_min , force_logging, log_mode from v$database;


alter system set streams_pool_size = M;
alter system set sga_max_size = g scope=spfile;
alter system set sga_target = g scope=spfile;
alter system set pga_aggregate_target=g;
startup force


-- OGG管理用户
CREATE USER ogg identified by lhr;
GRANT DBA to ogg;
grant SELECT ANY DICTIONARY to ogg;
GRANT EXECUTE ON SYS.DBMS_LOCK TO ogg;
grant select any transaction to ogg;
grant select any table to ogg;
grant flashback any table to ogg;
grant alter any table to ogg;

exec dbms_goldengate_auth.grant_admin_privilege('OGG','*',TRUE); 


-- 业务用户
CREATE USER lhr identified by lhr;
alter user lhr identified by lhr;
GRANT DBA to lhr ;
grant SELECT ANY DICTIONARY to lhr;
GRANT EXECUTE ON SYS.DBMS_LOCK TO lhr;



-- 启动监听
vi /u01/app/oracle/product/12.2.0.1/dbhome_1/network/admin/listener.ora
lsnrctl start
lsnrctl status

Oracle数据初始化

-- 源端数据初始化
/usr/local/swingbench/bin/oewizard  -s -create -c /usr/local/swingbench/wizardconfigs/oewizard.xml -create \
-version 2.0  -cs //172.72.7.34/lhrsdb  -dba "sys as sysdba" -dbap lhr -dt thin \
-ts users -u lhr -p lhr -allindexes  -scale 0.0001  -tc  -v -cl


col TABLE_NAME format a30
SELECT a.table_name,a.num_rows FROM dba_tables a where a.OWNER='LHR' ;
select object_type,count(*) from dba_objects where owner='LHR' group by object_type;
select object_type,status,count(*) from dba_objects where owner='LHR' group by object_type,status;
select sum(bytes)// from dba_segments where owner='LHR';

-- 检查键是否正确:https://www.xmmup.com/ogg-01296-biaoyouzhujianhuoweiyijiandanshirengranshiyongquanbulielaijiexixing.html
-- 否则OGG启动后,会报错:OGG-01296、OGG-06439、OGG-01169  Encountered an update where all key columns for target table LHR.ORDER_ITEMS are not present.
select owner, constraint_name, constraint_type, status, validated 
from dba_constraints 
where owner='LHR' 
and VALIDATED='NOT VALIDATED';

select 'alter table lhr.'||TABLE_NAME||' enable validate constraint '||CONSTRAINT_NAME||';' 
from dba_constraints 
where owner='LHR'
and VALIDATED='NOT VALIDATED';


-- 删除外键
SELECT 'ALTER TABLE  LHR.'|| D.TABLE_NAME ||' DROP constraint '|| D.CONSTRAINT_NAME||';' 
FROM DBA_constraints d where  owner='LHR' and d.CONSTRAINT_TYPE='R';




sqlplus lhr/lhr@172.72.7.34:1521/lhrsdb

@/oggoracle/demo_ora_create.sql
@/oggoracle/demo_ora_insert.sql


SQL> select * from tcustmer;

CUST NAME                           CITY                 ST
---- ------------------------------ -------------------- --
WILL BG SOFTWARE CO.                SEATTLE              WA
JANE ROCKY FLYER INC.               DENVER               CO


-- 创建2个clob和blob类型的表
sqlplus lhr/lhr@172.72.7.34:1521/lhrsdb  @/oggoracle/demo_ora_lob_create.sql
exec testing_lobs;
select * from lhr.TSRSLOB;

drop table IMAGE_LOB;
CREATE TABLE IMAGE_LOB (
   T_ID VARCHAR2 () NOT NULL,
   T_IMAGE BLOB,
   T_CLOB  CLOB 
   );

-- 插入blob文件  
CREATE OR REPLACE DIRECTORY D1 AS '/home/oracle/';
grant all on DIRECTORY  D1 TO PUBLIC;
CREATE OR REPLACE NONEDITIONABLE PROCEDURE IMG_INSERT(TID      VARCHAR2,
                                                      FILENAME VARCHAR2,
                                                      name VARCHAR2) AS
    F_LOB BFILE;
    B_LOB BLOB;
BEGIN
    INSERT INTO IMAGE_LOB
        (T_ID, T_IMAGE,T_CLOB)
    VALUES
        (TID, EMPTY_BLOB(),name) RETURN T_IMAGE INTO B_LOB;
    F_LOB := BFILENAME('D1', FILENAME);
    DBMS_LOB.FILEOPEN(F_LOB, DBMS_LOB.FILE_READONLY);
    DBMS_LOB.LOADFROMFILE(B_LOB, F_LOB, DBMS_LOB.GETLENGTH(F_LOB));
    DBMS_LOB.FILECLOSE(F_LOB);
    COMMIT;
END;
/


BEGIN
    IMG_INSERT('1','1.jpg','xmmup.com');
    IMG_INSERT('2','2.jpg','www.xmmup.com');
 END;
/


select * from IMAGE_LOB;





-----  oracle所有表
SQL> select * from tab;

TNAME                          TABTYPE  CLUSTERID
------------------------------ ------- ----------
ADDRESSES                      TABLE
CARD_DETAILS                   TABLE
CUSTOMERS                      TABLE
IMAGE_LOB                      TABLE
INVENTORIES                    TABLE
LOGON                          TABLE
ORDERENTRY_METADATA            TABLE
ORDERS                         TABLE
ORDER_ITEMS                    TABLE
PRODUCTS                       VIEW
PRODUCT_DESCRIPTIONS           TABLE
PRODUCT_INFORMATION            TABLE
PRODUCT_PRICES                 VIEW
TCUSTMER                       TABLE
TCUSTORD                       TABLE
TSRSLOB                        TABLE
TTRGVAR                        TABLE
WAREHOUSES                     TABLE

18 rows selected.



SELECT COUNT(*) FROM LHR.ADDRESSES                      UNION ALL
SELECT COUNT(*) FROM LHR.CARD_DETAILS                   UNION ALL
SELECT COUNT(*) FROM LHR.CUSTOMERS                      UNION ALL
SELECT COUNT(*) FROM LHR.IMAGE_LOB                      UNION ALL
SELECT COUNT(*) FROM LHR.INVENTORIES                    UNION ALL
SELECT COUNT(*) FROM LHR.LOGON                          UNION ALL
SELECT COUNT(*) FROM LHR.ORDERENTRY_METADATA            UNION ALL
SELECT COUNT(*) FROM LHR.ORDERS                         UNION ALL
SELECT COUNT(*) FROM LHR.ORDER_ITEMS                    UNION ALL
SELECT COUNT(*) FROM LHR.PRODUCT_DESCRIPTIONS           UNION ALL
SELECT COUNT(*) FROM LHR.PRODUCT_INFORMATION            UNION ALL
SELECT COUNT(*) FROM LHR.TCUSTMER                       UNION ALL
SELECT COUNT(*) FROM LHR.TCUSTORD                       UNION ALL
SELECT COUNT(*) FROM LHR.TSRSLOB                        UNION ALL
SELECT COUNT(*) FROM LHR.TTRGVAR                        UNION ALL
SELECT COUNT(*) FROM LHR.WAREHOUSES
;

  COUNT(*)
----------
       150
       150
       100
         2
    900724
       239
         4
       143
       773
      1000
      1000
         2
         2
         1
         0
      1000

16 rows selected.

最终,在Oracle端共包括16张表,2个视图,其中2个表TSRSLOB和IMAGE_LOB包括了blob和clob字段。

目标端kafka环境

docker pull lhrbest/kafka:3.2.0


docker rm -f lhrkafka
docker run -itd --name lhrkafka -h lhrkafka \
  --net=ora-network --ip 172.72.7.44 \
  -p 9092:9092 -p 2181:2181 \
  -v /sys/fs/cgroup:/sys/fs/cgroup \
  --privileged=true lhrbest/kafka:3.2.0 \
  /usr/sbin/init

docker exec -it lhrkafka bash


-- 启动(默认已启动)
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

[root@lhrkafka /]# jps
 QuorumPeerMain
 Kafka
 Jps
[root@lhrkafka /]# ps -ef|grep java
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* root                   : ?        :: java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis= -XX:InitiatingHeapOccupancyPercent= -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel= -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles= -XX:GCLogFileSize=M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/kafka/config/zookeeper.properties
*/
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* root                  : ?        :: java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis= -XX:InitiatingHeapOccupancyPercent= -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel= -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles= -XX:GCLogFileSize=M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port= -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar kafka.Kafka /usr/local/kafka/config/server.properties
*/
root                : pts/    :: grep --color=auto java
[root@lhrkafka /]# netstat -tulnp | grep java
tcp               0.0.0.0:            0.0.0.0:*               LISTEN      /java            
tcp               0.0.0.0:            0.0.0.0:*               LISTEN      /java            
tcp               0.0.0.0:           0.0.0.0:*               LISTEN      /java            
tcp               0.0.0.0:           0.0.0.0:*               LISTEN      /java            
tcp               0.0.0.0:           0.0.0.0:*               LISTEN      /java            
tcp               0.0.0.0:            0.0.0.0:*               LISTEN      /java 

kafka默认占用9092端口,ZK默认占用2181端口。

kafka日志:

tailf /usr/local/kafka/logs/server.log

测试一下,在服务器上创建一个topic为test,然后生产几条信息:

-- 生产者
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
>hello
>world



-- 在另一台机器上,开启消费者控制台,监听test的topic,发现可以收到数据
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic test --from-beginning
hello
word




-- 查看当前服务器中的所有 topic
/usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092

源端OGG for Oracle微服务环境

-- OGG机器
docker pull lhrbest/ogg213maoracle:v1.0

docker rm -f lhrogg213maoracle
docker run -d --name lhrogg213maoracle -h lhrogg213maoracle \
  --net=ora-network --ip 172.72.7.100 \
  -p 9391:3389 -p 29000-29005:9000-9005 \
  -v /sys/fs/cgroup:/sys/fs/cgroup \
  --privileged=true lhrbest/ogg213maoracle:v1.0 \
  /usr/sbin/init


docker exec -it lhrogg213maoracle bash

su - oracle
adminclient
CONNECT http://127.0.0.1:9000 deployment deploy213 as oggadmin password lhr

访问:http://192.168.1.35:29001 ,用户名:oggadmin,密码:lhr

创建身份证明、添加trandata

ogg@172.72.7.34/lhrsdb

目标端OGG for bigdata微服务环境

docker pull lhrbest/ogg214mabigdata:v1.0

docker rm -f lhrogg214mabigdata
docker run -d --name lhrogg214mabigdata -h lhrogg214mabigdata \
  --net=ora-network --ip 172.72.7.101 \
  -p 9191:3389 -p 9000-9005:9000-9005 \
  -v /sys/fs/cgroup:/sys/fs/cgroup \
  --privileged=true lhrbest/ogg214mabigdata:v1.0 \
  /usr/sbin/init


docker exec -it lhrogg214mabigdata bash


-- 配置kafka参数
vi /ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
gg.handler.kafkahandler.schemaTopicName=LHR_OGG


vi  /ogg214c/ogg_deploy/etc/conf/ogg/custom_kafka_producer.properties
bootstrap.servers=172.72.7.44:9092

访问:http://192.168.1.35:9001 ,用户名:oggadmin,密码:lhr

全量同步

注意:在此阶段,源端需要停业务,不能产生新数据。

源端创建初始化加载

EXTRACT ext0
USERIDALIAS ora12c  domain OGGMA
rmthost 172.72.7.101,mgrport 9003
rmtfile ./dirdat/e0
tableexclude LHR.PRODUCTS;
tableexclude LHR.PRODUCT_PRICES;
TABLE LHR.*;

查询报告,说明数据已经传输到目标端了,如下:

进入OS查询:

[root@lhrogg214mabigdata dirdat]# pwd
/ogg214c/ogg_deploy/var/lib/data/dirdat
[root@lhrogg214mabigdata dirdat]# ll
total 84236
-rw-r----- 1 oracle oinstall 86256166 Jul 22 12:52 e0000000
[root@lhrogg214mabigdata dirdat]# 
[root@lhrogg214mabigdata dirdat]# ll -h
total 83M
-rw-r----- 1 oracle oinstall 83M Jul 22 12:52 e0000000

目标端kafka数据全量初始化

REPLICAT rep0
targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
end runtime
map LHR.*, target LHR.*;

运行完后,自动停止:

全量同步结果检查

image-20220722142220713

-- 查看所有历史数据
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic LHR_OGG --from-beginning



-- 查看当前服务器中的所有 topic
/usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092


-- topic详情
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server  localhost:9092 --describe --topic LHR_OGG

一张表一个主题,如下:

[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
__consumer_offsets
test
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
ADDRESSES
CARD_DETAILS
CUSTOMERS
EMP
IMAGE_LOB
INVENTORIES
LHR_OGG
LOGON
ORDERENTRY_METADATA
ORDERS
ORDER_ITEMS
PRODUCT_DESCRIPTIONS
PRODUCT_INFORMATION
TCUSTMER
TCUSTORD
TSRSLOB
WAREHOUSES
__consumer_offsets
test
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092 | wc -l
19
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server  localhost:9092 --describe --topic WAREHOUSES
Topic: WAREHOUSES       TopicId: HR3273rMTK6JsQt8OTjKNA PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: WAREHOUSES       Partition: 0    Leader: 0       Replicas: 0     Isr: 0
[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic WAREHOUSES --from-beginning | wc -l       
^CProcessed a total of 1000 messages

数据已全量同步完成。

增量同步

Oracle端配置

数据目录:/ogg213c/ogg_deploy/var/lib/data/dirdat

image-20220722142355233

EXTRACT ext1
USERIDALIAS ora12c DOMAIN OGGMA
DDL INCLUDE MAPPED
DDLOPTIONS REPORT
EXTTRAIL ./dirdat/e1
table LHR.*;

源端配置数据分发服务

登陆:http://192.168.1.35:29002

trail://172.72.7.100:9002/services/v2/sources?trail=./dirdat/e1
ogg://172.72.7.101:9003/services/v2/targets?trail=./dirdat/e1

此时,bigdata会自动添加接收方服务:

文件已传输到目标端:

[root@lhrogg214mabigdata dirdat]# ll
total 84252
-rw-r----- 1 oracle oinstall 86256166 Jul 22 12:52 e0000000
-rw-r----- 1 oracle oinstall    13994 Jul 22 15:37 e1000000000
[root@lhrogg214mabigdata dirdat]# 

kafka端应用配置

目标端选项较多,包括:Warehouse、Cassandra、HBase、HDFS、JDBC、Kafka和Kafka Connect等。

REPLICAT rep1
targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
map LHR.*, target LHR.*;

增量测试

LHR@lhrsdb> delete from ADDRESSES where rownum<=;

1 row deleted.

LHR@lhrsdb> commit;

Commit complete.

源端:

数据分发:

kafka端:

命令行接收:

[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic ADDRESSES
LHR.ADDRESSESD42022-07-22 15:47:58.00481242022-07-22 15:48:01.593000(00000000000000026625&2022-06-28 16:15:0025Street NameTalgarthBerkshireV

可见,数据会增量同步的。

使用kafka manager查看kafka数据

参考:https://www.xmmup.com/kafkatuxingguanligongjucmakkafka-manageranzhuangjishiyong.html

docker pull registry.cn-hangzhou.aliyuncs.com/lhrbest/kafkamanager_cmak:3.0.0.6

docker rm -f lhrkafkamanager
docker run -itd --name lhrkafkamanager -h lhrkafkamanager \
  --net=ora-network --ip 172.72.7.45 \
  -p 9100:9000  \
  -v /sys/fs/cgroup:/sys/fs/cgroup \
  --privileged=true lhrbest/kafkamanager_cmak:3.0.0.6 \
  /usr/sbin/init

docker exec -it lhrkafkamanager bash

web登陆地址:http://192.168.1.35:9100/

总结

1、配置数据分发服务时,需要注意dirdat的位置

2、分发是9002端口,接收是9003端口。