flink 安装及wordcount
2023-09-14 09:02:30 时间
1、下载
http://mirror.bit.edu.cn/apache/flink/
2、安装
确保已经安装java8以上 解压flink tar zxvf flink-1.8.0-bin-scala_2.11.tgz 启动本地模式 $ ./bin/start-cluster.sh # Start Flink
[hadoop@bigdata-senior01 flink-1.8.0]$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host bigdata-senior01.home.com.
Starting taskexecutor daemon on host bigdata-senior01.home.com.
[hadoop@bigdata-senior01 flink-1.8.0]$ jps
1995 StandaloneSessionClusterEntrypoint
2443 TaskManagerRunner
2526 Jps
3、访问flink
http://localhost:8081
4、第一个程序wordcount,从一个socket流中读出字符串,计算10秒内的词频
4.1 引入依赖
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.8.0</version> <scope>provided</scope> </dependency> </dependencies>
4.2 代码
public class SocketWindowWordCount { public static void main(String args[]) throws Exception { // the host and the port to connect to final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch (Exception e) { e.printStackTrace(); System.err.println(e.getMessage()); System.err.println("No port specified. Please run 'SocketWindowWordCount " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l <port>' and " + "type the input text into the command line"); return; } // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream(hostname, port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) throws Exception { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word,1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(10)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount value1, WordWithCount value2) throws Exception { return new WordWithCount(value1.word,value1.count+value2.count); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } /** * Data type for words with count. */ public static class WordWithCount { public String word; public long count; public WordWithCount() { } public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } }
4.4 编译成jar包上传
先用nc启动侦听并接受连接 nc -lk 9000 启动SocketWindowWordCount [hadoop@bigdata-senior01 bin]$ ./flink run /home/hadoop/SocketWindowWordCount.jar --port 9000 查看输出 [root@bigdata-senior01 log]# tail -f flink-hadoop-taskexecutor-0-bigdata-senior01.home.com.out
在nc端输入字符串,在日志监控端10秒为一个周期就可以看到输出合计。
相关文章
- Mac下django简单安装配置步骤
- 烂泥:php5.6源码安装及php-fpm配置
- 大数据基础之Flink(1)简介、安装、使用
- KAFKA安装+配置详解+常用操作+监控
- ansible:安装nginx1.18.0(使用role功能)
- Flink(6):Flink安装部署之Flink On Yarn模式
- Py之portalocker:portalocker的简介、安装、使用方法之详细攻略
- CentOS下yum安装PHP,配置php-fpm服务
- flink部署操作-flink standalone集群安装部署
- CentOS安装Redis详细教程
- jetson nano 安装 onnx
- VCS和Verdi安装以及安装问题
- 安装Struts2 类库
- 虚拟机下安装CentOS6.5系统教程
- 【PyTorch】安装支持cuda的pytorch-1.10.2
- pip install安装模块配置阿里云源-VScode