zl程序教程

您现在的位置是:首页 >  其他

当前栏目

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(一)

2023-09-11 14:16:09 时间

Spark SQL所有的功能入口都是SQLContext 类,及其子类。不过要创建一个SQLContext对象,首先需要有一个SparkContext对象。

val sc: SparkContext // 假设已经有一个 SparkContext 对象

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 用于包含RDD到DataFrame隐式转换操作

import sqlContext.implicits._

除了SQLContext之外,你也可以创建HiveContext,HiveContext是SQLContext 的超集。

除了SQLContext的功能之外,HiveContext还提供了完整的HiveQL语法,UDF使用,以及对Hive表中数据的访问。要使用HiveContext,你并不需要安装Hive,而且SQLContext能用的数据源,HiveContext也一样能用。HiveContext是单独打包的,从而避免了在默认的Spark发布版本中包含所有的Hive依赖。如果这些依赖对你来说不是问题(不会造成依赖冲突等),建议你在Spark-1.3之前使用HiveContext。而后续的Spark版本,将会逐渐把SQLContext升级到和HiveContext功能差不多的状态。

spark.sql.dialect选项可以指定不同的SQL变种(或者叫SQL方言)。这个参数可以在SparkContext.setConf里指定,也可以通过 SQL语句的SET key=value命令指定。对于SQLContext,该配置目前唯一的可选值就是”sql”,这个变种使用一个Spark SQL自带的简易SQL解析器。而对于HiveContext,spark.sql.dialect 默认值为”hiveql”,当然你也可以将其值设回”sql”。仅就目前而言,HiveSQL解析器支持更加完整的SQL语法,所以大部分情况下,推荐使用HiveContext。

创建DataFrame

Spark应用可以用SparkContext创建DataFrame,所需的数据来源可以是已有的RDD(existing RDD),或者Hive表,或者其他数据源(data sources.)

以下是一个从JSON文件创建DataFrame的小栗子:

Scala Java Python R
val sc: SparkContext // 已有的 SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 将DataFrame内容打印到stdout

df.show()
DataFrame操作

DataFrame提供了结构化数据的领域专用语言支持,包括ScalaJavaPython and R.

这里我们给出一个结构化数据处理的基本示例:

Scala Java Python R
val sc: SparkContext // 已有的 SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 创建一个 DataFrame

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 展示 DataFrame 的内容

df.show()

// age name

// null Michael

// 30 Andy

// 19 Justin

// 打印数据树形结构

df.printSchema()

// root

// |-- age: long (nullable = true)

// |-- name: string (nullable = true)

// select "name" 字段

df.select("name").show()

// name

// Michael

// Andy

// Justin

// 展示所有人,但所有人的 age 都加1

df.select(df("name"), df("age") + 1).show()

// name (age + 1)

// Michael null

// Andy 31

// Justin 20

// 筛选出年龄大于21的人

df.filter(df("age") 21).show()

// age name

// 30 Andy

// 计算各个年龄的人数

df.groupBy("age").count().show()

// age count

// null 1

// 19 1

// 30 1

DataFrame的完整API列表请参考这里:API Documentation

除了简单的字段引用和表达式支持之外,DataFrame还提供了丰富的工具函数库,包括字符串组装,日期处理,常见的数学函数等。完整列表见这里:DataFrame Function Reference.


Scala Java Python R
val sqlContext = ... // 已有一个 SQLContext 对象

val df = sqlContext.sql("SELECT * FROM table")
创建Dataset

Dataset API和RDD类似,不过Dataset不使用Java序列化或者Kryo,而是使用专用的编码器(Encoder )来序列化对象和跨网络传输通信。如果这个编码器和标准序列化都能把对象转字节,那么编码器就可以根据代码动态生成,并使用一种特殊数据格式,这种格式下的对象不需要反序列化回来,就能允许Spark进行操作,如过滤、排序、哈希等。

Scala Java
// 对普通类型数据的Encoder是由 importing sqlContext.implicits._ 自动提供的

val ds = Seq(1, 2, 3).toDS()

ds.map(_ + 1).collect() // 返回: Array(2, 3, 4)

