湖仓一体电商项目(二十四):合并Iceberg小文件
合并Iceberg小文件
Iceberg表每次commit都会生成一个parquet数据文件,有可能一张Iceberg表对应的数据文件非常多,那么我们通过Java Api 方式对Iceberg表可以进行数据文件合并,数据文件合并之后,会生成新的Snapshot且原有Snap快照数据并不会被删除,如果要删除对应的数据文件需要通过“Expire Snapshots来实现”。
编辑我们可以通过Java Api 删除历史快照Snap-*.avro,可以通过指定时间戳,当前时间戳之前的所有快照都会被删除,如果指定时间比最后一个快照时间还大,会保留最新快照数据。
在删除快照时,数据data目录中过期的数据parquet文件也会被删除(例如:快照回滚后不再需要的文件),到底哪些parquet文件数据被删除决定于最后的“snap-xx.avro”中对应的manifest list数据对应的parquet数据。随着不断删除snapshot,在Iceberg表不再有manifest文件对应的parquet文件也会被删除。
每次Commit生成对应的Snapshot之外,还会有一份元数据文件“vX-metadata.json”文件产生,我们可以在创建Iceberg表时执行对应的属性决定Iceberg表保留几个元数据文件,属性如下:
Property | Description |
---|---|
write.metadata.delete-after-commit.enabled | 每次表提交后是否删除旧的元数据文件 |
write.metadata.previous-versions-max | 要保留旧的元数据文件数量 |
例如,在Spark中创建表 test ,指定以上两个属性,建表语句如下:
CREATE TABLE ${CataLog名称}.${库名}.${表名} (
id bigint,
name string
) using iceberg
PARTITIONED BY (
loc string
) TBLPROPERTIES (
'write.metadata.delete-after-commit.enabled'= true,
'write.metadata.previous-versions-max' = 3
)
此项目中我们可以定期执行如下代码来删除Iceberg中过多的快照文件和数据文件,代码如下:
object CombinSnapAndRemoveOldSnap {
def main(args: Array[String]): Unit = {
val conf = new Configuration()
val catalog = new HadoopCatalog(conf,"hdfs://mycluster/lakehousedata")
/**
* 1.准备Iceberg表
*/
val table1: Table = catalog.loadTable(TableIdentifier.of("icebergdb","ODS_BROWSELOG"))
val table2: Table = catalog.loadTable(TableIdentifier.of("icebergdb","ODS_MEMBER_ADDRESS"))
val table3: Table = catalog.loadTable(TableIdentifier.of("icebergdb","ODS_MEMBER_INFO"))
val table4: Table = catalog.loadTable(TableIdentifier.of("icebergdb","ODS_PRODUCT_CATEGORY"))
val table5: Table = catalog.loadTable(TableIdentifier.of("icebergdb","ODS_PRODUCT_INFO"))
val table6: Table = catalog.loadTable(TableIdentifier.of("icebergdb","ODS_USER_LOGIN"))
val table7: Table = catalog.loadTable(TableIdentifier.of("icebergdb","DWD_BROWSELOG"))
val table8: Table = catalog.loadTable(TableIdentifier.of("icebergdb","DWD_USER_LOGIN"))
val table9: Table = catalog.loadTable(TableIdentifier.of("icebergdb","DWS_BROWSE_INFO"))
val table10: Table = catalog.loadTable(TableIdentifier.of("icebergdb","DWS_USER_LOGIN"))
/**
* 2.合并小文件数据,Iceberg合并小文件时并不会删除被合并的文件,Compact是将小文件合并成大文件并创建新的Snapshot。
* 如果要删除文件需要通过Expire Snapshots来实现,targetSizeInBytes 指定合并后的每个文件大小
*/
Actions.forTable(table1).rewriteDataFiles().execute()
Actions.forTable(table2).rewriteDataFiles().execute()
Actions.forTable(table3).rewriteDataFiles().execute()
Actions.forTable(table4).rewriteDataFiles().execute()
Actions.forTable(table5).rewriteDataFiles().execute()
Actions.forTable(table6).rewriteDataFiles().execute()
Actions.forTable(table7).rewriteDataFiles().execute()
Actions.forTable(table8).rewriteDataFiles().execute()
Actions.forTable(table9).rewriteDataFiles().execute()
Actions.forTable(table10).rewriteDataFiles().execute()
/**
* 3.删除历史快照,历史快照是通过ExpireSnapshot来实现的,设置需要删除多久的历史快照 snap-*.avro文件
*/
table1.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
table2.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
table3.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
table4.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
table5.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
table6.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
table7.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
table8.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
table9.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
table10.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
}
}
相关文章
- kkFileView在线文件预览项目学习&搭建
- 【Linux-vim】Linux 修改文件内容
- jar命令更新SpringBoot项目jar包里的补丁文件
- 使用IDM从Google 云端硬盘链接上下载超大文件
- ASP.NET WEB——项目创建与文件上传操作
- 银行应用系统日志文件敏感信息脱敏处理方法
- Node 项目通过 .npmrc 文件指定依赖安装源
- compile 时只保存项目内的文件
- 【实用的开源项目】使用服务器部署Sharry:真的很好用的文件分享程序!
- 【好玩儿的Docker项目】Pingvin Share——一个专注于文件分享的高颜值轻量小工具!
- 如何在 C# 项目中链接一个文件夹下的所有文件
- Gradle 项目的生命周期和settings文件
- 【iOS 开发】iOS 开发 简介 (IOS项目文件 | MVC 模式 | 事件响应机制 | Storyboard 控制界面 | 代码控制界面 | Retina 屏幕图片适配)
- 【Android 安全】DEX 加密 ( 代理 Application 开发 | 项目中配置 OpenSSL 开源库 | 使用 OpenSSL 开源库解密 dex 文件 )
- 记一次授权项目中绕过某服WAF成功文件上传
- 重启eclips后启动项目出现监听文件找不到详解程序员
- iOS项目生成静态库文件(.a)详解手机开发
- Eclipse将引用了第三方jar包的Java项目打包成jar文件的两种方法详解编程语言
- MyEclipse项目名有红叉,但文件中没有红叉的解决方法详解编程语言
- bbs项目富文本编辑器实现上传文件到media目录详解编程语言
- [开源项目] 系统或软件找不到特定的dll文件?试试批量安装VC运行库
- 【Linux重建分区:挽救你的文件】(linux重建分区)
- Linux下清空文件内容的方法(linux文件内容清空)
- 使用Linux Tailq命令快速查看日志文件末尾信息(linuxtailq)
- 深入分析MySQL Dump文件及其用法(mysqldump文件)
- Linux文件传输:简单易行的文件拷贝方法(linux之间拷贝文件)
- 输入Linux重定向文件输入操作详细指南(linux重定向文件)
- MySQL 扩展文件知多少?(mysql 文件扩展)
- AspAccess创建静态文件/HTML
- firefox中JS读取XML文件