Flink 状态管理-快照策略
快照策略(SnapshotStrategy)
Flink的检查点机制是建立在分布式一致快照之上的,从而实现数据处理的exactly-once处理语义。无论是Keyed state(HeapKeyStateBackend、RocksDBKeyedStateBackend)还是Operator state(DefaultOperatorStateBackend)都会接收快照执行请求(snapshot方法),而具体的快照操作都交由具体的snapshot策略完成。
下面是Flink快照策略UML,可以看到Keyed state中的HeapSnapshotStrategy
和RocksDBSnapshotStrategyBase
分别对应堆内存和RocksDB(RocksDB又细分为全量快照和增量快照)存储后端的快照执行策略,而DefaultOperatorStateBackendSnapshotStrategy
对应着Operator state存储后端快照执行策略。
除了Keyed state和Operator state之外,因为savepoint本质也是snapshot的特殊实现,所以对应的savepoint执行策略SavepointSnapshotStrategy
也实现了SnapshotStrategy
接口。
(福利推荐:阿里云、腾讯云、华为云服务器最新限时优惠活动,云服务器1核2G仅88元/年、2核4G仅698元/3年,点击这里立即抢购>>>)
下面是SnapshotStrategy
接口定义,其中定义了执行快照的所需步骤:
- 同步执行部分,用于生成执行快照所需的资源,为下一步写入快照数据做好资源准备。
- 异步执行部分,将快照数据写入到提供的
CheckpointStreamFactory
中。
public interface SnapshotStrategy<S extends StateObject, SR extends SnapshotResources> { //同步执行生成快照的部分,可以理解为为执行快照准备必要的资源。 SR syncPrepareResources(long checkpointId) throws Exception; //异步执行快照写入部分,快照数据写入到CheckpointFactory SnapshotResultSupplier<S> asyncSnapshot( SR syncPartResource, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions); //用于执行异步快照部分的Supplier @FunctionalInterface interface SnapshotResultSupplier<S extends StateObject> { //Performs the asynchronous part of a checkpoint and returns the snapshot result. SnapshotResult<S> get(CloseableRegistry snapshotCloseableRegistry) throws Exception; } }
下面是SnapshotResources
所对应的UML图:
- 全量快照
FullSnapshotResources
下分别对应着堆内存快照资源HeapSnapshotResources
以及RocksDB全量快照资源实现类RocksDBFullSnapshotResources
- RocksDB增量快照资源实现类
IncrementalRocksDBSnapshotResoruces
。 - Operator state快照资源实现类
DefaultOperatorStateBackendSnapshotResources
。
SnapshotResources
接口定义如下,只有一个release方法定义,用于在异步Snapshot执行完成后清空资源。
@Internal public interface SnapshotResources { /** Cleans up the resources after the asynchronous part is done. */ void release(); }
关于具体资源实现类我们在对应的快照策略中来查看。
堆内存快照策略(HeapSnasphotStrategy)
在看堆内存快照策略之前,我们先看下堆内存执行快照所对应的资源类HeapSnapshotResources
。通过上面的UML我们可以看到堆内存快照和RocksDB全量快照都实现了FullSnapshotResources
,这也说明了堆内存存储后端不存在增量快照的实现。
FullSnapshotResources
定义了与具体存储后端无关的全量执行全量快照资源,它们都是通过FullSnapshotAsyncWriter
来写快照数据。
FullSnapshotResources
接口定义如下,其中泛型K代表了具体存储key的数据类型。
public interface FullSnapshotResources<K> extends SnapshotResources { //返回此状态快照的元数据列表,StateMetaInfoSnapshot记录每个状态对应快照元数据信息,比如state name、 backend 类型、序列化器等。 List<StateMetaInfoSnapshot> getMetaInfoSnapshots(); //创建用于遍历当前快照的迭代器 KeyValueStateIterator createKVStateIterator() throws IOException; //当前快照对应的KeyGroupRange KeyGroupRange getKeyGroupRange(); /** Returns key [email protected] TypeSerializer}. */ TypeSerializer<K> getKeySerializer(); /** Returns the [email protected] StreamCompressionDecorator} that should be used for writing. */ StreamCompressionDecorator getStreamCompressionDecorator(); }
下面我们看下HeapSnapshotStrategy
中的两个核心方法syncPrepareResources
和asyncSnapshot
。
class HeapSnapshotStrategy<K> implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotResources<K>> { ... //准备snapshot资源HeapSnapshotResources @Override public HeapSnapshotResources<K> syncPrepareResources(long checkpointId) { return HeapSnapshotResources.create( registeredKVStates, registeredPQStates, keyGroupCompressionDecorator, keyGroupRange, getKeySerializer(), totalKeyGroups); } @Override public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot( HeapSnapshotResources<K> syncPartResource, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) { ...... //SupplierWithException是Java Supplier可能抛出异常的函数接口,第一个泛型参数是supplier执行返回类型,第二个参数为Supplier中函数抛出的异常 final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier = localRecoveryConfig.isLocalRecoveryEnabled() //是否使用本地恢复 && !checkpointOptions.getCheckpointType().isSavepoint() ? () -> createDuplicatingStream( //本地恢复并且当前不是savepoint,创建复制流 checkpointId, CheckpointedStateScope.EXCLUSIVE, streamFactory, localRecoveryConfig .getLocalStateDirectoryProvider()) : () -> createSimpleStream(//非本地恢复,或者是savepoint,创建简单流 CheckpointedStateScope.EXCLUSIVE, streamFactory); return (snapshotCloseableRegistry) -> { ...... //输出数据流 final CheckpointStreamFactory.CheckpointStateOutputStream localStream = streamWithResultProvider.getCheckpointOutputStream(); ////使用KeyedBackendSerializationProxy写cp数据 final DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(localStream); serializationProxy.write(outView); ...... }; } }
上面asyncSnapshot方法通过CheckpointStreamWithResultProvider
来创建快照输出流。该类核心就是封装了获取输出流,如果没有配置本地状态恢复,只会创建一个输出流来讲snapshot数据写入到job所配置的Checkpoint存储。如果配置了本地恢复,就需要将状态数据写本地了(本地数据恢复),所以对于这种情况会获取两个输出流,一个用于写配置的Checkpoint存储,一个用于写本地。
public interface CheckpointStreamWithResultProvider extends Closeable { //关闭输出流,并返回带有流句柄的快照结果 @Nonnull SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException; //返回snapshot输出流 @Nonnull CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream(); @Override default void close() throws IOException { getCheckpointOutputStream().close(); } ... }
而CheckpointStreamWithResultProvider
的两个内部实现类也就分别对应了创建simple流(PrimaryStreamOnly,只会创建一个输出流, 这个流是我们配置checkpoint存储的写入地方,可能是远端HDFS、JobManager等),和创建duplicating流(PrimaryAndSecondaryStream,两个输出流,第一个流和PrimaryStreamOnly一样;第二个输出流用于写入到本地、TaskManager等,用于本地恢复)。
创建simple stream,下面可以看到只会创建一个primary stream。
static CheckpointStreamWithResultProvider createSimpleStream( @Nonnull CheckpointedStateScope checkpointedStateScope, @Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException { //创建主输出流 CheckpointStreamFactory.CheckpointStateOutputStream primaryOut = primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope); return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut); }
创建duplicating stream,可以看到除了一个primary stream外,还会创建写文件的second stream。
@Nonnull static CheckpointStreamWithResultProvider createDuplicatingStream( @Nonnegative long checkpointId, @Nonnull CheckpointedStateScope checkpointedStateScope, @Nonnull CheckpointStreamFactory primaryStreamFactory, @Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) throws IOException { CheckpointStreamFactory.CheckpointStateOutputStream primaryOut = primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope); try { //cp数据写出路径 File outFile = new File( secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory( checkpointId), String.valueOf(UUID.randomUUID())); Path outPath = new Path(outFile.toURI()); //构建写入文件的输出流 CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut = new FileBasedStateOutputStream(outPath.getFileSystem(), outPath); return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream( primaryOut, secondaryOut); } catch (IOException secondaryEx) { LOG.warn( "Exception when opening secondary/local checkpoint output stream. " + "Continue only with the primary stream.", secondaryEx); } return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut); }
上面CheckpointStreamFactory
创建输出流,该输出流用于将Checkpoint数据写入到外部,比如通过FsCheckpoihntStreamFactory
将检查点数据写到外部文件系统。
public interface CheckpointStreamFactory { //创建一个新的状态输出流,CheckpointStateOutputStream为当前CheckpointStreamFactory内部静态抽象类 CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException; //CheckpointStateOutputStream基类,相关实现都在CheckpointStreamFactory的子类 abstract class CheckpointStateOutputStream extends FSDataOutputStream { //关闭数据流并获取句柄 @Nullable public abstract StreamStateHandle closeAndGetHandle() throws IOException; //关闭数据流 @Override public abstract void close() throws IOException; } }
RocksDB快照存储策略
上面的UML我们可以知道RocksDB快照存储策略主要对应三个核心类,抽象类RocksDBSnapshotStrategyBase
、全量快照策略RocksDBFullSnapshotStrategy
和增量快照策略RocksDBIncrementalSnapshotStrategy
。RocksDBSnapshotStrategyBase
定义了一些RocksDB、state相关的成员变量,具体实现都在相关子类中。
全量快照
全量快照RocksDBFullSnapshotStrategy
用于创建RocksDBKeyedStateBackend
的全量快照,每次Checkpoint会将全量状态数据同步到远端(JobManager或HDFS)。
下面我们同样看下核心方法:asyncPrepareResources和asyncSnapshot。
public class RocksFullSnapshotStrategy<K> extends RocksDBSnapshotStrategyBase<K, FullSnapshotResources<K>> { ...... @Override public FullSnapshotResources<K> syncPrepareResources(long checkpointId) throws Exception { //构建RocksDB全量快照资源类,RocksDBFullSnapshotResources和HeapFullSnapshotResources相比,包含了 //RocksDB 实例和快照Snapshot return RocksDBFullSnapshotResources.create( kvStateInformation, registeredPQStates, db, rocksDBResourceGuard, keyGroupRange, keySerializer, keyGroupPrefixBytes, keyGroupCompressionDecorator); } @Override public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot( FullSnapshotResources<K> fullRocksDBSnapshotResources, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) { if (fullRocksDBSnapshotResources.getMetaInfoSnapshots().isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug( "Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", timestamp); } return registry -> SnapshotResult.empty(); } //createCheckpointStreamSupplier和Heap中一样,根据是否启动本地恢复,创建Duplicating和simple stream final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier = createCheckpointStreamSupplier( checkpointId, checkpointStreamFactory, checkpointOptions); //创建全量异步Writer return new FullSnapshotAsyncWriter<>( checkpointOptions.getCheckpointType(), checkpointStreamSupplier, fullRocksDBSnapshotResources); } ...... }
FullSnapshotAsyncWriter
也是一个Supplier,用于异步写全量快照数据到给定的输出流中。
public class FullSnapshotAsyncWriter<K> implements SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> { @Override public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception { ...... //获取输出流 final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider = checkpointStreamSupplier.get(); snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider); //写快照数据到输出流中 writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets); ...... } private void writeSnapshotToOutputStream( @Nonnull CheckpointStreamWithResultProvider checkpointStreamWithResultProvider, @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets) throws IOException, InterruptedException { //通过输出视图将快照数据写入到指定输出流中,注意 checkpointStreamWithResultProvider可能写两份数据 final DataOutputView outputView = new DataOutputViewStreamWrapper( checkpointStreamWithResultProvider.getCheckpointOutputStream()); //写元数据 writeKVStateMetaData(outputView); //为每个state实例写状态数据 try (KeyValueStateIterator kvStateIterator = snapshotResources.createKVStateIterator()) { writeKVStateData( kvStateIterator, checkpointStreamWithResultProvider, keyGroupRangeOffsets); } } }
下面我们看下最关键的writeKVStateData,到底是怎么将全量数据写到外部的。我们抛开繁杂的细节,就看这里怎么写的。可以看到实际就是迭代KeyValueStateIterator
。
private void writeKVStateData( final KeyValueStateIterator mergeIterator, final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider, final KeyGroupRangeOffsets keyGroupRangeOffsets) throws IOException, InterruptedException { ...... try { ...... //就是遍历KeyValueStateIterator迭代器 // main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking // key-group offsets. while (mergeIterator.isValid()) { ...... writeKeyValuePair(previousKey, previousValue, kgOutView); ...... // request next k/v pair previousKey = mergeIterator.key(); previousValue = mergeIterator.value(); mergeIterator.next(); } ...... } finally { // this will just close the outer stream IOUtils.closeQuietly(kgOutStream); } }
KeyValueStateIterator
就是记录了当前快照的所有key-value实体,RocksDB和Heap分别有各自的迭代器实现。
我们看下RocksStatesPerKeyGroupMergeIterator
是如何创建的。我们在上面看FullSnapshotResources
接口时看到了抽象方法createKVStateIterator
定义,该方法就是专门用于创建迭代器的。HeapSnapshotResources
和RocksDBFullSnapshotResources
分别实现了该方法来创建Heap和RocksDB迭代器。下面是RocksDBFullSnapshotResources.createKVStateIterator
实现。
@Override public KeyValueStateIterator createKVStateIterator() throws IOException { ...... try { //创建RocksDB ReadOptions,设置读取上面的RocksDB snapshot,该snapshot是在Checkpoint同步阶段生成的 ReadOptions readOptions = new ReadOptions(); closeableRegistry.registerCloseable(readOptions::close); readOptions.setSnapshot(snapshot); //RocksDBIteratorWrapper是对RocksDBIterator的一层包装 List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators = createKVStateIterators(closeableRegistry, readOptions); ....... //RocksStatesPerKeyGroupMergeIterator实际是将多个state实例(ColumnFamily)的迭代器包成一个迭代器 return new RocksStatesPerKeyGroupMergeIterator( closeableRegistry, kvStateIterators, heapPriorityQueueIterators, keyGroupPrefixBytes); } catch (Throwable t) { IOUtils.closeQuietly(closeableRegistry); throw new IOException("Error creating merge iterator", t); } } private List<Tuple2<RocksIteratorWrapper, Integer>> createKVStateIterators( CloseableRegistry closeableRegistry, ReadOptions readOptions) throws IOException { final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators = new ArrayList<>(metaData.size()); int kvStateId = 0; //每个state,也就是每个RocksDB的ColumnFamily都会创建一个迭代器 for (MetaData metaDataEntry : metaData) { RocksIteratorWrapper rocksIteratorWrapper = createRocksIteratorWrapper( db, metaDataEntry.rocksDbKvStateInfo.columnFamilyHandle, metaDataEntry.stateSnapshotTransformer, readOptions); kvStateIterators.add(Tuple2.of(rocksIteratorWrapper, kvStateId)); closeableRegistry.registerCloseable(rocksIteratorWrapper); ++kvStateId; } return kvStateIterators; } private static RocksIteratorWrapper createRocksIteratorWrapper( RocksDB db, ColumnFamilyHandle columnFamilyHandle, StateSnapshotTransformer<byte[]> stateSnapshotTransformer, ReadOptions readOptions) { //创建RocksDB Iterator,被包在了Flink定义的RocksDBIteratorWrapper中 RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions); return stateSnapshotTransformer == null ? new RocksIteratorWrapper(rocksIterator) : new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer); }
上面代码可以看到这里的迭代器其实本质还是RocksDB自己的迭代器(指定了读取的snapshot),Flink将其包在了RocksDBIteratorWrapper
中(为什么需要包一层可以查看RocksDB自身官网Iterator异常处理)。因为可能有多个state实例,每个实例都有自己的一个迭代器,最后Flink将这些迭代器封装到一个迭代器中,即RocksStatetsPerKeyGroupMergeIterator
。
增量快照
RocksIncrementalSnapshotStrategy
是RocksDBKeyedStateBackend
增量快照策略,它是基于RocksDB的native Checkpoint来实现增量快照的。
我们在看RocksIncrementalSnapshotStrategy
的syncPrepareResources和asyncSnapshot前,先看下RocksDB增量快照会用到的一些关键成员变量。
//RocksDB增量快照资源信息为内部类IncrementalRocksDBSnapshotResources public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategyBase< K, RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources> { //RocksDB实例目录 @Nonnull private final File instanceBasePath; /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */ @Nonnull private final UUID backendUID; //记录了checkpoint id和当前checkpoint sst文件映射关系 @Nonnull private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles; //最后一次完成的Checkpoint ID private long lastCompletedCheckpointId; //用于上传快照文件(RocksDB checkpoint生成的sst文件等) private final RocksDBStateUploader stateUploader; ... }
下面我们再看下同步资源准备阶段,主要做了两件事:
- 获取最近一次Checkpoint生成的sst文件,也就是通过materializedSstFiles获取。用于增量文件对比。
- 创建RocksDB Checkpoint。
@Override public IncrementalRocksDBSnapshotResources syncPrepareResources(long checkpointId) throws Exception { //目录准备,如果开启本地恢复,则创建永久目录,否则创建临时目录 final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId); LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory); final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>(kvStateInformation.size()); //最近一次完成的Checkpoint 所生成的sst文件,用于增量对比 final Set<StateHandleID> baseSstFiles = snapshotMetaData(checkpointId, stateMetaInfoSnapshots); //创建RocksDB 检查点 takeDBNativeCheckpoint(snapshotDirectory); return new IncrementalRocksDBSnapshotResources( snapshotDirectory, baseSstFiles, stateMetaInfoSnapshots); }
takeDBNativeCheckpoint
就是同步创建RocksDB的Checkpoint,Checkpoint数据会在指定目录生成(sst文件、misc文件)。
private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory) throws Exception { try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource(); Checkpoint checkpoint = Checkpoint.create(db)) { checkpoint.createCheckpoint(outputDirectory.getDirectory().toString()); } catch (Exception ex) { ...... } }
asyncSnapshot内部很简单,主要创建RocksDBIncrementalSnapshotOperation
Supplier来创建增量快照。
@Override public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot( IncrementalRocksDBSnapshotResources snapshotResources, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) { ... return new RocksDBIncrementalSnapshotOperation( checkpointId, checkpointStreamFactory, snapshotResources.snapshotDirectory, //RocksDB Checkpoint生成目录 snapshotResources.baseSstFiles, //上次Cp完成的sst文件 snapshotResources.stateMetaInfoSnapshots); }
下面我们看下增量快照实现的核心RocksDBIncrementalSnapshotOperation
。
private final class RocksDBIncrementalSnapshotOperation implements SnapshotResultSupplier<KeyedStateHandle> { ... @Override public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception { ... // 当前RocksDB checkpoint生成的sst文件 final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); // 当前RocksDB Checkpoint的misc files(元数据文件) final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); ...... //上传增量sst文件和misc 文件,uploadSstFiles方法内部获取遍历RocksDB Checkpoint目录比较新增sst文件 uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry); //塞入当前Checkpoint对应sst文件 synchronized (materializedSstFiles) { materializedSstFiles.put(checkpointId, sstFiles.keySet()); } ...... } }
我们再看下上面的uploadSstFiles方法实现:
private void uploadSstFiles( @Nonnull Map<StateHandleID, StreamStateHandle> sstFiles, @Nonnull Map<StateHandleID, StreamStateHandle> miscFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry) throws Exception { //增量sst本地文件路径 Map<StateHandleID, Path> sstFilePaths = new HashMap<>(); //misc文件路径 Map<StateHandleID, Path> miscFilePaths = new HashMap<>(); //当前RocksDB Checkpoint目录 Path[] files = localBackupDirectory.listDirectory(); if (files != null) { //查找增量文件 createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths); //使用stateUploader上传增量sst文件 sstFiles.putAll( stateUploader.uploadFilesToCheckpointFs( sstFilePaths, checkpointStreamFactory, snapshotCloseableRegistry)); //上传misc文件 miscFiles.putAll( stateUploader.uploadFilesToCheckpointFs( miscFilePaths, checkpointStreamFactory, snapshotCloseableRegistry)); } }
上面createUploadFilesPaths
方法用于对比查找增量sst文件,并生成要被上传的sst文件和misc文件。
private void createUploadFilePaths( Path[] files, Map<StateHandleID, StreamStateHandle> sstFiles, Map<StateHandleID, Path> sstFilePaths, Map<StateHandleID, Path> miscFilePaths) { for (Path filePath : files) { final String fileName = filePath.getFileName().toString(); //文件句柄 final StateHandleID stateHandleID = new StateHandleID(fileName); //sst文件和最后一次Cp sst文件对比,查找增量 if (fileName.endsWith(SST_FILE_SUFFIX)) { final boolean existsAlready = baseSstFiles != null && baseSstFiles.contains(stateHandleID); if (existsAlready) { //对于之前已经存在的sst文件,只使用一个占位符说明之前上传过的,文件在共享目录 sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle()); } else { //新增文件,将要被上传的 sstFilePaths.put(stateHandleID, filePath); } } else { //misc文件全部上传 miscFilePaths.put(stateHandleID, filePath); } } }
可以看到增量快照的实现逻辑就是:
- 通过RocksDB的Checkpoint生成当前快照的sst文件(由于LSM特性,sst文件是不可变的).
- Flink每次记录当前Checkpoint id和其快照sst文件的映射关系。
- 上传当前Checkpoint对应的sst文件和misc文件。
- 之后的Checkpoint中如果还有之前的sst文件,那这些文件就不需要在上传到HDFS了。
可以看到Flink的增量Checkpoint就是巧妙利用了LSM 中sst文件是递增不变的特性。
Operator state快照策略
Operator state的快照策略只有一个,即DefaultOperatorStateBackendSnapshotStrategy
,它将Operator state中的ListState和BroadcastState的快照数据写出到快照存储端。
class DefaultOperatorStateBackendSnapshotStrategy implements SnapshotStrategy< OperatorStateHandle, DefaultOperatorStateBackendSnapshotStrategy .DefaultOperatorStateBackendSnapshotResources> { private final ClassLoader userClassLoader; //Operator state中只有两类state:ListState和BroadcastState private final Map<String, PartitionableListState<?>> registeredOperatorStates; private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates; protected DefaultOperatorStateBackendSnapshotStrategy( ClassLoader userClassLoader, Map<String, PartitionableListState<?>> registeredOperatorStates, Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates) { this.userClassLoader = userClassLoader; this.registeredOperatorStates = registeredOperatorStates; this.registeredBroadcastStates = registeredBroadcastStates; } ...... }
在同步准备资源阶段,DefaultOperatorStateBackendSnapshotStrategy
只做了一件事:深拷贝ListState和BroadcastState。深拷贝的目的就是同步创建这个时刻的快照,以保证exactly-once。
@Override public DefaultOperatorStateBackendSnapshotResources syncPrepareResources(long checkpointId) { //存放拷贝后的Operator state final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies = new HashMap<>(registeredOperatorStates.size()); final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies = new HashMap<>(registeredBroadcastStates.size()); ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(userClassLoader); try { //将传递ListState和BroadcastState进行深拷贝,便于后续使用 if (!registeredOperatorStates.isEmpty()) { for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) { PartitionableListState<?> listState = entry.getValue(); if (null != listState) { listState = listState.deepCopy(); } registeredOperatorStatesDeepCopies.put(entry.getKey(), listState); } } //拷贝broad cast state if (!registeredBroadcastStates.isEmpty()) { for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) { BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue(); if (null != broadcastState) { broadcastState = broadcastState.deepCopy(); } registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState); } } } finally { Thread.currentThread().setContextClassLoader(snapshotClassLoader); } return new DefaultOperatorStateBackendSnapshotResources( registeredOperatorStatesDeepCopies, registeredBroadcastStatesDeepCopies); }
深拷贝完Operator state后,asyncSnapshot方法就开始异步写快照数据到CheckpointStreamFactory
了。
@Override public SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot( DefaultOperatorStateBackendSnapshotResources syncPartResource, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) { ...... return (snapshotCloseableRegistry) -> { //创建输出流 CheckpointStreamFactory.CheckpointStateOutputStream localOut = streamFactory.createCheckpointStateOutputStream( CheckpointedStateScope.EXCLUSIVE); snapshotCloseableRegistry.registerCloseable(localOut); ...... //通过OperatorBackendSerializationProxy写快照数据到输出流 DataOutputView dov = new DataOutputViewStreamWrapper(localOut); OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy( operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots); backendSerializationProxy.write(dov); ...... return SnapshotResult.of(retValue); } else { throw new IOException("Stream was already unregistered."); } }; }
你还在原价购买阿里云、腾讯云、华为云、天翼云产品?那就亏大啦!现在申请成为四大品牌云厂商VIP用户,可以3折优惠价购买云服务器等云产品,并且可享四大云服务商产品终身VIP优惠价,还等什么?赶紧点击下面对应链接免费申请VIP客户吧:
相关文章
- Pyhon基础知识之Json序列化与反序列化
- 寻亲32年后找回被拐儿子!全国打拐第一数据库立功,为0-14岁儿童预存DNA信息
- R语言-基础+向量
- mysql脏读、幻读、不可重复读
- R语言随机森林RandomForest、逻辑回归Logisitc预测心脏病数据和可视化分析|附代码数据
- 振弦采集模块监测传感器频率值不稳定
- 终于!SOFATracer 完成了它的链路可视化之旅
- ESXI主机bond下配置
- 数据库概述
- 阿里云数据中台升级品牌新主张,发布智能风控引擎Quick Decision和隐私计算DataTrust
- ABAP 编程语言里的 Reference Semantic - 引用语义
- 81. 使用 SAP ABAP Memory Inspector 对应用程序消耗内存进行检测时常犯的错误
- Redis 实现用户积分和积分排行榜微服务优化
- GaussDB(DWS)数据库的数据迁移实操【玩转PB级数仓GaussDB(DWS)】
- 直观感受PromQL及其数据类型
- 数据框、矩阵和列表20230202
- 正确的做网站搜索——如何避免XAHWW的社死悲剧
- 云数据库RDS MySQL版备份恢复最佳实践
- 程序员必备的数据库知识:数据存储结构
- 生物信息学常见数据格式