Spark入门实战系列–6.SparkSQL(中)–深入了解SparkSQL运行计划及调优详解大数据
l 主机操作系统:Windows 64位,双核4线程,主频2.2G,10G内存
l 虚拟软件:VMware® Workstation 9.0.0 build-812388
l 虚拟机操作系统:CentOS6.5 64位,单核
l 虚拟机运行环境:
Ø JDK:1.7.0_55 64位
Ø Hadoop:2.2.0(需要编译为64位)
Ø Scala:2.10.4
Ø Spark:1.1.0(需要编译)
Ø Hive:0.13.1(源代码编译,参见1.2)
1.1.2 集群网络环境本次实验环境只需要hadoop1一台机器即可,网络环境配置如下:
1.2 编译Hive 1.2.1 下载Hive源代码包
这里选择下载的版本为hive-0.13.1,这个版本需要到apache的归档服务器下载,下载地址:http://archive.apache.org/dist/hive/hive-0.13.1/,选择apache-hive-0.13.1-src.tar.gz文件进行下载:
把下载的hive-0.13.0.tar.gz安装包,使用SSH Secure File Transfer工具(参见第2课《Spark编译与部署(上) 基础环境搭建》1.3.1介绍)上传到/home/hadoop/upload 目录下。
1.2.3 解压缩并移动到编译目录到上传目录下,用如下命令解压缩hive安装文件:
$cd /home/hadoop/upload
$tar -zxf apache-hive-0.13.1-src.tar.gz
改名并移动到/app/complied目录下:
$sudo mv apache-hive-0.13.1-src /app/complied/hive-0.13.1-src
$ll /app/complied
1.2.4 编译Hive编译Hive源代码的时候,需要从网上下载依赖包,所以整个编译过程机器必须保证在联网状态。编译执行如下脚本:
$cd /app/complied/hive-0.13.1-src/
$export MAVEN_OPTS= -Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m
$mvn -Phadoop-2,dist -Dmaven.test.skip=true clean package
在编译过程中可能出现速度慢或者中断,可以再次启动编译,编译程序会在上次的编译中断处继续进行编译,整个编译过程耗时与网速紧密相关,网速较快的情况需要1个小时左右(上图的时间是多次编译后最后成功的界面)。最终编译的结果为$HIVE_HOME/packaging/target/apache-hive-0.13.1-bin.tar.gz
通过如下命令查看最终编译完成整个目录大小,可以看到大小为353.6M左右
$du -s /app/complied/hive-0.13.1-src
【注】已经编译好的Hive包在本系列配套资源/install/6.hive-0.13.1-src.tar.gz,读者直接使用
1.3 首次运行hive-console 1.3.1 获取Spark源代码由于首次运行hive-console需要在Spark源代码进行编译,关于Spark源代码的获取可以参考第二课《Spark编译与部署(下) Spark编译安装》方式进行获取,连接地址为 http://spark.apache.org/downloads.html,获取源代码后把Spark源代码移动到/app/complied目录,并命名为spark-1.1.0-hive
1.3.2 配置/etc/profile环境变量第一步 使用如下命令打开/etc/profile文件:
$sudo vi /etc/profile
第二步 设置如下参数:
export HADOOP_HOME=/app/hadoop/hadoop-2.2.0
export HIVE_HOME=/app/complied/hive-0.13.1-src
export HIVE_DEV_HOME=/app/complied/hive-0.13.1-src
第三步 生效配置并验证
$sudo vi /etc/profile
$echo $HIVE_DEV_HOME
1.3.3 运行sbt进行编译运行hive/console不需要启动Spark,需要进入到Spark根目录下使用sbt/sbt hive/console进行首次运行编译,编译以后下次可以直接启动。编译Spark源代码的时候,需要从网上下载依赖包,所以整个编译过程机器必须保证在联网状态。编译命令如下:
$cd /app/complied/spark-1.1.0-hive
$sbt/sbt hive/console
编译时间会很长,在编译过程中可能出现速度慢或者中断,可以再次启动编译,编译程序会在上次的编译中断处继续进行编译,整个编译过程耗时与网速紧密相关。
通过如下命令查看最终编译完成整个目录大小,可以看到大小为267.9M左右
$du -s /app/complied/spark-1.1.0-hive
【注】已经编译好的Spark for hive-console包在本系列配套资源/install/6.spark-1.1.0-hive.tar.gz,可直接使用
1.4 使用hive-console 1.4.1 启动hive-console进入到spark根目录下,使用如下命令启动hive-console
$cd /app/complied/spark-1.1.0-hive
$sbt/sbt hive/console
可以使用:help查看帮助内容
scala :help
可以使用tab键查看所有可使用命令、函数
首先定义Person类,在该类中定义name、age和state三个列,然后把该类注册为people表并装载数据,最后通过查询到数据存放到query中
scala case class Person(name:String, age:Int, state:String)
scala sparkContext.parallelize(Person( Michael ,29, CA )::Person( Andy ,30, NY )::Person( Justin ,19, CA )::Person( Justin ,25, CA )::Nil).registerTempTable( people )
scala val query= sql( select * from people )
scala query.printSchema
scala query.collect()
scala query.queryExecution
scala query.queryExecution.logical
scala query.queryExecution.analyzed
scala query.queryExecution.optimizedPlan
scala query.queryExecution.sparkPlan
scala query.toDebugString
上面常用操作里介绍了源自RDD的数据, SparkSQL也可以源自多个数据源:jsonFile、parquetFile和Hive等。
1.4.5.1 读取Json格式数据第一步 Json测试数据
Json文件支持嵌套表,SparkSQL也可以读入嵌套表,如下面形式的Json数据,可以使用jsonFile读入SparkSQL。该文件可以在配套资源/data/class6中找到,在以下测试中把文件放到 /home/hadoop/upload/class6 路径中
{
fullname : Sean Kelly ,
org : SK Consulting ,
emailaddrs : [
{ type : work , value : [email protected] },
{ type : home , pref : 1, value : [email protected] }
],
telephones : [
{ type : work , pref : 1, value : +1 214 555 1212 },
{ type : fax , value : +1 214 555 1213 },
{ type : mobile , value : +1 214 555 1214 }
],
addresses : [
{ type : work , format : us ,
value : 1234 Main StnSpringfield, TX 78080-1216 },
{ type : home , format : us ,
value : 5678 Main StnSpringfield, TX 78080-1316 }
],
urls : [
{ type : work , value : http://seankelly.biz/ },
{ type : home , value : http://seankelly.tv/ }
]
}
第二步 读入Json数据
使用jsonFile读入数据并注册成表jsonPerson,然后定义一个查询jsonQuery
scala jsonFile( /home/hadoop/upload/class6/nestjson.json ).registerTempTable( jsonPerson )
scala val jsonQuery = sql( select * from jsonPerson )
第三步 查看jsonQuery的schema
scala jsonQuery.printSchema
第四步 查看jsonQuery的整个运行计划
scala jsonQuery.queryExecution
Parquet数据放在配套资源/data/class6/wiki_parquet中,在以下测试中把文件放到 /home/hadoop/upload/class6 路径下
第一步 读入Parquet数据
parquet文件读入并注册成表parquetWiki,然后定义一个查询parquetQuery
scala parquetFile( /home/hadoop/upload/class6/wiki_parquet ).registerTempTable( parquetWiki )
scala val parquetQuery = sql( select * from parquetWiki )
有报错但不影响使用
第二步 查询parquetQuery的schema
scala parquetQuery.printSchema
第三步 查询parquetQuery的整个运行计划
scala parquetQuery.queryExecution
第四步 查询取样
scala parquetQuery.takeSample(false,10,2)
在TestHive类中已经定义了大量的hive0.13的测试数据的表格式,如src、sales等等,在hive-console中可以直接使用;第一次使用的时候,hive-console会装载一次。下面我们使用sales表看看其schema和整个运行计划。
第一步 读入测试数据并定义一个查询hiveQuery
scala val hiveQuery = sql( select * from sales )
第二步 查看hiveQuery的schema
scala hiveQuery.printSchema
第三步 查看hiveQuery的整个运行计划
scala hiveQuery.queryExecution
第四步 其他SQL语句的运行计划
scala val hiveQuery = sql( select * from (select * from src limit 5) a limit 3 )
scala val hiveQuery = sql( select * FROM (select * FROM src) a )
scala hiveQuery.where( key === 100).queryExecution.toRdd.collect
scala sql( select name, age,state as location from people ).queryExecution
scala sql( select name from (select name,state as location from people) a where location= CA ).queryExecution
scala sql( select sum(age) from people ).queryExecution
scala sql( select sum(age) from people ).toDebugString
scala sql( select state,avg(age) from people group by state ).queryExecution
scala sql( select state,avg(age) from people group by state ).toDebugString
1.4.6.2 Join操作
scala sql( select a.name,b.name from people a join people b where a.name=b.name ).queryExecution
scala sql( select a.name,b.name from people a join people b where a.name=b.name ).toDebugString
scala sql( select distinct a.name,b.name from people a join people b where a.name=b.name ).queryExecution
scala sql( select distinct a.name,b.name from people a join people b where a.name=b.name ).toDebugString
CombineFilters就是合并Filter,在含有多个Filter时发生,如下查询:
sql( select name from (select * from people where age =19) a where a.age 30 ).queryExecution
上面的查询,在Optimized的过程中,将age =19和age 30这两个Filter合并了,合并成((age =19) (age 30))。其实上面还做了一个其他的优化,就是project的下推,子查询使用了表的所有列,而主查询使用了列name,在查询数据的时候子查询优化成只查列name。
1.4.7.2 PushPredicateThroughProjectPushPredicateThroughProject就是project下推,和上面例子中的project一样
sql( select name from (select name,state as location from people) a where location= CA ).queryExecution
ConstantFolding是常量叠加,用于表达式。如下面的例子:
sql( select name,1+2 from people ).queryExecution
Spark是一个快速的内存计算框架,同时是一个并行运算的框架,在计算性能调优的时候,除了要考虑广为人知的木桶原理外,还要考虑平行运算的Amdahl定理。
木桶原理又称短板理论,其核心思想是:一只木桶盛水的多少,并不取决于桶壁上最高的那块木块,而是取决于桶壁上最短的那块。将这个理论应用到系统性能优化上,系统的最终性能取决于系统中性能表现最差的组件。例如,即使系统拥有充足的内存资源和CPU资源,但是如果磁盘I/O性能低下,那么系统的总体性能是取决于当前最慢的磁盘I/O速度,而不是当前最优越的CPU或者内存。在这种情况下,如果需要进一步提升系统性能,优化内存或者CPU资源是毫无用处的。只有提高磁盘I/O性能才能对系统的整体性能进行优化。
Amdahl定理,一个计算机科学界的经验法则,因吉恩·阿姆达尔而得名。它代表了处理器平行运算之后效率提升的能力。并行计算中的加速比是用并行前的执行速度和并行后的执行速度之比来表示的,它表示了在并行化之后的效率提升情况。阿姆达尔定律是固定负载(计算总量不变时)时的量化标准。可用公式:来表示。式中
分别表示问题规模的串行分量(问题中不能并行化的那一部分)和并行分量,p表示处理器数量。当
时,上式的极限是
,其中
。这意味着无论我们如何增大处理器数目,加速比是无法高于这个数的。
SparkSQL作为Spark的一个组件,在调优的时候,也要充分考虑到上面的两个原理,既要考虑如何充分的利用硬件资源,又要考虑如何利用好分布式系统的并行计算。由于测试环境条件有限,本篇不能做出更详尽的实验数据来说明,只能在理论上加以说明。
2.1 并行性SparkSQL在集群中运行,将一个查询任务分解成大量的Task分配给集群中的各个节点来运行。通常情况下,Task的数量是大于集群的并行度。比如前面第六章和第七章查询数据时,shuffle的时候使用了缺省的spark.sql.shuffle.partitions,即200个partition,也就是200个Task:
而实验的集群环境却只能并行3个Task,也就是说同时只能有3个Task保持Running:
这时大家就应该明白了,要跑完这200个Task就要跑200/3=67批次。如何减少运行的批次呢?那就要尽量提高查询任务的并行度。查询任务的并行度由两方面决定:集群的处理能力和集群的有效处理能力。
l对于Spark Standalone集群来说,集群的处理能力是由conf/spark-env中的SPARK_WORKER_INSTANCES参数、SPARK_WORKER_CORES参数决定的;而SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES不能超过物理机器的实际CPU core;
l集群的有效处理能力是指集群中空闲的集群资源,一般是指使用spark-submit或spark-shell时指定的 total-executor-cores,一般情况下,我们不需要指定,这时候,Spark Standalone集群会将所有空闲的core分配给查询,并且在Task轮询运行过程中,Standalone集群会将其他spark应用程序运行完后空闲出来的core也分配给正在运行中的查询。
综上所述,SparkSQL的查询并行度主要和集群的core数量相关,合理配置每个节点的core可以提高集群的并行度,提高查询的效率。
2.2 高效的数据格式高效的数据格式,一方面是加快了数据的读入速度,另一方面可以减少内存的消耗。高效的数据格式包括多个方面:
2.2.1 数据本地性分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况,除非是在集群的所有节点上都保存数据的副本。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率。为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是合理设置数据的副本。设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值。
下面是Spark webUI监控Stage的一个图:
lPROCESS_LOCAL是指读取缓存在本地节点的数据
lNODE_LOCAL是指读取本地节点硬盘数据
lANY是指读取非本地节点数据
l通常读取数据PROCESS_LOCAL NODE_LOCAL ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关。
2.2.2 合适的数据类型
对于要查询的数据,定义合适的数据类型也是非常有必要。对于一个tinyint可以使用的数据列,不需要为了方便定义成int类型,一个tinyint的数据占用了1个byte,而int占用了4个byte。也就是说,一旦将这数据进行缓存的话,内存的消耗将增加数倍。在SparkSQL里,定义合适的数据类型可以节省有限的内存资源。
2.2.3 合适的数据列对于要查询的数据,在写SQL语句的时候,尽量写出要查询的列名,如Select a,b from tbl,而不是使用Select * from tbl;这样不但可以减少磁盘IO,也减少缓存时消耗的内存。
2.2.4 优的数据存储格式在查询的时候,最终还是要读取存储在文件系统中的文件。采用更优的数据存储格式,将有利于数据的读取速度。查看SparkSQL的Stage,可以发现,很多时候,数据读取消耗占有很大的比重。对于sqlContext来说,支持 textFiile、SequenceFile、ParquetFile、jsonFile;对于hiveContext来说,支持AvroFile、ORCFile、Parquet File,以及各种压缩。根据自己的业务需求,测试并选择合适的数据存储格式将有利于提高SparkSQL的查询效率。
2.3 内存的使用spark应用程序最纠结的地方就是内存的使用了,也是最能体现“细节是魔鬼”的地方。Spark的内存配置项有不少,其中比较重要的几个是:
lSPARK_WORKER_MEMORY,在conf/spark-env.sh中配置SPARK_WORKER_MEMORY 和SPARK_WORKER_INSTANCES,可以充分的利用节点的内存资源,SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY不要超过节点本身具备的内存容量;
lexecutor-memory,在spark-shell或spark-submit提交spark应用程序时申请使用的内存数量;不要超过节点的SPARK_WORKER_MEMORY;
lspark.storage.memoryFraction spark应用程序在所申请的内存资源中可用于cache的比例
lspark.shuffle.memoryFraction spark应用程序在所申请的内存资源中可用于shuffle的比例
在实际使用上,对于后两个参数,可以根据常用查询的内存消耗情况做适当的变更。另外,在SparkSQL使用上,有几点建议:
l对于频繁使用的表或查询才进行缓存,对于只使用一次的表不需要缓存;
l对于join操作,优先缓存较小的表;
l要多注意Stage的监控,多思考如何才能更多的Task使用PROCESS_LOCAL;
l要多注意Storage的监控,多思考如何才能Fraction cached的比例更多
对于SparkSQL,还有一个比较重要的参数,就是shuffle时候的Task数量,通过spark.sql.shuffle.partitions来调节。调节的基础是spark集群的处理能力和要处理的数据量,spark的默认值是200。Task过多,会产生很多的任务启动开销,Task多少,每个Task的处理时间过长,容易straggle。
2.5 其他的一些建议优化的方面的内容很多,但大部分都是细节性的内容,下面就简单地提提:
l 想要获取更好的表达式查询速度,可以将spark.sql.codegen设置为Ture;
l 对于大数据集的计算结果,不要使用collect() ,collect()就结果返回给driver,很容易撑爆driver的内存;一般直接输出到分布式文件系统中;
l 对于Worker倾斜,设置spark.speculation=true 将持续不给力的节点去掉;
l 对于数据倾斜,采用加入部分中间步骤,如聚合后cache,具体情况具体分析;
l 适当的使用序化方案以及压缩方案;
l 善于利用集群监控系统,将集群的运行状况维持在一个合理的、平稳的状态;
l 善于解决重点矛盾,多观察Stage中的Task,查看最耗时的Task,查找原因并改善;
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/8912.html
分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集相关文章
- Yelp 的 Spark 数据血缘建设实践!
- Spark集群基础概念 与 spark架构原理详解大数据
- spark性能调优(二) 彻底解密spark的Hash Shuffle详解大数据
- Spark入门实战系列–2.Spark编译与部署(上)–基础环境搭建详解大数据
- Spark入门实战系列–2.Spark编译与部署(下)–Spark编译安装详解大数据
- Spark入门实战系列–9.Spark图计算GraphX介绍及实例详解大数据
- Spark项目之电商用户行为分析大数据平台之(五)实时数据采集详解大数据
- Spark项目之电商用户行为分析大数据平台之(一)项目介绍详解大数据
- Spark源码分析之spark-submit详解大数据
- Spark SQL讲解详解大数据
- Spark内存管理机制详解大数据
- Spark编程模型(RDD编程模型)详解大数据
- Spark算子执行流程详解之八大数据
- Spark算子执行流程详解之七大数据
- Spark-Sql源码解析之六 PrepareForExecution: spark plan -> executed Plan详解大数据
- Spark-Sql源码解析之五 Spark Planner:optimized logical plan –> spark plan详解大数据
- Tuning-java-garbage-collection-for-spark-applications详解大数据
- 如何利用Spark提高批量插入Solr的效率详解大数据
- MySQL数据比较:简化数据比对任务(mysql数据比对工具)
- 大数据工具比较:R 语言和 Spark 谁更胜一筹?
- 使用mysql管理同一天内的数据(mysql同一天)
- Aerith与MySQL让数据动起来(aerith mysql)
- 作为缓存Spark利用Redis缓冲数据的应用(spark需要redis)
- Spark构建Redis数据按照高效实时处理(spark连接redis)
- Spark构建实时应用存储分析引擎Redis(spark存储redis)
- arkOracle与Spark实现企业大数据运维的创新融合(oracle 与sp)
- 数据Redis操作获取并移除数据(redis 获取并移除)