如何快速实现 BitSail Connector?
实现 如何 快速 Connector
2023-06-13 09:16:13 时间
1. 目录结构
首先开发者需要通过 git 下载最新代码到本地,并导入到 IDE 中。同时创建自己的工作分支,使用该分支开发自己的 Connector。
项目结构如下:
2. 开发流程
BitSail 是一款基于分布式架构的数据集成引擎,Connector 会并发执行。并由 BitSail 框架来负责任务的调度、并发执行、脏数据处理等,开发者只需要实现对应接口即可,具体开发流程如下:
- 工程配置,开发者需要在 bitsail/bitsail-connectors/pom.xml 模块中注册自己的 Connector,同时在 bitsail/bitsail-dist/pom.xml 增加自己的 Connector 模块,同时为你的连接器注册配置文件,来使得框架可以在运行时动态发现它。
- Connector 开发,实现 Source、Sink 提供的抽象方法,具体细节参考后续介绍。
- 数据输出类型,目前支持的数据类型为 BitSail Row 类型,无论是 Source 在 Reader 中传递给下游的数据类型,还是 Sink 从上游消费的数据类型,都应该是 BitSail Row 类型。
3. Architecture
当前 Source API 的设计同时兼容了流批一批的场景,换言之就是同时支持 pull & push 的场景。在此之前,我们需要首先再过一遍传统流批场景中各组件的交互模型。
3.1 Batch Model
传统批式场景中,数据的读取一般分为如下几步:
- createSplits:一般在 client 端或者中心节点执行,目的是将完整的数据按照指定的规则尽可能拆分为较多的 rangeSplits,createSplits 在作业生命周期内有且执行一次。
- runWithSplit: 一般在执行节点节点执行,执行节点启动后会向中心节点请求存在的 rangeSplit,然后再本地进行执行;执行完成后会再次向中心节点请求直到所有 splits 执行完成。
- commit:全部的 split 的执行完成后,一般会在中心节点执行 commit 的操作,用于将数据对外可见。
3.2 Stream Model
传统流式场景中,数据的读取一般分为如下几步:
- createSplits:一般在 client 端或者中心节点执行,目的是根据滑动窗口或者滚动窗口的策略将数据流划分为 rangeSplits,createSplits 在流式作业的生命周期中按照划分窗口的会一直执行。
- runWithSplit: 一般在执行节点节点执行,中心节点会向可执行节点发送 rangeSplit,然后在可执行节点本地进行执行;执行完成后会将处理完的 splits 数据向下游发送。
- commit:全部的 split 的执行完成后,一般会向目标数据源发送 retract message,实时动态展现结果。
3.3 BitSail Model
- createSplits:BitSail 通过 SplitCoordinator 模块划分 rangeSplits,在流式作业中的生命周期中 createSplits 会周期性执行,而在批式作业中仅仅会执行一次。
- runWithSplit: 在执行节点节点执行,BitSail 中执行节点包括 Reader 和 Writer 模块,中心节点会向可执行节点发送 rangeSplit,然后在可执行节点本地进行执行;执行完成后会将处理完的 splits 数据向下游发送。
- commit:writer 在完成数据写入后,committer 来完成提交。在不开启 checkpoint 时,commit 会在所有 writer 都结束后执行一次;在开启 checkpoint 时,commit 会在每次 checkpoint 的时候都会执行一次。
4. Source Connector
- Source: 数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,不参与作业真正的执行。
- SourceSplit: 数据读取分片;大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的 Split。
- State:作业状态快照,当开启 checkpoint 之后,会保存当前执行状态。
- SplitCoordinator: 既然提到了 Split,就需要有相应的组件去创建、管理 Split;SplitCoordinator 承担了这样的角色。
- SourceReader: 真正负责数据读取的组件,在接收到 Split 后会对其进行数据读取,然后将数据传输给下一个算子。
Source Connector 开发流程如下
- 首先需要创建 Source 类,需要实现 Source 和 ParallelismComputable 接口,主要负责和框架的交互,构架作业,它不参与作业真正的执行。
- BitSail 的 Source 采用流批一体的设计思想,通过 getSourceBoundedness 方法设置作业的处理方式,通过 configure 方法定义 readerConfiguration 的配置,通过 createTypeInfoConverter 方法来进行数据类型转换,可以通过 FileMappingTypeInfoConverter 得到用户在 yaml 文件中自定义的数据源类型和 BitSail 类型的转换,实现自定义化的类型转换。
- 最后,定义数据源的数据分片格式 SourceSplit 类和闯将管理 Split 的角色 SourceSplitCoordinator 类。
- 最后完成 SourceReader 实现从 Split 中进行数据的读取。
- 每个 SourceReader 都在独立的线程中执行,并保证 SourceSplitCoordinator 分配给不同 SourceReader 的切片没有交集。
- 在 SourceReader 的执行周期中,开发者只需要关注如何从构造好的切片中去读取数据,之后完成数据类型对转换,将外部数据类型转换成 BitSail 的 Row 类型传递给下游即可。
4.1 Reader 示例
public class FakeSourceReader extends SimpleSourceReaderBase<Row> {
private final BitSailConfiguration readerConfiguration;
private final TypeInfo<?>[] typeInfos;
private final transient int totalCount;
private final transient RateLimiter fakeGenerateRate;
private final transient AtomicLong counter;
private final FakeRowGenerator fakeRowGenerator;
public FakeSourceReader(BitSailConfiguration readerConfiguration, Context context) {
this.readerConfiguration = readerConfiguration;
this.typeInfos = context.getTypeInfos();
this.totalCount = readerConfiguration.get(FakeReaderOptions.TOTAL_COUNT);
this.fakeGenerateRate = RateLimiter.create(readerConfiguration.get(FakeReaderOptions.RATE));
this.counter = new AtomicLong();
this.fakeRowGenerator = new FakeRowGenerator(readerConfiguration, context.getIndexOfSubtask());
}
@Override
public void pollNext(SourcePipeline<Row> pipeline) throws Exception {
fakeGenerateRate.acquire();
pipeline.output(fakeRowGenerator.fakeOneRecord(typeInfos));
}
@Override
public boolean hasMoreElements() {
return counter.incrementAndGet() <= totalCount;
}
}
相关文章
- 解决方案分享:数商云S2B2C系统如何赋能医药企业实现深度营销数字化
- 拒绝躺平,如何使用AOP的环绕通知实现分布式锁
- IIS 7中如何实现http重定向https
- 词MySQL:如何实现快速名词查询(mysql实例名)
- 实现碳达峰、碳中和,气象变化工作如何定坐标?
- Linux网络连接指南:快速实现安全上网!(linux如何连接网络)
- 查看Redis版本:快速实现(如何查看redis版本)
- MySQL查询技巧:如何实现结果排名(mysql查询排名)
- 深度学习实践:如何使用Tensorflow实现快速风格迁移?
- 红帽linux密码丢失,如何找回?快速实现密码重置的方法!(红帽linux忘记密码)
- Oracle 数据库技巧:如何使用截取函数实现精准月份数据的提取?(oracle截取月份)
- 如何实现MySQL数据库的远程访问?快速学习指南(mysql数据库远程访问)
- 快速实现!Oracle如何提取单个数据(oracle获取一条数据)
- 一键排序:Linux如何实现文件快速排序(sortlinux排序)
- MySQL:如何实现快速更新查询结果(mysql更新查询结果)
- 使用MySQL计算数据的平均值 一步步教你如何实现(mysql平均值)
- Linux系统如何实现快速连接网络(linux系统连接网络)
- 如何在MongoDB数据库中实现数据加密?(mongodb数据库加密)
- VB程序如何实现远程连接Oracle数据库(vb远程连接oracle)
- SQL Server如何快速实现数据分隔(sqlserver 分隔)
- SQL Server中如何实现快速数据写入(sqlserver 写入)
- Redis如何实现对象存储(redis怎么存对象)
- 海量数据Oracle如何实现几亿数据的快速关联(oracle几亿数据关联)
- MySQL数据库如何实现ID重新排序(mysql中id重新排序)
- MySQL 数据匹配如何通过两个表格实现快速匹配和查询(mysql两表格数据匹配)
- 语句如何使用CMD实现MySQL语句执行(cmd怎么执行mysql)
- 单机Redis快速实现分区(单机redis如何分区)
- 功能如何快速取消Redis实现的点赞功能(如何取消redis点赞)
- Redis队列如何实现消息派发(redis队列怎么执行)
- 如何应用C#实现UDP的分包组包