zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Spark之RDD的transformation&action(Java&Scala实现)

JAVAampscalaSpark 实现 Action RDD Transformation
2023-09-14 09:00:26 时间

1.3 通过parallelize或makeRDD将单机数据创建为分布式RDD
(区别: A)makeRDD函数比parallelize函数多提供了数据的位置信息。

 B)两者的返回值都是ParallelCollectionRDD,但parallelize函数可以自己指定分区的数量,而 

 makeRDD函数固定为seq参数的size大小)
1.4 基于DB(Mysql)、NoSQL(HBase)、S3(SC3)、数据流创建。 2,action是得到一个值,或者一个结果(直接将RDDcache到内存中)

所有的transformation都是采用的懒策略,就是如果只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。

3,图示:01020304 Java代码的实现之transformation操作实战
import java.util.Arrays;

import java.util.Iterator;

import java.util.List;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

 * transformation操作实战

 * @author Administrator

@SuppressWarnings(value = {"unused", "unchecked"})

public class TransformationOperation {

 public static void main(String[] args) {

 // map();

 // filter();

 // flatMap();

 // groupByKey();

 // reduceByKey();

 // sortByKey();

 join();

 //cogroup();

 * map算子案例:将集合中每一个元素都乘以2

 private static void map() {

 // 创建SparkConf

 SparkConf conf = new SparkConf()

 .setAppName("map")

 .setMaster("local");

 // 创建JavaSparkContext

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 构造集合

 List Integer numbers = Arrays.asList(1, 2, 3, 4, 5);

 // 并行化集合,创建初始RDD

 JavaRDD Integer numberRDD = sc.parallelize(numbers);

 // 使用map算子,将集合中的每个元素都乘以2

 // map算子,是对任何类型的RDD,都可以调用的

 // 在java中,map算子接收的参数是Function对象

 // 创建的Function对象,一定会让你设置第二个泛型参数,这个泛型类型,就是返回的新元素的类型

 // 同时call()方法的返回类型,也必须与第二个泛型类型同步

 // 在call()方法内部,就可以对原始RDD中的每一个元素进行各种处理和计算,并返回一个新的元素

 // 所有新的元素就会组成一个新的RDD

 JavaRDD Integer multipleNumberRDD = numberRDD.map(

 new Function Integer, Integer () {

 private static final long serialVersionUID = 1L;

 // 传入call()方法的,就是1,2,3,4,5

 // 返回的就是2,4,6,8,10

 @Override

 public Integer call(Integer v1) throws Exception {

 return v1 * 2;

 // 打印新的RDD

 multipleNumberRDD.foreach(new VoidFunction Integer () {

 private static final long serialVersionUID = 1L;

 @Override

 public void call(Integer t) throws Exception {

 System.out.println(t); 

 // 关闭JavaSparkContext

 sc.close();

 * filter算子案例:过滤集合中的偶数

 private static void filter() {

 // 创建SparkConf

 SparkConf conf = new SparkConf()

 .setAppName("filter")

 .setMaster("local");

 // 创建JavaSparkContext

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 模拟集合

 List Integer numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

 // 并行化集合,创建初始RDD

 JavaRDD Integer numberRDD = sc.parallelize(numbers);

 // 对初始RDD执行filter算子,过滤出其中的偶数

 // filter算子,传入的也是Function,其他的使用注意点,实际上和map是一样的

 // 但是,唯一的不同,就是call()方法的返回类型是Boolean

 // 每一个初始RDD中的元素,都会传入call()方法,此时你可以执行各种自定义的计算逻辑

 // 来判断这个元素是否是你想要的

 // 如果你想在新的RDD中保留这个元素,那么就返回true;否则,不想保留这个元素,返回false

 JavaRDD Integer evenNumberRDD = numberRDD.filter(

 new Function Integer, Boolean () {

 private static final long serialVersionUID = 1L;

 // 在这里,1到10,都会传入进来

 // 但是根据我们的逻辑,只有2,4,6,8,10这几个偶数,会返回true

 // 所以,只有偶数会保留下来,放在新的RDD中

 @Override

 public Boolean call(Integer v1) throws Exception {

 return v1 % 2 == 0;

 // 打印新的RDD

 evenNumberRDD.foreach(new VoidFunction Integer () {

 private static final long serialVersionUID = 1L;

 @Override

 public void call(Integer t) throws Exception {

 System.out.println(t);

 // 关闭JavaSparkContext

 sc.close();

 * flatMap案例:将文本行拆分为多个单词

 private static void flatMap() {

 // 创建SparkConf

 SparkConf conf = new SparkConf()

 .setAppName("flatMap") 

 .setMaster("local"); 

 // 创建JavaSparkContext

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 构造集合

 List String lineList = Arrays.asList("hello you", "hello me", "hello world"); 

 // 并行化集合,创建RDD

 JavaRDD String lines = sc.parallelize(lineList);

 // 对RDD执行flatMap算子,将每一行文本,拆分为多个单词

 // flatMap算子,在java中,接收的参数是FlatMapFunction

 // 我们需要自己定义FlatMapFunction的第二个泛型类型,即,代表了返回的新元素的类型

 // call()方法,返回的类型,不是U,而是Iterable U ,这里的U也与第二个泛型类型相同

 // flatMap其实就是,接收原始RDD中的每个元素,并进行各种逻辑的计算和处理,返回可以返回多个元素

 // 多个元素,即封装在Iterable集合中,可以使用ArrayList等集合

 // 新的RDD中,即封装了所有的新元素;也就是说,新的RDD的大小一定是 = 原始RDD的大小

 JavaRDD String words = lines.flatMap(new FlatMapFunction String, String () {

 private static final long serialVersionUID = 1L;

 // 在这里会,比如,传入第一行,hello you

 // 返回的是一个Iterable String (hello, you)

 @Override

 public Iterable String call(String t) throws Exception {

 return Arrays.asList(t.split(" "));

 // 打印新的RDD

 words.foreach(new VoidFunction String () {

 private static final long serialVersionUID = 1L;

 @Override

 public void call(String t) throws Exception {

 System.out.println(t);

 // 关闭JavaSparkContext

 sc.close();

 * groupByKey案例:按照班级对成绩进行分组

 private static void groupByKey() {

 // 创建SparkConf

 SparkConf conf = new SparkConf()

 .setAppName("groupByKey") 

 .setMaster("local");

 // 创建JavaSparkContext

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 模拟集合

 List Tuple2 String, Integer scoreList = Arrays.asList(

 new Tuple2 String, Integer ("class1", 80),

 new Tuple2 String, Integer ("class2", 75),

 new Tuple2 String, Integer ("class1", 90),

 new Tuple2 String, Integer ("class2", 65));

 // 并行化集合,创建JavaPairRDD

 JavaPairRDD String, Integer scores = sc.parallelizePairs(scoreList);

 // 针对scores RDD,执行groupByKey算子,对每个班级的成绩进行分组

 // groupByKey算子,返回的还是JavaPairRDD

 // 但是,JavaPairRDD的第一个泛型类型不变,第二个泛型类型变成Iterable这种集合类型

 // 也就是说,按照了key进行分组,那么每个key可能都会有多个value,此时多个value聚合成了Iterable

 // 那么接下来,我们是不是就可以通过groupedScores这种JavaPairRDD,很方便地处理某个分组内的数据

 JavaPairRDD String, Iterable Integer groupedScores = scores.groupByKey();

 // 打印groupedScores RDD

 groupedScores.foreach(new VoidFunction Tuple2 String,Iterable Integer () {

 private static final long serialVersionUID = 1L;

 @Override

 public void call(Tuple2 String, Iterable Integer t)

 throws Exception {

 System.out.println("class: " + t._1); 

 Iterator Integer ite = t._2.iterator();

 while(ite.hasNext()) {

 System.out.println(ite.next()); 

 System.out.println("=============================="); 

 // 关闭JavaSparkContext

 sc.close();

 * reduceByKey案例:统计每个班级的总分

 private static void reduceByKey() {

 // 创建SparkConf

 SparkConf conf = new SparkConf()

 .setAppName("reduceByKey") 

 .setMaster("local");

 // 创建JavaSparkContext

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 模拟集合

 List Tuple2 String, Integer scoreList = Arrays.asList(

 new Tuple2 String, Integer ("class1", 80),

 new Tuple2 String, Integer ("class2", 75),

 new Tuple2 String, Integer ("class1", 90),

 new Tuple2 String, Integer ("class2", 65));

 // 并行化集合,创建JavaPairRDD

 JavaPairRDD String, Integer scores = sc.parallelizePairs(scoreList);

 // 针对scores RDD,执行reduceByKey算子

 // reduceByKey,接收的参数是Function2类型,它有三个泛型参数,实际上代表了三个值

 // 第一个泛型类型和第二个泛型类型,代表了原始RDD中的元素的value的类型

 // 因此对每个key进行reduce,都会依次将第一个、第二个value传入,将值再与第三个value传入

 // 因此此处,会自动定义两个泛型类型,代表call()方法的两个传入参数的类型

 // 第三个泛型类型,代表了每次reduce操作返回的值的类型,默认也是与原始RDD的value类型相同的

 // reduceByKey算法返回的RDD,还是JavaPairRDD key, value 

 JavaPairRDD String, Integer totalScores = scores.reduceByKey(

 new Function2 Integer, Integer, Integer () {

 private static final long serialVersionUID = 1L;

 // 对每个key,都会将其value,依次传入call方法

 // 从而聚合出每个key对应的一个value

 // 然后,将每个key对应的一个value,组合成一个Tuple2,作为新RDD的元素

 @Override

 public Integer call(Integer v1, Integer v2) throws Exception {

 return v1 + v2;

 // 打印totalScores RDD

 totalScores.foreach(new VoidFunction Tuple2 String,Integer () {

 private static final long serialVersionUID = 1L;

 @Override

 public void call(Tuple2 String, Integer t) throws Exception {

 System.out.println(t._1 + ": " + t._2); 

 // 关闭JavaSparkContext

 sc.close();

 * sortByKey案例:按照学生分数进行排序

 private static void sortByKey() {

 // 创建SparkConf

 SparkConf conf = new SparkConf()

 .setAppName("sortByKey") 

 .setMaster("local");

 // 创建JavaSparkContext

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 模拟集合

 List Tuple2 Integer, String scoreList = Arrays.asList(

 new Tuple2 Integer, String (65, "leo"),

 new Tuple2 Integer, String (50, "tom"),

 new Tuple2 Integer, String (100, "marry"),

 new Tuple2 Integer, String (80, "jack"));

 // 并行化集合,创建RDD

 JavaPairRDD Integer, String scores = sc.parallelizePairs(scoreList);

 // 对scores RDD执行sortByKey算子

 // sortByKey其实就是根据key进行排序,可以手动指定升序,或者降序

 // 返回的,还是JavaPairRDD,其中的元素内容,都是和原始的RDD一模一样的

 // 但是就是RDD中的元素的顺序,不同了

 JavaPairRDD Integer, String sortedScores = scores.sortByKey(false); 

 // 打印sortedScored RDD

 sortedScores.foreach(new VoidFunction Tuple2 Integer,String () {

 private static final long serialVersionUID = 1L;

 @Override

 public void call(Tuple2 Integer, String t) throws Exception {

 System.out.println(t._1 + ": " + t._2); 

 // 关闭JavaSparkContext

 sc.close();

 * join案例:打印学生成绩

 private static void join() {

 // 创建SparkConf

 SparkConf conf = new SparkConf()

 .setAppName("join") 

 .setMaster("local");

 // 创建JavaSparkContext

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 模拟集合

 List Tuple2 Integer, String studentList = Arrays.asList(

 new Tuple2 Integer, String (1, "leo"),

 new Tuple2 Integer, String (2, "jack"),

 new Tuple2 Integer, String (3, "tom"));

 List Tuple2 Integer, Integer scoreList = Arrays.asList(

 new Tuple2 Integer, Integer (1, 100),

 new Tuple2 Integer, Integer (2, 90),

 new Tuple2 Integer, Integer (3, 60),

 new Tuple2 Integer, Integer (2, 80),

 new Tuple2 Integer, Integer (2, 70));

 // 并行化两个RDD

 JavaPairRDD Integer, String students = sc.parallelizePairs(studentList);

 JavaPairRDD Integer, Integer scores = sc.parallelizePairs(scoreList);

 // 使用join算子关联两个RDD

 // join以后,还是会根据key进行join,并返回JavaPairRDD

 // 但是JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key的类型,因为是通过key进行join的

 // 第二个泛型类型,是Tuple2 v1, v2 的类型,Tuple2的两个泛型分别为原始RDD的value的类型

 // join,就返回的RDD的每一个元素,就是通过key join上的一个pair

 // 什么意思呢?比如有(1, 1) (1, 2) (1, 3)的一个RDD

 // 还有一个(1, 4) (2, 1) (2, 2)的一个RDD

 // 如果是cogroup的话,会是(1,((1,2,3),(4))) 

 // join以后,实际上会得到(1 (1, 4)) (1, (2, 4)) (1, (3, 4)) 

 JavaPairRDD Integer, Tuple2 String, Integer studentScores = students.join(scores);

 // 打印studnetScores RDD

 studentScores.foreach(

 new VoidFunction Tuple2 Integer,Tuple2 String,Integer () {

 private static final long serialVersionUID = 1L;

 @Override

 public void call(Tuple2 Integer, Tuple2 String, Integer t)

 throws Exception {

 System.out.println("student id: " + t._1); 

 System.out.println("student name: " + t._2._1); 

 System.out.println("student score: " + t._2._2);

 System.out.println("==============================="); 

 // 关闭JavaSparkContext

 sc.close();

 * cogroup案例:打印学生成绩

 private static void cogroup() {

 // 创建SparkConf

 SparkConf conf = new SparkConf()

 .setAppName("cogroup") 

 .setMaster("local");

 // 创建JavaSparkContext

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 模拟集合

 List Tuple2 Integer, String studentList = Arrays.asList(

 new Tuple2 Integer, String (1, "leo"),

 new Tuple2 Integer, String (2, "jack"),

 new Tuple2 Integer, String (3, "tom"));

 List Tuple2 Integer, Integer scoreList = Arrays.asList(

 new Tuple2 Integer, Integer (1, 100),

 new Tuple2 Integer, Integer (2, 90),

 new Tuple2 Integer, Integer (3, 60),

 new Tuple2 Integer, Integer (1, 70),

 new Tuple2 Integer, Integer (2, 80),

 new Tuple2 Integer, Integer (3, 50));

 // 并行化两个RDD

 JavaPairRDD Integer, String students = sc.parallelizePairs(studentList);

 JavaPairRDD Integer, Integer scores = sc.parallelizePairs(scoreList);

 // cogroup与join不同

 // 相当于是,一个key join上的所有value,都给放到一个Iterable里面去了 

 // cogroup,不太好讲解,希望大家通过动手编写我们的案例,仔细体会其中的奥妙

 JavaPairRDD Integer, Tuple2 Iterable String , Iterable Integer studentScores = 

 students.cogroup(scores);

 // 打印studnetScores RDD

 studentScores.foreach(

 new VoidFunction Tuple2 Integer,Tuple2 Iterable String ,Iterable Integer () {

 private static final long serialVersionUID = 1L;

 @Override

 public void call(

 Tuple2 Integer, Tuple2 Iterable String , Iterable Integer t)

 throws Exception {

 System.out.println("student id: " + t._1); 

 System.out.println("student name: " + t._2._1); 

 System.out.println("student score: " + t._2._2);

 System.out.println("==============================="); 

 // 关闭JavaSparkContext

 sc.close();

}
Java代码的实现之action操作实战
import java.util.Arrays;

import java.util.List;

import java.util.Map;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

 * action操作实战

 * @author Administrator

@SuppressWarnings("unused")

public class ActionOperation {

 public static void main(String[] args) {

 // reduce();

 // collect();

 // count();

 // take();

 // saveAsTextFile();

 countByKey();

 private static void reduce() {

 // 创建SparkConf和JavaSparkContext

 SparkConf conf = new SparkConf()

 .setAppName("reduce")

 .setMaster("local"); 

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加

 List Integer numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

 JavaRDD Integer numbers = sc.parallelize(numberList);

 // 使用reduce操作对集合中的数字进行累加

 // reduce操作的原理:

 // 首先将第一个和第二个元素,传入call()方法,进行计算,会获取一个结果,比如1 + 2 = 3

 // 接着将该结果与下一个元素传入call()方法,进行计算,比如3 + 3 = 6

 // 以此类推

 // 所以reduce操作的本质,就是聚合,将多个元素聚合成一个元素

 int sum = numbers.reduce(new Function2 Integer, Integer, Integer () {

 private static final long serialVersionUID = 1L;

 @Override

 public Integer call(Integer v1, Integer v2) throws Exception {

 return v1 + v2;

 System.out.println(sum); 

 // 关闭JavaSparkContext

 sc.close();

 private static void collect() {

 // 创建SparkConf和JavaSparkContext

 SparkConf conf = new SparkConf()

 .setAppName("collect")

 .setMaster("local"); 

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加

 List Integer numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

 JavaRDD Integer numbers = sc.parallelize(numberList);

 // 使用map操作将集合中所有数字乘以2

 JavaRDD Integer doubleNumbers = numbers.map(

 new Function Integer, Integer () {

 private static final long serialVersionUID = 1L;

 @Override

 public Integer call(Integer v1) throws Exception {

 return v1 * 2;

 // 不用foreach action操作,在远程集群上遍历rdd中的元素

 // 而使用collect操作,将分布在远程集群上的doubleNumbers RDD的数据拉取到本地

 // 这种方式,一般不建议使用,因为如果rdd中的数据量比较大的话,比如超过1万条

 // 那么性能会比较差,因为要从远程走大量的网络传输,将数据获取到本地

 // 此外,除了性能差,还可能在rdd中数据量特别大的情况下,发生oom异常,内存溢出

 // 因此,通常,还是推荐使用foreach action操作,来对最终的rdd元素进行处理

 List Integer doubleNumberList = doubleNumbers.collect();

 for(Integer num : doubleNumberList) {

 System.out.println(num); 

 // 关闭JavaSparkContext

 sc.close();

 private static void count() {

 // 创建SparkConf和JavaSparkContext

 SparkConf conf = new SparkConf()

 .setAppName("count")

 .setMaster("local"); 

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加

 List Integer numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

 JavaRDD Integer numbers = sc.parallelize(numberList);

 // 对rdd使用count操作,统计它有多少个元素

 long count = numbers.count();

 System.out.println(count); 

 // 关闭JavaSparkContext

 sc.close();

 private static void take() {

 // 创建SparkConf和JavaSparkContext

 SparkConf conf = new SparkConf()

 .setAppName("take")

 .setMaster("local"); 

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加

 List Integer numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

 JavaRDD Integer numbers = sc.parallelize(numberList);

 // 对rdd使用count操作,统计它有多少个元素

 // take操作,与collect类似,也是从远程集群上,获取rdd的数据

 // 但是collect是获取rdd的所有数据,take只是获取前n个数据

 List Integer top3Numbers = numbers.take(3);

 for(Integer num : top3Numbers) {

 System.out.println(num); 

 // 关闭JavaSparkContext

 sc.close();

 private static void saveAsTextFile() {

 // 创建SparkConf和JavaSparkContext

 SparkConf conf = new SparkConf()

 .setAppName("saveAsTextFile"); 

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 有一个集合,里面有1到10,10个数字,现在要对10个数字进行累加

 List Integer numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

 JavaRDD Integer numbers = sc.parallelize(numberList);

 // 使用map操作将集合中所有数字乘以2

 JavaRDD Integer doubleNumbers = numbers.map(

 new Function Integer, Integer () {

 private static final long serialVersionUID = 1L;

 @Override

 public Integer call(Integer v1) throws Exception {

 return v1 * 2;

 // 直接将rdd中的数据,保存在HFDS文件中

 // 但是要注意,我们这里只能指定文件夹,也就是目录

 // 那么实际上,会保存为目录中的/double_number.txt/part-00000文件

 doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt"); 

 // 关闭JavaSparkContext

 sc.close();

 @SuppressWarnings("unchecked")

 private static void countByKey() {

 // 创建SparkConf

 SparkConf conf = new SparkConf()

 .setAppName("countByKey") 

 .setMaster("local");

 // 创建JavaSparkContext

 JavaSparkContext sc = new JavaSparkContext(conf);

 // 模拟集合

 List Tuple2 String, String scoreList = Arrays.asList(

 new Tuple2 String, String ("class1", "leo"),

 new Tuple2 String, String ("class2", "jack"),

 new Tuple2 String, String ("class1", "marry"),

 new Tuple2 String, String ("class2", "tom"),

 new Tuple2 String, String ("class2", "david")); 

 // 并行化集合,创建JavaPairRDD

 JavaPairRDD String, String students = sc.parallelizePairs(scoreList);

 // 对rdd应用countByKey操作,统计每个班级的学生人数,也就是统计每个key对应的元素个数

 // 这就是countByKey的作用

 // countByKey返回的类型,直接就是Map String, Object 

 Map String, Object studentCounts = students.countByKey();

 for(Map.Entry String, Object studentCount : studentCounts.entrySet()) {

 System.out.println(studentCount.getKey() + ": " + studentCount.getValue()); 

 // 关闭JavaSparkContext

 sc.close();

}
Scala代码的实现之transformation操作实战
import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

 * @author Administrator

object TransformationOperation {

 def main(args: Array[String]) {

 // map() 

 // filter() 

 // flatMap() 

 // groupByKey() 

 // reduceByKey() 

 // sortByKey() 

 join() 

 def map() {

 val conf = new SparkConf()

 .setAppName("map")

 .setMaster("local") 

 val sc = new SparkContext(conf)

 val numbers = Array(1, 2, 3, 4, 5)

 val numberRDD = sc.parallelize(numbers, 1) 

 val multipleNumberRDD = numberRDD.map { num = num * 2 } 

 multipleNumberRDD.foreach { num = println(num) } 

 def filter() {

 val conf = new SparkConf()

 .setAppName("filter")

 .setMaster("local")

 val sc = new SparkContext(conf)

 val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

 val numberRDD = sc.parallelize(numbers, 1)

 val evenNumberRDD = numberRDD.filter { num = num % 2 == 0 }

 evenNumberRDD.foreach { num = println(num) } 

 def flatMap() {

 val conf = new SparkConf()

 .setAppName("flatMap") 

 .setMaster("local") 

 val sc = new SparkContext(conf) 

 val lineArray = Array("hello you", "hello me", "hello world") 

 val lines = sc.parallelize(lineArray, 1)

 val words = lines.flatMap { line = line.split(" ") } 

 words.foreach { word = println(word) }

 def groupByKey() {

 val conf = new SparkConf()

 .setAppName("groupByKey") 

 .setMaster("local") 

 val sc = new SparkContext(conf)

 val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),

 Tuple2("class1", 90), Tuple2("class2", 60))

 val scores = sc.parallelize(scoreList, 1) 

 val groupedScores = scores.groupByKey() 

 groupedScores.foreach(score = { 

 println(score._1); 

 score._2.foreach { singleScore = println(singleScore) };

 println("=============================") 

 def reduceByKey() {

 val conf = new SparkConf()

 .setAppName("groupByKey") 

 .setMaster("local") 

 val sc = new SparkContext(conf)

 val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),

 Tuple2("class1", 90), Tuple2("class2", 60))

 val scores = sc.parallelize(scoreList, 1) 

 val totalScores = scores.reduceByKey(_ + _) 

 totalScores.foreach(classScore = println(classScore._1 + ": " + classScore._2)) 

 def sortByKey() {

 val conf = new SparkConf()

 .setAppName("sortByKey") 

 .setMaster("local") 

 val sc = new SparkContext(conf)

 val scoreList = Array(Tuple2(65, "leo"), Tuple2(50, "tom"), 

 Tuple2(100, "marry"), Tuple2(85, "jack")) 

 val scores = sc.parallelize(scoreList, 1) 

 val sortedScores = scores.sortByKey(false)

 sortedScores.foreach(studentScore = println(studentScore._1 + ": " + studentScore._2)) 

 def join() {

 val conf = new SparkConf()

 .setAppName("join") 

 .setMaster("local") 

 val sc = new SparkContext(conf)

 val studentList = Array(

 Tuple2(1, "leo"),

 Tuple2(2, "jack"),

 Tuple2(3, "tom"));

 val scoreList = Array(

 Tuple2(1, 100),

 Tuple2(2, 90),

 Tuple2(3, 60));

 val students = sc.parallelize(studentList);

 val scores = sc.parallelize(scoreList);

 val studentScores = students.join(scores) 

 studentScores.foreach(studentScore = { 

 println("student id: " + studentScore._1);

 println("student name: " + studentScore._2._1)

 println("student socre: " + studentScore._2._2) 

 println("=======================================") 

 def cogroup() {

}
Scala代码的实现之action操作实战
import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

 * @author Administrator

object ActionOperation {

 def main(args: Array[String]) {

 // reduce() 

 // collect() 

 // count() 

 // take() 

 countByKey() 

 def reduce() {

 val conf = new SparkConf()

 .setAppName("reduce")

 .setMaster("local") 

 val sc = new SparkContext(conf)

 val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

 val numbers = sc.parallelize(numberArray, 1) 

 val sum = numbers.reduce(_ + _) 

 println(sum) 

 def collect() {

 val conf = new SparkConf()

 .setAppName("collect")

 .setMaster("local") 

 val sc = new SparkContext(conf)

 val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

 val numbers = sc.parallelize(numberArray, 1) 

 val doubleNumbers = numbers.map { num = num * 2 } 

 val doubleNumberArray = doubleNumbers.collect()

 for(num - doubleNumberArray) {

 println(num) 

 def count() {

 val conf = new SparkConf()

 .setAppName("count")

 .setMaster("local") 

 val sc = new SparkContext(conf)

 val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

 val numbers = sc.parallelize(numberArray, 1) 

 val count = numbers.count()

 println(count) 

 def take() {

 val conf = new SparkConf()

 .setAppName("take")

 .setMaster("local") 

 val sc = new SparkContext(conf)

 val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

 val numbers = sc.parallelize(numberArray, 1) 

 val top3Numbers = numbers.take(3)

 for(num - top3Numbers) {

 println(num) 

 def saveAsTextFile() {

 def countByKey() {

 val conf = new SparkConf()

 .setAppName("countByKey") 

 .setMaster("local") 

 val sc = new SparkContext(conf)

 val studentList = Array(Tuple2("class1", "leo"), Tuple2("class2", "jack"),

 Tuple2("class1", "tom"), Tuple2("class2", "jen"), Tuple2("class2", "marry")) 

 val students = sc.parallelize(studentList, 1) 

 val studentCounts = students.countByKey() 

 println(studentCounts) 

}

Scala/Java - Redis 连接检测与重试 项目实现中需要连接 redis,为了防止因网络抖动或其他原因造成的客户端连接失败,一般需要增加重试机制判断 client 是否连接成功,之前写了一版重连代码发现有 bug,借此机会看下代码 bug 以及如何更好的重连 redis。...
第三方自定义 jar 包可以添加到本地 maven 库中,随后即可 mvn package 打入到最终的项目 jar 包中,该方法最方便。创建 install.sh 文件,jar_path 为第三方自定义 jar 包在设备的位置,groupId、artifactId 和 版本号 version 自己定义,执行脚本后
Scala / Java - 采用 MD5 加盐 实现 id 均匀分组 大量 id 场景下经常需要通过 id 进行 AB Test,最常见的就是使用尾号 hash 进行分组,但是由于 id 生成规则以及其他因素,按照尾号分组往往会造成 id 不匀,从而导致 AB Test 效果受影响,所以下文采用 md5 加盐 Hash 的方式,得到更均匀的分组与 AB Test 效果。......
Flink - 读取 Parquet 文件 By Scala / Java parquet 文件常见与 Flink、Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面介绍 Flink 场景下如何读取 Parquet。
Scala/Java - shuffle 数组详解 本地使用 spark paralize 数组 rdd 时需要构造一个随机数组,分别使用 java.util 和 scala.util 实现,下面记录下不同的 shuffle 方法以及踩到的坑。