2023-06-13 09:20:26 时间
SQLContext sqlContext = new SQLContext(jsc); 

DataFrame dataFrame = sqlContext.parquetFile(parquetPath); 


String sql = " select SUM(id) from test group by dev_chnid "; 

DataFrame result = sqlContext.sql(sql); 


override def collect(): Array[Row] = { 

 val ret = queryExecution.executedPlan.executeCollect()//执行executedPlan的executeCollect 


def executeCollect(): Array[Row] = { 

 execute().mapPartitions { iter = 

 val converter = CatalystTypeConverters.createToScalaConverter(schema) 



def collect(): Array[T] = withScope { 

 val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray) 

 Array.concat(results: _*) 



abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable { 

final def execute(): RDD[Row] = { 

 RDDOperationScope.withScope(sparkContext, nodeName, false, true) { 



可以每个具体的SparkPlan都会封装一个doExecute函数,其输出为RDD[Row]。就拿select SUM(id) from test group by dev_chnid语句来说,其executePlan为:

Aggregate false, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L] 

 Exchange (HashPartitioning 200) 

 Aggregate true, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L] 

 PhysicalRDD [dev_chnid#0,id#17L], MapPartitionsRDD

先看下Aggregatefalse, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L]的doExecute的函数:

protected override def doExecute(): RDD[Row] = attachTree(this, "execute") { 

 if (groupingExpressions.isEmpty) {//如果没有分组 

 child.execute().mapPartitions { iter = //执行child的execute函数 

 val buffer = newAggregateBuffer() 

 var currentRow: Row = null 

 while (iter.hasNext) { 

 currentRow = iter.next() 

 var i = 0 

 while (i buffer.length) {//计算全局的值 


 i += 1 

 val resultProjection = new InterpretedProjection(resultExpressions, computedSchema) 

 val aggregateResults = new GenericMutableRow(computedAggregates.length) 

 var i = 0 

 while (i buffer.length) { 

 aggregateResults(i) = buffer(i).eval(EmptyRow) 

 i += 1 


 } else { 

 child.execute().mapPartitions { iter = //执行child的execute函数 

 val hashTable = new HashMap[Row, Array[AggregateFunction]] 

 //groupingExpressions = [dev_chnid#0] 

 //child.output = [dev_chnid#0,id#17L] 

 val groupingProjection = new InterpretedMutableProjection(groupingExpressions, child.output) 

 var currentRow: Row = null 

 while (iter.hasNext) { 

 currentRow = iter.next() 

 val currentGroup = groupingProjection(currentRow) 

 var currentBuffer = hashTable.get(currentGroup) 

 if (currentBuffer == null) { 

 currentBuffer = newAggregateBuffer() 

 hashTable.put(currentGroup.copy(), currentBuffer) 

 var i = 0 

 while (i currentBuffer.length) { 


 i += 1 

 new Iterator[Row] { 

 private[this] val hashTableIter = hashTable.entrySet().iterator() 

 private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length) 

 private[this] val resultProjection = 

 new InterpretedMutableProjection( 

 resultExpressions, computedSchema ++ namedGroups.map(_._2)) 

 private[this] val joinedRow = new JoinedRow4 

 override final def hasNext: Boolean = hashTableIter.hasNext 

 override final def next(): Row = { 

 val currentEntry = hashTableIter.next() 

 val currentGroup = currentEntry.getKey 

 val currentBuffer = currentEntry.getValue 

 var i = 0 

 while (i currentBuffer.length) { 

 // Evaluating an aggregate buffer returns the result. No row is required since we 

 // already added all rows in the group using update. 

 aggregateResults(i) = currentBuffer(i).eval(EmptyRow) 

 i += 1 

 resultProjection(joinedRow(aggregateResults, currentGroup))//返回不同分组下的值 


Aggregatefalse, [dev_chnid#0], [CombineSum(PartialSum#45L) AS c0#43L]的child为Exchange (HashPartitioning 200),即:

case class Exchange( 

 newPartitioning: Partitioning, 

 newOrdering: Seq[SortOrder], 

 child: SparkPlan) 

 extends UnaryNode { 

protected override def doExecute(): RDD[Row] = attachTree(this , "execute") { 

 newPartitioning match { 

 case HashPartitioning(expressions, numPartitions) = 

 val keySchema = expressions.map(_.dataType).toArray 

 val valueSchema = child.output.map(_.dataType).toArray 

 val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions) 

 val part = new HashPartitioner(numPartitions) 

 val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { 

 child.execute().mapPartitions { iter = 

 val hashExpressions = newMutableProjection(expressions, child.output)() 

 iter.map(r = (hashExpressions(r).copy(), r.copy())) 

 } else { 

 child.execute().mapPartitions { iter = 

 val hashExpressions = newMutableProjection(expressions, child.output)() 

 val mutablePair = new MutablePair[Row, Row]() 

 iter.map(r = mutablePair.update(hashExpressions(r), r)) 

 val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)//其中rdd为其child的输出,part指定分区规则 

 if (newOrdering.nonEmpty) { 




 case _ = sys.error(s"Exchange not implemented for $newPartitioning") 

 // TODO: Handle BroadcastPartitioning. 



Aggregatetrue, [dev_chnid#0], [dev_chnid#0,SUM(id#17L) AS PartialSum#45L],以此类推,接下来我们看下最后那个SparkPlan:PhysicalRDD

private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { 

 protected override def doExecute(): RDD[Row] = rdd//直接返回其构造函数的rdd 



private[sql] object DataSourceStrategy extends Strategy with Logging { 

 def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { 

case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) = 

val sharedHadoopConf = SparkHadoopUtil.get.conf 

val confBroadcast = 

 t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf)) 




 (a, f) = t.buildScan(a, f, t.paths, confBroadcast)) :: Nil 

 protected def pruneFilterProject( 

 relation: LogicalRelation, 

 projectList: Seq[NamedExpression], 

 filterPredicates: Seq[Expression], 

 scanBuilder: (Array[String], Array[Filter]) = RDD[Row]) = { 





 (requestedColumns, pushedFilters) = { 

 scanBuilder(requestedColumns.map(_.name).toArray, selectFilters(pushedFilters).toArray) 

 protected def pruneFilterProjectRaw( 

 relation: LogicalRelation, 

 projectList: Seq[NamedExpression], 

 filterPredicates: Seq[Expression], 

 scanBuilder: (Seq[Attribute], Seq[Expression]) = RDD[Row]) = { 

 val projectSet = AttributeSet(projectList.flatMap(_.references)) 

 val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) 

 val filterCondition = filterPredicates.reduceLeftOption(expressions.And) 

 val pushedFilters = filterPredicates.map { _ transform { 

 case a: AttributeReference = relation.attributeMap(a) // Match original case of attributes. 

 if (projectList.map(_.toAttribute) == projectList 

 projectSet.size == projectList.size 

 filterSet.subsetOf(projectSet)) { 

 // When it is possible to just use column pruning to get the right projection and 

 // when the columns of this projection are enough to evaluate all filter conditions, 

 // just do a scan followed by a filter, with no extra project. 

 val requestedColumns = 

 projectList.asInstanceOf[Seq[Attribute]] // Safe due to if above. 

 .map(relation.attributeMap) // Match original case of attributes. 

 val scan = createPhysicalRDD(relation.relation, projectList.map(_.toAttribute), 

 scanBuilder(requestedColumns, pushedFilters)) 

 filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) 

 } else { 

 val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq 

 val scan = createPhysicalRDD(relation.relation, requestedColumns, 

 scanBuilder(requestedColumns, pushedFilters)) 

 execution.Project(projectList, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) 

private[this] def createPhysicalRDD( 

 relation: BaseRelation, 

 output: Seq[Attribute], 

 rdd: RDD[Row]): SparkPlan = { 

 val converted = if (relation.needConversion) { 

 execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) 

 } else { 


 execution.PhysicalRDD(output, converted) 



private[sql] class ParquetRelation2( 

 override val paths: Array[String], 

 private val maybeDataSchema: Option[StructType], 

 // This is for metastore conversion. 

 private val maybePartitionSpec: Option[PartitionSpec], 

 override val userDefinedPartitionColumns: Option[StructType], 

 parameters: Map[String, String])( 

 val sqlContext: SQLContext) 

 extends HadoopFsRelation(maybePartitionSpec) 

 with Logging { 

override def buildScan( 

 requiredColumns: Array[String], 

 filters: Array[Filter], 

 inputFiles: Array[FileStatus], 

 broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = { 

 val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean 

 val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown 

 // Create the function to set variable Parquet confs at both driver and executor side. 

 val initLocalJobFuncOpt = 






 parquetFilterPushDown) _ 

 // Create the function to set input paths at the driver side. 

 val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _ 

 val footers = inputFiles.map(f = metadataCache.footers(f.getPath)) 

 Utils.withDummyCallSite(sqlContext.sparkContext) { 

 // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. 

 // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects 

 // and footers. Especially when a global arbitrative schema (either from metastore or data 

 // source DDL) is available. 

 new SqlNewHadoopRDD( 

 sc = sqlContext.sparkContext, 

 broadcastedConf = broadcastedConf, 

 initDriverSideJobFuncOpt = Some(setInputPaths), 

 initLocalJobFuncOpt = Some(initLocalJobFuncOpt), 

 inputFormatClass = classOf[FilteringParquetRowInputFormat], 

 keyClass = classOf[Void], 

 valueClass = classOf[Row]) { 

 val cacheMetadata = useMetadataCache 

 @transient val cachedStatuses = inputFiles.map { f = 

 // In order to encode the authority of a Path containing special characters such as / 

 // (which does happen in some S3N credentials), we need to use the string returned by the 

 // URI of the path to create a new Path. 

 val pathWithEscapedAuthority = escapePathUserInfo(f.getPath) 

 new FileStatus( 

 f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, 

 f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority) 


 @transient val cachedFooters = footers.map { f = 

 // In order to encode the authority of a Path containing special characters such as /, 

 // we need to use the string returned by the URI of the path to create a new Path. 

 new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata) 


 private def escapePathUserInfo(path: Path): Path = { 

 val uri = path.toUri 

 new Path(new URI( 

 uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath, 

 uri.getQuery, uri.getFragment)) 

 // Overridden so we can inject our own cached files statuses. 

 override def getPartitions: Array[SparkPartition] = { 

 val inputFormat = if (cacheMetadata) { 

 new FilteringParquetRowInputFormat { 

 override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses 

 override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters 

 } else { 

 new FilteringParquetRowInputFormat 

 val jobContext = newJobContext(getConf(isDriverSide = true), jobId) 

 val rawSplits = inputFormat.getSplits(jobContext) 

 Array.tabulate[SparkPartition](rawSplits.size) { i = 

 new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) 



由于先局部聚合,然后全局聚合,因此job 0 被切分为2个Stage,stage 0 和stage 1,如下:

Spark-Sql源码解析之七 Execute: executed Plan - RDD[Row]详解大数据

