Flink中DataStream和Table互相转换
2023-02-25 18:17:32 时间
前言
Flink 为处理一列转多列的场景提供了两种返回类型 Tuple 和 Row
- Tuple 只支持1~25个字段,且不能为null,不支持拓展
- Row 支持null同时也无限制字段数,但如果需要使用Row,必须重载实现
getResultType
方法
DataStream=>Table
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.List;
public class TCS002 {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
List<Row> list = new ArrayList<Row>();
list.add(Row.of("张三",18,"男"));
list.add(Row.of("xiaohong",16,"女"));
RowTypeInfo rowTypeInfo = getRowTypeInfo(list.get(0));
DataStream<Row> ds = env.fromCollection(list,rowTypeInfo);
tabEnv.registerDataStream("table01",ds);
tabEnv.from("table01").execute().print();
}
private static RowTypeInfo getRowTypeInfo(Row row) {
TypeInformation[] types = new TypeInformation[row.getArity()];
String[] fieldNames = new String[row.getArity()];
for (int i = 0; i < row.getArity(); i++) {
Object field = row.getField(i);
if(field instanceof Integer){
types[i] = BasicTypeInfo.INT_TYPE_INFO;
}else{
types[i] = BasicTypeInfo.STRING_TYPE_INFO;
}
fieldNames[i] = "f"+i;
}
return new RowTypeInfo(types, fieldNames);
}
}
Table=>DataStream
DataStream<Row> ds02 = tabEnv.toAppendStream(tb01, rowTypeInfo);
相关文章
- Jgit的使用笔记
- 利用Github Action实现Tornadofx/JavaFx打包
- 叹息!GitHub Trending 即将成为历史!
- 微软软了?开源社区讨论炸锅,GitHub CEO 亲自来答
- GitHub Trending 列表频现重复项,前后端都没去重?
- Photoshop Elements 2021版本软件安装教程(mac+windows全版本都有)
- (ps全版本)Photoshop 2020的安装与破解教程(mac+windows全版本都有)
- (ps全版本)Photoshop cc2018的安装与破解教程(mac+windows全版本,包括2023
- 环境搭建:Oracle GoldenGate 大数据迁移到 Redshift/Flat file/Flume/Kafka测试流程
- 每个开发人员都要掌握的:最小 Linux 基础课
- 来撸羊毛了!Windows 环境下 Hexo 博客搭建,并部署到 GitHub Pages
- 超实用!手把手入门 MongoDB:这些坑点请一定远离
- 【GitHub日报】22-10-09 zustand、neovim、webtorrent、express 等4款App今日上新
- 【GitHub日报】22-10-10 brew、minio、vite、seaweedfs、dbeaver 等8款App今日上新
- 【GitHub日报】22-10-11 cobra、grafana、vue、ToolJet、redwood 等13款App今日上新
- Photoshop 2018 下载及安装教程(mac+windows全版本都有,包括最新的2023)
- Photoshop 2017 下载及安装教程(mac+windows全版本都有,包括最新的2023)
- Photoshop 2020 下载及安装教程(mac+windows全版本都有,包括最新的2023)
- Photoshop 2023 资源免费下载(mac+windows全版本都有,包括最新的2023)
- 最新版本Photoshop CC2018软件安装教程(mac+windows全版本都有,包括2023