zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

大数据Kudu(十):Flink操作Kudu

2023-02-19 12:17:54 时间

Flink操作Kudu

Flink主要应用场景是流式数据处理上,有些公司针对流式数据使用Flink实时分析后将结果存入Kudu,例如快手公司。这里将实时计算的结果存入Kudu需要自定义Flink Kudu Sink。

场景:Flink实时读取Socket数据,将结果存入Kudu表t_flink_result,为了方便操作不再创建Kudu外表,这里在Impala中创建Kudu内表t_flink_result:

create table t_flink_result
(
	id  int,
	name string,
	age int,
	primary key (id)
)
partition by hash partitions 3
stored as kudu
tblproperties(
 'kudu.master_address' = 'cm1:7150,cm2:7150'
)

在Maven中导入以下Flink 包依赖:

<!-- Flink 开发Scala需要导入以下依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.9.1</version>
</dependency>

Flink 自定义KuduSink 代码如下:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val ds: DataStream[String] = env.socketTextStream("cm3",9999)

//自定义KuduSink
ds.addSink(new RichSinkFunction[String] {
  //初始化连接Kudu对象
  var kuduClient :KuduClient = _

  //Kudu 表对象
  var kuduTable :KuduTable = _

  //创建KuduSession 客户端会话
  var session: KuduSession = _

  //初始化时调用一次,这里初始化连接Kudu的对象
  override def open(parameters: Configuration): Unit = {
    kuduClient = new KuduClientBuilder("cm1:7051,cm2:7051").build()
    kuduTable = kuduClient.openTable("impala::default.t_flink_result")
    session = kuduClient.newSession()
    //设置插入数据策略
    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
  }

  //来一条数据这里调用一次invoke方法
  override def invoke(one: String, context: SinkFunction.Context[_]): Unit = {
    val arr: Array[String] = one.split(",")
    val id: Int = arr(0).toInt
    val name: String = arr(1)
    val age: Int = arr(2).toInt

    //准备插入的数据
    val insert: Insert = kuduTable.newInsert()
    val row: PartialRow = insert.getRow
    row.addInt("id",id)
    row.addString("name",name)
    row.addInt("age",age)

    //插入到Kudu表中
    session.apply(insert)
  }

  //当Flink 关闭时调用一次,回收连接对象
  override def close(): Unit ={
    session.close()
    kuduClient.close()
  }
})

env.execute()

启动以上Flink 代码,打开Socket 服务器,输入数据,可以在impala 中查询表t_flink_result数据,数据被写入。