DataFrames与RDDs的相互转换
转换 相互
2023-09-27 14:19:38 时间
Spark SQL支持两种RDDs转换为DataFrames的方式
使用反射获取RDD内的Schema
当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。
通过编程接口指定Schema
通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。
这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema。
使用反射获取Schema(Inferring the Schema Using Reflection)
import org.apache.spark.sql.{DataFrameReader, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object InferringSchema {
def main(args: Array[String]) {
//创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName("SQL-intsmaze")
//SQLContext要依赖SparkContext
val sc = new SparkContext(conf)
//创建SQLContext
val sqlContext = new SQLContext(sc)
//从指定的地址创建RDD
val lineRDD = sc.textFile("hdfs://192.168.19.131:9000/person.tzt").map(_.split(","))
//创建case class
//将RDD和case class关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
//导入隐式转换,如果不导入无法将RDD转换成DataFrame
//将RDD转换成DataFrame
import sqlContext.implicits._
val personDF = personRDD.toDF
//注册表
personDF.registerTempTable("intsmaze")
//传入SQL
val df = sqlContext.sql("select * from intsmaze order by age desc limit 2")
//将结果以JSON的方式存储到指定位置
df.write.json("hdfs://192.168.19.131:9000/personresult")
//停止Spark Context
sc.stop()
}
}
//case class一定要放到外面
case class Person(id: Int, name: String, age: Int)
spark shell中不需要导入sqlContext.implicits._是因为spark shell默认已经自动导入了。
打包提交到yarn集群:
/home/hadoop/app/spark/bin/spark-submit --class InferringSchema \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar
通过编程接口指定Schema(Programmatically Specifying the Schema)
当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步:
从原来的RDD创建一个Row格式的RDD.
创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema.
通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema.
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}
object SpecifyingSchema {
def main(args: Array[String]) {
//创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName("SQL-intsmaze")
//SQLContext要依赖SparkContext
val sc = new SparkContext(conf)
//创建SQLContext
val sqlContext = new SQLContext(sc)
//从指定的地址创建RDD
val personRDD = sc.textFile(args(0)).map(_.split(","))
//通过StructType直接指定每个字段的schema
val schema = StructType(
List(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
)
)
//将RDD映射到rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
//将schema信息应用到rowRDD上
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//注册表
personDataFrame.registerTempTable("intsmaze")
//执行SQL
val df = sqlContext.sql("select * from intsmaze order by age desc ")
//将结果以JSON的方式存储到指定位置
df.write.json(args(1))
//停止Spark Context
sc.stop()
}
}
将程序打成jar包,上传到spark集群,提交Spark任务
/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar \
hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult
/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar \
hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult
在maven项目的pom.xml中添加Spark SQL的依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.2</version>
</dependency>
相关文章
- Matlab实现二进制矩阵转换为十进制
- Thinkphp5 日期与时间戳相互转换
- JMeter:将提取的时间戳值转换为日期格式
- 06不同进制数之间的相互转换
- CSS中内联元素的盒子模型特点?行内元素,块元素,行内块元素之间的相互转换
- VC中CString和WPARAM之间的相互转换
- 【Serde】枚举和数字转换
- fastjson中List和JSONArray的相互转换
- python String - Image 相互转换
- python实现全角半角的相互转换
- 字符串和byte相互转换,以及pack和unpack处理数据和二进制转化
- Java 基础——List 与数组、Map 相互转换
- 进制转换_全
- Byte[]、Image、Bitmap_之间的相互转换
- Java 数据类型及转换
- java项目编码格式转换(如GBK转UTF-8)
- 《挑战30天C++入门极限》C++运算符重载转换运算符
- c# 字节数组和类相互转换
- android 路径地址与Uri的相互转换 uri转string
- lmdb转换
- 如何将时间序列问题用 Python 转换成为监督学习问题
- 转换CString->char*
- 在C#中调用格式工厂进行任意视频格式到FLV的转换
- 纵表和横表的概念及其相互转换
- (转)java 中unsigned类型的转换
- 把非线程安全的集合转换为线程安全
- C# 使用转换语义版本号