zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

Spark Streaming 数据接收优化

数据Spark 优化 接收 Streaming
2023-09-14 09:00:24 时间
看这篇文章前,请先移步Spark Streaming 数据产生与导入相关的内存分析, 文章重点讲的是从Kafka消费到数据进入BlockManager的这条线路的分析。 这篇内容是个人的一些经验,大家用的时候还是建议好好理解内部的原理,不可照搬 让Receiver均匀的分布到你的Executor上 在Spark Streaming 数据产生与导入相关的内存分析中我说了这么一句话: 我发现在数据量很大的情况下,最容易挂掉的就是Receiver所在的Executor了。 建议Spark Streaming团队最好是能将数据写入到多个BlockManager上。 从现在的API来看,是没有提供这种途径的。但是Spark Streaming 提供了同时读多个topic的功能,每个topic是一个InputStream。 我们可以复用这个功能,具体代码如下:
val kafkaDStreams = (1 to kafkaDStreamsNum).map { _ = KafkaUtils.createStream(

ssc, 

zookeeper, 

groupId, 

Map("your topic" - 1), 

if (memoryOnly) StorageLevel.MEMORY_ONLY else StorageLevel.MEMORY_AND_DISK_SER_2)}

val unionDStream = ssc.union(kafkaDStreams)

unionDStream
kafkaDStreamsNum 是你自己定义的,希望有多少个Executor 启动Receiver 去接收kafka数据。我的经验值是 1/4 个Executors 数目。因为数据还要做replication 一般,所以这样内存最大可以占到  1/2 的storage.
另外,务必给你系统设置 spark.streaming.receiver.maxRate。假设你启动了 N个 Receiver,那么你系统实际会接受到的数据不会超过 N*MaxRate,也就是说,maxRate参数是针对每个 Receiver 设置的。 减少非Storage 内存的占用 也就是我们尽量让数据都占用Spark 的Storage 内存。方法是把spark.streaming.blockInterval 调小点。当然也会造成一个副作用,就是input-block 会多。每个Receiver 产生的的input-block数为: batchInterval* 1000/blockInterval。 这里假设你的batchInterval 是以秒为单位的。 blockInterval 其实我也不知道会有啥影响。其实说白了,就是为了防止GC的压力。实时计算有一个很大问题是GC。 减少单个Executor的内存 一般在Spark Streaming中不建议把 Executor 的内存调的太大。对GC是个压力,大内存一FullGC比较可怕,很可能会拖垮整个计算。 多Executor的容错性也会更好些。
HADOOP MapReduce 处理 Spark 抽取的 Hive 数据【解决方案一】 今天咱先说问题,经过几天测试题的练习,我们有从某题库中找到了新题型,并且成功把我们干趴下,昨天今天就干了一件事,站起来。 java mapeduce 清洗 hive 中的数据 ,清晰之后将driver代码 进行截图提交。
客户流失?来看看大厂如何基于spark+机器学习构建千万数据规模上的用户留存模型 ⛵ 如何在海量用户中精准预测哪些客户即将流失?本文结合音乐流媒体平台 Sparkify 数据,详细讲解一个客户流失建模预测案例的全流程:探索性数据分析 EDA、数据处理、进一步数据探索、建模优化、结果评估。【代码与数据集亲测可运行】
Spark如何对源端数据做切分? 典型的Spark作业读取位于OSS的Parquet外表时,源端的并发度(task/partition)如何确定?特别是在做TPCH测试时有一些疑问,如源端扫描文件的并发度是如何确定的?是否一个parquet文件对应一个partition?多个parquet文件对应一个partition?还是一个parquet文件对应多个partition?本文将从源码角度进行分析进而解答这些疑问。