Flink(59):Flink之FlinkCDC(下)
目录
0. 相关文章链接
1. FlinkCDC1.x中存在的痛点
2. FlinkCDC2.x的设计目标
3. FlinkCDC2.x的设计实现
3.1. 整体概览
在对于有主键的表做初始化模式,整体的流程主要分为 5 个阶段:
- Chunk 切分;
- Chunk 分配; (实现并行读取数据&CheckPoint)
- Chunk 读取; (实现无锁读取)
- Chunk 汇报;
- Chunk 分配。
3.2. Chunk切分
根据 Netflix DBlog 的论文中的无锁算法原理,对于目标表按照主键进行数据分片,设置每个切片的区间为左闭右开或者左开右闭来保证数据的连续性。
3.3. Chunk分配
将划分好的 Chunk 分发给多个 SourceReader,每个 SourceReader 读取表中的一部分数据, 实现了并行读取的目标。
同时在每个 Chunk 读取的时候可以单独做 CheckPoint,某个 Chunk 读取失败只需要单独执行该 Chunk 的任务,而不需要像 1.x 中失败了只能从头读取。
若每个 SourceReader 保证了数据一致性,则全表就保证了数据一致性。
3.4. Chunk读取
读取可以分为 5 个阶段:
- SourceReader 读取表数据之前先记录当前的 Binlog 位置信息记为低位点;
- SourceReader 将自身区间内的数据查询出来并放置在 buffer 中;
- 查询完成之后记录当前的 Binlog 位置信息记为高位点;
- 在增量部分消费从低位点到高位点的 Binlog;
- 根据主键,对 buffer 中的数据进行修正并输出。
通过以上5个阶段可以保证每个Chunk最终的输出就是在高位点时该Chunk中最新的数据,但是目前只是做到了保证单个 Chunk 中的数据一致性。
3.5. Chunk汇报
在 Snapshot Chunk 读取完成之后,有一个汇报的流程,如上图所示,即 SourceReader 需要将 Snapshot Chunk 完成信息汇报给 SourceEnumerator。
3.6. Chunk分配
FlinkCDC 是支持全量+增量数据同步的,在 SourceEnumerator 接收到所有的 SnapshotChunk 完成信息之后,还有一个消费增量数据(Binlog) 的任务,此时是通过下发 Binlog Chunk给任意一个 SourceReader 进行单并发读取来实现的。
4. FlinkCDC2.x的核心原理分析
4.1. Binlog Chunk 中开始读取位置源码
MySqlHybridSplitAssigner
private MySqlBinlogSplit createBinlogSplit() {
final List<MySqlSnapshotSplit> assignedSnapshotSplit = snapshotSplitAssigner
.getAssignedSplits()
.values()
.stream()
.sorted(Comparator.comparing(MySqlSplit::splitId))
.collect(Collectors.toList());
Map<String, BinlogOffset> splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets();
final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
BinlogOffset minBinlogOffset = BinlogOffset.INITIAL_OFFSET;
for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
// find the min binlog offset
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
if (binlogOffset.compareTo(minBinlogOffset) < 0) {
minBinlogOffset = binlogOffset;
}
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
split.getTableId(),
split.splitId(),
split.getSplitStart(),
split.getSplitEnd(),
binlogOffset));
tableSchemas.putAll(split.getTableSchemas());
}
final MySqlSnapshotSplit lastSnapshotSplit = assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit();
return new MySqlBinlogSplit(
BINLOG_SPLIT_ID,
lastSnapshotSplit.getSplitKeyType(),
minBinlogOffset,
BinlogOffset.NO_STOPPING_OFFSET,
finishedSnapshotSplitInfos,
tableSchemas
);
}
4.2. 读取低位点到高位点之间的Binlog
BinlogSplitReader
/** * Returns the record should emit or not. * * <p>The watermark signal algorithm is the binlog split reader only sends the binlog event * that * belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid * since the offset is after its high watermark. * * <pre> E.g: the data input is : * snapshot-split-0 info : [0, 1024) highWatermark0 * snapshot-split-1 info : [1024, 2048) highWatermark1 * the data output is: * only the binlog event belong to [0, 1024) and offset is after highWatermark0 * should send, * only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should * send. * </pre> */ private boolean shouldEmit(SourceRecord sourceRecord) { if (isDataChangeRecord(sourceRecord)) { TableId tableId = getTableId(sourceRecord); BinlogOffset position = getBinlogPosition(sourceRecord); // aligned, all snapshot splits of the table has reached max highWatermark if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) { return true; } Object[] key = getSplitKey( currentBinlogSplit.getSplitKeyType(), sourceRecord, statefulTaskContext.getSchemaNameAdjuster() ); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) { if (RecordUtils.splitKeyRangeContains( key, splitInfo.getSplitStart(), splitInfo.getSplitEnd()) && position.isAtOrBefore(splitInfo.getHighWatermark())) { return true; } } // not in the monitored splits scope, do not emit return false; } // always send the schema change event and signal event // we need record them to state of Flink return true; }
注:此博客根据某马2020年贺岁视频改编而来 -> B站网址
注:其他相关文章链接由此进 -> Flink文章汇总
注:此博文为介绍FlinkCDC2.x的相关知识,对应FlinkCDC的详细使用可以查看Flink(58):Flink之FlinkCDC(上)
注:此博文中的相关内容和图片截取至网上Flink相关公开内容
相关文章
- mvn创建flink项目
- Flink窗口Window机制详解
- Flink(61):Flink中通用KafkaUtil工具类
- Flink(50):Flink之综合练习(二)
- Flink(23):Flink之Window案例二(基于数量的滚动和滑动窗口)
- Flink(13):Flink之Transformation算子(上)
- Flink(6):Flink安装部署之Flink On Yarn模式
- Hudi(14):Hudi集成Flink之核心参数设置
- 巨鲸任务调度平台:spark flink任务调度
- 一文带你彻底了解大数据处理引擎Flink内存管理
- 用beam实现连接kafka和elasticSearch示例 在flink平台运行
- 大数据Hadoop之——实时计算流计算引擎Flink(Flink环境部署)