KafkaStream时间戳问题CreateTime = -1引起的程序中断
程序 时间 中断 引起 问题
2023-09-14 08:57:20 时间
Exception in thread "app-8835188a-e0a0-46da-ac2a-6820ec197628-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = raw_103, partition = 1, offset = 7032668, CreateTime = -1, serialized key size = -1, serialized value size = 111, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = { "key1": [ 103, "4113471085724846255", "--", "2018-04-17 21:33:53" ], "key2": [ [ 213309, "--", 20128, 1 ] ] }) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:73)
at org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61)
at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:48)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:98)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:560)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:896)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:797)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
之前直接改了源码。后来从度娘中找到解决方法:
新增时间异常捕获类MyEventTimeExtractor.class, 直接返回0
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
public class MyEventTimeExtractor implements TimestampExtractor{
@Override
public long extract(ConsumerRecord<Object, Object> record,
long previousTimestamp) {
return 0;
}
}
然后在属性添加下面配置:
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);
编译执行,ok
相关文章
- 如何让程序能进入c库调试
- windows/mfc程序中使用OpenGL的多重采样功能
- 自动关机程序[通俗易懂]
- 【愚公系列】2022年09月 微信小程序-three.js绘制正方体
- “政务App + 小程序”提升数字化服务效率及体验
- 自定义Appfabric Cache 配置提供程序「建议收藏」
- 解析小程序双线程技术,助力移动应用体验提升
- 微信小程序返回上一页传值方法
- LyScript 验证PE程序开启的保护
- 小程序快速搭建生态,助力智能电视发展新思路
- 如何固化ZYNQ PL端程序到FLASH
- 数据可视化编辑平台上线,小程序也能拥有可视化图层!
- Linux C程序调整时间的操作(linuxc修改时间)
- 分析Linux程序时间性能分析(linux程序执行时间)
- GDB运行程序
- Linux查找程序:快速定位文件(linux查找程序)
- 微软Win11升级体检程序更新 显示计算机不兼容的简要原因
- 时间解决 Java 程序中 Redis 超时时间的问题(redisjava过期)
- 时间设置 Java 程序中 Redis Key 的过期时间(redisjava过期)
- 策略解锁Java程序的Redis过期时间管理策略(redisjava过期)
- 时间设置Java程序设置Redis过期时间(redisjava过期)
- 设置Java程序设置Redis数据过期时间(redisjava过期)
- 时间解决Java程序中Redis过期时间问题(redisjava过期)
- 时间Java程序中Redis配置缓存过期时间(redisjava过期)
- 时间解决Java程序中Redis过期时间设置问题(redisjava过期)
- 花上一二十分钟的时间,读懂什么是程序集(assembly)!
- 花上一二十分钟的时间,读懂什么是程序集(assembly)!
- Linux C语言中实现延时程序的方法(linux c 延时)
- Linux下编译C程序:极速体验(linux下编译c程序)
- 让你的.NET程序兼容不同版本的Dll文件
- 不用GD库生成当前时间的PNG格式图象的程序
- 全局记录程序片段的运行时间正确找到程序逻辑耗时多的断点
- C#截图程序类似腾讯QQ截图实现代码