Spark学习之RDD简单算子
collect
返回RDD的所有元素
scala var input=sc.parallelize(Array(-1,0,1,2,2)) input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at console :27 scala var result=input.collect result: Array[Int] = Array(-1, 0, 1, 2, 2)
count,coutByValue
count返回RDD的元素数量,countByValue返回每个值的出现次数
scala var input=sc.parallelize(Array(-1,0,1,2,2)) scala var result=input.count result: Long = 5 scala var result=input.countByValue result: scala.collection.Map[Int,Long] = Map(0 - 1, 1 - 1, 2 - 2, -1 - 1)
take,top,takeOrdered
take返回RDD的前N个元素 takeOrdered默认返回升序排序的前N个元素,可以指定排序算法 Top返回降序排序的前N个元素
var input=sc.parallelize(Array(1,2,3,4,9,8,7,5,6)) scala var result=input.take(6) result: Array[Int] = Array(1, 2, 3, 4, 9, 8) scala var result=input.take(20) result: Array[Int] = Array(1, 2, 3, 4, 9, 8, 7, 5, 6) scala var result=input.takeOrdered(6) result: Array[Int] = Array(1, 2, 3, 4, 5, 6) scala var result=input.takeOrdered(6)(Ordering[Int].reverse) result: Array[Int] = Array(9, 8, 7, 6, 5, 4) scala var result=input.top(6) result: Array[Int] = Array(9, 8, 7, 6, 5, 4
Filter
传入返回值为boolean的函数,返回改函数结果为true的RDD
scala var input=sc.parallelize(Array(-1,0,1,2)) scala var result=input.filter(_ 0).collect() result: Array[Int] = Array(1, 2)
map,flatmap
map对每个元素执行函数,转换为新的RDD,flatMap和map类似,但会把map的返回结果做flat处理,就是把多个Seq的结果拼接成一个Seq输出
scala var input=sc.parallelize(Array(-1,0,1,2)) scala var result=input.map(_+1).collect result: Array[Int] = Array(0, 1, 2, 3) scala var result=input.map(x= x.to(3)).collect result: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(-1, 0, 1, 2, 3), Range(0, 1, 2, 3), Range(1, 2, 3), Range(2, 3)) scala var result=input.flatMap(x= x.to(3)).collect result: Array[Int] = Array(-1, 0, 1, 2, 3, 0, 1, 2, 3, 1, 2, 3, 2, 3)
distinct
RDD去重
scala var input=sc.parallelize(Array(-1,0,1,2,2)) scala var result=input.distinct.collect result: Array[Int] = Array(0, 1, 2, -1)
Reduce
通过函数聚集RDD中的所有元素
scala var input=sc.parallelize(Array(-1,0,1,2)) scala var result=input.reduce((x,y)= {println(x,y);x+y}) (-1,1) //处理-1,1,结果为0,RDD剩余元素为{0,2} (0,2) //上面的结果为0,在处理0,2,结果为2,RDD剩余元素为{0} (2,0) //上面结果为2,再处理(2,0),结果为2,RDD剩余元素为{} result: Int = 2
sample,takeSample
sample就是从RDD中抽样,第一个参数withReplacement是指是否有放回的抽样,true为放回,为false为不放回,放回就是抽样结果可能重复,第二个参数是fraction,0到1之间的小数,表明抽样的百分比 takeSample类似,但返回类型是Array,第一个参数是withReplacement,第二个参数是样本个数
var rdd=sc.parallelize(1 to 20) scala rdd.sample(true,0.5).collect res33: Array[Int] = Array(6, 8, 13, 15, 17, 17, 17, 18, 20) scala rdd.sample(false,0.5).collect res35: Array[Int] = Array(1, 3, 10, 11, 12, 13, 14, 17, 18) scala rdd.sample(true,1).collect res44: Array[Int] = Array(2, 2, 3, 5, 6, 6, 8, 9, 9, 10, 10, 10, 14, 15, 16, 17, 17, 18, 19, 19, 20, 20) scala rdd.sample(false,1).collect res46: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) scala rdd.takeSample(true,3) res1: Array[Int] = Array(1, 15, 19) scala rdd.takeSample(false,3) res2: Array[Int] = Array(7, 16, 6)
collectAsMap,countByKey,lookup
collectAsMap把PairRDD转为Map,如果存在相同的key,后面的会覆盖前面的。 countByKey统计每个key出现的次数 Lookup返回给定key的所有value
scala var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) scala var result=input.collectAsMap result: scala.collection.Map[Int,String] = Map(2 - two, 4 - four, 1 - one, 3 - three) scala var result=input.countByKey result: scala.collection.Map[Int,Long] = Map(1 - 2, 2 - 1, 3 - 1, 4 - 1) scala var result=input.lookup(1) result: Seq[String] = WrappedArray(1, one) scala var result=input.lookup(2) result: Seq[String] = WrappedArray(two)
groupBy,keyBy
groupBy根据传入的函数产生的key,形成元素为K-V形式的RDD,然后对key相同的元素分组 keyBy对每个value,为它加上key
scala var rdd=sc.parallelize(List("A1","A2","B1","B2","C")) scala var result=rdd.groupBy(_.substring(0,1)).collect result: Array[(String, Iterable[String])] = Array((A,CompactBuffer(A1, A2)), (B,CompactBuffer(B1, B2)), (C,CompactBuffer(C))) scala var rdd=sc.parallelize(List("hello","world","spark","is","fun")) scala var result=rdd.keyBy(_.length).collect result: Array[(Int, String)] = Array((5,hello), (5,world), (5,spark), (2,is), (3,fun))
keys,values
scala var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) scala var result=input.keys.collect result: Array[Int] = Array(1, 1, 2, 3, 4) scala var result=input.values.collect result: Array[String] = Array(1, one, two, three, four) mapvalues mapvalues对K-V形式的RDD的每个Value进行操作 scala var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) scala var result=input.mapValues(_*2).collect result: Array[(Int, String)] = Array((1,11), (1,oneone), (2,twotwo), (3,threethree), (4,fourfour))
union,intersection,subtract,cartesian
union合并2个集合,不去重 subtract将第一个集合中的同时存在于第二个集合的元素去掉 intersection返回2个集合的交集 cartesian返回2个集合的笛卡儿积
scala var rdd1=sc.parallelize(Array(-1,1,1,2,3)) scala var rdd2=sc.parallelize(Array(0,1,2,3,4)) scala var result=rdd1.union(rdd2).collect result: Array[Int] = Array(-1, 1, 1, 2, 3, 0, 1, 2, 3, 4) scala var result=rdd1.intersection(rdd2).collect result: Array[Int] = Array(1, 2, 3) scala var result=rdd1.subtract(rdd2).collect result: Array[Int] = Array(-1) scala var result=rdd1.cartesian(rdd2).collect result: Array[(Int, Int)] = Array((-1,0), (-1,1), (-1,2), (-1,3), (-1,4), (1,0), (1,1), (1,2), (1,3), (1,4), (1,0), (1,1), (1,2), (1,3), (1,4), (2,0), (2,1), (2,2), (2,3), (2,4), (3,0), (3,1), (3,2), (3,3), (3,4)) 本文作者:Endless2010 来源:51CTO
Spark快速入门-3-Spark的算子总结 Transformation 变换/转换算子:这类算子操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。这种变换并不触发提交作业,完成作业中间过程处理。 Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业,并将数据输出 Spark 系统。
相关文章
- spark ml pipeline构建机器学习任务
- Spark2.x学习笔记:Spark SQL的SQL
- Spark2.x学习笔记:Spark SQL快速入门
- Spark机器学习
- 基于Spark的机器学习实践 (九) - 聚类算法
- 【第一章】JAVA之牛客网刷题笔记【点进来花两把游戏的时间学习晚上睡觉都踏实了】
- Spark如何与深度学习框架协作,处理非结构化数据
- 英特尔开源BigDL,可直接在Spark框架下运行深度学习
- Enlitic创始人Jeremy Howard专访:我眼中的深度学习与数据科学
- 用Spark学习矩阵分解推荐算法
- 阿里工作7年,肝到P8就靠这份学习笔记了,已助14个朋友拿到offer
- Intel开源基于Spark的深度学习库BigDL
- Ruby学习笔记之升级ruby的版本
- 2023.1.18,周三【图神经网络 学习记录4】通过新建Anaconda环境,尝试解决昨天的问题(未)(conda基础命令回顾) || GCN核心公式讲解,两层GCN || GCN的通俗理解
- Spark-SparkSQL深入学习系列四(转自OopsOutOfMemory)
- Spark-SparkSQL深入学习系列二(转自OopsOutOfMemory)
- Spark-SparkSQL深入学习系列一(转自OopsOutOfMemory)
- 第67课:Spark SQL下采用Java和Scala实现Join的案例综合实战(巩固前面学习的Spark SQL知识)
- 免费新课程发布!Spark 3.0.0的灵魂:RDD和DataSet,欢迎读者学习!
- Spark机器学习第4课及第5课:深入理解RDD、DataFrame、DataSet、Structured Streaming
- 使用spark ml pipeline进行机器学习
- 《采访中收集程序猿》学习记录8
- Oracle GoldenGate学习之Goldengate介绍
- 初学者该如何快速入门Python?这可能是最详细的学习攻略了!
- 【Spark深入学习 -13】Spark计算引擎剖析
- 【Spark深入学习 -10】基于spark构建企业级流处理系统
- 【spark 深入学习 03】Spark RDD的蛮荒世界
- 【Spark 深入学习 01】 Spark是什么鬼?
- Android C2DM学习——云端推送
- 【PMP】Head First PMP 学习笔记 第十二章 采购管理