Flink教程(10) 元组Tuple POJO用KeySelector 多个字段keyBy
2023-09-27 14:26:50 时间
一、元组
假设有个流
DataStream<Tuple2<String, Integer>> wordAndOne = ....
1. 单个字段keyBy
用字段位置
wordAndOne.keyBy(0)
用字段表达式
wordAndOne.keyBy(v -> v.f0)
2. 多个字段keyBy
用字段位置
wordAndOne.keyBy(0, 1)
用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
return Tuple2.of(value.f0, value.f1);
}
});
上述可用lambda简化
wordAndOne.keyBy(
(KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>) value ->
Tuple2.of(value.f0, value.f1)
);
二、POJO
假设有个流
DataStream<PeopleCount> source = ...
PeopleCount的类定义是
public class PeopleCount {
private String province;
private String city;
private Integer counts;
public PeopleCount() {
}
省略其他代码。。。
}
1. 单个字段keyBy
source.keyBy(a -> a.getProvince())
source.keyBy(PeopleCount::getProvince)
2. 多个字段keyBy
source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(PeopleCount value) throws Exception {
return Tuple2.of(value.getProvince(), value.getCity());
}
});
上述可用lambda简化
map.keyBy(
(KeySelector<PeopleCount, Tuple2<String, String>>) value ->
Tuple2.of(value.getProvince(), value.getCity())
);
相关文章
- Flink-Transform(转换)
- Flink教程(30)- Flink VS Spark
- Flink教程(23)- Flink高级特性(Streaming File Sink)
- Flink教程(14)- Flink高级API(容错机制)
- Flink教程(10)- Flink批流一体API(其它)
- Flink教程(04)- Flink入门案例
- Flink教程(02)- Flink入门
- Flink中Checkpoint和Savepoint的区别
- Flink主要组件以及工作流程
- Flink-Window
- flink 如何实现对watermark 的checkpoint,防止数据复写
- Hadoop Compatibility in Flink
- Flink Weekly | 每周社区动态更新-12/24
- 下:比拼生态和未来,Spark和Flink哪家强?
- 美团点评基于 Flink 的实时数仓建设实践
- Flink 零基础实战教程:如何计算实时热门商品
- Apache Flink 1.9.0 为什么将支持 Python API ?
- Flink Table Api & SQL — 用户定义函数
- Flink 1.13 实用部署架构详解