zl程序教程

您现在的位置是:首页 >  其他

当前栏目

Spark算子执行流程详解之五大数据

流程执行数据Spark 详解 五大 算子
2023-06-13 09:20:26 时间

def combineByKey[C](createCombiner: V = C,
  mergeValue: (C, V) = C,
  mergeCombiners: (C, C) = C,
  partitioner: Partitioner,
  mapSideCombine: Boolean = true,
  serializer: Serializer = null): RDD[(K, C)] = self.withScope {
  require(mergeCombiners != null, mergeCombiners must be defined ) // required as of Spark 0.9.0
  if (keyClass.isArray) {
  if (mapSideCombine) {
  throw new SparkException( Cannot use map-side combining with array keys. )
  }
  if (partitioner.isInstanceOf[HashPartitioner]) {
  throw new SparkException( Default partitioner cannot partition array keys. )
  }
  }
  val aggregator = new Aggregator[K, V, C](
  self.context.clean(createCombiner),
  self.context.clean(mergeValue),
  self.context.clean(mergeCombiners))
  if (self.partitioner == Some(partitioner)) {//如果分区函数相同,则不需要shuffle,只需要进行一次mapPartitions
  self.mapPartitions(iter = {
  val context = TaskContext.get()
  new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
  }, preservesPartitioning = true)
  } else {//否则需要进行shuffle
  new ShuffledRDD[K, V, C](self, partitioner)
  .setSerializer(serializer)
  .setAggregator(aggregator)
  .setMapSideCombine(mapSideCombine)
  }

}


关注其入参:

combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。

要理解combineByKey(),要先理解它在处理数据时是如何处理每个元素的。由于combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的键相同。combineByKey()的处理流程如下:

如果是一个新的元素,此时使用createCombiner()来创建那个键对应的累加器的初始值。(!注意:这个过程会在每个分区第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。)

如果这是一个在处理当前分区中之前已经遇到键,此时combineByKey()使用mergeValue()将该键的累加器对应的当前值与这个新值进行合并。

由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()将各个分区的结果进行合并。如果mergeCombiners为True,则在map的输出的时候提前进行一次合并,如果mergeCombiners为false,则在reduce结果的时候进行一次合并。提前进行合并的作用是为了减少shuffle读取的时候传输的数据量,提升shuffle read的速度。

先来看下ShuffleRDD里面的依赖:


class ShuffledRDD[K, V, C](
  @transient var prev: RDD[_ : Product2[K, V]],
  part: Partitioner)
  extends RDD[(K, C)](prev.context, Nil) {
  …
  override def getDependencies: Seq[Dependency[_]] = {//其shuffle的write和read由ShuffleDependency生成
  List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
  }
  …

}


通过向shuffleManager注册获取shuffle读写句柄,默认的shufflemanager是SortShuffleManager


class ShuffleDependency[K, V, C](
  @transient _rdd: RDD[_ : Product2[K, V]],
  val partitioner: Partitioner,
  val serializer: Option[Serializer] = None,
  val keyOrdering: Option[Ordering[K]] = None,
  val aggregator: Option[Aggregator[K, V, C]] = None,
  val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  val shuffleId: Int = _rdd.context.newShuffleId()
 //注册shuffle句柄
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
  shuffleId, _rdd.partitions.size, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))

}


private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {

  private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
  private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()

  /**
  * Register a shuffle with the manager and obtain a handle for it to pass to tasks.
  */
  override def registerShuffle[K, V, C](
  shuffleId: Int,
  numMaps: Int,
  dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
  new BaseShuffleHandle(shuffleId, numMaps, dependency)
  }

  /**
  * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
  * Called on executors by reduce tasks.
  */
  override def getReader[K, C](
  handle: ShuffleHandle,
  startPartition: Int,
  endPartition: Int,
  context: TaskContext): ShuffleReader[K, C] = {
  // We currently use the same block store shuffle fetcher as the hash-based shuffle.
  new HashShuffleReader(
  handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
  }

  /** Get a writer for a given partition. Called on executors by map tasks. */
  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
  : ShuffleWriter[K, V] = {
  val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]]
  shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps)
  new SortShuffleWriter(
  shuffleBlockResolver, baseShuffleHandle, mapId, context)
  }

  /** Remove a shuffle s metadata from the ShuffleManager. */
  override def unregisterShuffle(shuffleId: Int): Boolean = {
  if (shuffleMapNumber.containsKey(shuffleId)) {
  val numMaps = shuffleMapNumber.remove(shuffleId)
  (0 until numMaps).map{ mapId =
  shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
  }
  }
  true
  }

  override val shuffleBlockResolver: IndexShuffleBlockResolver = {
  indexShuffleBlockResolver
  }

  /** Shut down this ShuffleManager. */
  override def stop(): Unit = {
  shuffleBlockResolver.stop()
  }

}