// 以下这行不仅定义了case class,同时也自动为其创建了Encoder

case class Person(name: String, age: Long)

val ds = Seq(Person("Andy", 32)).toDS()

// DataFrame 只需提供一个和数据schema对应的class即可转换为 Dataset。Spark会根据字段名进行映射。

val path = "examples/src/main/resources/people.json"

val people = sqlContext.read.json(path).as[Person]
和RDD互操作

Spark SQL有两种方法将RDD转为DataFrame。

1. 使用反射机制,推导包含指定类型对象RDD的schema。这种基于反射机制的方法使代码更简洁,而且如果你事先知道数据schema,推荐使用这种方式;

2. 编程方式构建一个schema,然后应用到指定RDD上。这种方式更啰嗦,但如果你事先不知道数据有哪些字段,或者数据schema是运行时读取进来的,那么你很可能需要用这种方式。

利用反射推导schema

Spark SQL的Scala接口支持自动将包含case class对象的RDD转为DataFrame。对应的case class定义了表的schema。case class的参数名通过反射,映射为表的字段名。case class还可以嵌套一些复杂类型,如Seq和Array。RDD隐式转换成DataFrame后,可以进一步注册成表。随后,你就可以对表中数据使用SQL语句查询了。

// sc 是已有的 SparkContext 对象

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 为了支持RDD到DataFrame的隐式转换

import sqlContext.implicits._

// 定义一个case class.

// 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制,

// 你可以使用自定义class,并实现Product接口。当然,你也可以改用编程方式定义schema

case class Person(name: String, age: Int)

// 创建一个包含Person对象的RDD,并将其注册成table

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p = Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")

// sqlContext.sql方法可以直接执行SQL语句

val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age = 13 AND age = 19")

// SQL查询的返回结果是一个DataFrame,且能够支持所有常见的RDD算子

// 查询结果中每行的字段可以按字段索引访问:

teenagers.map(t = "Name: " + t(0)).collect().foreach(println)

// 或者按字段名访问:

teenagers.map(t = "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型

teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)

// 返回结果: Map("name" - "Justin", "age" - 19)

Spark—GraphX编程指南 GraphX 是新的图形和图像并行计算的Spark API。从整理上看,GraphX 通过引入 弹性分布式属性图(Resilient Distributed Property Graph)继承了Spark RDD:一个将有效信息放在顶点和边的有向多重图。为了支持图形计算,GraphX 公开了一组基本的运算(例如,subgraph,joinVertices和mapReduceTriplets),以及在一个优化后的 PregelAPI的变形。此外,GraphX 包括越来越多的图算法和 builder 构造器,以简化图形分析任务。
Spark 2.4.0编程指南--Spark DataSources parquet、orc、csv、json、text、avro格式文件的读、写 spark.sql直接运行文件 BucketyBy,PartitionBy 读写文件 mergining dataSet jdbc(mysql)读写操作 Hive操作(create drop database ,...
Spark 2.4.0编程指南--Spark SQL UDF和UDAF ## 技能标签 - 了解UDF 用户定义函数(User-defined functions, UDFs) - 了解UDAF (user-defined aggregate function), 用户定义的聚合函数 - UDF示例(统计行数据字符长度) - UDF示例(统计行数据字符转大写)...
Spark 2.4.0编程指南--spark dataSet action ## 技能标签 - Spark session 创建 - 在Spark 2.0之后,RDD被数据集(Dataset)取代 ,保留RDD旧api - 数据集数据集介绍 - 读取本地文件(txt,json),HDFS文件 - 对txt格式文件数据遍历(行数据转成对象) - 对json格式文件...
Spark 2.4.0 编程指南--快速入门 ## 技能标签 - Spark 2.4.0 Spark session available as spark - 在Spark 2.0之后,RDD被数据集(Dataset)取代 - Spark session 读取HDFS文件做为数据集 - 数据集函数,count(),first(),...
转自:http://ifeve.com/%E3%80%8Aspark-%E5%AE%98%E6%96%B9%E6%96%87%E6%A1%A3%E3%80%8Bspark%E7%BC%96%E7%A8%8B%E6%8C%87%E5%8D%97/