zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

Hudi-Flink SQL实时读取kafka数据写入Hudi表

Kafka实时flinkSQL数据 读取 写入 hudi
2023-09-11 14:14:34 时间

0.进入shell

./sql-client.sh embedded shell
 

1.建表关联kafka

CREATE TABLE order_kafka_source(
    `orderId` STRING,
    `userId` STRING,
    `orderTime` STRING,
    `ip` STRING,
    `orderMoney` DOUBLE,
    `orderStatus` INT
)
WITH(
    'connector' = 'kafka',
    'topic'='order-topic',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'gid-1001',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);

 

 

2.建表sink到hudi

CREATE TABLE order_hudi_sink(
    `orderId` STRING PRIMARY KEY NOT ENFORCED,
    `userId` STRING,
    `orderTime` STRING,
    `ip` STRING,
    `orderMoney` DOUBLE,
    `orderStatus` INT,
    `ts` STRING,
    `partition_day` STRING
)
PARTITIONED BY (partition_day)
WITH(
    'connector' = 'hudi',
    'path'='hdfs://localhost:9000/hudi-warehouse/flink_hudi_order',
    'table.type' = 'MERGE_ON_READ',
    'write.operation' = 'upsert',
    'hoodie.datasource.write.recordkey.field' = 'orderId',
    'write.precombine.field' = 'ts',
    'write.tasks' = '1',
    'compaction.tasks' = '1',
    'compaction.async.enable' = 'true',
    'compaction.trigger.strategy' = 'num_commits',
    'compaction.delta_commits' = '1'
);

 

 

3.写入hudi表

INSERT INTO order_hudi_sink
SELECT 
    orderId,userId,orderTime,ip,orderMoney,orderStatus,
    substring(orderId, 0, 17) AS ts, substring(orderTime, 0, 10) AS partition_day
FROM order_kafka_source;