Flink Checkpoint所有配置解读
2023-09-11 14:15:40 时间
配置类在:org.apache.flink.configuration.CheckpointingOptions
配置解析:
配置 | 类型 | 默认值 | 描述 |
---|---|---|---|
state.backend | String | 无 | 检查点存储,用于在执行过程中存储操作符的本地状态 |
state.checkpoint-storage | String | 无 | 用于恢复检查点状态的检查点存储。 |
state.backend.changelog.enabled | Boolean | false | 是否开启状态变更日志 |
state.checkpoints.num-retained | Integer | 1 | 保留的已完成检查点的最大数目 |
state.backend.async | Boolean | true | 已弃用,所有状态快照都是异步的。 |
state.backend.incremental | Boolean | false | 状态后端是否应该创建增量检查点,如果允许,对于增量检查点,存储的只是与前一个检查点不同的部分,而不是完整的检查点状态。 |
state.backend.local-recovery | Boolean | false | 状态后端配置本地恢复,默认本地恢复处于去激活状态。 |
taskmanager.state.local.root-dirs | String | 无 | config参数定义根目录,用于存储基于文件的状态,用于本地恢复。 |
state.savepoints.dir | String | 无 | 保存点的默认目录。由状态后端使用,向文件系统写入保存点(HashMapStateBackend, EmbeddedRocksDBStateBackend) |
state.checkpoints.dir | String | 无 | 用于在Flink支持的文件系统中存储数据文件和检查点元数据的默认目录。存储路径必须能够访问所有参与的进程/节点(即:所有TaskManagers和JobManagers)。 |
state.storage.fs.memory-threshold | MemorySize | 20kb | 状态数据文件的最小大小。所有小于此值的状态块都内联存储在根检查点元数据文件中。 |
state.storage.fs.write-buffer-size | Integer | 4 * 1024 | 写入文件系统的检查点流的写入缓冲区的默认大小。 |
配置代码如下:
/** A collection of all configuration options that relate to checkpoints and savepoints. */
public class CheckpointingOptions {
// ------------------------------------------------------------------------
// general checkpoint options
// ------------------------------------------------------------------------
/**
* The checkpoint storage used to store operator state locally within the cluster during
* execution.
*
* <p>The implementation can be specified either via their shortcut name, or via the class name
* of a {@code StateBackendFactory}. If a StateBackendFactory class name is specified, the
* factory is instantiated (via its zero-argument constructor) and its {@code
* StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called.
*
* <p>Recognized shortcut names are 'hashmap' and 'rocksdb'.
*
* @deprecated Use {@link StateBackendOptions#STATE_BACKEND}.
*/
@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS)
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
@Deprecated
public static final ConfigOption<String> STATE_BACKEND =
ConfigOptions.key("state.backend")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text("The state backend to be used to store state.")
.linebreak()
.text(
"The implementation can be specified either via their shortcut "
+ " name, or via the class name of a %s. "
+ "If a factory is specified it is instantiated via its "
+ "zero argument constructor and its %s "
+ "method is called.",
TextElement.code("StateBackendFactory"),
TextElement.code(
"StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)"))
.linebreak()
.text("Recognized shortcut names are 'hashmap' and 'rocksdb'.")
.build());
/**
* The checkpoint storage used to checkpoint state for recovery.
*
* <p>The implementation can be specified either via their shortcut name, or via the class name
* of a {@code CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified,
* the factory is instantiated (via its zero-argument constructor) and its {@code
* CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called.
*
* <p>Recognized shortcut names are 'jobmanager' and 'filesystem'.
*/
@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 2)
public static final ConfigOption<String> CHECKPOINT_STORAGE =
ConfigOptions.key("state.checkpoint-storage")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"The checkpoint storage implementation to be used to checkpoint state.")
.linebreak()
.text(
"The implementation can be specified either via their shortcut "
+ " name, or via the class name of a %s. "
+ "If a factory is specified it is instantiated via its "
+ "zero argument constructor and its %s "
+ " method is called.",
TextElement.code("CheckpointStorageFactory"),
TextElement.code(
"CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)"))
.linebreak()
.text(
"Recognized shortcut names are 'jobmanager' and 'filesystem'.")
.build());
/** Whether to enable state change log. */
@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS)
@Documentation.ExcludeFromDocumentation("Hidden for now")
public static final ConfigOption<Boolean> ENABLE_STATE_CHANGE_LOG =
ConfigOptions.key("state.backend.changelog.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable state backend to write state changes to StateChangelog.");
/** The maximum number of completed checkpoints to retain. */
@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
public static final ConfigOption<Integer> MAX_RETAINED_CHECKPOINTS =
ConfigOptions.key("state.checkpoints.num-retained")
.defaultValue(1)
.withDescription("The maximum number of completed checkpoints to retain.");
/** @deprecated Checkpoints are aways asynchronous. */
@Deprecated
public static final ConfigOption<Boolean> ASYNC_SNAPSHOTS =
ConfigOptions.key("state.backend.async")
.booleanType()
.defaultValue(true)
.withDescription("Deprecated option. All state snapshots are asynchronous.");
/**
* Option whether the state backend should create incremental checkpoints, if possible. For an
* incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the
* complete checkpoint state.
*
* <p>Once enabled, the state size shown in web UI or fetched from rest API only represents the
* delta checkpoint size instead of full checkpoint size.
*
* <p>Some state backends may not support incremental checkpoints and ignore this option.
*/
@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
public static final ConfigOption<Boolean> INCREMENTAL_CHECKPOINTS =
ConfigOptions.key("state.backend.incremental")
.defaultValue(false)
.withDescription(
"Option whether the state backend should create incremental checkpoints, if possible. For"
+ " an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the"
+ " complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API"
+ " only represents the delta checkpoint size instead of full checkpoint size."
+ " Some state backends may not support incremental checkpoints and ignore this option.");
/**
* This option configures local recovery for this state backend. By default, local recovery is
* deactivated.
*
* <p>Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend
* and HashMapStateBackend do not support local recovery and ignore this option.
*/
@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
public static final ConfigOption<Boolean> LOCAL_RECOVERY =
ConfigOptions.key("state.backend.local-recovery")
.defaultValue(false)
.withDescription(
"This option configures local recovery for this state backend. By default, local recovery is "
+ "deactivated. Local recovery currently only covers keyed state backends. Currently, the MemoryStateBackend "
+ "does not support local recovery and ignores this option.");
/**
* The config parameter defining the root directories for storing file-based state for local
* recovery.
*
* <p>Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend
* does not support local recovery and ignore this option.
*/
@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
public static final ConfigOption<String> LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS =
ConfigOptions.key("taskmanager.state.local.root-dirs")
.noDefaultValue()
.withDescription(
"The config parameter defining the root directories for storing file-based state for local "
+ "recovery. Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend does "
+ "not support local recovery and ignore this option");
// ------------------------------------------------------------------------
// Options specific to the file-system-based state backends
// ------------------------------------------------------------------------
/**
* The default directory for savepoints. Used by the state backends that write savepoints to
* file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend).
*/
@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 3)
public static final ConfigOption<String> SAVEPOINT_DIRECTORY =
ConfigOptions.key("state.savepoints.dir")
.noDefaultValue()
.withDeprecatedKeys("savepoints.state.backend.fs.dir")
.withDescription(
"The default directory for savepoints. Used by the state backends that write savepoints to"
+ " file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend).");
/**
* The default directory used for storing the data files and meta data of checkpoints in a Flink
* supported filesystem. The storage path must be accessible from all participating
* processes/nodes(i.e. all TaskManagers and JobManagers).
*/
@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 2)
public static final ConfigOption<String> CHECKPOINTS_DIRECTORY =
ConfigOptions.key("state.checkpoints.dir")
.stringType()
.noDefaultValue()
.withDeprecatedKeys("state.backend.fs.checkpointdir")
.withDescription(
"The default directory used for storing the data files and meta data of checkpoints "
+ "in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes"
+ "(i.e. all TaskManagers and JobManagers).");
/**
* The minimum size of state data files. All state chunks smaller than that are stored inline in
* the root checkpoint metadata file.
*/
@Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS)
public static final ConfigOption<MemorySize> FS_SMALL_FILE_THRESHOLD =
ConfigOptions.key("state.storage.fs.memory-threshold")
.memoryType()
.defaultValue(MemorySize.parse("20kb"))
.withDescription(
"The minimum size of state data files. All state chunks smaller than that are stored"
+ " inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.")
.withDeprecatedKeys("state.backend.fs.memory-threshold");
/**
* The default size of the write buffer for the checkpoint streams that write to file systems.
*/
@Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS)
public static final ConfigOption<Integer> FS_WRITE_BUFFER_SIZE =
ConfigOptions.key("state.storage.fs.write-buffer-size")
.intType()
.defaultValue(4 * 1024)
.withDescription(
String.format(
"The default size of the write buffer for the checkpoint streams that write to file systems. "
+ "The actual write buffer size is determined to be the maximum of the value of this option and option '%s'.",
FS_SMALL_FILE_THRESHOLD.key()))
.withDeprecatedKeys("state.backend.fs.write-buffer-size");
}
相关文章
- CentOS 7下Samba服务安装与配置详解
- 在 SAP Fiori Launchpad 里给需要执行的 SAPGUI 事物码配置系统别名
- 打开服务器上的 IncludeExceptionDetailInFaults (从 ServiceBehaviorAttribute 或从 <serviceDebug> 配置行为)以便将异常信息发送回
- Atitit.面向接口的web 原理与设计重写 路由启动绑定配置url router rewriting urlpage mvc mvp的 java c#.net php js
- 【Android Gradle 插件】Module 目录下 build.gradle 配置文件 ( android 闭包块配置 | AppExtension 扩展类型参考文档 )
- 玩转华为ENSP模拟器系列 | 配置基于VLAN的VLAN Mapping示例(2 to 1)
- 华为运营商级路由器配置示例 | IPv4静态路由
- 配置
- IPSec设备冗余实验配置