大数据Flink进阶(十二):Flink本地模式开启WebUI
2023-06-13 09:18:23 时间
Flink本地模式开启WebUI
在工作中我们一般使用IntelliJ IDEA开发工具进行代码开发,为了能方便快速的调试Flink和了解Flink程序的运行情况,我们希望本地开发工具中运行Flink时能查看到WebUI,这就可以在编写Flink程序时开启本地WebUI。
一、在Flink 项目中添加本地模式 WebUI的依赖
在Flink1.15版本之前根据使用Scala版本在Java Flink项目或Scala Flink项目中添加对应Scala版本的依赖。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
在Flink1.15版本之后,无论是Java Flink项目还是Scala Flink项目,添加如下依赖,不需额外依赖Scala版本。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
二、在代码中启用本地WebUI
Flink Java 代码启动本地WebUI:
Configuration conf = new Configuration();
//设置WebUI绑定的本地端口
conf.setString(RestOptions.BIND_PORT,"8081");
//使用配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
Flink Scala 代码启动本地WebUI:
val configuration = new Configuration()
//设置WebUI绑定的本地端口
configuration.set(RestOptions.BIND_PORT,"8081")
//使用配置
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
三、编写完整代码启动并访问WebUI
Java 代码示例:
//1.使用本地模式
Configuration conf = new Configuration();
//设置WebUI绑定的本地端口
conf.setString(RestOptions.BIND_PORT,"8081");
//使用配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
//2.读取Socket数据
DataStreamSource<String> ds = env.socketTextStream("node3", 9999);
//3.准备K,V格式数据
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = ds.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
String[] words = line.split(",");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
//4.聚合打印结果
tupleDS.keyBy(tp -> tp.f0).sum(1).print();
//5.execute触发执行
env.execute();
代码运行:
Scala代码示例:
//1.创建本地WebUI环境
val configuration = new Configuration()
//设置绑定的本地端口
configuration.set(RestOptions.BIND_PORT,"80")
//第一种设置方式
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
//2.Scala 流处理导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型
import org.apache.flink.streaming.api.scala._
//3.读取Socket数据
val linesDS: DataStream[String] = env.socketTextStream("node3", 9999)
//4.进行WordCount统计
linesDS.flatMap(line=>{line.split(",")})
.map((_,1))
.keyBy(_._1)
.sum(1)
.print()
//5.最后使用execute 方法触发执行
env.execute()
以上代码启动任意一个都可以通过访问:http://localhost来查看WebUI。
注意:启动代码之前在node3首选启动Socket服务,然后再启动代码。在导入flink-runtime-web依赖之后最好重启开发工具,重新加载对应的依赖包,否则可能执行代码之后访问本地WebUI时出现"{"errors":["Not found: /"]}"错误,访问不到WebUI情况。
相关文章
- Flink简介
- 大数据-Flink环境部署(Windows)及Flink编程
- Flink本地模式安装和使用
- 解决hudi hms catalog中flink建表,spark无法写入问题
- flink中文社区_flink demo
- Flink SQL Checkpoint 学习总结
- Flink SQL增量查询Hudi表
- Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面
- Apache Flink 1.16 功能解读
- 大数据Flink进阶(五):Flink开发环境准备
- 大数据Flink进阶(二):数据架构的演变
- 大数据Flink进阶(六):Flink入门案例
- 更快更稳更易用: Flink 自适应批处理能力演进
- Flink创始团队二次创业再被收购,Kafka母公司与阿里“遭遇战”已经开始
- Flink CEP 新特性进展与在实时风控场景的落地
- Linux基础:Flink容错机制之作业执行和守护进程
- Flink技术与Oracle数据库结合,助力数据分析(flink与oracle)