大数据Kudu(十):Flink操作Kudu
2023-02-19 12:17:54 时间
Flink操作Kudu
Flink主要应用场景是流式数据处理上,有些公司针对流式数据使用Flink实时分析后将结果存入Kudu,例如快手公司。这里将实时计算的结果存入Kudu需要自定义Flink Kudu Sink。
场景:Flink实时读取Socket数据,将结果存入Kudu表t_flink_result,为了方便操作不再创建Kudu外表,这里在Impala中创建Kudu内表t_flink_result:
create table t_flink_result
(
id int,
name string,
age int,
primary key (id)
)
partition by hash partitions 3
stored as kudu
tblproperties(
'kudu.master_address' = 'cm1:7150,cm2:7150'
)
在Maven中导入以下Flink 包依赖:
<!-- Flink 开发Scala需要导入以下依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
Flink 自定义KuduSink 代码如下:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val ds: DataStream[String] = env.socketTextStream("cm3",9999)
//自定义KuduSink
ds.addSink(new RichSinkFunction[String] {
//初始化连接Kudu对象
var kuduClient :KuduClient = _
//Kudu 表对象
var kuduTable :KuduTable = _
//创建KuduSession 客户端会话
var session: KuduSession = _
//初始化时调用一次,这里初始化连接Kudu的对象
override def open(parameters: Configuration): Unit = {
kuduClient = new KuduClientBuilder("cm1:7051,cm2:7051").build()
kuduTable = kuduClient.openTable("impala::default.t_flink_result")
session = kuduClient.newSession()
//设置插入数据策略
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
}
//来一条数据这里调用一次invoke方法
override def invoke(one: String, context: SinkFunction.Context[_]): Unit = {
val arr: Array[String] = one.split(",")
val id: Int = arr(0).toInt
val name: String = arr(1)
val age: Int = arr(2).toInt
//准备插入的数据
val insert: Insert = kuduTable.newInsert()
val row: PartialRow = insert.getRow
row.addInt("id",id)
row.addString("name",name)
row.addInt("age",age)
//插入到Kudu表中
session.apply(insert)
}
//当Flink 关闭时调用一次,回收连接对象
override def close(): Unit ={
session.close()
kuduClient.close()
}
})
env.execute()
启动以上Flink 代码,打开Socket 服务器,输入数据,可以在impala 中查询表t_flink_result数据,数据被写入。
相关文章
- Android破解心得——记学习七少月安卓大型安全公开课
- 新特性解读 | MySQL 8.0.31 导入直方图存量数据
- 【Qbot】4.连接mysql/限制使用次数
- MySQL 为什么要使用索引及索引创建的原则有哪些?
- MySQL 6种索引数据结构详解:BTree、B+Tree、红黑树、平衡二叉树、二叉树、Hash
- MySQL 聚集索引(InnoDB)和 非聚集索引(MyISAM) 精讲~两张图彻底搞懂
- MySQL 事务隔离级别 理论+实战分析
- MySQL MVCC 多版本并发控制机制 工作原理
- MySQL : 彻底搞懂一条SQL的执行过程
- 彻底搞懂MySQL主从复制工作原理 2+3+3+4
- MySQL Explain 执行计划详解、写高效SQL、灵活使用索引(实战)
- MySQL 数据库 Schema 设计的性能优化①:高效的模型设计
- 图算法、图数据库在风控场景的应用
- 客快物流大数据项目(九十一):ClickHouse的数据库引擎
- 零基础学SQL注入必练靶场之SQLiLabs(搭建+打靶)
- aws生产实践-33:aurora查看触发死锁的sql
- C/C++ Qt 数据库与Chart实现历史数据展示
- C/C++ Qt 数据库SqlRelationalTable关联表
- C/C++ Qt 数据库与SqlTableModel组件应用
- C/C++ Qt 数据库与TableView多组件联动