zl程序教程

您现在的位置是:首页 >  其他

当前栏目

Flink 实时流Wordcount案例

案例实时flink WordCount
2023-06-13 09:17:20 时间

Scala版本

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object SocketWindowWordCountScala {
 def main(args: Array[String]) : Unit = {
 // 定义一个数据类型保存单词出现的次数
 case class WordWithCount(word: String, count: Long)
 // port 表示需要连接的端口
 val port: Int = try {
   ParameterTool.fromArgs(args).getInt("port")
 } catch {
   case e: Exception => {
     System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
     return
   }
 }
 // 获取运行环境
 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
 // 连接此socket获取输入数据
 val text = env.socketTextStream("node21", port, '\n')
 //需要加上这一行隐式转换 否则在调用flatmap方法的时候会报错
 import org.apache.flink.api.scala._
 // 解析数据, 分组, 窗口化, 并且聚合求SUM
 val windowCounts = text
 .flatMap { w => w.split("\\s") }
 .map { w => WordWithCount(w, 1) }
 .keyBy("word")
 .timeWindow(Time.seconds(5), Time.seconds(1))
 .sum("count")
 // 打印输出并设置使用一个并行度
 windowCounts.print().setParallelism(1)
 env.execute("Socket Window WordCount")
 }
}

java版本

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.util.Collector;

/**

* Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来

*       先在node21机器上执行nc -l 9000

*/

public class StreamingWindowWordCountJava {

 public static void main(String[] args) throws Exception {

 //定义socket的端口号

 int port;

 try{

     ParameterTool parameterTool = ParameterTool.fromArgs(args);

     port = parameterTool.getInt("port");

 }catch (Exception e){

     System.err.println("没有指定port参数,使用默认值9000");

     port = 9000;

 }

 //获取运行环境

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 //连接socket获取输入的数据

 DataStreamSource<String> text = env.socketTextStream("node21", port, "\n");

 //计算数据

 DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {

   public void flatMap(String value, Collector<WordWithCount> out) throws Exception {

       String[] splits = value.split("\\s");

       for (String word:splits) {

           out.collect(new WordWithCount(word,1L));

       }

   }

 })//打平操作,把每行的单词转为<word,count>类型的数据

       //针对相同的word数据进行分组

       .keyBy("word")

       //指定计算数据的窗口大小和滑动窗口大小

       .timeWindow(Time.seconds(2),Time.seconds(1))

       .sum("count");

 //把数据打印到控制台,使用一个并行度

 windowCount.print().setParallelism(1);

 //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行

 env.execute("streaming word count");

}

/**

* 主要为了存储单词以及单词出现的次数

*/

 static class WordWithCount{

   public String word;

   public long count;

   public WordWithCount(){}

   public WordWithCount(String word, long count) {

       this.word = word;

       this.count = count;

   }

   @Override

   public String toString() {

       return "WordWithCount{" +

               "word='" + word + '\'' +

               ", count=" + count +

               '}';

   }

 }

}