Spark算子执行流程详解之一大数据
def take(num: Int): Array[T] = withScope {
if (num == 0) {
new Array[T](0)
} else {
val buf = newArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size num partsScanned totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry =1
if (partsScanned 0) {
// If we didn t find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
if (buf.size ==0) {//截止目前为止buf为空的话,则扩大4倍范围
numPartsToTry = partsScanned * 4
} else {//截止目前为止还有部分值没取到的话,则扩大至Math.max((1.5 * num * partsScanned / buf.size).toInt partsScanned, 1),但是不超过当前已扫描过分区的4倍
// the left side of max is =1 whenever partsScanned = 2
numPartsToTry = Math.max((1.5* num * partsScanned / buf.size).toInt partsScanned, 1)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
}
}
val left = num buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val res = sc.runJob(this, (it:Iterator[T]) = it.take(left).toArray, p, allowLocal =true)
res.foreach(buf ++= _.take(num buf.size))
partsScanned += numPartsToTry
}
buf.toArray
}
}
/**
* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] = U` instead of `(TaskContext, Iterator[T]) = U`.
*/
def runJob[T,U: ClassTag](
rdd: RDD[T],
func: Iterator[T] = U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) = cleanedFunc(it), partitions, allowLocal)
}
其中partitions: Seq[Int]代表需要计算的分区,可以计算某个分区,也可以计算多个分区,是待计算的分区集合。
其次先看第一次循环,其partsScanned为0,numPartsToTry为1,因此先计算第一个分区的结果,如果第一次计算可以取得满足条件的num个值,则循环结束,如果取不到满足条件的num个值,则扩大第二次计算的分区范围,很可能一下子扫多个分区。
其执行过程见下图:
Take可以避免全量计算,执行时间比较短。但可能会多次触发action。
2.first
取RDD的第一个元素
/**
* Return the first element in this RDD.
*/
def first(): T = withScope {
take(1) match {
case Array(t) = t
case _ = throw newUnsupportedOperationException( empty collection )
}
}
其实就是调用take来完成的,take的流程可以查阅take函数详解
3.sortByKey
def sortByKey(ascending: Boolean =true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = newRangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K,V, V](self, part)
.setKeyOrdering(if (ascending)ordering elseordering.reverse)
}
sortByKey其实就是根据父RDD生成ShuffledRDD的过程,其分区函数为范围分区RangePartitioner,执行过程如下:
父RDD的每个分区按照分区函数RangePartitioner将每个分区的数据划分为多个分区的数据,然后ShuffledRDD拉取自己对应分区的数据。但是sortByKey主要应该掌握其RangePartitioner分区函数的执行原理,它如何保证ShuffledRDD的每个分区的数量是大致相同的,也就是如何来划分每个分区的边界的,且看:
class RangePartitioner[K: Ordering : ClassTag, V](
@transient partitions: Int,
@transient rdd: RDD[_ : Product2[K,V]],
private var ascending: Boolean =true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions = 0, s Number of partitions cannot be negative but found$partitions. )
private var ordering= implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions 1) partitions前(partitions 1)的分区边界
private var rangeBounds: Array[K] = {
if (partitions = 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0* partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0* sampleSize / rdd.partitions.size).toInt
// numItems相当于记录rdd元素的总数
// sketched的类型是Array[(Int, Int, Array[K])],记录的是分区的编号、该分区中总元素的个数以及从父RDD中每个分区采样的数据
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
val fraction = math.min(sampleSize / math.max(numItems,1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx,n, sample) =
if (fraction * n sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.size).toFloat
for (key - sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced =new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id 1)
val reSampled = imbalanced.sample(withReplacement =false, fraction, seed).collect()
val weight = (1.0/ fraction).toFloat
candidates ++= reSampled.map(x = (x, weight))
}
RangePartitioner.determineBounds(candidates, partitions)
}
}
}
def numPartitions: Int = rangeBounds.length +1
private var binarySearch: ((Array[K],K) = Int) = CollectionsUtils.makeBinarySearch[K]
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length =128) {
// If we have less than 128 partitions naive search
while (partition rangeBounds.length ordering.gt(k,rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition 0) {
partition = -partition-1
}
if (partition rangeBounds.length) {
partition = rangeBounds.length
}
}
if (ascending) {
partition
} else {
rangeBounds.length partition
}
}
private[spark] objectRangePartitioner {
/**
* Sketches the input RDD via reservoir sampling on each partition.
*
* @param rdd the input RDD to sketch
* @param sampleSizePerPartition max sample size per partition
* @return (total number of items, an array of (partitionId, number of items, sample))
*/
def sketch[K: ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =
val seed = byteswap32(idx ^ (shift 16))
//Reservoir:水塘抽样
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2.toLong).sum
(numItems, sketched)
}
/**
* Determines the bounds for range partitioning from candidates with weights indicating how many
* items each represents. Usually this is 1 over the probability used to sample this candidate.
*
* @param candidates unordered candidates with weights
* @param partitions number of partitions
* @return selected bounds
*/
def determineBounds[K: Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
val ordered = candidates.sortBy(_._1)
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i numCandidates) (j partitions 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}
}
可见其分区边界由rangeBounds保存,然后提供getPartition函数,根据传入的key获取其对于的分区编号。那么现在的问题就是当不知道父RDD的每个分区总数的情况下,如何保证数据被随机抽样出来,只有数据随机被抽样出来,才能保证之后切分分区的时候每个分区的数目是大致相同的。(这样就可以只扫描一次获取随机值,否则需要先扫描出总数,然后根据总数来抽样,这样就扫描了2次)
首先抛出个数学算法,即 Reservoir Sampling(水塘抽样),目的在于从包含n个项目的集合S中选取k个样本,其中n为一很大或未知的数量,尤其适用于不能把所有n个项目都存放到主内存的情况。
具体推导过程这里不详细描述,直接写结论:
在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于k/n,替换池中任意一个为第n个数。大于k/n,继续保留前面的数。直到数据流结束,返回此k个数。但是为了保证计算机计算分数额准确性,一般是生成一个0到n的随机数。
//reservoir代表返回长度为k的池塘
//从stream中取前k个放入reservoir;
for ( int i = 1; i i++)
reservoir[i] = stream[i];
for (i = k; stream != null; i++) {
p = random(0, i);
if (p k) reservoir[p] = stream[i];
return reservoir;
private var rangeBounds: Array[K] = {
if (partitions = 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0* partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0* sampleSize / rdd.partitions.size).toInt
// numItems相当于记录rdd元素的总数
// sketched的类型是Array[(Int, Int, Array[K])],记录的是分区的编号、该分区中总元素的个数以及从父RDD中每个分区采样的数据
// sampleSizePerPartition代表的是每个分区抽样的值,然后针对待排序的key值进行抽样,即sketch函数
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
//如果存在数据倾斜的情况,则某些分区包含数据量多的情况下,抽样的值偏少,需要增加抽样的数目
val fraction = math.min(sampleSize / math.max(numItems,1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx,n, sample) =
if (fraction * n sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.size).toFloat
for (key - sample) {
candidates += ((key, weight))
}
}
}
//针对不平衡的分区继续抽样
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced =new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id 1)
val reSampled = imbalanced.sample(withReplacement =false, fraction, seed).collect()
val weight = (1.0/ fraction).toFloat
candidates ++= reSampled.map(x = (x, weight))
}
//根据各个分区抽样的值来划分边界,其中weight值反应某个key的权重,权重越大,说明该key值越多
RangePartitioner.determineBounds(candidates, partitions)
}
}
}
def sketch[K: ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =
val seed = byteswap32(idx ^ (shift 16))
//Reservoir:水塘抽样
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2.toLong).sum
(numItems, sketched)
}
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Int) = {
val reservoir = newArray[T](k)
// Put the first k elements in the reservoir.
//先取前K个值
vari = 0
while (i k input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}
// If we have consumed all the elements, return them. Otherwise do the replacement.
if (i k) {//如果没有取到,说明该分区少于K个值,则直接返回
// If input size k, trim the array to return only an array of input size.
val trimReservoir =new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir,0, i)
(trimReservoir, i)
} else {//否则按照水塘抽样遍历剩余的值
// If input size k, continue the sampling process.
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
val replacementIndex = rand.nextInt(i)
if (replacementIndex k) {
reservoir(replacementIndex) = item
}
i += 1
}
(reservoir, i)
}
}
def determineBounds[K: Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
val ordered = candidates.sortBy(_._1)
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i numCandidates) (j partitions 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight target) {//根据weight值来均衡划分分界
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}
相关文章
- pg_dump执行流程简单记录
- SpringMVC的执行流程
- 图解springmvc 执行流程
- Springmvc执行流程介绍[通俗易懂]
- 【SSM – SpringMVC篇】02 – SpringMVC执行流程详解,SpringMVC三大核心组件和使用,SpringMVC头文件模板
- luban-mall项目中的电商支付流程实战详解
- 如何查询已经执行过的流程信息?
- Mybatis执行一个Sql的流程
- 渲染流程之光栅化阶段及像素处理阶段
- 最全iOS 应用上架流程(提交到AppStore)
- 湖仓一体电商项目(二十二):实时任务执行流程
- 一文讲述Spring MVC的执行流程
- 7个步骤详解AdaBoost 算法原理和构建流程
- SpringMVC执行流程
- xhs-web校验流程分析
- SpringMVC:SpringMVC执行流程
- 面试题: SpringMVC的执行流程?
- 14 张图详解构建全自动化 Helm 打包测试发布 CICD 流程
- 一步步了解iOS APP上架流程,让你的APP顺利进入App Store的大门
- uniapp开发,window下创建ios打包证书的详情流程
- 【Linux 内核 内存管理】mmap 系统调用源码分析 ② ( sys_mmap_pgoff 系统调用函数执行流程 | sys_mmap_pgoff 函数源码 )
- 【Linux 内核 内存管理】mmap 系统调用源码分析 ⑤ ( mmap_region 函数执行流程 | mmap_region 函数源码 )
- Git提交代码的流程——新手适用详解程序员
- Spark算子执行流程详解之八大数据
- Spark算子执行流程详解之六大数据
- Spark算子执行流程详解之五大数据
- MySQL流程函数:用法及其优势(mysql流程函数)
- MySQL流程函数:学习如何使用它们(mysql流程函数)
- MySQL安装详细指南下载安装配置流程一网打尽(mysql下载及安装流程)
- mysql基础:mysqld_safe启动执行流程详解