Spark修炼之道(进阶篇)——Spark入门到精通:第十一节 Spark Streaming—— DStream Transformation操作
2023-09-14 09:00:24 时间
union(otherStream)
将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
count() 通过对DStreaim中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream
reduce(func) 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.
countByValue() 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数
reduceByKey(func, [numTasks]) 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream
join(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream
cogroup(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream
transform(func) 通过RDD-to-RDD函数作用于源码DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
updateStateByKey(func) 根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream
val wordCounts=wordMap.reduceByKey(_ + _) val filteredWordCounts=wordCounts.filter(_._2 1) val numOfCount=filteredWordCounts.count() val countByValue=words.countByValue() val union=words.union(word1) val transform=words.transform(x= x.map(x= (x,1))) //显式原文件 lines.print() //打印flatMap结果 words.print() //打印map结果 wordMap.print() //打印reduceByKey结果 wordCounts.print() //打印filter结果 filteredWordCounts.print() //打印count结果 numOfCount.print() //打印countByValue结果 countByValue.print() //打印union结果 union.print() //打印transform结果 transform.print()
if (args.length 2) { System.err.println("Usage: StatefulNetworkWordCount hostname port ") System.exit(1) //函数字面量,输入的当前值与前一次的状态结果进行累加 val updateFunc = (values: Seq[Int], state: Option[Int]) = { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) //输入类型为K,V,S,返回值类型为K,S //V对应为带求和的值,S为前一次的状态 val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) = { iterator.flatMap(t = updateFunc(t._2, t._3).map(s = (t._1, s))) val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[4]") //每一秒处理一次 val ssc = new StreamingContext(sparkConf, Seconds(1)) //当前目录为checkpoint结果目录,后面会讲checkpoint在Spark Streaming中的应用 ssc.checkpoint(".") //RDD的初始化结果 val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
//使用Socket作为输入源,本例ip为localhost,端口为9999 val lines = ssc.socketTextStream(args(0), args(1).toInt) //flatMap操作 val words = lines.flatMap(_.split(" ")) //map操作 val wordDstream = words.map(x = (x, 1)) //updateStateByKey函数使用 val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD) stateDstream.print() ssc.start() ssc.awaitTermination() }
count() 通过对DStreaim中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream
reduce(func) 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.
countByValue() 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数
reduceByKey(func, [numTasks]) 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream
join(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream
cogroup(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream
transform(func) 通过RDD-to-RDD函数作用于源码DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
updateStateByKey(func) 根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream
val wordCounts=wordMap.reduceByKey(_ + _) val filteredWordCounts=wordCounts.filter(_._2 1) val numOfCount=filteredWordCounts.count() val countByValue=words.countByValue() val union=words.union(word1) val transform=words.transform(x= x.map(x= (x,1))) //显式原文件 lines.print() //打印flatMap结果 words.print() //打印map结果 wordMap.print() //打印reduceByKey结果 wordCounts.print() //打印filter结果 filteredWordCounts.print() //打印count结果 numOfCount.print() //打印countByValue结果 countByValue.print() //打印union结果 union.print() //打印transform结果 transform.print()
下面的代码是运行时添加的文件内容
root@sparkmaster:~/streaming# echo "A B C D" test12.txt; echo "A B" test12.txt
下面是前面各个函数的结果
示例2:
上节课中演示的WordCount代码并没有只是对输入的单词进行分开计数,没有记录前一次计数的状态,如果想要连续地进行计数,则可以使用updateStateByKey方法来进行。下面的代码主要给大家演示如何updateStateByKey的方法,
if (args.length 2) { System.err.println("Usage: StatefulNetworkWordCount hostname port ") System.exit(1) //函数字面量,输入的当前值与前一次的状态结果进行累加 val updateFunc = (values: Seq[Int], state: Option[Int]) = { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) //输入类型为K,V,S,返回值类型为K,S //V对应为带求和的值,S为前一次的状态 val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) = { iterator.flatMap(t = updateFunc(t._2, t._3).map(s = (t._1, s))) val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[4]") //每一秒处理一次 val ssc = new StreamingContext(sparkConf, Seconds(1)) //当前目录为checkpoint结果目录,后面会讲checkpoint在Spark Streaming中的应用 ssc.checkpoint(".") //RDD的初始化结果 val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
//使用Socket作为输入源,本例ip为localhost,端口为9999 val lines = ssc.socketTextStream(args(0), args(1).toInt) //flatMap操作 val words = lines.flatMap(_.split(" ")) //map操作 val wordDstream = words.map(x = (x, 1)) //updateStateByKey函数使用 val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD) stateDstream.print() ssc.start() ssc.awaitTermination() }
下图是初始时的值:
使用下列命令启动netcat server
相关文章
- 大数据基础之Kudu(4)spark读写kudu
- Spark修炼之道(进阶篇)——Spark入门到精通:第十节 Spark SQL案例实战(一)
- Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL、DataFrame与Spark Streaming
- Spark修炼之道(高级篇)——Spark源码阅读:第二节 SparkContext的创建
- Spark修炼之道(基础篇)——Linux大数据开发基础:第十二节:Shell编程入门(四)
- Spark修炼之道(进阶篇)——Spark入门到精通:第四节 Spark编程模型(一)
- Spark修炼之道(进阶篇)——Spark入门到精通:第三节 Spark Intellij IDEA开发环境搭建
- Spark MaxAbsScaler 绝对值最大标准化
- Spark PCA
- 一天学完spark的Scala基础语法教程六、字符串(idea版本)
- Apache CarbonData 2.0 开发实用系列之一:与Spark SQL集成使用
- Spark的Streaming和Spark的SQL简单入门学习
- 执行Spark运行在yarn上的命令报错 spark-shell --master yarn-client
- GIS+=地理信息+云计算技术——Spark集群部署
- Spark任务提交底层原理
- Spark性能调优之广播变量
- 一天学完spark的Scala基础语法教程十、类和对象(idea版本)
- Spark(1):Spark概述