
您现在的位置是:首页 >  其他



流程执行数据Spark 详解 之三 算子
2023-06-13 09:20:26 时间


def aggregate[U: ClassTag](zeroValue:U)(seqOp: (U,T) = U, combOp: (U,U) = U): U = withScope {
  // Clone the zero value since we will also be serializing it as part of tasks
  var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
  val cleanSeqOp = sc.clean(seqOp)
  val cleanCombOp = sc.clean(combOp)

// zeroValue即初始值,aggregatePartition是在excutor上执行的

val aggregatePartition = (it:Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)

// jobResult即初始值,其合并每个分区的结果是在driver端执行的
  val mergeResult = (index: Int, taskResult:U) = jobResult = combOp(jobResult, taskResult)

  sc.runJob(this, aggregatePartition, mergeResult)

  |  {(x : Int,y : Int) = x + y},

  |  {(a : Int,b : Int) = a + b}

  |  )

res17: Int = 58

 * Aggregate the elements of each partition, and then the results for all the partitions, using a
 * given associative and commutative function and a neutral zero value . The function
 * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
 * allocation; however, it should not modify t2.
 * This behaves somewhat differently from fold operations implemented for non-distributed
 * collections in functional languages like Scala. This fold operation may be applied to
 * partitions individually, and then fold those results into the final result, rather than
 * apply the fold to each element sequentially in some defined ordering. For functions
 * that are not commutative, the result may differ from that of a fold applied to a
 * non-distributed collection.
def fold(zeroValue: T)(op: (T, T) = T): T= withScope {
  // Clone the zero value since we will also be serializing it as part of tasks
  var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
  val cleanOp = sc.clean(op)

  val foldPartition = (iter: Iterator[T]) = iter.fold(zeroValue)(cleanOp)

  val mergeResult = (index: Int, taskResult:T) = jobResult = op(jobResult, taskResult)
  sc.runJob(this, foldPartition, mergeResult)



 * Aggregates the elements of this RDD in a multi-level tree pattern.
 * @param depth suggested depth of the tree (default: 2)
 * @see [[org.apache.spark.rdd.RDD#aggregate]]
def treeAggregate[U: ClassTag](zeroValue:U)(
  seqOp: (U, T) = U,
  combOp: (U, U) = U,
  depth: Int = 2): U = withScope {
  require(depth = 1, s Depth must be greater than or equal to 1 but got$depth. )
  if (partitions.length == 0) {
  Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
  } else {
  val cleanSeqOp = context.clean(seqOp)
  val cleanCombOp = context.clean(combOp)

  val aggregatePartition =
  (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)

  var partiallyAggregated = mapPartitions(it = Iterator(aggregatePartition(it)))
  var numPartitions = partiallyAggregated.partitions.length

  val scale = math.max(math.ceil(math.pow(numPartitions,1.0 / depth)).toInt, 2)
  // If creating an extra level doesn t help reduce
  // the wall-clock time, we stop tree aggregation.
  while (numPartitions scale + numPartitions / scale) {//计算迭代的程度
  numPartitions /= scale
  val curNumPartitions = numPartitions

  partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
  (i, iter) = iter.map((i % curNumPartitions, _))
  }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values


  comb: (a: Int, b: Int)Int

  val z =sc.parallelize(List(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18),9)

  scala z.treeAggregate(0)(seq,comb,2)

  res1: Int = 171


 * Reduces the elements of this RDD using the specified commutative and
 * associative binary operator.
def reduce(f: (T,T) = T): T = withScope {
  val cleanF = sc.clean(f)

  val reducePartition: Iterator[T] = Option[T] = iter = {
  if (iter.hasNext) {

  } else {
  var jobResult: Option[T] = None

  val mergeResult = (index: Int, taskResult: Option[T]) = {
  if (taskResult.isDefined) {
  jobResult = jobResult match {
  case Some(value) = Some(f(value, taskResult.get))
  case None = taskResult
  sc.runJob(this, reducePartition, mergeResult)
  // Get the final result out of our Option, or throw an exception if the RDD was empty

  jobResult.getOrElse(throw new UnsupportedOperationException( empty collection ))

 * Returns the max of this RDD as defined by the implicit Ordering[T].
 * @return the maximum element of the RDD
 * */
def max()(implicitord: Ordering[T]):T = withScope {


 * Returns the min of this RDD as defined by the implicit Ordering[T].
 * @return the maximum element of the RDD
 * */
def min()(implicitord: Ordering[T]):T = withScope {


 * Reduces the elements of this RDD in a multi-level tree pattern.
 * @param depth suggested depth of the tree (default: 2)
 * @see [[org.apache.spark.rdd.RDD#reduce]]
def treeReduce(f: (T,T) = T, depth: Int =2): T = withScope {
  require(depth = 1, s Depth must be greater than or equal to 1 but got$depth. )
  val cleanF = context.clean(f)

  val reducePartition: Iterator[T] = Option[T] = iter = {
  if (iter.hasNext) {
  } else {

  val partiallyReduced = mapPartitions(it = Iterator(reducePartition(it)))
  val op: (Option[T], Option[T]) = Option[T] = (c, x) = {
  if (c.isDefined x.isDefined) {
  Some(cleanF(c.get, x.get))
  } else if (c.isDefined) {
  } else if (x.isDefined) {
  } else {

  partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
  .getOrElse(throw new UnsupportedOperationException( empty collection ))

