Hudi-Flink SQL实时读取kafka数据写入Hudi表
2023-09-11 14:14:34 时间
0.进入shell
./sql-client.sh embedded shell
1.建表关联kafka
CREATE TABLE order_kafka_source( `orderId` STRING, `userId` STRING, `orderTime` STRING, `ip` STRING, `orderMoney` DOUBLE, `orderStatus` INT ) WITH( 'connector' = 'kafka', 'topic'='order-topic', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'gid-1001', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' );
2.建表sink到hudi
CREATE TABLE order_hudi_sink( `orderId` STRING PRIMARY KEY NOT ENFORCED, `userId` STRING, `orderTime` STRING, `ip` STRING, `orderMoney` DOUBLE, `orderStatus` INT, `ts` STRING, `partition_day` STRING ) PARTITIONED BY (partition_day) WITH( 'connector' = 'hudi', 'path'='hdfs://localhost:9000/hudi-warehouse/flink_hudi_order', 'table.type' = 'MERGE_ON_READ', 'write.operation' = 'upsert', 'hoodie.datasource.write.recordkey.field' = 'orderId', 'write.precombine.field' = 'ts', 'write.tasks' = '1', 'compaction.tasks' = '1', 'compaction.async.enable' = 'true', 'compaction.trigger.strategy' = 'num_commits', 'compaction.delta_commits' = '1' );
3.写入hudi表
INSERT INTO order_hudi_sink SELECT orderId,userId,orderTime,ip,orderMoney,orderStatus, substring(orderId, 0, 17) AS ts, substring(orderTime, 0, 10) AS partition_day FROM order_kafka_source;
相关文章
- Python 操作 Kafka --- kafka-python
- Hudi-Flink消费kafka将增量数据实时写入Hudi(java)
- Kafka-分区
- Kafka分区与消费者的关系
- Kafka为什么性能这么快?
- 114 Kafka核心组件
- FlinkSQL 实时采集Kafka内容到MySQL(实战记录)
- 将postgresql中的数据实时同步到kafka中(转载)
- Kafka中时间轮分析与Java实现
- 用strings命令查看kafka-log内容 过滤二进制编码
- 实时数仓系列-网易云音乐基于 Flink + Kafka 的实时数仓建设实践
- Kafka启动遇到ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
- KAFKA value foreach is not a member of org.apache.kafka.clients.consumer.ConsumerRecords
- kafka producer batch expired TimeoutException: KAFKA-5621、KIP-91(Provide Intuitive User Timeouts in The Producer)、KAFKA-5886
- Linkedin官方kafka性能压测-kafkaBenchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines)
- 进行Spark,Kafka针对Kerberos相关配置
- kafka-rest:A Comprehensive, Open Source REST Proxy for Kafka
- Kafka配置项unclean.leader.election.enable造成consumer出现offset重置现象
- Kafka史上最详细原理总结
- kafka具体解释一、Kafka简单介绍