Spark算子执行流程详解之二大数据
计算数据总量,每个分区各自计算自己的总数,然后汇总到driver端,driver端再把每个分区的总数相加统计出对应rdd的数据量,其流程如下:
在一定的超时时间之内返回rdd元素的个数,其rdd元素的总数分布符合正态分布,其分布因子为confidence,当超过timeout时,返回一个未完成的结果。
/**
* :: Experimental ::
* Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
*/
@Experimental
def countApprox(
timeout: Long,
confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
//定义在excutor端计算总数的函数
val countElements: (TaskContext, Iterator[T]) = Long = { (ctx, iter) =
var result = 0L
while (iter.hasNext) {
result += 1L
iter.next()
}
result
}
//定义在driver端的一个监听回调函数,当task完成的时候,会触发里面的merge操作,当超时时间到之后或者任务提前完成的话,会取里面的当前状态,即currentResult
val evaluator = newCountEvaluator(partitions.length, confidence)
//提交任务
sc.runApproximateJob(this, countElements, evaluator, timeout)
}
def runApproximateJob[T,U, R](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) = U,
evaluator: ApproximateEvaluator[U, R],
timeout: Long): PartialResult[R] = {
assertNotStopped()
val callSite = getCallSite
logInfo( Starting job: + callSite.shortForm)
val start = System.nanoTime
val cleanedFunc = clean(func)
// cleanedFunc就是countElements,evaluator就是CountEvaluator,超时时间为timeout
val result = dagScheduler.runApproximateJob(rdd, cleanedFunc, evaluator, callSite, timeout,
localProperties.get)
logInfo(
Job finished: + callSite.shortForm + , took + (System.nanoTime start) / 1e9 + s )
result
}
def runApproximateJob[T,U, R](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) = U,
evaluator: ApproximateEvaluator[U, R],
callSite: CallSite,
timeout: Long,
properties: Properties): PartialResult[R] = {
//定义一个监听器,当有任务完成的时候触发taskSucceeded,当超时时间到的时候返回CountEvaluator的当前值
val listener = newApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext,Iterator[_]) = _]
val partitions = (0until rdd.partitions.size).toArray
val jobId = nextJobId.getAndIncrement()
//提交任务
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions, allowLocal = false, callSite, listener,
SerializationUtils.clone(properties)))
//等待计算结果
listener.awaitResult() // Will throw an exception if the job fails
}
因此其超时计算总数的逻辑主要在ApproximateActionListener里面,请看ApproximateActionListener:
private[spark] classApproximateActionListener[T, U, R](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) = U,
evaluator: ApproximateEvaluator[U, R],
timeout: Long)
extends JobListener {
val startTime= System.currentTimeMillis()
val totalTasks= rdd.partitions.size
var finishedTasks= 0
var failure: Option[Exception] = None // Set if the job has failed (permanently)
var resultObject: Option[PartialResult[R]] = None// Set if we ve already returned a PartialResult
//当某个分区完成的时候触发taskSucceeded回调函数
override def taskSucceeded(index: Int, result: Any) {
synchronized {
//更新CountEvaluator的当前值
evaluator.merge(index, result.asInstanceOf[U])
finishedTasks += 1
if (finishedTasks== totalTasks) {//当全部分区都完成的是退出等待,返回计算结果
// If we had already returned a PartialResult, set its final value
resultObject.foreach(r = r.setFinalValue(evaluator.currentResult()))
// Notify any waiting thread that may have called awaitResult
//退出等待
this.notifyAll()
}
}
}
……
/**
* Waits for up to timeout milliseconds since the listener was created and then returns a
* PartialResult with the result so far. This may be complete if the whole job is done.
*/
//等待计算结果
def awaitResult(): PartialResult[R] = synchronized {
val finishTime = startTime+ timeout
while (true) {
val time = System.currentTimeMillis()
if (failure.isDefined) {
throw failure.get
} else if (finishedTasks== totalTasks) {//如果在超时时间之内计算完成,则返回全部结果
return new PartialResult(evaluator.currentResult(),true)
} else if (time = finishTime) {//如果已经超时,则返回部分结果
resultObject = Some(newPartialResult(evaluator.currentResult(), false))
return resultObject.get
} else {//如果超时时间没到,则继续休眠
this.wait(finishTime time)
}
}
// Should never be reached, but required to keep the compiler happy
return null
}
}
其中如果在超时时间之内没有完成的话,evaluator.currentResult()会返回符合总数符合正态分布的一个近似结果,感兴趣的同学可以继续研究下去:
private[spark] classCountEvaluator(totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[Long, BoundedDouble] {
var outputsMerged= 0
var sum: Long =0
override def merge(outputId: Int, taskResult: Long) {
outputsMerged += 1
sum += taskResult
}
override def currentResult(): BoundedDouble = {
if (outputsMerged== totalOutputs) {//全部完成
new BoundedDouble(sum,1.0, sum,sum)
} else if (outputsMerged== 0) {//一个任务都没完成
new BoundedDouble(0,0.0, Double.NegativeInfinity, Double.PositiveInfinity)
} else {//部分完成,计算其理论总数的正态分布参数
val p = outputsMerged.toDouble / totalOutputs
val mean = (sum+ 1 p) / p
val variance = (sum+ 1) * (1 p) / (p * p)
val stdev = math.sqrt(variance)
val confFactor = newNormalDistribution().
inverseCumulativeProbability(1 (1 confidence) / 2)
val low = mean confFactor * stdev
val high = mean + confFactor * stdev
new BoundedDouble(mean, confidence, low, high)
}
}
}
因此countApprox的计算过程大致如下:1)excutor端不断的计算分区的总数然后上报给driver端;2)driver端接受excutor上报的总数进行统计,如果在超时时间之内没有全部上报完成的话,则强制退出,返回一个其总数符合正态分布的值,如果在超时时间之内计算完成的话,则返回一个准确值。
6.countApproxDistinct
作用是对RDD集合内容进行去重统计,该统计是一个大约的统计,参数relativeSD控制统计的精确度。relativeSD越小,结果越准确。
/**
* Return approximate number of distinct elements in the RDD.
*
* The algorithm used is based on streamlib s implementation of HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm , available
* a href= http://dx.doi.org/10.1145/2452376.2452456 here /a .
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
*/
def countApproxDistinct(relativeSD: Double =0.05): Long = withScope {
require(relativeSD 0.000017, s accuracy ($relativeSD) must be greater than 0.000017 )
val p = math.ceil(2.0* math.log(1.054 / relativeSD) / math.log(2)).toInt
countApproxDistinct(if (p 4) 4 elsep, 0)
}
采用的是HyperLogLog in Practice算法,原理比较深奥,有兴趣的可以深究。
实例如下:
def main(args: Array[String]) {
val conf = new SparkConf().setAppName( spark-demo ).setMaster( local )
val sc = new SparkContext(conf)
/**
* 构建一个集合,分成20个partition
*/
val a = sc.parallelize(1 to 10000 , 20)
//RDD a内容复制5遍,其中有50000个元素
val b = a++a++a++a++a
//结果是9760,不传参数,默认是0.05
println(b.countApproxDistinct())
//结果是9760
println(b.countApproxDistinct(0.05))
//8224
println(b.countApproxDistinct(0.1))
//10000
println(b.countApproxDistinct(0.001))
}
}
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter:Iterator[T]) = iter.toArray)
Array.concat(results: _*)
}
获取Rdd的所有数据,然后缓存在Driver端,其流程如下:
如果RDD数据量很大的话,请谨慎使用,因为会缓存该RDD的所有数据量。
8.toLocalIterator返回一个保护所有记录的迭代器
/**
* Return an iterator that contains all of the elements in this RDD.
*
* The iterator will consume as much memory as the largest partition in this RDD.
*
* Note: this results in multiple Spark jobs, and if the input RDD is the result
* of a wide transformation (e.g. join with different partitioners), to avoid
* recomputing the input RDD should be cached first.
*/
def toLocalIterator:Iterator[T] = withScope {
//针对每个分区触发一次action
def collectPartition(p: Int): Array[T] = {
sc.runJob(this, (iter: Iterator[T]) = iter.toArray, Seq(p), allowLocal =false).head
}
//调用flatMap将所有记录组装起来返回单个迭代器
(0 until partitions.length).iterator.flatMap(i = collectPartition(i))
}
scala val rdd = sc.parallelize(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at console :24
scala val it = rdd.toLocalIterator
it: Iterator[Int] = non-empty iterator
scala while(it.hasNext){
| println(it.next)
| }
1
2
3
4
5
6
7
8
9
10
9.takeOrdered
takeOrdered函数用于从RDD中,按照默认(升序)或指定排序规则,返回前num个元素。
def takeOrdered(num: Int)(implicitord: Ordering[T]): Array[T] = withScope {
if (num == 0) {
Array.empty
} else {
val mapRDDs = mapPartitions { items =
//先在excutor端进行排序,按照ord排序规则,转化为前num个优先队列
// Priority keeps the largest elements, so let s reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.length ==0) {
Array.empty
} else {
//将分区的计算结果传送给driver,转化为数组,进行排序取前num条记录
mapRDDs.reduce { (queue1, queue2) =
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}
}
List Integer data = Arrays.asList(1,4,3,2,5,6);
JavaRDD Integer JavaRDD = jsc.parallelize(data,2);
for(Integer integer:JavaRDD.takeOrdered(2)){
System.out.println(integer);
}
打印
1
2
相关文章
- 全流程:安装uni-app(小程序端)
- Flowable 快速入门教程:Flowable 入门开发案例,结合流程设计器详细讲解
- 湖仓一体电商项目(十四):实时任务执行流程
- Springmvc执行流程介绍[通俗易懂]
- 流程引擎标准定义_开源流程引擎
- SpringMVC(三):SpringMVC执行流程
- React源码分析7-state计算流程和优先级6
- 如何进行供应链管理体系变革?SCM供应链管理系统助力企业采购流程高效运行,降低供应链风险
- 【rainbowzhou 面试15/101】技术提问--数据质量管理的流程有哪些?
- 03 xxl-job任务执行流程
- iOS开发之进阶篇(1)—— 证书、打包上架流程、p12文件
- 达达快递接入流程和开发文档
- 【Linux 内核 内存管理】mmap 系统调用源码分析 ② ( sys_mmap_pgoff 系统调用函数执行流程 | sys_mmap_pgoff 函数源码 )
- 【Linux 内核 内存管理】mmap 系统调用源码分析 ④ ( do_mmap 函数执行流程 | do_mmap 函数源码 )
- 【Linux 内核 内存管理】mmap 系统调用源码分析 ⑤ ( mmap_region 函数执行流程 | mmap_region 函数源码 )
- 【Linux 内核 内存管理】munmap 系统调用源码分析 ① ( munmap 系统调用函数执行流程 | munmap 函数源码 | vm_munmap 函数源码 )
- 腾讯云购买域名服务器以及完成网站备案详细流程
- MySql执行流程与生命周期详解
- Spring MVC执行流程
- Activiti 流程实例、任务、执行对象及相关的表详解编程语言
- 系统MongoDB与ERP系统的结合:实现高效工作流程(mongodberp)
- Spring MVC拦截器的执行流程
- Linux下编译执行文件的流程(linux编译可执行文件)
- 流程SQL Server中的工作流实现方法(sqlserver里面的)
- 深入了解Oracle中的采购流程(oracle中采购流程)
- 简单易懂MySQL下载和安装的完美指南(mysql下载及安装流程)