private[spark] class SortShuffleWriter[K, V, C](
  shuffleBlockResolver: IndexShuffleBlockResolver,
  handle: BaseShuffleHandle[K, V, C],
  mapId: Int,
  context: TaskContext)
  extends ShuffleWriter[K, V] with Logging {

  private val dep = handle.dependency

  private val blockManager = SparkEnv.get.blockManager

  private var sorter: ExternalSorter[K, V, _] = null

  // Are we in the process of stopping? Because map tasks can call stop() with success = true
  // and then call stop() with success = false if they get an exception, we want to make sure
  // we don t try deleting files, etc twice.
  private var stopping = false

  private var mapStatus: MapStatus = null

  private val writeMetrics = new ShuffleWriteMetrics()
  context.taskMetrics.shuffleWriteMetrics = Some(writeMetrics)

  /** Write a bunch of records to this task s output */
  override def write(records: Iterator[Product2[K, V]]): Unit = {
  if (dep.mapSideCombine) {//在map端聚合
  require(dep.aggregator.isDefined, Map-side combine without Aggregator specified! )
  sorter = new ExternalSorter[K, V, C](
  dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  sorter.insertAll(records)
  } else {//不在map端聚合,连dep.partitioner都不传下去了,map端仅仅是按照分区函数分区罢了,其他不做任何事情
  // In this case we pass neither an aggregator nor an ordering to the sorter, because we don t
  // care whether the keys get sorted in each partition; that will be done on the reduce side
  // if the operation being run is sortByKey.
  sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
  sorter.insertAll(records)
  }

  // Don t bother including the time to open the merged output file in the shuffle write time,
  // because it just opens a single file, so is typically too fast to measure accurately
  // (see SPARK-3570).
  val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
  val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
  val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
  shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

  mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  }

}


def insertAll(records: Iterator[_ : Product2[K, V]]): Unit = {
  // TODO: stop combining if we find that the reduction factor isn t high
  val shouldCombine = aggregator.isDefined

  if (shouldCombine) {//如果在map端聚合,则需要利用mergeValue和createCombiner功能
  // Combine values in-memory first using our AppendOnlyMap
  val mergeValue = aggregator.get.mergeValue
  val createCombiner = aggregator.get.createCombiner
  var kv: Product2[K, V] = null
  val update = (hadValue: Boolean, oldValue: C) = {
  if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
  }
  while (records.hasNext) {
  addElementsRead()
  kv = records.next()
  map.changeValue((getPartition(kv._1), kv._1), update)
  maybeSpillCollection(usingMap = true)
  }
  } else if (bypassMergeSort) {//否则如果分区个数少的话,则写多个分区文件
  // SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies
  if (records.hasNext) {
  spillToPartitionFiles(
  WritablePartitionedIterator.fromIterator(records.map { kv =
  ((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
  })
  )
  }
  } else {//否则如果分区个数多的话,则写一个文件,怕临时文件多
  // Stick values into our buffer
  while (records.hasNext) {
  addElementsRead()
  val kv = records.next()
  buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
  maybeSpillCollection(usingMap = false)
  }
}


private[spark] class HashShuffleReader[K, C](
  handle: BaseShuffleHandle[K, _, C],
  startPartition: Int,
  endPartition: Int,
  context: TaskContext)
  extends ShuffleReader[K, C]
{
  require(endPartition == startPartition + 1,
  Hash shuffle currently only supports fetching one partition )

  private val dep = handle.dependency

  /** Read the combined key-values for this reduce task */
  override def read(): Iterator[Product2[K, C]] = {
  val ser = Serializer.getSerializer(dep.serializer)
  val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)

  val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
  if (dep.mapSideCombine) {//如果已经在map端聚合过的话,则利用mergeCombiners即可完成数据聚合
  new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
  } else {//否则利用createCombiner和mergeValue进行聚合
  new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
  }
  } else {
  require(!dep.mapSideCombine, Map-side combine without Aggregator specified! )

  // Convert the Product2s to pairs since this is what downstream RDDs currently expect
  iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair = (pair._1, pair._2))
  }

  // Sort the output if there is a sort ordering defined.
  dep.keyOrdering match {
  case Some(keyOrd: Ordering[K]) =
  // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
  // the ExternalSorter won t spill to disk.
  val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
  sorter.insertAll(aggregatedIter)
  context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
  context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
  sorter.iterator
  case None =
  aggregatedIter
  }
  }

}


因此假设一下的场景combineByKey的mapSideCombine一个为false,另外一个为true的情况:


val initialScores = Array(( Fred , 88.0), ( Fred , 95.0), ( Fred , 91.0), ( Wilma , 93.0), ( Wilma , 95.0), ( Wilma , 98.0)) 

val d1 = sc.parallelize(initialScores) 

type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数) 

d1.combineByKey( 

  score = (1, score), 

  (c1: MVType, newScore) = (c1._1 + 1, c1._2 + newScore), 

  (c1: MVType, c2: MVType) = (c1._1 + c2._1, c1._2 + c2._2),

  RangePartitioner,

  false

).map { case (name, (num, socre)) = (name, socre / num) }.collect

