Spark-Sql源码解析之七 Execute: executed Plan -> RDD[Row]详解大数据
SQLContext sqlContext = new SQLContext(jsc); DataFrame dataFrame = sqlContext.parquetFile(parquetPath); dataFrame.registerTempTable(source); String sql = " select SUM(id) from test group by dev_chnid "; DataFrame result = sqlContext.sql(sql); log.info("Result:"+result.collect());//collect触发action override def collect(): Array[Row] = { val ret = queryExecution.executedPlan.executeCollect()//执行executedPlan的executeCollect ret def executeCollect(): Array[Row] = { execute().mapPartitions { iter = val converter = CatalystTypeConverters.createToScalaConverter(schema) iter.map(converter(_).asInstanceOf[Row]) }.collect()//最终执行的是executedPlan的execute,即SparkPlan的execute def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray) Array.concat(results: _*) }
查看SparkPlan的execute函数:
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable { final def execute(): RDD[Row] = { RDDOperationScope.withScope(sparkContext, nodeName, false, true) { doExecute()//执行各个具体SparkPlan的doExecute函数 }
可以每个具体的SparkPlan都会封装一个doExecute函数,其输出为RDD[Row]。就拿select SUM(id) from test group by dev_chnid语句来说,其executePlan为:
Aggregate false, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L] Exchange (HashPartitioning 200) Aggregate true, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L] PhysicalRDD [dev_chnid#0,id#17L], MapPartitionsRDD
先看下Aggregatefalse, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L]的doExecute的函数:
protected override def doExecute(): RDD[Row] = attachTree(this, "execute") { if (groupingExpressions.isEmpty) {//如果没有分组 child.execute().mapPartitions { iter = //执行child的execute函数 val buffer = newAggregateBuffer() var currentRow: Row = null while (iter.hasNext) { currentRow = iter.next() var i = 0 while (i buffer.length) {//计算全局的值 buffer(i).update(currentRow) i += 1 val resultProjection = new InterpretedProjection(resultExpressions, computedSchema) val aggregateResults = new GenericMutableRow(computedAggregates.length) var i = 0 while (i buffer.length) { aggregateResults(i) = buffer(i).eval(EmptyRow) i += 1 Iterator(resultProjection(aggregateResults)) } else { child.execute().mapPartitions { iter = //执行child的execute函数 val hashTable = new HashMap[Row, Array[AggregateFunction]] //groupingExpressions = [dev_chnid#0] //child.output = [dev_chnid#0,id#17L] val groupingProjection = new InterpretedMutableProjection(groupingExpressions, child.output) var currentRow: Row = null while (iter.hasNext) { currentRow = iter.next() val currentGroup = groupingProjection(currentRow) var currentBuffer = hashTable.get(currentGroup) if (currentBuffer == null) { currentBuffer = newAggregateBuffer() hashTable.put(currentGroup.copy(), currentBuffer) var i = 0 while (i currentBuffer.length) { currentBuffer(i).update(currentRow)//计算不同分组情况下的聚合值 i += 1 new Iterator[Row] { private[this] val hashTableIter = hashTable.entrySet().iterator() private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length) private[this] val resultProjection = new InterpretedMutableProjection( resultExpressions, computedSchema ++ namedGroups.map(_._2)) private[this] val joinedRow = new JoinedRow4 override final def hasNext: Boolean = hashTableIter.hasNext override final def next(): Row = { val currentEntry = hashTableIter.next() val currentGroup = currentEntry.getKey val currentBuffer = currentEntry.getValue var i = 0 while (i currentBuffer.length) { // Evaluating an aggregate buffer returns the result. No row is required since we // already added all rows in the group using update. aggregateResults(i) = currentBuffer(i).eval(EmptyRow) i += 1 resultProjection(joinedRow(aggregateResults, currentGroup))//返回不同分组下的值 }
Aggregatefalse, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L]的child为Exchange (HashPartitioning 200),即:
case class Exchange( newPartitioning: Partitioning, newOrdering: Seq[SortOrder], child: SparkPlan) extends UnaryNode { protected override def doExecute(): RDD[Row] = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) = val keySchema = expressions.map(_.dataType).toArray val valueSchema = child.output.map(_.dataType).toArray val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions) val part = new HashPartitioner(numPartitions) val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { child.execute().mapPartitions { iter = val hashExpressions = newMutableProjection(expressions, child.output)() iter.map(r = (hashExpressions(r).copy(), r.copy())) } else { child.execute().mapPartitions { iter = val hashExpressions = newMutableProjection(expressions, child.output)() val mutablePair = new MutablePair[Row, Row]() iter.map(r = mutablePair.update(hashExpressions(r), r)) val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)//其中rdd为其child的输出,part指定分区规则 if (newOrdering.nonEmpty) { shuffled.setKeyOrdering(keyOrdering) shuffled.setSerializer(serializer) shuffled.map(_._2) case _ = sys.error(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. }
Exchange返回的ShuffleRDD的输入为其child的输出,即:
Aggregatetrue, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L],以此类推,接下来我们看下最后那个SparkPlan:PhysicalRDD
private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { protected override def doExecute(): RDD[Row] = rdd//直接返回其构造函数的rdd }
PhysicalRDD的doExecute直接返回其构造函数的rdd,写得太抽象了吧,因此我们来追踪下这个rdd究竟是什么。
private[sql] object DataSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) = val sharedHadoopConf = SparkHadoopUtil.get.conf val confBroadcast = t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf)) pruneFilterProject( projectList, filters, (a, f) = t.buildScan(a, f, t.paths, confBroadcast)) :: Nil protected def pruneFilterProject( relation: LogicalRelation, projectList: Seq[NamedExpression], filterPredicates: Seq[Expression], scanBuilder: (Array[String], Array[Filter]) = RDD[Row]) = { pruneFilterProjectRaw( relation, projectList, filterPredicates, (requestedColumns, pushedFilters) = { scanBuilder(requestedColumns.map(_.name).toArray, selectFilters(pushedFilters).toArray) protected def pruneFilterProjectRaw( relation: LogicalRelation, projectList: Seq[NamedExpression], filterPredicates: Seq[Expression], scanBuilder: (Seq[Attribute], Seq[Expression]) = RDD[Row]) = { val projectSet = AttributeSet(projectList.flatMap(_.references)) val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) val filterCondition = filterPredicates.reduceLeftOption(expressions.And) val pushedFilters = filterPredicates.map { _ transform { case a: AttributeReference = relation.attributeMap(a) // Match original case of attributes. if (projectList.map(_.toAttribute) == projectList projectSet.size == projectList.size filterSet.subsetOf(projectSet)) { // When it is possible to just use column pruning to get the right projection and // when the columns of this projection are enough to evaluate all filter conditions, // just do a scan followed by a filter, with no extra project. val requestedColumns = projectList.asInstanceOf[Seq[Attribute]] // Safe due to if above. .map(relation.attributeMap) // Match original case of attributes. val scan = createPhysicalRDD(relation.relation, projectList.map(_.toAttribute), scanBuilder(requestedColumns, pushedFilters)) filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) } else { val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq val scan = createPhysicalRDD(relation.relation, requestedColumns, scanBuilder(requestedColumns, pushedFilters)) execution.Project(projectList, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) private[this] def createPhysicalRDD( relation: BaseRelation, output: Seq[Attribute], rdd: RDD[Row]): SparkPlan = { val converted = if (relation.needConversion) { execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) } else { rdd execution.PhysicalRDD(output, converted) }
可见其rdd本质上是buildscan的返回值,spark根据不同的HadoopFsRelation编写不同的buildscan,基于Spark1.4.0的Parquet文件对应的HadoopFsRelation为ParquetRelation2,即:
private[sql] class ParquetRelation2( override val paths: Array[String], private val maybeDataSchema: Option[StructType], // This is for metastore conversion. private val maybePartitionSpec: Option[PartitionSpec], override val userDefinedPartitionColumns: Option[StructType], parameters: Map[String, String])( val sqlContext: SQLContext) extends HadoopFsRelation(maybePartitionSpec) with Logging { override def buildScan( requiredColumns: Array[String], filters: Array[Filter], inputFiles: Array[FileStatus], broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = { val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown // Create the function to set variable Parquet confs at both driver and executor side. val initLocalJobFuncOpt = ParquetRelation2.initializeLocalJobFunc( requiredColumns, filters, dataSchema, useMetadataCache, parquetFilterPushDown) _ // Create the function to set input paths at the driver side. val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _ val footers = inputFiles.map(f = metadataCache.footers(f.getPath)) Utils.withDummyCallSite(sqlContext.sparkContext) { // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects // and footers. Especially when a global arbitrative schema (either from metastore or data // source DDL) is available. new SqlNewHadoopRDD( sc = sqlContext.sparkContext, broadcastedConf = broadcastedConf, initDriverSideJobFuncOpt = Some(setInputPaths), initLocalJobFuncOpt = Some(initLocalJobFuncOpt), inputFormatClass = classOf[FilteringParquetRowInputFormat], keyClass = classOf[Void], valueClass = classOf[Row]) { val cacheMetadata = useMetadataCache @transient val cachedStatuses = inputFiles.map { f = // In order to encode the authority of a Path containing special characters such as / // (which does happen in some S3N credentials), we need to use the string returned by the // URI of the path to create a new Path. val pathWithEscapedAuthority = escapePathUserInfo(f.getPath) new FileStatus( f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority) }.toSeq @transient val cachedFooters = footers.map { f = // In order to encode the authority of a Path containing special characters such as /, // we need to use the string returned by the URI of the path to create a new Path. new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata) }.toSeq private def escapePathUserInfo(path: Path): Path = { val uri = path.toUri new Path(new URI( uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath, uri.getQuery, uri.getFragment)) // Overridden so we can inject our own cached files statuses. override def getPartitions: Array[SparkPartition] = { val inputFormat = if (cacheMetadata) { new FilteringParquetRowInputFormat { override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters } else { new FilteringParquetRowInputFormat val jobContext = newJobContext(getConf(isDriverSide = true), jobId) val rawSplits = inputFormat.getSplits(jobContext) Array.tabulate[SparkPartition](rawSplits.size) { i = new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) }.values }
其返回一个SqlNewHadoopRDD,至此已经将sql语言转换为Spark的计算模型即rdd,接下来就是根据不同的rdd切分不同的stage,然后提交stage内的task来进行运算了,最后我们来看下其具体的执行情况:
由于先局部聚合,然后全局聚合,因此job 0 被切分为2个Stage,stage 0 和stage 1,如下:
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9307.html
分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集相关文章
- SQL Server 提取数字、提取英文、提取中文的sql语句
- SQL SERVER 表与表之间 字段一对多sql语句写法
- mysql的SQL_NO_CACHE(在查询时不使用缓存)和sql_cache用法详解数据库
- Spark-Sql源码解析之二 Sqlparser:sql –> unresolved logical plan详解大数据
- 一键实现:SQL数据转换到Oracle的神器(sql转oracle工具)
- JSP JSTL <sql:query>标签:通过SQL语句查询
- MySQL 建表SQL精选实例(mysql建表sql)
- sql server比较Oracle 与 SQL Server的优缺点(oracleltgt)
- 「一键导出MySQL SQL文件,轻松备份数据库数据」(mysql导出sql工具)
- 数据库导入SQL脚本,使用MSSQL数据库快速实现数据迁移(sql文件导入mssql)
- 和 sql server支持两者:Oracle 和 SQL Server的兼容性(兼容oracle)
- Oracle和SQL:创新的时代来临(oracle与sql)
- 数据合法性使用SQL语句判断MSSQL数据合法性(sql判断mssql)
- 快速高效的MSSQL导入SQL方法,让数据转移无压力。(mssql导入sql)
- MySQL数据库备份还原SQL操作指南(mysql数据还原sql)
- MySQL如何进行批量导入SQL数据?(mysql批量导入sql)
- 深入浅出:精通Oracle数据库SQL语句(oracle数据库sql语句)
- Linux下执行SQL脚本:灵活、快捷、方便!(linux执行sql脚本)
- 精通Oracle元数据之SQL编程(oracle元数据sql)
- 利用SQL查询Redis中的数据(用sql查询redis)
- MySQL SQL数据库管理神器(mysql。sql)
- SQL取用Redis缓存技术实现数据提速(sql读取redis缓存)
- SQL与Redis结合构建高效存储模型(在sql 中加redis)
- 的order byOracle SQL中的Order By语法实现的排序(oracle中sql中)
- 异探究Oracle中两段SQL差异(oracle两段sql差)
- Oracle与SQL连接之路追求数据的完美结合(oracle与sql链接)
- 精通Oracle SQL语法,把握大数据运算之道(oracle_sql语法)
- Mssql,Access的sql经典SQL语句大全
- Sql学习第一天——SQL练习题(建表/sql语句)
- Sql学习第四天——SQL关于withcube,withrollup和grouping解释及演示