Hudi-Flink SQL实时读取Hudi表数据
2023-09-11 14:14:34 时间
代码如下(hudi表实时写入参考上一篇[Hudi-Flink消费kafka将增量数据实时写入Hudi])
package com.zhen.hudi; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; /** * @Author FengZhen * @Date 3/10/22 8:33 PM * @Description 基于Flink SQL Connector实现:从hudi表中加载数据,编写SQL查询 */ public class FlinkSQLReadDemo { public static void main(String[] args) { //1.获取表的执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); //2.创建输入表,TODO:加载hudi表数据 tableEnv.executeSql( "CREATE TABLE order_hudi(\n" + " `orderId` STRING PRIMARY KEY NOT ENFORCED,\n" + " `userId` STRING,\n" + " `orderTime` STRING,\n" + " `ip` STRING,\n" + " `orderMoney` DOUBLE,\n" + " `orderStatus` INT,\n" + " `ts` STRING,\n" + " `partition_day` STRING\n" + ")\n" + "PARTITIONED BY (partition_day)\n" + "WITH(\n" + " 'connector' = 'hudi',\n" + " 'path'='hdfs://localhost:9000/hudi-warehouse/flink_hudi_order',\n" + " 'table.type' = 'MERGE_ON_READ',\n" + " 'read.streaming.enabled' = 'true',\n" + " 'read.streaming.check-interval' = '4'\n" + ")" ); //3.执行查询语句,流式读取hudi表数据 tableEnv.executeSql( "SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day FROM order_hudi" ).print(); } }
相关文章
- 【总目录】本博客博文总目录-实时更新
- 如何实时主动监控你的网站接口是否挂掉并及时报警
- Kafka + Flink 出现异常 java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/DeserializationSchema
- EasyPlayerPro Windows播放器实时流进行本地缓冲区即时回放功能实现
- Flink实时动态修改算子规则
- inotify + rsync实现web镜像实时同步
- 【STM32H7的DSP教程】第38章 STM32H7的FIR高通滤波器实现(支持逐个数据的实时滤波)
- 利用动态图层实现数据的实时显示
- promise 异步问题。导致页面没有实时刷新的解决办法:改成同步
- C/C++(stdout)实时输出
- iOS使用ffmpeg播放rstp实时监控视频数据流
- 日志框架实现实时改动,实时生效,详细框架思路(5)
- rsync+inotify实时同步——筑梦之路
- Win10电脑关闭实时保护功能方法教学
- 大数据框架对比:Hadoop、Storm、Samza、Spark和Flink——flink支持SQL,待看
- python摄像头实时人脸检测数据收集
- 实时计算之storm