d1.combineByKey( 

  score = (1, score), 

  (c1: MVType, newScore) = (c1._1 + 1, c1._2 + newScore), 

  (c1: MVType, c2: MVType) = (c1._1 + c2._1, c1._2 + c2._2),

).map { case (name, (num, socre)) = (name, socre / num) }.collect


Spark算子执行流程详解之五大数据

mapSideCombine为true的执行流程如下:

Spark算子执行流程详解之五大数据

可见提前在map端做聚合可以减少shuffle过程中产生的数据。

23.distinct()

去重,删除RDD中相同的元素


def distinct(): RDD[T] = withScope {
  distinct(partitions.length)
}

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1)
}

def reduceByKey(func: (V, V) = V, numPartitions: Int): RDD[(K, V)] = self.withScope {
  reduceByKey(new HashPartitioner(numPartitions), func)
}

def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] = self.withScope {
  combineByKey[V]((v: V) = v, func, func, partitioner)
}


最终调用的还是combineByKey,因为对于RDD最终的聚合类操作,其本质运算都是由combineByKey完成的。其具体的执行流程如下:

Spark算子执行流程详解之五大数据 24.groupByKey

将相同key的记录聚合起来


/**
 * Group the values for each key in the RDD into a single sequence. Allows controlling the
 * partitioning of the resulting key-value pair RDD by passing a Partitioner.
 * The ordering of elements within each group is not guaranteed, and may even differ
 * each time the resulting RDD is evaluated.
 *
 * Note: This operation may be very expensive. If you are grouping in order to perform an
 * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
 * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
 *
 * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
 * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
 */
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
  // groupByKey shouldn t use map side combine because map side combine does not
  // reduce the amount of data shuffled and requires all map side data be inserted
  // into a hash table, leading to more objects in the old gen.
  val createCombiner = (v: V) = CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v: V) = buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) = c1 ++= c2
  val bufs = combineByKey[CompactBuffer[V]](
  createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]

}


其本质就是利用combineByKey来实现相同KEY的聚合操作,但是需要注意的一点是groupByKey不在map端聚合,因为它在map端聚合无法减少网络传输的数据量,反而会增加maptask运行消耗的java内存,进而导致GC拖慢整个计算过程。

现在假设分区函数相同和不同的情况下其groupbykey的执行流程如下:

分区函数相同:

Spark算子执行流程详解之五大数据

分区函数不相同:

 Spark算子执行流程详解之五大数据

25.aggregateByKey

聚合操作,将相同的key的value聚合起来,类似于sql里面的聚合函数,可以实现求max,min,avg等操作。


/**
 * Aggregate the values of each key, using given combine functions and a neutral zero value .
 * This function can return a different result type, U, than the type of the values in this RDD,
 * V. Thus, we need one operation for merging a V into a U and one operation for merging two U s,
 * as in scala.TraversableOnce. The former operation is used for merging values within a
 * partition, and the latter is used for merging values between partitions. To avoid memory
 * allocation, both of these functions are allowed to modify and return their first argument
 * instead of creating a new U.

/

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) = U,
  combOp: (U, U) = U): RDD[(K, U)] = self.withScope {
  aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)

}

/**
 * Aggregate the values of each key, using given combine functions and a neutral zero value .
 * This function can return a different result type, U, than the type of the values in this RDD,
 * V. Thus, we need one operation for merging a V into a U and one operation for merging two U s,
 * as in scala.TraversableOnce. The former operation is used for merging values within a
 * partition, and the latter is used for merging values between partitions. To avoid memory
 * allocation, both of these functions are allowed to modify and return their first argument
 * instead of creating a new U.
 */
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) = U,
  combOp: (U, U) = U): RDD[(K, U)] = self.withScope {
  // Serialize the zero value to a byte array so that we can get a new clone of it on each key
  val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
  val zeroArray = new Array[Byte](zeroBuffer.limit)
  zeroBuffer.get(zeroArray)

  lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
  val createZero = () = cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

  // We will clean the combiner closure later in `combineByKey`
  val cleanedSeqOp = self.context.clean(seqOp)
  combineByKey[U]((v: V) = cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner)

}


首先需要和aggregate操作区别开来,aggregate里面的SeqOp和combOp都会使用zeroValue的值,而aggregateByKey的zeroValue只会在SeqOp中使用。且其mapSideCombine为true,会在map端进行聚合,假设利用aggregateByKey计算每月平均气温的操作如下:


val rdd = sc.textFile( 气象数据 ) 

val rdd2 = rdd.map(x= x.split( )).map(x = (x(0).substring( 从年月日中提取月 ),x(1).toInt)) 

val zeroValue = (0,0) 

val seqOp= (u:(Int, Int), v:Int) = { 

 (u._1 + 1, u._2 + v) 

 

val compOp= (c1:(Int,Int),c2:(Int,Int))= { 

  (u1._1 + u2._1, u1._2 + u2._2) 

 

 

val vdd3 = vdd2.aggregateByKey( 

zeroValue , 

seqOp, 

compOp 

 

rdd3.foreach(x= println(x._1 + : average tempreture is + x._2._2/x._2._1)