
其他



流程执行数据Spark 详解 之一 算子
2023-06-13 09:20:26

def take(num: Int): Array[T] = withScope {
  if (num == 0) {
  new Array[T](0)
  } else {
  val buf = newArrayBuffer[T]
  val totalParts = this.partitions.length
  var partsScanned = 0
  while (buf.size num partsScanned totalParts) {
  // The number of partitions to try in this iteration. It is ok for this number to be
  // greater than totalParts because we actually cap it at totalParts in runJob.
  var numPartsToTry =1
  if (partsScanned 0) {
  // If we didn t find any rows after the previous iteration, quadruple and retry.
  // Otherwise, interpolate the number of partitions we need to try, but overestimate
  // it by 50%. We also cap the estimation in the end.
  if (buf.size ==0) {//截止目前为止buf为空的话,则扩大4倍范围
  numPartsToTry = partsScanned * 4
  } else {//截止目前为止还有部分值没取到的话,则扩大至Math.max((1.5 * num * partsScanned / buf.size).toInt partsScanned, 1),但是不超过当前已扫描过分区的4倍
  // the left side of max is =1 whenever partsScanned = 2
  numPartsToTry = Math.max((1.5* num * partsScanned / buf.size).toInt partsScanned, 1)
  numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)

  val left = num buf.size
  val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
  val res = sc.runJob(this, (it:Iterator[T]) = it.take(left).toArray, p, allowLocal =true)

  res.foreach(buf ++= _.take(num buf.size))
  partsScanned += numPartsToTry


 * Run a job on a given set of partitions of an RDD, but take a function of type
 * `Iterator[T] = U` instead of `(TaskContext, Iterator[T]) = U`.
def runJob[T,U: ClassTag](
  rdd: RDD[T],
  func: Iterator[T] = U,
  partitions: Seq[Int],
  allowLocal: Boolean
  ): Array[U] = {
  val cleanedFunc = clean(func)
  runJob(rdd, (ctx: TaskContext, it: Iterator[T]) = cleanedFunc(it), partitions, allowLocal)


其中partitions: Seq[Int]代表需要计算的分区,可以计算某个分区,也可以计算多个分区,是待计算的分区集合。







 * Return the first element in this RDD.
def first(): T = withScope {
  take(1) match {
  case Array(t) = t
  case _ = throw newUnsupportedOperationException( empty collection )





def sortByKey(ascending: Boolean =true, numPartitions: Int = self.partitions.length)
  : RDD[(K, V)] = self.withScope
  val part = newRangePartitioner(numPartitions, self, ascending)
  new ShuffledRDD[K,V, V](self, part)
  .setKeyOrdering(if (ascending)ordering elseordering.reverse)




class RangePartitioner[K: Ordering : ClassTag, V](
  @transient partitions: Int,
  @transient rdd: RDD[_ : Product2[K,V]],
  private var ascending: Boolean =true)
  extends Partitioner {

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions = 0, s Number of partitions cannot be negative but found$partitions. )

  private var ordering= implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions 1) partitions前(partitions 1)的分区边界
  private var rangeBounds: Array[K] = {
  if (partitions = 1) {
  } else {
  // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
  val sampleSize = math.min(20.0* partitions, 1e6)
  // Assume the input partitions are roughly balanced and over-sample a little bit.
  val sampleSizePerPartition = math.ceil(3.0* sampleSize / rdd.partitions.size).toInt
   // numItems相当于记录rdd元素的总数
  // sketched的类型是Array[(Int, Int, Array[K])],记录的是分区的编号、该分区中总元素的个数以及从父RDD中每个分区采样的数据
  val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
  if (numItems == 0L) {
  } else {
  // If a partition contains much more than the average number of items, we re-sample from it
  // to ensure that enough items are collected from that partition.
  val fraction = math.min(sampleSize / math.max(numItems,1L), 1.0)
  val candidates = ArrayBuffer.empty[(K, Float)]
  val imbalancedPartitions = mutable.Set.empty[Int]
  sketched.foreach { case (idx,n, sample) =
  if (fraction * n sampleSizePerPartition) {
  imbalancedPartitions += idx
  } else {
  // The weight is 1 over the sampling probability.
  val weight = (n.toDouble / sample.size).toFloat
  for (key - sample) {
  candidates += ((key, weight))
  if (imbalancedPartitions.nonEmpty) {
  // Re-sample imbalanced partitions with the desired sampling probability.
  val imbalanced =new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
  val seed = byteswap32(-rdd.id 1)
  val reSampled = imbalanced.sample(withReplacement =false, fraction, seed).collect()
  val weight = (1.0/ fraction).toFloat
  candidates ++= reSampled.map(x = (x, weight))
  RangePartitioner.determineBounds(candidates, partitions)

  def numPartitions: Int = rangeBounds.length +1

  private var binarySearch: ((Array[K],K) = Int) = CollectionsUtils.makeBinarySearch[K]

  def getPartition(key: Any): Int = {
  val k = key.asInstanceOf[K]
  var partition = 0
  if (rangeBounds.length =128) {
  // If we have less than 128 partitions naive search
  while (partition rangeBounds.length ordering.gt(k,rangeBounds(partition))) {
  partition += 1
  } else {
  // Determine which binary search method to use only once.
  partition = binarySearch(rangeBounds, k)
  // binarySearch either returns the match location or -[insertion point]-1
  if (partition 0) {
  partition = -partition-1
  if (partition rangeBounds.length) {
  partition = rangeBounds.length
  if (ascending) {
  } else {
  rangeBounds.length partition
private[spark] objectRangePartitioner {

  * Sketches the input RDD via reservoir sampling on each partition.
  * @param rdd the input RDD to sketch
  * @param sampleSizePerPartition max sample size per partition
  * @return (total number of items, an array of (partitionId, number of items, sample))
  def sketch[K: ClassTag](
   rdd: RDD[K],
  sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
  val shift = rdd.id
  // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
  val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =
  val seed = byteswap32(idx ^ (shift 16))
  val (sample, n) = SamplingUtils.reservoirSampleAndCount(
  iter, sampleSizePerPartition, seed)
  Iterator((idx, n, sample))
  val numItems = sketched.map(_._2.toLong).sum
  (numItems, sketched)

  * Determines the bounds for range partitioning from candidates with weights indicating how many
  * items each represents. Usually this is 1 over the probability used to sample this candidate.
  * @param candidates unordered candidates with weights
  * @param partitions number of partitions
  * @return selected bounds
  def determineBounds[K: Ordering : ClassTag](
  candidates: ArrayBuffer[(K, Float)],
  partitions: Int): Array[K] = {
  val ordering = implicitly[Ordering[K]]
  val ordered = candidates.sortBy(_._1)
  val numCandidates = ordered.size
  val sumWeights = ordered.map(_._2.toDouble).sum
  val step = sumWeights / partitions
  var cumWeight = 0.0
  var target = step
  val bounds = ArrayBuffer.empty[K]
  var i = 0
  var j = 0
  var previousBound = Option.empty[K]
  while ((i numCandidates) (j partitions 1)) {
  val (key, weight) = ordered(i)
  cumWeight += weight
  if (cumWeight target) {
  // Skip duplicate values.
  if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
  bounds += key
  target += step
  j += 1
  previousBound = Some(key)
  i += 1



  首先抛出个数学算法,即 Reservoir Sampling(水塘抽样),目的在于从包含n个项目的集合S中选取k个样本,其中n为一很大或未知的数量,尤其适用于不能把所有n个项目都存放到主内存的情况。





for ( int i = 1; i i++) 

  reservoir[i] = stream[i]; 

for (i = k; stream != null; i++) { 

  p = random(0, i); 

  if (p k) reservoir[p] = stream[i]; 

return reservoir; 

private var rangeBounds: Array[K] = {
  if (partitions = 1) {
  } else {
  // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
  val sampleSize = math.min(20.0* partitions, 1e6)
  // Assume the input partitions are roughly balanced and over-sample a little bit.
  val sampleSizePerPartition = math.ceil(3.0* sampleSize / rdd.partitions.size).toInt
   // numItems相当于记录rdd元素的总数
  // sketched的类型是Array[(Int, Int, Array[K])],记录的是分区的编号、该分区中总元素的个数以及从父RDD中每个分区采样的数据

// sampleSizePerPartition代表的是每个分区抽样的值,然后针对待排序的key值进行抽样,即sketch函数
  val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
  if (numItems == 0L) {
  } else {
  // If a partition contains much more than the average number of items, we re-sample from it
  // to ensure that enough items are collected from that partition.

  val fraction = math.min(sampleSize / math.max(numItems,1L), 1.0)
  val candidates = ArrayBuffer.empty[(K, Float)]
  val imbalancedPartitions = mutable.Set.empty[Int]
  sketched.foreach { case (idx,n, sample) =
  if (fraction * n sampleSizePerPartition) {
  imbalancedPartitions += idx
  } else {
  // The weight is 1 over the sampling probability.
  val weight = (n.toDouble / sample.size).toFloat
  for (key - sample) {
  candidates += ((key, weight))

  if (imbalancedPartitions.nonEmpty) {
  // Re-sample imbalanced partitions with the desired sampling probability.
  val imbalanced =new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
  val seed = byteswap32(-rdd.id 1)
  val reSampled = imbalanced.sample(withReplacement =false, fraction, seed).collect()
  val weight = (1.0/ fraction).toFloat
  candidates ++= reSampled.map(x = (x, weight))

  RangePartitioner.determineBounds(candidates, partitions)

def sketch[K: ClassTag](
  rdd: RDD[K],
  sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
  val shift = rdd.id
  // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
  val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =
  val seed = byteswap32(idx ^ (shift 16))
  val (sample, n) = SamplingUtils.reservoirSampleAndCount(
  iter, sampleSizePerPartition, seed)
  Iterator((idx, n, sample))
  val numItems = sketched.map(_._2.toLong).sum
  (numItems, sketched)


def reservoirSampleAndCount[T: ClassTag](
  input: Iterator[T],
  k: Int,
  seed: Long = Random.nextLong())
  : (Array[T], Int) = {
  val reservoir = newArray[T](k)
  // Put the first k elements in the reservoir.


  vari = 0
  while (i k input.hasNext) {
  val item = input.next()
  reservoir(i) = item
  i += 1

  // If we have consumed all the elements, return them. Otherwise do the replacement.
  if (i k) {//如果没有取到,说明该分区少于K个值,则直接返回
  // If input size k, trim the array to return only an array of input size.
  val trimReservoir =new Array[T](i)
  System.arraycopy(reservoir, 0, trimReservoir,0, i)
  (trimReservoir, i)
  } else {//否则按照水塘抽样遍历剩余的值
  // If input size k, continue the sampling process.
  val rand = new XORShiftRandom(seed)
  while (input.hasNext) {
  val item = input.next()
  val replacementIndex = rand.nextInt(i)
  if (replacementIndex k) {
  reservoir(replacementIndex) = item
  i += 1
  (reservoir, i)

def determineBounds[K: Ordering : ClassTag](
  candidates: ArrayBuffer[(K, Float)],
  partitions: Int): Array[K] = {
  val ordering = implicitly[Ordering[K]]
  val ordered = candidates.sortBy(_._1)
  val numCandidates = ordered.size
  val sumWeights = ordered.map(_._2.toDouble).sum
  val step = sumWeights / partitions
  var cumWeight = 0.0
  var target = step
  val bounds = ArrayBuffer.empty[K]
  var i = 0
  var j = 0
  var previousBound = Option.empty[K]
  while ((i numCandidates) (j partitions 1)) {
  val (key, weight) = ordered(i)
  cumWeight += weight
  if (cumWeight target) {//根据weight值来均衡划分分界
  // Skip duplicate values.
  if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
  bounds += key
  target += step
  j += 1
  previousBound = Some(key)
  i += 1
