zl程序教程

您现在的位置是:首页 >  其他

当前栏目

湖仓一体电商项目(十四):实时任务执行流程

流程实时项目执行 任务 电商 十四 一体
2023-06-13 09:12:26 时间

​实时任务执行流程

目前暂时将项目在本地执行,执行顺序如下:

一、准备环境

这里默认HDFS、Hive、HBase、Kafka环境已经准备,启动maxwell组件监控mysql业务库数据:

#在Kafka中创建好对应的kafka topic(已创建的topic,可忽略,避免重复创建)
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DB-BUSSINESS-DATA --partitions 3 --replication-factor 3

./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-ODS-TOPIC --partitions 3 --replication-factor 3

./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DIM-TOPIC --partitions 3 --replication-factor 3

./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWD-USER-LOGIN-TOPIC --partitions 3 --replication-factor 3

./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWS-USER-LOGIN-WIDE-TOPIC --partitions 3 --replication-factor 3

#启动maxwell
[root@node3 ~]# cd /software/maxwell-1.28.2/bin
[root@node3 bin]#  maxwell --config ../config.properties

#在Hive中创建好需要的Iceberg各层的表
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

CREATE TABLE ODS_MEMBER_INFO  (
id string,
user_id string,
member_growth_score string,
member_level string,
balance string,
gmt_create string,
gmt_modified string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_MEMBER_INFO/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);


CREATE TABLE ODS_MEMBER_ADDRESS  (
id string,
user_id string,
province string,
city string,
area string,
address string,
log string,
lat string,
phone_number string,
consignee_name string,
gmt_create string,
gmt_modified string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_MEMBER_ADDRESS/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE ODS_USER_LOGIN (
id string,
user_id string,
ip string,
login_tm string,
logout_tm string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_USER_LOGIN/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE DWD_USER_LOGIN (
id string,
user_id string,
ip string,
login_tm string,
logout_tm string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWD_USER_LOGIN/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE DWS_USER_LOGIN (
user_id string,
ip string,
gmt_create string,
login_tm string,
logout_tm string,
member_level string,
province string,
city string,
area string,
address string,
member_points string,
balance string,
member_growth_score string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWS_USER_LOGIN/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);


#启动Clickhouse
[root@node1 ~]# service clickhouse-server start

#在Clickhouse中创建好对应表
create table dm_user_login_info(
 dt String,
 province String,
 city String,
 user_id String,
 login_tm String,
 gmt_create String
) engine = MergeTree() order by dt;

二、启动Flink代码

依次启动如下Flink代码:”ProduceKafkaDBDataToODS.scala”、“DimDataToHBase.scala”、“ProduceKafkaODSDataToDWD.scala”、“ProduceUserLogInToDWS.scala”、“ProcessUserLoginInfoToDM.scala”代码。各个代码中Kafka Connector属性“scan.startup.mode”设置为“latest-offset”,从最新位置消费数据。

注意:代码执行时可以设置使用内存参数:-Xmx300m -Xms300m

三、启动数据采集接口代码

启动项目“LakeHouseDataPublish”发布数据。

四、启动模拟数据代码

启动项目“LakeHouseMockData”中模拟向数据库中生产数据代码“RTMockDBData.java”。