zl程序教程

您现在的位置是:首页 >  其他

当前栏目

5分钟实现第一个Flink程序

2023-03-15 23:26:36 时间

因为网络上很多资料都过时了,有的是版本太老了,本文针对最新版本的1.13.2快速构建一个WordCount程序

项目介绍

本文创建一个可以从网络上读取输入,然后每5秒钟输出每个单词个数的项目

创建maven项目

mvn archetype:generate 
    -DarchetypeGroupId=org.apache.flink 
    -DarchetypeArtifactId=flink-quickstart-java 
    -DarchetypeVersion=1.13.2 
    -DgroupId=mflink 
    -DartifactId=mflink 
    -Dversion=0.1 
    -Dpackage=myflink 
    -DinteractiveMode=false

用IDE打开这个项目,里面已经创建了两个类StreamingJob和BatchJob,本文使用StreamingJob来完成一个实时统计单词的任务

可以修改后面一些自定义的参数

编写逻辑

  • 创建StreamExecutionEnvironment: 这是一个入口类,可以用来设置参数和创建数据源以及提交任务
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 从socket读取数据: 从本地端口号 9000 的 socket 中读取数据的数据源
DataStream<String> text = env.socketTextStream("localhost", 9000, "
");

这创建了一个字符串类型的 DataStream。DataStream 是 Flink 中做流处理的核心 API,上面定义了非常多常见的操作(如,过滤、转换、聚合、窗口、关联等)。

  • 拆分单词: 将字符串数据解析成单词和次数(使用Tuple2<String, Integer>表示)(类似于MapReduce中的Map)
DataStream<Tuple2<String, Integer>> wordCounts = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split("\s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        });
  • 统计单词个数(类似于MapReduct中的Reduce)
DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
        .keyBy((KeySelector<Tuple2<String, Integer>, Object>) tuple -> tuple.f0)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .sum(1);
  • 输出到标准输出
windowCounts.print().setParallelism(1);
  • 开始执行
// execute program
env.execute("Socket Window WordCount");

最后的 env.execute 调用是启动实际Flink作业所必需的。所有算子操作(例如创建源、聚合、打印)只是构建了内部算子操作的图形。只有在execute()被调用时才会在提交到集群上或本地计算机上执行。

完整代码

public class StreamingJob {
    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 通过连接 socket 获取输入数据,这里连接到本地9000端口,如果9000端口已被占用,请换一个端口
        DataStream<String> text = env.socketTextStream("localhost", 9000, "
");

        // 解析数据,按 word 分组,开窗,聚合
        DataStream<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                        for (String word : value.split("\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                });


        DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
                .keyBy((KeySelector<Tuple2<String, Integer>, Object>) tuple -> tuple.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);

        // 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程
        windowCounts.print().setParallelism(1);

        // execute program
        env.execute("Socket Window WordCount");
    }
}

运行程序

使用netcat往端口输入

nc -lk 9000

启动StreamingJob统计

直接在IDE中启动就可以了

常见错误

java.lang.ClassNotFoundException: org.apache.flink.api.common.functions.FlatMapFunction

解决方法: 把pom.xml文件中的<scope>provided</scope>注释掉

Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

解决方案: .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

参考