Flink 自定义触发器实现带超时时间的 CountWindow
flink 实现 时间 自定义 触发器 超时
2023-09-11 14:18:40 时间
Flink 的 window 有两个基本款,TimeWindow 和 CountWindow。
TimeWindow 是到时间就触发窗口,CountWindow 是到数量就触发。
TimeWindow、CountWindow 都可以分为滚动窗口、滑动窗口
如果我需要到时间就触发,并且到时间之前如果已经积累了足够数量的数据;或者在限定时间内没有积累足够数量的数据,我依然希望触发窗口业务,那么就需要自定义触发器。
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 带超时的计数窗口触发器 */ public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> { private static Logger LOG = LoggerFactory.getLogger(CountTriggerWithTimeout.class); /** * 窗口最大数据量 */ private int maxCount; /** * event time / process time */ private TimeCharacteristic timeType; /** * 用于储存窗口当前数据量的状态对象 */ private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE); public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) { this.maxCount = maxCount; this.timeType = timeType; } private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception { clear(window, ctx); return TriggerResult.FIRE_AND_PURGE; } @Override public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor); countState.add(1L); if (countState.get() >= maxCount) { LOG.info("fire with count: " + countState.get()); return fireAndPurge(window, ctx); } if (timestamp >= window.getEnd()) { LOG.info("fire with tiem: " + timestamp); return fireAndPurge(window, ctx); } else { return TriggerResult.CONTINUE; } } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { if (timeType != TimeCharacteristic.ProcessingTime) { return TriggerResult.CONTINUE; } if (time >= window.getEnd()) { return TriggerResult.CONTINUE; } else { LOG.info("fire with process tiem: " + time); return fireAndPurge(window, ctx); } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { if (timeType != TimeCharacteristic.EventTime) { return TriggerResult.CONTINUE; } if (time >= window.getEnd()) { return TriggerResult.CONTINUE; } else { LOG.info("fire with event tiem: " + time); return fireAndPurge(window, ctx); } } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor); countState.clear(); } /** * 计数方法 */ class Sum implements ReduceFunction<Long> { @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } }
使用示例(超时时间 10 秒,数据量上限 1000):
stream .timeWindowAll(Time.seconds(10)) .trigger( new CountTriggerWithTimeout(1000, TimeCharacteristic.ProcessingTime) ) .process(new XxxxWindowProcessFunction()) .addSink(new XxxSinkFunction()) .name("Xxx");
相关文章
- Hudi-集成Flink(Flink操作hudi表)
- 阶段学习资料整理(prometheus监控、Flink、k8s)
- flink实现protobuf format(超详细)
- 《Flink官方文档》Python 编程指南测试版(一)
- 《Apache Flink 官方文档》前言
- Flink--Streaming Warehouse 流式数仓的概念、目标及实现路径
- flink-- Window Top-N使用讲解及源码示例
- flink sql 使用Table Function或Array Expansion 实现列转行(附源码实现)
- An Overview of End-to-End Exactly-Once Processing in Apache Flink (with Apache Kafka, too!)
- Apache Flink中的广播状态实用指南
- Flink 的Window 操作(基于flink 1.3描述)
- 大数据Flink教程之 在 Minikube 上部署 Flink Native Kubernetes
- Apache Flink X Apache Doris 构建极速易用的实时数仓架构
- Flink大数据计算的机遇与挑战
- Apache Flink 1.9重磅发布!首次合并阿里内部版本Blink重要功能
- 如何正确使用 Flink Connector?
- Flink udf的小问题:无参数的udf函数会被优化成常量表达式
- Flink源码解析二之Network Stack实现原理
- flink DataStream API使用及原理