zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Spark-Sql源码解析之七 Execute: executed Plan -> RDD[Row]详解大数据

SQL源码数据Spark 详解 解析 row execute
2023-06-13 09:20:26 时间
SQLContext sqlContext = new SQLContext(jsc); 

DataFrame dataFrame = sqlContext.parquetFile(parquetPath); 

dataFrame.registerTempTable(source); 

String sql = " select SUM(id) from test group by dev_chnid "; 

DataFrame result = sqlContext.sql(sql); 

log.info("Result:"+result.collect());//collect触发action 

override def collect(): Array[Row] = { 

 val ret = queryExecution.executedPlan.executeCollect()//执行executedPlan的executeCollect 

 ret 

def executeCollect(): Array[Row] = { 

 execute().mapPartitions { iter = 

 val converter = CatalystTypeConverters.createToScalaConverter(schema) 

 iter.map(converter(_).asInstanceOf[Row]) 

 }.collect()//最终执行的是executedPlan的execute,即SparkPlan的execute 

def collect(): Array[T] = withScope { 

 val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray) 

 Array.concat(results: _*) 

}

查看SparkPlan的execute函数:

abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable { 

final def execute(): RDD[Row] = { 

 RDDOperationScope.withScope(sparkContext, nodeName, false, true) { 

 doExecute()//执行各个具体SparkPlan的doExecute函数 

}

可以每个具体的SparkPlan都会封装一个doExecute函数,其输出为RDD[Row]。就拿select SUM(id) from test group by dev_chnid语句来说,其executePlan为:

Aggregate false, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L] 

 Exchange (HashPartitioning 200) 

 Aggregate true, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L] 

 PhysicalRDD [dev_chnid#0,id#17L], MapPartitionsRDD

先看下Aggregatefalse, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L]的doExecute的函数:

protected override def doExecute(): RDD[Row] = attachTree(this, "execute") { 

 if (groupingExpressions.isEmpty) {//如果没有分组 

 child.execute().mapPartitions { iter = //执行child的execute函数 

 val buffer = newAggregateBuffer() 

 var currentRow: Row = null 

 while (iter.hasNext) { 

 currentRow = iter.next() 

 var i = 0 

 while (i buffer.length) {//计算全局的值 

 buffer(i).update(currentRow) 

 i += 1 

 val resultProjection = new InterpretedProjection(resultExpressions, computedSchema) 

 val aggregateResults = new GenericMutableRow(computedAggregates.length) 

 var i = 0 

 while (i buffer.length) { 

 aggregateResults(i) = buffer(i).eval(EmptyRow) 

 i += 1 

 Iterator(resultProjection(aggregateResults)) 

 } else { 

 child.execute().mapPartitions { iter = //执行child的execute函数 

 val hashTable = new HashMap[Row, Array[AggregateFunction]] 

 //groupingExpressions = [dev_chnid#0] 

 //child.output = [dev_chnid#0,id#17L] 

 val groupingProjection = new InterpretedMutableProjection(groupingExpressions, child.output) 

 var currentRow: Row = null 

 while (iter.hasNext) { 

 currentRow = iter.next() 

 val currentGroup = groupingProjection(currentRow) 

 var currentBuffer = hashTable.get(currentGroup) 

 if (currentBuffer == null) { 

 currentBuffer = newAggregateBuffer() 

 hashTable.put(currentGroup.copy(), currentBuffer) 

 var i = 0 

 while (i currentBuffer.length) { 

 currentBuffer(i).update(currentRow)//计算不同分组情况下的聚合值 

 i += 1 

 new Iterator[Row] { 

 private[this] val hashTableIter = hashTable.entrySet().iterator() 

 private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length) 

 private[this] val resultProjection = 

 new InterpretedMutableProjection( 

 resultExpressions, computedSchema ++ namedGroups.map(_._2)) 

 private[this] val joinedRow = new JoinedRow4 

 override final def hasNext: Boolean = hashTableIter.hasNext 

 override final def next(): Row = { 

 val currentEntry = hashTableIter.next() 

 val currentGroup = currentEntry.getKey 

 val currentBuffer = currentEntry.getValue 

 var i = 0 

 while (i currentBuffer.length) { 

 // Evaluating an aggregate buffer returns the result. No row is required since we 

 // already added all rows in the group using update. 

 aggregateResults(i) = currentBuffer(i).eval(EmptyRow) 

 i += 1 

 resultProjection(joinedRow(aggregateResults, currentGroup))//返回不同分组下的值 

}

Aggregatefalse, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L]的child为Exchange (HashPartitioning 200),即:

case class Exchange( 

 newPartitioning: Partitioning, 

 newOrdering: Seq[SortOrder], 

 child: SparkPlan) 

 extends UnaryNode { 

protected override def doExecute(): RDD[Row] = attachTree(this , "execute") { 

 newPartitioning match { 

 case HashPartitioning(expressions, numPartitions) = 

 val keySchema = expressions.map(_.dataType).toArray 

 val valueSchema = child.output.map(_.dataType).toArray 

 val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions) 

 val part = new HashPartitioner(numPartitions) 

 val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { 

 child.execute().mapPartitions { iter = 

 val hashExpressions = newMutableProjection(expressions, child.output)() 

 iter.map(r = (hashExpressions(r).copy(), r.copy())) 

 } else { 

 child.execute().mapPartitions { iter = 

 val hashExpressions = newMutableProjection(expressions, child.output)() 

 val mutablePair = new MutablePair[Row, Row]() 

 iter.map(r = mutablePair.update(hashExpressions(r), r)) 

 val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)//其中rdd为其child的输出,part指定分区规则 

 if (newOrdering.nonEmpty) { 

 shuffled.setKeyOrdering(keyOrdering) 

 shuffled.setSerializer(serializer) 

 shuffled.map(_._2) 

 case _ = sys.error(s"Exchange not implemented for $newPartitioning") 

 // TODO: Handle BroadcastPartitioning. 

}

Exchange返回的ShuffleRDD的输入为其child的输出,即:

Aggregatetrue, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L],以此类推,接下来我们看下最后那个SparkPlan:PhysicalRDD

private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { 

 protected override def doExecute(): RDD[Row] = rdd//直接返回其构造函数的rdd 

}

PhysicalRDD的doExecute直接返回其构造函数的rdd,写得太抽象了吧,因此我们来追踪下这个rdd究竟是什么。

private[sql] object DataSourceStrategy extends Strategy with Logging { 

 def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { 

case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) = 

val sharedHadoopConf = SparkHadoopUtil.get.conf 

val confBroadcast = 

 t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf)) 

pruneFilterProject( 

 projectList, 

 filters, 

 (a, f) = t.buildScan(a, f, t.paths, confBroadcast)) :: Nil 

 protected def pruneFilterProject( 

 relation: LogicalRelation, 

 projectList: Seq[NamedExpression], 

 filterPredicates: Seq[Expression], 

 scanBuilder: (Array[String], Array[Filter]) = RDD[Row]) = { 

 pruneFilterProjectRaw( 

 relation, 

 projectList, 

 filterPredicates, 

 (requestedColumns, pushedFilters) = { 

 scanBuilder(requestedColumns.map(_.name).toArray, selectFilters(pushedFilters).toArray) 

 protected def pruneFilterProjectRaw( 

 relation: LogicalRelation, 

 projectList: Seq[NamedExpression], 

 filterPredicates: Seq[Expression], 

 scanBuilder: (Seq[Attribute], Seq[Expression]) = RDD[Row]) = { 

 val projectSet = AttributeSet(projectList.flatMap(_.references)) 

 val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) 

 val filterCondition = filterPredicates.reduceLeftOption(expressions.And) 

 val pushedFilters = filterPredicates.map { _ transform { 

 case a: AttributeReference = relation.attributeMap(a) // Match original case of attributes. 

 if (projectList.map(_.toAttribute) == projectList 

 projectSet.size == projectList.size 

 filterSet.subsetOf(projectSet)) { 

 // When it is possible to just use column pruning to get the right projection and 

 // when the columns of this projection are enough to evaluate all filter conditions, 

 // just do a scan followed by a filter, with no extra project. 

 val requestedColumns = 

 projectList.asInstanceOf[Seq[Attribute]] // Safe due to if above. 

 .map(relation.attributeMap) // Match original case of attributes. 

 val scan = createPhysicalRDD(relation.relation, projectList.map(_.toAttribute), 

 scanBuilder(requestedColumns, pushedFilters)) 

 filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) 

 } else { 

 val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq 

 val scan = createPhysicalRDD(relation.relation, requestedColumns, 

 scanBuilder(requestedColumns, pushedFilters)) 

 execution.Project(projectList, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) 

private[this] def createPhysicalRDD( 

 relation: BaseRelation, 

 output: Seq[Attribute], 

 rdd: RDD[Row]): SparkPlan = { 

 val converted = if (relation.needConversion) { 

 execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) 

 } else { 

 rdd 

 execution.PhysicalRDD(output, converted) 

} 

可见其rdd本质上是buildscan的返回值,spark根据不同的HadoopFsRelation编写不同的buildscan,基于Spark1.4.0的Parquet文件对应的HadoopFsRelation为ParquetRelation2,即:

private[sql] class ParquetRelation2( 

 override val paths: Array[String], 

 private val maybeDataSchema: Option[StructType], 

 // This is for metastore conversion. 

 private val maybePartitionSpec: Option[PartitionSpec], 

 override val userDefinedPartitionColumns: Option[StructType], 

 parameters: Map[String, String])( 

 val sqlContext: SQLContext) 

 extends HadoopFsRelation(maybePartitionSpec) 

 with Logging { 

override def buildScan( 

 requiredColumns: Array[String], 

 filters: Array[Filter], 

 inputFiles: Array[FileStatus], 

 broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = { 

 val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean 

 val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown 

 // Create the function to set variable Parquet confs at both driver and executor side. 

 val initLocalJobFuncOpt = 

 ParquetRelation2.initializeLocalJobFunc( 

 requiredColumns, 

 filters, 

 dataSchema, 

 useMetadataCache, 

 parquetFilterPushDown) _ 

 // Create the function to set input paths at the driver side. 

 val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _ 

 val footers = inputFiles.map(f = metadataCache.footers(f.getPath)) 

 Utils.withDummyCallSite(sqlContext.sparkContext) { 

 // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. 

 // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects 

 // and footers. Especially when a global arbitrative schema (either from metastore or data 

 // source DDL) is available. 

 new SqlNewHadoopRDD( 

 sc = sqlContext.sparkContext, 

 broadcastedConf = broadcastedConf, 

 initDriverSideJobFuncOpt = Some(setInputPaths), 

 initLocalJobFuncOpt = Some(initLocalJobFuncOpt), 

 inputFormatClass = classOf[FilteringParquetRowInputFormat], 

 keyClass = classOf[Void], 

 valueClass = classOf[Row]) { 

 val cacheMetadata = useMetadataCache 

 @transient val cachedStatuses = inputFiles.map { f = 

 // In order to encode the authority of a Path containing special characters such as / 

 // (which does happen in some S3N credentials), we need to use the string returned by the 

 // URI of the path to create a new Path. 

 val pathWithEscapedAuthority = escapePathUserInfo(f.getPath) 

 new FileStatus( 

 f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, 

 f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority) 

 }.toSeq 

 @transient val cachedFooters = footers.map { f = 

 // In order to encode the authority of a Path containing special characters such as /, 

 // we need to use the string returned by the URI of the path to create a new Path. 

 new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata) 

 }.toSeq 

 private def escapePathUserInfo(path: Path): Path = { 

 val uri = path.toUri 

 new Path(new URI( 

 uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath, 

 uri.getQuery, uri.getFragment)) 

 // Overridden so we can inject our own cached files statuses. 

 override def getPartitions: Array[SparkPartition] = { 

 val inputFormat = if (cacheMetadata) { 

 new FilteringParquetRowInputFormat { 

 override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses 

 override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters 

 } else { 

 new FilteringParquetRowInputFormat 

 val jobContext = newJobContext(getConf(isDriverSide = true), jobId) 

 val rawSplits = inputFormat.getSplits(jobContext) 

 Array.tabulate[SparkPartition](rawSplits.size) { i = 

 new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) 

 }.values 

}
其返回一个SqlNewHadoopRDD,至此已经将sql语言转换为Spark的计算模型即rdd,接下来就是根据不同的rdd切分不同的stage,然后提交stage内的task来进行运算了,最后我们来看下其具体的执行情况:

Spark-Sql源码解析之七 Execute: executed Plan - RDD[Row]详解大数据

由于先局部聚合,然后全局聚合,因此job 0 被切分为2个Stage,stage 0 和stage 1,如下:

Spark-Sql源码解析之七 Execute: executed Plan - RDD[Row]详解大数据

Spark-Sql源码解析之七 Execute: executed Plan - RDD[Row]详解大数据

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9307.html

分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集