《Spark 官方文档》Spark快速入门
本教程是对Spark的一个快速简介。首先,我们通过Spark的交互式shell介绍一下API(主要是Python或Scala),然后展示一下如何用Java、Scala、Python写一个Spark应用。更完整参考看这里:programming guide
首先,请到Spark website下载一个Spark发布版本,以便后续方便学习。我们暂时还不会用到HDFS,所以你可以使用任何版本的Hadoop。
使用Spark shell交互式分析
利用Spark shell 很容易学习Spark API,同时也Spark shell也是强大的交互式数据分析工具。Spark shell既支持Scala(Scala版本的shell在Java虚拟机中运行,所以在这个shell中可以引用现有的Java库),也支持Python。在Spark目录下运行下面的命令可以启动一个Spark shell:
Scala PythonSpark最主要的抽象概念是个分布式集合,也叫作弹性分布式数据集(Resilient Distributed Dataset – RDD)。RDD可以由Hadoop InputFormats读取HDFS文件创建得来,或者从其他RDD转换得到。下面我们就先利用Spark源代码目录下的README文件来新建一个RDD:
scala val textFile = sc.textFile("README.md") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
RDD有两种算子,action算子(actions)返回结果,transformation算子(transformations)返回一个新RDD。我们先来看一下action算子:
scala textFile.count() // Number of items in this RDD res0: Long = 126 scala textFile.first() // First item in this RDD res1: String = # Apache Spark
再来看下如何使用transformation算子。我们利用filter这个transformation算子返回一个只包含原始文件子集的新RDD。
scala val linesWithSpark = textFile.filter(line = line.contains("Spark")) linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
把这两个例子串起来,我们可以这样写:
scala textFile.filter(line = line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15更多RDD算子
RDD action 和 transformation 算子可以做更加复杂的计算。下面的代码中,我们将找出文件中包含单词数最多的行有多少个单词:
Scala Pythonscala textFile.map(line = line.split(" ").size).reduce((a, b) = if (a b) a else b) res4: Long = 15
首先,用一个map算子将每一行映射为一个整数,返回一个新RDD。然后,用reduce算子找出这个RDD中最大的单词数。map和reduce算组的参数都是scala 函数体(闭包),且函数体内可以使用任意的语言特性,或引用scala/java库。例如,我们可以调用其他函数。为了好理解,下面我们用Math.max作为例子:
scala import java.lang.Math import java.lang.Math scala textFile.map(line = line.split(" ").size).reduce((a, b) = Math.max(a, b)) res5: Int = 15
Hadoop上的MapReduce是大家耳熟能详的一种通用数据流模式。而Spark能够轻松地实现MapReduce流程:
scala val wordCounts = textFile.flatMap(line = line.split(" ")).map(word = (word, 1)).reduceByKey((a, b) = a + b) wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
这个例子里,我使用了flatMap, map, and reduceByKey 这几个transformation算子,把每个单词及其在文件中出现的次数转成一个包含(String,int)键值对的RDD,计算出每个单词在文件中出现的次数
scala wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
Spark同样支持把数据集拉到集群范围的内存缓存中。这对于需要重复访问的数据非常有用,比如:查询一些小而”热“(频繁访问)的数据集 或者 运行一些迭代算法(如 PageRank)。作为一个简单的示例,我们把 linesWithSpark 这个数据集缓存一下:
Scala Python首先用Scala新建一个简单的Spark应用 – 简单到连名字都叫SimpleApp.scala
/* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains("a")).count() val numBs = logData.filter(line = line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) }
注意,应用程序需要定义一个main方法,而不是继承scala.App。scala.App的子类可能不能正常工作。
这个程序,统计了Spark README文件中分别包含‘a’和’b’的行数。注意,你需要把YOUR_SPARK_HOME换成你的Spark安装目录。与之前用spark-shell不同,这个程序有一个单独的SparkContext对象,我们初始化了这个SparkContext对象并将其作为程序的一部分。
我们把一个 SparkConf 对象传给SparkContext的构造函数,SparkConf对象包含了我们这个应用程序的基本信息和配置。
我们的程序需要依赖Spark API,所以我们需要包含一个sbt配置文件,simple.sbt,在这个文件里,我们可以配置Spark依赖项。这个文件同时也添加了Spark本身的依赖库:
name := "Simple Project" version := "1.0" scalaVersion := "2.10.5" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
为了让sbt能正常工作,我们需要一个典型的目录结构来放SimpleApp.scala和simple.sbt程序。一旦建立好目录,我们就可以创建一个jar包,然后用spark-submit脚本运行我们的代码。
# Your directory layout should look like this $ find . ./simple.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala # Package a jar containing your application $ sbt package [info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.10/simple-project_2.10-1.0.jar Lines with a: 46, Lines with b: 23
恭喜你!你的首个Spark应用已经跑起来了!
进一步的API参考,请看这里:Spark programming guide,或者在其他页面上点击 “Programming Guides”菜单 如果想了解集群上运行应用程序,请前往:deployment overview 最后,Spark examples子目录下包含了多个示例,你可以这样来运行这些例子:# For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py # For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R
转自:http://ifeve.com/%E3%80%8Aspark-%E5%AE%98%E6%96%B9%E6%96%87%E6%A1%A3%E3%80%8Bspark%E7%BC%96%E7%A8%8B%E6%8C%87%E5%8D%97/
相关文章
- Python fire官方文档教学(自动生成命令行,个人觉得意义不大,不如argparse)
- Word控件Spire.Doc 【文本】教程(12) ;新方法在 C# 中获取 Word 文档中内容控件的别名、标签和 ID
- Aspose.Words for .NET使用文档教程(4):如何比较两个Word文档
- Mac Docker下安装与使用ShowDoc在线文档
- Flink官方文档目录索引
- 《Log4j2官方文档》从Log4j 1.x迁移
- 《HttpClient官方文档》2.6 连接维持存活策略
- 《Spark 官方文档》机器学习库(MLlib)指南
- 《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(四)
- 《Spark 官方文档》硬件配置
- 《maven官方文档》5分钟开始Maven
- 《Spark 官方文档》在YARN上运行Spark
- 《Spark官方文档》提交Spark应用
- Apache Storm 官方文档 —— 使用非 JVM 语言开发
- Apache Storm 官方文档 —— 常用模式
- Apache Storm 官方文档 —— 序列化
- [转]Commons IO 官方文档
- Spring Authorization Server 0.3.0 发布,官方文档正式上线
- MKCoordinateSpan 缩放层级 MapKit (SwiftUI 中文手册文档)
- ASP.NET Aries DataGrid 配置表头说明文档
- Gradle Import Wizard--官方文档
- cas 官方文档
- 快速掌握openstack命令结合使用场景及官方文档大整理
- [Work Summary] Python将PDF转换成Word文档