Flink-出租车-基于 DataStream API 计算每小时赚取最多小费的司机
2023-03-31 11:03:17 时间
案例来源 https://github.com/apache/flink-training/blob/release-1.14/hourly-tips/README_zh.md
案例介绍
基于出租车付费事件流计算出每小时赚取最多小费的司机,最简单的方法是通过两个步骤来解决这个问题:首先使用一个小时长的窗口来计算每个司机在一小时内的总小费,然后从该窗口结果流中找到每小时总小费最多的司机。
结果输出:
每小时产生一个 HourlyTip对象 记录的数据流。 这个记录应包含该小时结束时的时间戳、 该小时内获得小费最多的司机的 driverId 以及他的实际小费总数。
public class HourlyTip {
/**
* 小时结束时的时间戳
*/
private Long eventTime;
/**
* 司机id driverId
*/
private Long driverId;
/**
* 该小时获得的小费总数
*/
private Float tips;
}
核心代码
// 初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义出租车-车费数据源
KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_FARE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("fare") // 避免kafka clientId重复
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiFareDeserialization())
.build();
DataStreamSource<TaxiFare> fareStream = env.fromSource(fareSource, WatermarkStrategy.<TaxiFare>forMonotonousTimestamps().withTimestampAssigner((fare, t) -> fare.getStartTime()), "fare source");
// 按司机分组,对每小时内的数据进行统计,求出每个司机每小时的总小费
SingleOutputStreamOperator<HourlyTip> hourlyTipsStream = fareStream.keyBy(TaxiFare::getDriverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTipsFunction());
/**
* window和windowAll的区别
*
* keyBy后数据分流,window是把不同的key分开聚合成窗口
* 而windowAll是把所有的key都聚合起来,所以windowAll的并行度只能为1,而window可以有多个并行度
*
*/
// 把所有key汇总起来,找出每个小时总小费最多的司机
SingleOutputStreamOperator<HourlyTip> hourlyMaxStream = hourlyTipsStream.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).max("tips");
hourlyMaxStream.addSink(new PrintSinkFunction<>());
env.execute("Hourly Tips");
完整代码
https://github.com/Mr-LuXiaoHua/study-flink
代码入口 com.example.datastream.hourlytips.HourlyTipsJob
相关文章
- iOS开发一定要尝试的 Texture(ASDK)
- css权重的计算规则
- gson解析json数据的方法
- angularjs双向绑定原理是什么?
- fastjson格式化
- fastjson和jackson区别
- angularjs ng-options设置多个默认选项
- eclipse json格式化
- js switch语句计算指定日期是今年的第几天
- javascript substr截取字符串
- 引入RabbitMQ后,你如何保证全链路数据100%不丢失?
- MySQL 定时备份数据库(非常全),值得收藏!
- 如何批量制作奇数流水号条形码
- TDSQL演进与突破:把企业级分布式数据库做到极致
- 《这么多MergeTree 表引擎,我该怎么选?》- part 2
- access怎样连接eclipse
- 全新池化方法AdaPool | 让ResNet、DenseNet、ResNeXt等在所有下游任务轻松涨点
- 为什么从 MongoDB 转向 Couchbase ?
- 全新数据增强 | TransMix 超越Mix-up、Cut-mix方法让模型更加鲁棒、精度更高
- Xcode控制台输出json数据乱码转为中文