Spark算子执行流程详解之五大数据
def combineByKey[C](createCombiner: V = C,
mergeValue: (C, V) = C,
mergeCombiners: (C, C) = C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, mergeCombiners must be defined ) // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException( Cannot use map-side combining with array keys. )
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException( Default partitioner cannot partition array keys. )
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {//如果分区函数相同,则不需要shuffle,只需要进行一次mapPartitions
self.mapPartitions(iter = {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {//否则需要进行shuffle
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
关注其入参:
combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。
要理解combineByKey(),要先理解它在处理数据时是如何处理每个元素的。由于combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的键相同。combineByKey()的处理流程如下:
如果是一个新的元素,此时使用createCombiner()来创建那个键对应的累加器的初始值。(!注意:这个过程会在每个分区第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。)
如果这是一个在处理当前分区中之前已经遇到键,此时combineByKey()使用mergeValue()将该键的累加器对应的当前值与这个新值进行合并。
由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()将各个分区的结果进行合并。如果mergeCombiners为True,则在map的输出的时候提前进行一次合并,如果mergeCombiners为false,则在reduce结果的时候进行一次合并。提前进行合并的作用是为了减少shuffle读取的时候传输的数据量,提升shuffle read的速度。
先来看下ShuffleRDD里面的依赖:
class ShuffledRDD[K, V, C](
@transient var prev: RDD[_ : Product2[K, V]],
part: Partitioner)
extends RDD[(K, C)](prev.context, Nil) {
…
override def getDependencies: Seq[Dependency[_]] = {//其shuffle的write和read由ShuffleDependency生成
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}
…
}
通过向shuffleManager注册获取shuffle读写句柄,默认的shufflemanager是SortShuffleManager
class ShuffleDependency[K, V, C](
@transient _rdd: RDD[_ : Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
val shuffleId: Int = _rdd.context.newShuffleId()
//注册shuffle句柄
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {
private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*/
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
// We currently use the same block store shuffle fetcher as the hash-based shuffle.
new HashShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
: ShuffleWriter[K, V] = {
val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]]
shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps)
new SortShuffleWriter(
shuffleBlockResolver, baseShuffleHandle, mapId, context)
}
/** Remove a shuffle s metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
if (shuffleMapNumber.containsKey(shuffleId)) {
val numMaps = shuffleMapNumber.remove(shuffleId)
(0 until numMaps).map{ mapId =
shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
}
}
true
}
override val shuffleBlockResolver: IndexShuffleBlockResolver = {
indexShuffleBlockResolver
}
/** Shut down this ShuffleManager. */
override def stop(): Unit = {
shuffleBlockResolver.stop()
}
}
private[spark] class SortShuffleWriter[K, V, C](
shuffleBlockResolver: IndexShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, C],
mapId: Int,
context: TaskContext)
extends ShuffleWriter[K, V] with Logging {
private val dep = handle.dependency
private val blockManager = SparkEnv.get.blockManager
private var sorter: ExternalSorter[K, V, _] = null
// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
// we don t try deleting files, etc twice.
private var stopping = false
private var mapStatus: MapStatus = null
private val writeMetrics = new ShuffleWriteMetrics()
context.taskMetrics.shuffleWriteMetrics = Some(writeMetrics)
/** Write a bunch of records to this task s output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
if (dep.mapSideCombine) {//在map端聚合
require(dep.aggregator.isDefined, Map-side combine without Aggregator specified! )
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
} else {//不在map端聚合,连dep.partitioner都不传下去了,map端仅仅是按照分区函数分区罢了,其他不做任何事情
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don t
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
}
// Don t bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}
}
def insertAll(records: Iterator[_ : Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn t high
val shouldCombine = aggregator.isDefined
if (shouldCombine) {//如果在map端聚合,则需要利用mergeValue和createCombiner功能
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) = {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
} else if (bypassMergeSort) {//否则如果分区个数少的话,则写多个分区文件
// SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies
if (records.hasNext) {
spillToPartitionFiles(
WritablePartitionedIterator.fromIterator(records.map { kv =
((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
})
)
}
} else {//否则如果分区个数多的话,则写一个文件,怕临时文件多
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
private[spark] class HashShuffleReader[K, C](
handle: BaseShuffleHandle[K, _, C],
startPartition: Int,
endPartition: Int,
context: TaskContext)
extends ShuffleReader[K, C]
{
require(endPartition == startPartition + 1,
Hash shuffle currently only supports fetching one partition )
private val dep = handle.dependency
/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val ser = Serializer.getSerializer(dep.serializer)
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {//如果已经在map端聚合过的话,则利用mergeCombiners即可完成数据聚合
new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
} else {//否则利用createCombiner和mergeValue进行聚合
new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
}
} else {
require(!dep.mapSideCombine, Map-side combine without Aggregator specified! )
// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair = (pair._1, pair._2))
}
// Sort the output if there is a sort ordering defined.
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won t spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.insertAll(aggregatedIter)
context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
sorter.iterator
case None =
aggregatedIter
}
}
}
因此假设一下的场景combineByKey的mapSideCombine一个为false,另外一个为true的情况:
val initialScores = Array(( Fred , 88.0), ( Fred , 95.0), ( Fred , 91.0), ( Wilma , 93.0), ( Wilma , 95.0), ( Wilma , 98.0))
val d1 = sc.parallelize(initialScores)
type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数)
d1.combineByKey(
score = (1, score),
(c1: MVType, newScore) = (c1._1 + 1, c1._2 + newScore),
(c1: MVType, c2: MVType) = (c1._1 + c2._1, c1._2 + c2._2),
RangePartitioner,
false
).map { case (name, (num, socre)) = (name, socre / num) }.collect
d1.combineByKey(
score = (1, score),
(c1: MVType, newScore) = (c1._1 + 1, c1._2 + newScore),
(c1: MVType, c2: MVType) = (c1._1 + c2._1, c1._2 + c2._2),
).map { case (name, (num, socre)) = (name, socre / num) }.collect
mapSideCombine为true的执行流程如下:
可见提前在map端做聚合可以减少shuffle过程中产生的数据。
23.distinct()去重,删除RDD中相同的元素
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1)
}
def reduceByKey(func: (V, V) = V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
}
def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] = self.withScope {
combineByKey[V]((v: V) = v, func, func, partitioner)
}
最终调用的还是combineByKey,因为对于RDD最终的聚合类操作,其本质运算都是由combineByKey完成的。其具体的执行流程如下:
24.groupByKey将相同key的记录聚合起来
/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* The ordering of elements within each group is not guaranteed, and may even differ
* each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn t use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) = CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) = buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) = c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
其本质就是利用combineByKey来实现相同KEY的聚合操作,但是需要注意的一点是groupByKey不在map端聚合,因为它在map端聚合无法减少网络传输的数据量,反而会增加maptask运行消耗的java内存,进而导致GC拖慢整个计算过程。
现在假设分区函数相同和不同的情况下其groupbykey的执行流程如下:
分区函数相同:
分区函数不相同:
25.aggregateByKey
聚合操作,将相同的key的value聚合起来,类似于sql里面的聚合函数,可以实现求max,min,avg等操作。
/**
* Aggregate the values of each key, using given combine functions and a neutral zero value .
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U s,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
/
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) = U,
combOp: (U, U) = U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
}
/**
* Aggregate the values of each key, using given combine functions and a neutral zero value .
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U s,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) = U,
combOp: (U, U) = U): RDD[(K, U)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () = cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
// We will clean the combiner closure later in `combineByKey`
val cleanedSeqOp = self.context.clean(seqOp)
combineByKey[U]((v: V) = cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner)
}
首先需要和aggregate操作区别开来,aggregate里面的SeqOp和combOp都会使用zeroValue的值,而aggregateByKey的zeroValue只会在SeqOp中使用。且其mapSideCombine为true,会在map端进行聚合,假设利用aggregateByKey计算每月平均气温的操作如下:
val rdd = sc.textFile( 气象数据 )
val rdd2 = rdd.map(x= x.split( )).map(x = (x(0).substring( 从年月日中提取月 ),x(1).toInt))
val zeroValue = (0,0)
val seqOp= (u:(Int, Int), v:Int) = {
(u._1 + 1, u._2 + v)
}
val compOp= (c1:(Int,Int),c2:(Int,Int))= {
(u1._1 + u2._1, u1._2 + u2._2)
}
val vdd3 = vdd2.aggregateByKey(
zeroValue ,
seqOp,
compOp
)
rdd3.foreach(x= println(x._1 + : average tempreture is + x._2._2/x._2._1)
相关文章
- Postgresql源码(80)plpgsql中异常处理编译与执行流程分析(sqlstate)
- Java程序main方法执行流程
- SpringMVC的执行流程
- 详解SpringMVC执行流程[通俗易懂]
- SpringMVC执行流程和原理「建议收藏」
- SpringCloudRPC远程调用核心原理:Feign远程调用的执行流程
- 一文讲述Spring MVC的执行流程
- MapReduce编程初级实践_mapreduce的执行流程
- react源码解析12.状态更新流程_2023-02-28
- 1000BASE-T GMII和MDI间的信号发送和接收流程
- 【Linux 内核 内存管理】mmap 系统调用源码分析 ② ( sys_mmap_pgoff 系统调用函数执行流程 | sys_mmap_pgoff 函数源码 )
- yarn是什么为什么会产生yarn,它解决了什么问题以及yarn的执行流程详解大数据
- Spark算子执行流程详解之八大数据
- Spark算子执行流程详解之一大数据
- JBPM工作流(五)——执行流程实例详解编程语言
- 深入理解Java之jvm启动流程详解编程语言
- Spring MVC拦截器的执行流程
- DHCP协议工作流程剖析
- 25字中文文章标题:Linux执行计划:优化你的工作流程(linux执行计划)
- 揭示Oracle工作流程的秘密(oracle工作流程)
- Redis网络处理从请求到回应(redis网络处理流程)
- mysql基础:mysqld_safe启动执行流程详解