zl程序教程

您现在的位置是:首页 >  Python

当前栏目

《Spark 官方文档》Spark配置(二)

2023-03-14 22:33:59 时间

内存管理

属性名 默认值 含义
spark.memory.fraction 0.75 堆内存中用于执行、混洗和存储(缓存)的比例。这个值越低,则执行中溢出到磁盘越频繁,同时缓存被逐出内存也更频繁。这个配置的目的,是为了留出用户自定义数据结构、内部元数据使用的内存。推荐使用默认值。请参考this description.
spark.memory.storageFraction 0.5 不会被逐出内存的总量,表示一个相对于 spark.memory.fraction的比例。这个越高,那么执行混洗等操作用的内存就越少,从而溢出磁盘就越频繁。推荐使用默认值。更详细请参考 this description.
spark.memory.offHeap.enabled true 如果true,Spark会尝试使用堆外内存。启用 后,spark.memory.offHeap.size必须为正数。
spark.memory.offHeap.size 0 堆外内存分配的大小(绝对值)。这个设置不会影响堆内存的使用,所以你的执行器总内存必须适应JVM的堆内存大小。必须要设为正数。并且前提是 spark.memory.offHeap.enabled=true.
spark.memory.useLegacyMode false 是否使用老式的内存管理模式(1.5以及之前)。老模式在堆内存管理上更死板,使用固定划分的区域做不同功能,潜在的会导致过多的数据溢出到磁盘(如果不小心调整性能)。必须启用本参数,以下选项才可用:
spark.shuffle.memoryFraction
spark.storage.memoryFraction
spark.storage.unrollFraction
spark.shuffle.memoryFraction 0.2 (废弃)必须先启用spark.memory.useLegacyMode这个才有用。
混洗阶段用于聚合和协同分组的JVM堆内存比例。在任何指定的时间,所有用于混洗的内存总和不会超过这个上限,超出的部分会溢出到磁盘上。如果溢出台频繁,考虑增加spark.storage.memoryFraction的大小。
spark.storage.memoryFraction 0.6 (废弃)必须先启用spark.memory.useLegacyMode这个才有用。
Spark用于缓存数据的对内存比例。这个值不应该比JVM 老生代(old generation)对象所占用的内存大,默认是60%的堆内存,当然你可以增加这个值,同时配置你所用的老生代对象占用内存大小。
spark.storage.unrollFraction 0.2 (废弃)必须先启用spark.memory.useLegacyMode这个才有用。
Spark块展开的内存占用比例。如果没有足够的内存来完整展开新的块,那么老的块将被抛弃。

执行行为

属性名 默认值 含义
spark.broadcast.blockSize 4m TorrentBroadcastFactory每个分片大小。太大会减少广播时候的并发数(更慢了);如果太小,BlockManager可能会给出性能提示。
spark.broadcast.factory org.apache.spark.broadcast.
TorrentBroadcastFactory
广播算法的实现。
spark.cleaner.ttl (infinite) Spark记住任意元数据的保留时间(秒)。周期性的清理能保证比这个更老的元数据将被遗忘(删除)。这对于长期运行的Spark作业非常有用(如,一些7*24运行)。注意,RDD持久化到内存中后,过了这么长时间以后,也会被清理掉(这。。。是不是有点坑!)。
spark.executor.cores YARN模式下默认1;如果是独立部署,则是worker节点上所有可用的core。 单个执行器可用的core数。仅针对YARN和独立部署模式。独立部署时,单个worker节点上会运行多个执行器(executor),只要worker上有足够的core。否则,每个应用在单个worker上只会启动一个执行器。
spark.default.parallelism 对于reduceByKey和join这样的分布式混洗(shuffle)算子,等于父RDD中最大的分区。对于parallelize这样没有父RDD的算子,则取决于集群管理器:

  • Local mode: number of cores on the local machine — 本地模式:机器的core数
  • Mesos fine grained mode: 8 — Mesos细粒度模式:8
  • Others: total number of cores on all executor nodes or 2, whichever is larger — 其他:所有执行器节点上core的数量 或者 2,这两数取较大的
