zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Flink 状态管理-快照策略

2023-02-26 12:28:19 时间

快照策略(SnapshotStrategy)

Flink的检查点机制是建立在分布式一致快照之上的,从而实现数据处理的exactly-once处理语义。无论是Keyed state(HeapKeyStateBackend、RocksDBKeyedStateBackend)还是Operator state(DefaultOperatorStateBackend)都会接收快照执行请求(snapshot方法),而具体的快照操作都交由具体的snapshot策略完成。

下面是Flink快照策略UML,可以看到Keyed state中的HeapSnapshotStrategyRocksDBSnapshotStrategyBase分别对应堆内存和RocksDB(RocksDB又细分为全量快照和增量快照)存储后端的快照执行策略,而DefaultOperatorStateBackendSnapshotStrategy对应着Operator state存储后端快照执行策略。
除了Keyed state和Operator state之外,因为savepoint本质也是snapshot的特殊实现,所以对应的savepoint执行策略SavepointSnapshotStrategy也实现了SnapshotStrategy接口。

(福利推荐:阿里云、腾讯云、华为云服务器最新限时优惠活动,云服务器1核2G仅88元/年、2核4G仅698元/3年,点击这里立即抢购>>>

Flink 状态管理-快照策略

下面是SnapshotStrategy接口定义,其中定义了执行快照的所需步骤:

  1. 同步执行部分,用于生成执行快照所需的资源,为下一步写入快照数据做好资源准备。
  2. 异步执行部分,将快照数据写入到提供的CheckpointStreamFactory中。
public interface SnapshotStrategy<S extends StateObject, SR extends SnapshotResources> {     //同步执行生成快照的部分,可以理解为为执行快照准备必要的资源。     SR syncPrepareResources(long checkpointId) throws Exception;     //异步执行快照写入部分,快照数据写入到CheckpointFactory     SnapshotResultSupplier<S> asyncSnapshot(             SR syncPartResource,             long checkpointId,             long timestamp,             @Nonnull CheckpointStreamFactory streamFactory,             @Nonnull CheckpointOptions checkpointOptions);      //用于执行异步快照部分的Supplier     @FunctionalInterface     interface SnapshotResultSupplier<S extends StateObject> {         //Performs the asynchronous part of a checkpoint and returns the snapshot result.         SnapshotResult<S> get(CloseableRegistry snapshotCloseableRegistry) throws Exception;     } }

下面是SnapshotResources所对应的UML图:

  • 全量快照FullSnapshotResources下分别对应着堆内存快照资源HeapSnapshotResources以及RocksDB全量快照资源实现类RocksDBFullSnapshotResources
  • RocksDB增量快照资源实现类IncrementalRocksDBSnapshotResoruces
  • Operator state快照资源实现类DefaultOperatorStateBackendSnapshotResources

Flink 状态管理-快照策略

SnapshotResources接口定义如下,只有一个release方法定义,用于在异步Snapshot执行完成后清空资源。

@Internal public interface SnapshotResources {     /** Cleans up the resources after the asynchronous part is done. */     void release(); }

关于具体资源实现类我们在对应的快照策略中来查看。

堆内存快照策略(HeapSnasphotStrategy)

在看堆内存快照策略之前,我们先看下堆内存执行快照所对应的资源类HeapSnapshotResources。通过上面的UML我们可以看到堆内存快照和RocksDB全量快照都实现了FullSnapshotResources,这也说明了堆内存存储后端不存在增量快照的实现。

FullSnapshotResources定义了与具体存储后端无关的全量执行全量快照资源,它们都是通过FullSnapshotAsyncWriter来写快照数据。

FullSnapshotResources接口定义如下,其中泛型K代表了具体存储key的数据类型。

public interface FullSnapshotResources<K> extends SnapshotResources {      //返回此状态快照的元数据列表,StateMetaInfoSnapshot记录每个状态对应快照元数据信息,比如state name、    backend 类型、序列化器等。     List<StateMetaInfoSnapshot> getMetaInfoSnapshots();          //创建用于遍历当前快照的迭代器     KeyValueStateIterator createKVStateIterator() throws IOException;          //当前快照对应的KeyGroupRange     KeyGroupRange getKeyGroupRange();      /** Returns key [email protected] TypeSerializer}. */     TypeSerializer<K> getKeySerializer();      /** Returns the [email protected] StreamCompressionDecorator} that should be used for writing. */     StreamCompressionDecorator getStreamCompressionDecorator(); } 

下面我们看下HeapSnapshotStrategy中的两个核心方法syncPrepareResourcesasyncSnapshot

class HeapSnapshotStrategy<K>         implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotResources<K>> {     ...     //准备snapshot资源HeapSnapshotResources     @Override     public HeapSnapshotResources<K> syncPrepareResources(long checkpointId) {         return HeapSnapshotResources.create(                 registeredKVStates,                 registeredPQStates,                 keyGroupCompressionDecorator,                 keyGroupRange,                 getKeySerializer(),                 totalKeyGroups);     }      @Override     public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(             HeapSnapshotResources<K> syncPartResource,             long checkpointId,             long timestamp,             @Nonnull CheckpointStreamFactory streamFactory,             @Nonnull CheckpointOptions checkpointOptions) {             ......         //SupplierWithException是Java Supplier可能抛出异常的函数接口,第一个泛型参数是supplier执行返回类型,第二个参数为Supplier中函数抛出的异常         final SupplierWithException<CheckpointStreamWithResultProvider, Exception>                 checkpointStreamSupplier =                         localRecoveryConfig.isLocalRecoveryEnabled() //是否使用本地恢复                                         && !checkpointOptions.getCheckpointType().isSavepoint()                                 ? () ->                                         createDuplicatingStream( //本地恢复并且当前不是savepoint,创建复制流                                                 checkpointId,                                                 CheckpointedStateScope.EXCLUSIVE,                                                 streamFactory,                                                 localRecoveryConfig                                                         .getLocalStateDirectoryProvider())                                 : () ->                                         createSimpleStream(//非本地恢复,或者是savepoint,创建简单流                                                 CheckpointedStateScope.EXCLUSIVE, streamFactory);          return (snapshotCloseableRegistry) -> {             ......             //输出数据流             final CheckpointStreamFactory.CheckpointStateOutputStream localStream =                     streamWithResultProvider.getCheckpointOutputStream();             ////使用KeyedBackendSerializationProxy写cp数据             final DataOutputViewStreamWrapper outView =                     new DataOutputViewStreamWrapper(localStream);             serializationProxy.write(outView);            ......         };     } }

上面asyncSnapshot方法通过CheckpointStreamWithResultProvider来创建快照输出流。该类核心就是封装了获取输出流,如果没有配置本地状态恢复,只会创建一个输出流来讲snapshot数据写入到job所配置的Checkpoint存储。如果配置了本地恢复,就需要将状态数据写本地了(本地数据恢复),所以对于这种情况会获取两个输出流,一个用于写配置的Checkpoint存储,一个用于写本地。

public interface CheckpointStreamWithResultProvider extends Closeable {     //关闭输出流,并返回带有流句柄的快照结果     @Nonnull     SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException;      //返回snapshot输出流     @Nonnull     CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream();      @Override     default void close() throws IOException {         getCheckpointOutputStream().close();     }     ... }

CheckpointStreamWithResultProvider的两个内部实现类也就分别对应了创建simple流(PrimaryStreamOnly,只会创建一个输出流, 这个流是我们配置checkpoint存储的写入地方,可能是远端HDFS、JobManager等),和创建duplicating流(PrimaryAndSecondaryStream,两个输出流,第一个流和PrimaryStreamOnly一样;第二个输出流用于写入到本地、TaskManager等,用于本地恢复)。

Flink 状态管理-快照策略

创建simple stream,下面可以看到只会创建一个primary stream。

static CheckpointStreamWithResultProvider createSimpleStream(             @Nonnull CheckpointedStateScope checkpointedStateScope,             @Nonnull CheckpointStreamFactory primaryStreamFactory)             throws IOException {         //创建主输出流         CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =                 primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);         return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);     }

创建duplicating stream,可以看到除了一个primary stream外,还会创建写文件的second stream。

@Nonnull     static CheckpointStreamWithResultProvider createDuplicatingStream(             @Nonnegative long checkpointId,             @Nonnull CheckpointedStateScope checkpointedStateScope,             @Nonnull CheckpointStreamFactory primaryStreamFactory,             @Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider)             throws IOException {          CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =                 primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);          try {             //cp数据写出路径             File outFile =                     new File(                             secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(                                     checkpointId),                             String.valueOf(UUID.randomUUID()));             Path outPath = new Path(outFile.toURI());              //构建写入文件的输出流             CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =                     new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);              return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(                     primaryOut, secondaryOut);         } catch (IOException secondaryEx) {             LOG.warn(                     "Exception when opening secondary/local checkpoint output stream. "                             + "Continue only with the primary stream.",                     secondaryEx);         }          return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);     }

上面CheckpointStreamFactory创建输出流,该输出流用于将Checkpoint数据写入到外部,比如通过FsCheckpoihntStreamFactory将检查点数据写到外部文件系统。

Flink 状态管理-快照策略

public interface CheckpointStreamFactory {        //创建一个新的状态输出流,CheckpointStateOutputStream为当前CheckpointStreamFactory内部静态抽象类     CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope)             throws IOException;      //CheckpointStateOutputStream基类,相关实现都在CheckpointStreamFactory的子类     abstract class CheckpointStateOutputStream extends FSDataOutputStream {          //关闭数据流并获取句柄         @Nullable         public abstract StreamStateHandle closeAndGetHandle() throws IOException;          //关闭数据流         @Override         public abstract void close() throws IOException;     } }

RocksDB快照存储策略

上面的UML我们可以知道RocksDB快照存储策略主要对应三个核心类,抽象类RocksDBSnapshotStrategyBase、全量快照策略RocksDBFullSnapshotStrategy和增量快照策略RocksDBIncrementalSnapshotStrategy
RocksDBSnapshotStrategyBase定义了一些RocksDB、state相关的成员变量,具体实现都在相关子类中。

全量快照

全量快照RocksDBFullSnapshotStrategy用于创建RocksDBKeyedStateBackend的全量快照,每次Checkpoint会将全量状态数据同步到远端(JobManager或HDFS)。

下面我们同样看下核心方法:asyncPrepareResources和asyncSnapshot。

public class RocksFullSnapshotStrategy<K>         extends RocksDBSnapshotStrategyBase<K, FullSnapshotResources<K>> {     ......          @Override     public FullSnapshotResources<K> syncPrepareResources(long checkpointId) throws Exception {         //构建RocksDB全量快照资源类,RocksDBFullSnapshotResources和HeapFullSnapshotResources相比,包含了         //RocksDB 实例和快照Snapshot         return RocksDBFullSnapshotResources.create(                 kvStateInformation,                 registeredPQStates,                 db,                 rocksDBResourceGuard,                 keyGroupRange,                 keySerializer,                 keyGroupPrefixBytes,                 keyGroupCompressionDecorator);     }      @Override     public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(             FullSnapshotResources<K> fullRocksDBSnapshotResources,             long checkpointId,             long timestamp,             @Nonnull CheckpointStreamFactory checkpointStreamFactory,             @Nonnull CheckpointOptions checkpointOptions) {          if (fullRocksDBSnapshotResources.getMetaInfoSnapshots().isEmpty()) {             if (LOG.isDebugEnabled()) {                 LOG.debug(                         "Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",                         timestamp);             }             return registry -> SnapshotResult.empty();         }          //createCheckpointStreamSupplier和Heap中一样,根据是否启动本地恢复,创建Duplicating和simple stream         final SupplierWithException<CheckpointStreamWithResultProvider, Exception>                 checkpointStreamSupplier =                         createCheckpointStreamSupplier(                                 checkpointId, checkpointStreamFactory, checkpointOptions);          //创建全量异步Writer         return new FullSnapshotAsyncWriter<>(                 checkpointOptions.getCheckpointType(),                 checkpointStreamSupplier,                 fullRocksDBSnapshotResources);     } ...... }

FullSnapshotAsyncWriter也是一个Supplier,用于异步写全量快照数据到给定的输出流中。

public class FullSnapshotAsyncWriter<K>         implements SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> {         @Override     public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry)             throws Exception {         ......         //获取输出流         final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider =                 checkpointStreamSupplier.get();          snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);         //写快照数据到输出流中         writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets);         ......     }          private void writeSnapshotToOutputStream(             @Nonnull CheckpointStreamWithResultProvider checkpointStreamWithResultProvider,             @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets)             throws IOException, InterruptedException {         //通过输出视图将快照数据写入到指定输出流中,注意 checkpointStreamWithResultProvider可能写两份数据         final DataOutputView outputView =                 new DataOutputViewStreamWrapper(                         checkpointStreamWithResultProvider.getCheckpointOutputStream());         //写元数据         writeKVStateMetaData(outputView);         //为每个state实例写状态数据         try (KeyValueStateIterator kvStateIterator = snapshotResources.createKVStateIterator()) {             writeKVStateData(                     kvStateIterator, checkpointStreamWithResultProvider, keyGroupRangeOffsets);         }     } }

下面我们看下最关键的writeKVStateData,到底是怎么将全量数据写到外部的。我们抛开繁杂的细节,就看这里怎么写的。可以看到实际就是迭代KeyValueStateIterator

private void writeKVStateData(             final KeyValueStateIterator mergeIterator,             final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider,             final KeyGroupRangeOffsets keyGroupRangeOffsets)             throws IOException, InterruptedException {         ......         try {            ......             //就是遍历KeyValueStateIterator迭代器             // main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking             // key-group offsets.             while (mergeIterator.isValid()) {                 ......                 writeKeyValuePair(previousKey, previousValue, kgOutView);                 ......                 // request next k/v pair                 previousKey = mergeIterator.key();                 previousValue = mergeIterator.value();                 mergeIterator.next();             }             ......         } finally {             // this will just close the outer stream             IOUtils.closeQuietly(kgOutStream);         }     }

KeyValueStateIterator就是记录了当前快照的所有key-value实体,RocksDB和Heap分别有各自的迭代器实现。
Flink 状态管理-快照策略

我们看下RocksStatesPerKeyGroupMergeIterator是如何创建的。我们在上面看FullSnapshotResources接口时看到了抽象方法createKVStateIterator定义,该方法就是专门用于创建迭代器的。HeapSnapshotResourcesRocksDBFullSnapshotResources分别实现了该方法来创建Heap和RocksDB迭代器。下面是RocksDBFullSnapshotResources.createKVStateIterator实现。

@Override     public KeyValueStateIterator createKVStateIterator() throws IOException {         ......         try {             //创建RocksDB ReadOptions,设置读取上面的RocksDB snapshot,该snapshot是在Checkpoint同步阶段生成的             ReadOptions readOptions = new ReadOptions();             closeableRegistry.registerCloseable(readOptions::close);             readOptions.setSnapshot(snapshot);              //RocksDBIteratorWrapper是对RocksDBIterator的一层包装             List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =                     createKVStateIterators(closeableRegistry, readOptions);            .......          //RocksStatesPerKeyGroupMergeIterator实际是将多个state实例(ColumnFamily)的迭代器包成一个迭代器             return new RocksStatesPerKeyGroupMergeIterator(                     closeableRegistry,                     kvStateIterators,                     heapPriorityQueueIterators,                     keyGroupPrefixBytes);         } catch (Throwable t) {             IOUtils.closeQuietly(closeableRegistry);             throw new IOException("Error creating merge iterator", t);         }     } private List<Tuple2<RocksIteratorWrapper, Integer>> createKVStateIterators(             CloseableRegistry closeableRegistry, ReadOptions readOptions) throws IOException {         final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =                 new ArrayList<>(metaData.size());         int kvStateId = 0;         //每个state,也就是每个RocksDB的ColumnFamily都会创建一个迭代器         for (MetaData metaDataEntry : metaData) {             RocksIteratorWrapper rocksIteratorWrapper =                     createRocksIteratorWrapper(                             db,                             metaDataEntry.rocksDbKvStateInfo.columnFamilyHandle,                             metaDataEntry.stateSnapshotTransformer,                             readOptions);             kvStateIterators.add(Tuple2.of(rocksIteratorWrapper, kvStateId));             closeableRegistry.registerCloseable(rocksIteratorWrapper);             ++kvStateId;         }         return kvStateIterators;     }      private static RocksIteratorWrapper createRocksIteratorWrapper(             RocksDB db,             ColumnFamilyHandle columnFamilyHandle,             StateSnapshotTransformer<byte[]> stateSnapshotTransformer,             ReadOptions readOptions) {         //创建RocksDB Iterator,被包在了Flink定义的RocksDBIteratorWrapper中         RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);         return stateSnapshotTransformer == null                 ? new RocksIteratorWrapper(rocksIterator)                 : new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer);     }

上面代码可以看到这里的迭代器其实本质还是RocksDB自己的迭代器(指定了读取的snapshot),Flink将其包在了RocksDBIteratorWrapper中(为什么需要包一层可以查看RocksDB自身官网Iterator异常处理)。因为可能有多个state实例,每个实例都有自己的一个迭代器,最后Flink将这些迭代器封装到一个迭代器中,即RocksStatetsPerKeyGroupMergeIterator

增量快照

RocksIncrementalSnapshotStrategyRocksDBKeyedStateBackend增量快照策略,它是基于RocksDB的native Checkpoint来实现增量快照的。

我们在看RocksIncrementalSnapshotStrategy的syncPrepareResources和asyncSnapshot前,先看下RocksDB增量快照会用到的一些关键成员变量。

//RocksDB增量快照资源信息为内部类IncrementalRocksDBSnapshotResources public class RocksIncrementalSnapshotStrategy<K>         extends RocksDBSnapshotStrategyBase<                 K, RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources> {      //RocksDB实例目录     @Nonnull private final File instanceBasePath;      /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */     @Nonnull private final UUID backendUID;           //记录了checkpoint id和当前checkpoint sst文件映射关系     @Nonnull private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;      //最后一次完成的Checkpoint ID     private long lastCompletedCheckpointId;      //用于上传快照文件(RocksDB checkpoint生成的sst文件等)     private final RocksDBStateUploader stateUploader;     ... }

下面我们再看下同步资源准备阶段,主要做了两件事:

  1. 获取最近一次Checkpoint生成的sst文件,也就是通过materializedSstFiles获取。用于增量文件对比。
  2. 创建RocksDB Checkpoint。
@Override     public IncrementalRocksDBSnapshotResources syncPrepareResources(long checkpointId)             throws Exception {          //目录准备,如果开启本地恢复,则创建永久目录,否则创建临时目录         final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);         LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);                  final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =                 new ArrayList<>(kvStateInformation.size());         //最近一次完成的Checkpoint 所生成的sst文件,用于增量对比         final Set<StateHandleID> baseSstFiles =                 snapshotMetaData(checkpointId, stateMetaInfoSnapshots);         //创建RocksDB 检查点         takeDBNativeCheckpoint(snapshotDirectory);          return new IncrementalRocksDBSnapshotResources(                 snapshotDirectory, baseSstFiles, stateMetaInfoSnapshots);     }

takeDBNativeCheckpoint就是同步创建RocksDB的Checkpoint,Checkpoint数据会在指定目录生成(sst文件、misc文件)。

private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory)             throws Exception {         try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();                 Checkpoint checkpoint = Checkpoint.create(db)) {             checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());         } catch (Exception ex) {             ......         }     } 

asyncSnapshot内部很简单,主要创建RocksDBIncrementalSnapshotOperation Supplier来创建增量快照。

@Override     public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(             IncrementalRocksDBSnapshotResources snapshotResources,             long checkpointId,             long timestamp,             @Nonnull CheckpointStreamFactory checkpointStreamFactory,             @Nonnull CheckpointOptions checkpointOptions) {         ...         return new RocksDBIncrementalSnapshotOperation(                 checkpointId,                 checkpointStreamFactory,                 snapshotResources.snapshotDirectory, //RocksDB Checkpoint生成目录                 snapshotResources.baseSstFiles, //上次Cp完成的sst文件                 snapshotResources.stateMetaInfoSnapshots);     } 

下面我们看下增量快照实现的核心RocksDBIncrementalSnapshotOperation

private final class RocksDBIncrementalSnapshotOperation             implements SnapshotResultSupplier<KeyedStateHandle> {     ...          @Override      public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry)              throws Exception {             ...             // 当前RocksDB checkpoint生成的sst文件             final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();             // 当前RocksDB Checkpoint的misc files(元数据文件)             final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();             ......             //上传增量sst文件和misc 文件,uploadSstFiles方法内部获取遍历RocksDB Checkpoint目录比较新增sst文件             uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);             //塞入当前Checkpoint对应sst文件             synchronized (materializedSstFiles) {                     materializedSstFiles.put(checkpointId, sstFiles.keySet());                 }             ......     }     }

我们再看下上面的uploadSstFiles方法实现:

 private void uploadSstFiles(                 @Nonnull Map<StateHandleID, StreamStateHandle> sstFiles,                 @Nonnull Map<StateHandleID, StreamStateHandle> miscFiles,                 @Nonnull CloseableRegistry snapshotCloseableRegistry)                 throws Exception {             //增量sst本地文件路径             Map<StateHandleID, Path> sstFilePaths = new HashMap<>();             //misc文件路径             Map<StateHandleID, Path> miscFilePaths = new HashMap<>();             //当前RocksDB Checkpoint目录             Path[] files = localBackupDirectory.listDirectory();             if (files != null) {                 //查找增量文件                 createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);                 //使用stateUploader上传增量sst文件                 sstFiles.putAll(                         stateUploader.uploadFilesToCheckpointFs(                                 sstFilePaths, checkpointStreamFactory, snapshotCloseableRegistry));                 //上传misc文件                 miscFiles.putAll(                         stateUploader.uploadFilesToCheckpointFs(                                 miscFilePaths, checkpointStreamFactory, snapshotCloseableRegistry));             }         }

上面createUploadFilesPaths方法用于对比查找增量sst文件,并生成要被上传的sst文件和misc文件。

private void createUploadFilePaths(                 Path[] files,                 Map<StateHandleID, StreamStateHandle> sstFiles,                 Map<StateHandleID, Path> sstFilePaths,                 Map<StateHandleID, Path> miscFilePaths) {             for (Path filePath : files) {                 final String fileName = filePath.getFileName().toString();                 //文件句柄                 final StateHandleID stateHandleID = new StateHandleID(fileName);                 //sst文件和最后一次Cp sst文件对比,查找增量                 if (fileName.endsWith(SST_FILE_SUFFIX)) {                     final boolean existsAlready =                             baseSstFiles != null && baseSstFiles.contains(stateHandleID);                      if (existsAlready) {                         //对于之前已经存在的sst文件,只使用一个占位符说明之前上传过的,文件在共享目录                         sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());                     } else {                         //新增文件,将要被上传的                         sstFilePaths.put(stateHandleID, filePath);                     }                 } else {                     //misc文件全部上传                     miscFilePaths.put(stateHandleID, filePath);                 }             }         }

可以看到增量快照的实现逻辑就是:

  1. 通过RocksDB的Checkpoint生成当前快照的sst文件(由于LSM特性,sst文件是不可变的).
  2. Flink每次记录当前Checkpoint id和其快照sst文件的映射关系。
  3. 上传当前Checkpoint对应的sst文件和misc文件。
  4. 之后的Checkpoint中如果还有之前的sst文件,那这些文件就不需要在上传到HDFS了。

可以看到Flink的增量Checkpoint就是巧妙利用了LSM 中sst文件是递增不变的特性。

Operator state快照策略

Operator state的快照策略只有一个,即DefaultOperatorStateBackendSnapshotStrategy,它将Operator state中的ListState和BroadcastState的快照数据写出到快照存储端。

class DefaultOperatorStateBackendSnapshotStrategy         implements SnapshotStrategy<                 OperatorStateHandle,                 DefaultOperatorStateBackendSnapshotStrategy                         .DefaultOperatorStateBackendSnapshotResources> {     private final ClassLoader userClassLoader;     //Operator state中只有两类state:ListState和BroadcastState     private final Map<String, PartitionableListState<?>> registeredOperatorStates;     private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;      protected DefaultOperatorStateBackendSnapshotStrategy(             ClassLoader userClassLoader,             Map<String, PartitionableListState<?>> registeredOperatorStates,             Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates) {         this.userClassLoader = userClassLoader;         this.registeredOperatorStates = registeredOperatorStates;         this.registeredBroadcastStates = registeredBroadcastStates;     }     ...... }

在同步准备资源阶段,DefaultOperatorStateBackendSnapshotStrategy只做了一件事:深拷贝ListState和BroadcastState。深拷贝的目的就是同步创建这个时刻的快照,以保证exactly-once。

@Override     public DefaultOperatorStateBackendSnapshotResources syncPrepareResources(long checkpointId) {                  //存放拷贝后的Operator state         final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =                 new HashMap<>(registeredOperatorStates.size());         final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =                 new HashMap<>(registeredBroadcastStates.size());          ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();         Thread.currentThread().setContextClassLoader(userClassLoader);         try {             //将传递ListState和BroadcastState进行深拷贝,便于后续使用             if (!registeredOperatorStates.isEmpty()) {                 for (Map.Entry<String, PartitionableListState<?>> entry :                         registeredOperatorStates.entrySet()) {                     PartitionableListState<?> listState = entry.getValue();                     if (null != listState) {                         listState = listState.deepCopy();                     }                     registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);                 }             }             //拷贝broad cast state             if (!registeredBroadcastStates.isEmpty()) {                 for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :                         registeredBroadcastStates.entrySet()) {                     BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();                     if (null != broadcastState) {                         broadcastState = broadcastState.deepCopy();                     }                     registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);                 }             }         } finally {             Thread.currentThread().setContextClassLoader(snapshotClassLoader);         }          return new DefaultOperatorStateBackendSnapshotResources(                 registeredOperatorStatesDeepCopies, registeredBroadcastStatesDeepCopies);     }

深拷贝完Operator state后,asyncSnapshot方法就开始异步写快照数据到CheckpointStreamFactory了。

@Override     public SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(             DefaultOperatorStateBackendSnapshotResources syncPartResource,             long checkpointId,             long timestamp,             @Nonnull CheckpointStreamFactory streamFactory,             @Nonnull CheckpointOptions checkpointOptions) {         ......         return (snapshotCloseableRegistry) -> {             //创建输出流             CheckpointStreamFactory.CheckpointStateOutputStream localOut =                     streamFactory.createCheckpointStateOutputStream(                             CheckpointedStateScope.EXCLUSIVE);             snapshotCloseableRegistry.registerCloseable(localOut);             ......              //通过OperatorBackendSerializationProxy写快照数据到输出流             DataOutputView dov = new DataOutputViewStreamWrapper(localOut);             OperatorBackendSerializationProxy backendSerializationProxy =                     new OperatorBackendSerializationProxy(                             operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);             backendSerializationProxy.write(dov);              ......                 return SnapshotResult.of(retValue);             } else {                 throw new IOException("Stream was already unregistered.");             }         };     }

Flink 状态管理-快照策略


本站部分内容转载自网络,版权属于原作者所有,如有异议请联系QQ153890879修改或删除,谢谢!
转载请注明原文链接:Flink 状态管理-快照策略

你还在原价购买阿里云、腾讯云、华为云、天翼云产品?那就亏大啦!现在申请成为四大品牌云厂商VIP用户,可以3折优惠价购买云服务器等云产品,并且可享四大云服务商产品终身VIP优惠价,还等什么?赶紧点击下面对应链接免费申请VIP客户吧:

1、点击这里立即申请成为腾讯云VIP客户

2、点击这里立即注册成为天翼云VIP客户

3、点击这里立即申请成为华为云VIP客户

4、点击这里立享阿里云产品终身VIP优惠价

喜欢 (0)
[[email protected]]
分享 (0)