如果用户没有在参数里指定,这个属性是默认的RDD transformation算子分区数,如:join,reduceByKey,parallelize等。
spark.executor.heartbeatInterval 10s 执行器心跳间隔(报告心跳给驱动器)。心跳机制使驱动器了解哪些执行器还活着,并且可以从心跳数据中获得执行器的度量数据。
spark.files.fetchTimeout 60s 获取文件的通讯超时,所获取的文件是通过在驱动器上调用SparkContext.addFile()添加的。
spark.files.useFetchCache true 如果设为true(默认),则同一个spark应用的不同执行器之间,会使用一二共享缓存来拉取文件,这样可以提升同一主机上运行多个执行器时候,任务启动的性能。如果设为false,这个优化就被禁用,各个执行器将使用自己独有的缓存,他们拉取的文件也是各自有一份拷贝。如果在NFS文件系统上使用本地文件系统,可以禁用掉这个优化(参考SPARK-6313
spark.files.overwrite false SparkContext.addFile()添加的文件已经存在,且内容不匹配的情况下,是否覆盖。
spark.hadoop.cloneConf false 如设为true,对每个任务复制一份Hadoop Configuration对象。启用这个可以绕过Configuration线程安全问题(SPARK-2546 )。默认这个是禁用的,很多job并不会受这个issue的影响。
spark.hadoop.validateOutputSpecs true 如设为true,在saveAsHadoopFile及其变体的时候,将会验证输出(例如,检查输出目录是否存在)。对于已经验证过或确认存在输出目录的情况,可以禁用这个。我们建议不要禁用,除非你确定需要和之前的spark版本兼容。可以简单的利用Hadoop 文件系统API手动删掉已存在的输出目录。这个设置会被Spark Streaming StreamingContext生成的job忽略,因为Streaming需要在回复检查点的时候,覆盖已有的输出目录。
spark.storage.memoryMapThreshold 2m spark从磁盘上读取一个块后,映射到内存块的最小大小。这阻止了spark映射过小的内存块。通常,内存映射块是有开销的,应该比接近或小于操作系统的页大小。
spark.externalBlockStore.blockManager org.apache.spark.storage.TachyonBlockManager 用于存储RDD的外部块管理器(文件系统)的实现。
文件系统URL由spark.externalBlockStore.url决定。
spark.externalBlockStore.baseDir System.getProperty(“java.io.tmpdir”) 外部块存储存放RDD的目录。文件系统URL由spark.externalBlockStore.url决定。也可以是逗号分隔的目录列表(Tachyon文件系统)
spark.externalBlockStore.url tachyon://localhost:19998 for Tachyon 所使用的外部块存储文件系统URL。

网络

属性名 默认值 含义
spark.akka.frameSize 128 “control plane” 通讯中所允许的最大消息大小(MB)。通常,只应用于map输出数据的大小信息,这些信息会在执行器和驱动器之间传递。如果你的job包含几千个map和reduce任务,你可能需要增大这个设置。
spark.akka.heartbeat.interval 1000s 设这么大的值,是为了禁用Akka传输失败检测器。也可以重新启用,如果你想用这个特性(但不建议)。设成较大的值可以减少网络开销,而较小的值(1秒左右)可能会对Akka的失败检测更有用。如有需要,可以调整这个值和spark.akka.heartbeat.pauses的组合。一种可能需要使用失败检测的情形是:用一个敏感的失败检测,可以快速识别并逐出不稳定的执行器。然而,在真实的spark集群中,这通常不是GC暂停或网络延迟造成的。除此之外,启用这个还会导致过多的心跳数据交换,从而造成网络洪峰。
spark.akka.heartbeat.pauses 6000s 设这么大的值,是为了禁用Akka传输失败检测器。也可以重新启用,如果你想用这个特性(但不建议)。这个是可接受的Akka心跳暂停时间。这个可以用来控制对GC暂停敏感程度。如有需要,可以调整这个值和spark.akka.heartbeat.interval的组合。
spark.akka.threads 4 用于通讯的actor线程数。如果驱动器机器上有很多CPU core,你可以适当增大这个值。
spark.akka.timeout 100s Spark节点之间通讯超时。
spark.blockManager.port (random) 块管理器(block manager)监听端口。在驱动器和执行器上都有。
spark.broadcast.port (random) 驱动器HTTP广播server监听端口。这和torrent广播没有关系。
spark.driver.host (local hostname) 驱动器主机名。用于和执行器以及独立部署时集群master通讯。
spark.driver.port (random) 驱动器端口。用于和执行器以及独立部署时集群master通讯。
spark.executor.port (random) 执行器端口。用于和驱动器通讯。
spark.fileserver.port (random) 驱动器HTTP文件server监听端口。
spark.network.timeout 120s 所有网络交互的默认超时。这个配置是以下属性的默认值:
spark.core.connection.ack.wait.timeout,
spark.akka.timeout,
spark.storage.blockManagerSlaveTimeoutMs,
spark.shuffle.io.connectionTimeout,spark.rpc.askTimeout or
spark.rpc.lookupTimeout
spark.port.maxRetries 16 绑定一个端口的最大重试次数。如果指定了一个端口(非0),每个后续重试会在之前尝试的端口基础上加1,然后再重试绑定。本质上,这确定了一个绑定端口的范围,就是 [start port, start port + maxRetries]
spark.replClassServer.port (random) 驱动器HTTP class server的监听端口。只和spark shell相关。
spark.rpc.numRetries 3 RPC任务最大重试次数。RPC任务最多重试这么多次。
spark.rpc.retry.wait 3s RPC请求操作重试前等待时间。
spark.rpc.askTimeout 120s RPC请求操作超时等待时间。
spark.rpc.lookupTimeout 120s RPC远程端点查询超时。

调度

属性名 默认值 含义
spark.cores.max (not set) 如果运行在独立部署集群模式(standalone deploy cluster)或者Mesos集群粗粒度共享模式(Mesos cluster in “coarse-grained” sharing mode),这个值决定了spark应用可以使用的最大CPU总数(应用在整个集群中可用CPU总数,而不是单个机器)。如果不设置,那么独立部署时默认为spark.deploy.defaultCores,Mesos集群则默认无限制(即所有可用的CPU)。
spark.locality.wait 3s 为了数据本地性最长等待时间(spark会根据数据所在位置,尽量让任务也启动于相同的节点,然而可能因为该节点上资源不足等原因,无法满足这个任务分配,spark最多等待这么多时间,然后放弃数据本地性)。数据本地性有多个级别,每一级别都是等待这么多时间(同一进程、同一节点、同一机架、任意)。你也可以为每个级别定义不同的等待时间,需要设置spark.locality.wait.node等。如果你发现任务数据本地性不佳,可以增加这个值,但通常默认值是ok的。
spark.locality.wait.node spark.locality.wait 单独定义同一节点数据本地性任务等待时间。你可以设为0,表示忽略节点本地性,直接跳到下一级别,即机架本地性(如果你的集群有机架信息)。
spark.locality.wait.process spark.locality.wait 单独定义同一进程数据本地性任务等待时间。这个参数影响试图访问特定执行器上的缓存数据的任务。
spark.locality.wait.rack spark.locality.wait 单独定义同一机架数据本地性等待时间。
spark.scheduler.maxRegisteredResourcesWaitingTime 30s 调度开始前,向集群管理器注册使用资源的最大等待时间。
spark.scheduler.minRegisteredResourcesRatio 0.8 for YARN mode;
0.0 for standalone mode and Mesos coarse-grained mode
调度启动前,需要注册得到资源的最小比例(注册到的资源数 / 需要资源总数)(YARN模式下,资源是执行器;独立部署和Mesos粗粒度模式下时资源是CPU core【spark.cores.max是期望得到的资源总数】)。可以设为0.0~1.0的一个浮点数。不管job是否得到了最小资源比例,最大等待时间都是由spark.scheduler.maxRegisteredResourcesWaitingTime控制的。
spark.scheduler.mode FIFO 提交到同一个SparkContext上job的调度模式(scheduling mode)。另一个可接受的值是FAIR,而FIFO只是简单的把job按先来后到排队。对于多用户服务很有用。
spark.scheduler.revive.interval 1s 调度器复活worker的间隔时间。
spark.speculation false 如果设为true,将会启动推测执行任务。这意味着,如果stage中有任务执行较慢,他们会被重新调度到别的节点上执行。
spark.speculation.interval 100ms Spark检查慢任务的时间间隔。
spark.speculation.multiplier 1.5 比任务平均执行时间慢多少倍的任务会被认为是慢任务。
spark.speculation.quantile 0.75 对于一个stage来说,完成多少百分比才开始检查慢任务,并启动推测执行任务。
spark.task.cpus 1 每个任务分配的CPU core。
spark.task.maxFailures 4 单个任务最大失败次数。应该>=1。最大重试次数 = spark.task.maxFailures – 1

动态分配

属性名 默认值 含义
spark.dynamicAllocation.enabled false 是否启用动态资源分配特性,启用后,执行器的个数会根据工作负载动态的调整(增加或减少)。注意,目前在YARN模式下不用。更详细信息,请参考:here该特性依赖于 spark.shuffle.service.enabled 的启用。同时还和以下配置相关:spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors以及 spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation
.executorIdleTimeout
60s 动态分配特性启用后,空闲时间超过该配置时间的执行器都会被移除。更详细请参考这里:description
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 动态分配特性启用后,包含缓存数据的执行器如果空闲时间超过该配置设置的时间,则被移除。更详细请参考:description
spark.dynamicAllocation.initialExecutors spark
.dynamicAllocation
.minExecutors
动态分配开启后,执行器的初始个数
spark.dynamicAllocation.maxExecutors infinity 动态分配开启后,执行器个数的上限
spark.dynamicAllocation.minExecutors 0 动态分配开启后,执行器个数的下限
spark.dynamicAllocation.schedulerBacklogTimeout 1s 动态分配启用后,如果有任务积压的持续时间长于该配置设置的时间,则申请新的执行器。更详细请参考:description
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout 和spark.dynamicAllocation.schedulerBacklogTimeout类似,只不过该配置对应于随后持续的执行器申请。更详细请参考: description

安全

属性名 默认值 含义
spark.acls.enable false 是否启用Spark acls(访问控制列表)。如果启用,那么将会检查用户是否有权限查看或修改某个作业(job)。注意,检查的前提是需要知道用户是谁,所以如果用户是null,则不会做任何检查。你可以在Spark UI上设置过滤器(Filters)来做用户认证,并设置用户名。
spark.admin.acls Empty 逗号分隔的用户列表,在该列表中的用户/管理员将能够访问和修改所有的Spark作业(job)。如果你的集群是共享的,并且有集群管理员,还有需要调试的开发人员,那么这个配置会很有用。如果想让所有人都有管理员权限,只需把该配置设置为”*”
spark.authenticate false 设置Spark是否认证集群内部连接。如果不是在YARN上运行,请参考 spark.authenticate.secret
spark.authenticate.secret None 设置Spark用于内部组件认证的秘钥。如果不是在YARN上运行,且启用了 spark.authenticate,那么该配置必须设置
spark.authenticate.enableSaslEncryption false 是否对Spark内部组件认证使用加密通信。该配置目前只有 block transfer service 使用。
spark.network.sasl.serverAlwaysEncrypt false 是否对支持SASL认证的service禁用非加密通信。该配置目前只有 external shuffle service 支持。
spark.core.connection.ack.wait.timeout 60s 网络连接等待应答信号的超时时间。为了避免由于GC等导致的意外超时,你可以设置一个较大的值。
spark.core.connection.auth.wait.timeout 30s 网络连接等待认证的超时时间。
spark.modify.acls Empty 逗号分隔的用户列表,在改列表中的用户可以修改Spark作业。默认情况下,只有启动该Spark作业的用户可以修改之(比如杀死该作业)。如果想要任何用户都可以修改作业,请将该配置设置为”*”
spark.ui.filters None 逗号分隔的过滤器class列表,这些过滤器将用于Spark web UI。这里的过滤器应该是一个标准的 javax servlet Filter 。每个过滤器的参数可以通过java系统属性来设置,如下:
spark.<class name of filer>.params=’param1=value1,param2=value2’例如:
-Dspark.ui.filters=com.test.filter1
-Dspark.com.test.filter1.params=’param1=foo,param2=testing’
spark.ui.view.acls Empty 逗号分隔的用户列表,在该列表中的用户可以查看Spark web UI。默认,只有启动该Spark作业的用户可以查看之。如果需要让所有用户都能查看,只需将该配置设为”*”

加密

属性名 默认值 含义
spark.ssl.enabled false 是否启用SSL连接(在所有所支持的协议上)。所有SSL相关配置(spark.ssl.xxx,其中xxx是一个特定的配置属性),都是全局的。如果需要在某些协议上覆盖全局设置,那么需要在该协议命名空间上进行单独配置。使用 spark.ssl.YYY.XXX 来为协议YYY覆盖全局配置XXX。目前YYY的可选值有 akka(用于基于AKKA框架的网络连接) 和 fs(用于应广播和文件服务器)
spark.ssl.enabledAlgorithms Empty 逗号分隔的加密算法列表。这些加密算法必须是JVM所支持的。这里有个可用加密算法参考列表: this
spark.ssl.keyPassword None 在key-store中私匙对应的密码。
spark.ssl.keyStore None key-store文件路径。可以是绝对路径,或者以本组件启动的工作目录为基础的相对路径。
spark.ssl.keyStorePassword None key-store的密码。
spark.ssl.protocol None 协议名称。该协议必须是JVM所支持的。这里有JVM支持的协议参考列表:this
spark.ssl.trustStore None trust-store文件路径。可以是绝对路径,或者以本组件启动的工作目录为基础的相对路径。
spark.ssl.trustStorePassword None trust-store的密码

Spark Streaming [流式]

属性名 默认值 含义
spark.streaming.backpressure.enabled false 是否启用Spark Streaming 的内部反压机制(spark 1.5以上支持)。启用后,Spark Streaming会根据当前批次的调度延迟和处理时长来控制接收速率,这样一来,系统的接收速度会和处理速度相匹配。该特性会在内部动态地设置接收速率。该速率的上限将由 spark.streaming.receiver.maxRate 和 spark.streaming.kafka.maxRatePerPartition 决定(如果它们设置了的话)。
spark.streaming.blockInterval 200ms 在将数据保存到Spark之前,Spark Streaming接收器组装数据块的时间间隔。建议不少于50ms。关于Spark Streaming编程指南细节,请参考 performance tuning 这一节。
spark.streaming.receiver.maxRate not set 接收速度的最大速率(每秒记录条数)。实际上,每个流每秒将消费这么多条记录。设置为0或者负数表示不限制速率。更多细节请参考: deployment guide
spark.streaming.receiver.writeAheadLog.enable false 是否启用接收器预写日志。所有的输入数据都会保存到预写日志中,这样在驱动器失败后,可以基于预写日志来恢复数据。更详细请参考:deployment guide
spark.streaming.unpersist true 是否强制Spark Streaming 自动从内存中清理掉所生成并持久化的RDD。同时,Spark Streaming收到的原始数据也将会被自动清理掉。如果设置为false,那么原始数据以及持久化的RDD将不会被自动清理,以便外部程序可以访问这些数据。当然,这将导致Spark消耗更多的内存。
spark.streaming.stopGracefullyOnShutdown false 如果设为true,Spark将会在JVM关闭时,优雅地关停StreamingContext,而不是立即关闭之。
spark.streaming.kafka.maxRatePerPartition not set 在使用Kafka direct stream API时,从每个Kafka数据分区读取数据的最大速率(每秒记录条数)。更详细请参考:Kafka Integration guide
spark.streaming.kafka.maxRetries 1 驱动器连续重试的最大次数,这个配置是为了让驱动器找出每个Kafka分区上的最大offset(默认值为1,意味着驱动器将最多尝试2次)。只对新的Kafka direct stream API有效。
spark.streaming.ui.retainedBatches 1000 Spark Streaming UI 以及 status API 中保留的最大批次个数。

SparkR

属性名 默认值 含义
spark.r.numRBackendThreads 2 SparkR RBackEnd处理RPC调用的后台线程数
spark.r.command Rscript 集群模式下,驱动器和worker上执行的R脚本可执行文件
spark.r.driver.command spark.r.command client模式的驱动器执行的R脚本。集群模式下会忽略

集群管理器

每个集群管理器都有一些额外的配置选项。详细请参考这里:

YARN
Mesos
Standalone Mode

环境变量

有些Spark设置需要通过环境变量来设定,这些环境变量可以在${SPARK_HOME}/conf/spark-env.sh脚本(Windows下是conf/spark-env.cmd)中设置。如果是独立部署或者Mesos模式,这个文件可以指定机器相关信息(如hostname)。运行本地Spark应用或者submit脚本时,也会引用这个文件。

注意,conf/spark-env.sh默认是不存在的。你需要复制conf/spark-env.sh.template这个模板来创建,还有注意给这个文件附上可执行权限。

以下变量可以在spark-env.sh中设置:

环境变量 含义
JAVA_HOME Java安装目录(如果没有在PATH变量中指定)
PYSPARK_PYTHON 驱动器和worker上使用的Python二进制可执行文件(默认是python)
PYSPARK_DRIVER_PYTHON 仅在驱动上使用的Python二进制可执行文件(默认同PYSPARK_PYTHON)
SPARKR_DRIVER_R SparkR shell使用的R二进制可执行文件(默认是R)
SPARK_LOCAL_IP 本地绑定的IP
SPARK_PUBLIC_DNS Spark程序公布给其他机器的hostname

另外,还有一些选项需要在Spark standalone cluster scripts里设置,如:每台机器上使用的core数量,和最大内存占用量。

spark-env.sh是一个shell脚本,因此一些参数可以通过编程方式来设定 – 例如,你可以获取本机IP来设置SPARK_LOCAL_IP。

日志配置

Spark使用log4j 打日志。你可以在conf目录下用log4j.properties来配置。复制该目录下已有的log4j.properties.template并改名为log4j.properties即可。

覆盖配置目录

默认Spark配置目录是”${SPARK_HOME}/conf”,你也可以通过 ${SPARK_CONF_DIR}指定其他目录。Spark会从这个目录下读取配置文件(spark-defaults.conf,spark-env.sh,log4j.properties等)

继承Hadoop集群配置

如果你打算用Spark从HDFS读取数据,那么有2个Hadoop配置文件必须放到Spark的classpath下:

  • hdfs-site.xml,配置HDFS客户端的默认行为
  • core-site.xml,默认文件系统名

这些配置文件的路径在不同发布版本中不太一样(如CDH和HDP版本),但通常都能在 ${HADOOP_HOME}/etc/hadoop/conf目录下找到。一些工具,如Cloudera Manager,可以动态修改配置,而且提供了下载一份拷贝的机制。

要想让这些配置对Spark可见,请在${SPARK_HOME}/spark-env.sh中设置HADOOP_CONF_DIR变量。

 转载自 并发编程网 - ifeve.com