Kafka多分区下二分法查找指定时间戳的offset
2023-03-15 23:28:12 时间
python消费Kafka的时候,不能指定时间戳开始消费,只能指定offset,因此需要先找到指定时间戳所在的offset再消费。百度找到的文章都是单分区下的查找方法,多分区时需要做一定的修改,记录下代码:
import time
from kafka import KafkaConsumer, TopicPartition
def from_timestamp(timestamp):
"""
将long型的时间戳转换为格式话的限制方式
:param timestamp:
:return:
"""
timeArray = time.localtime(timestamp)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
return otherStyleTime
# 目标时间戳
timestamp = 1635696000000
# 指定消费者
consumer = KafkaConsumer(bootstrap_servers=['xxx.xxx.xxx.xxx:9092'],
auto_offset_reset='oldest',
max_poll_records=10000,
max_poll_interval_ms=500
)
# 指定partition信息
tp0 = TopicPartition('tpc_bd_hu_track', 0)
tp1 = TopicPartition('tpc_bd_hu_track', 1)
tp2 = TopicPartition('tpc_bd_hu_track', 2)
tp_tuple = (tp0, tp1, tp2)
consumer.assign(tp_tuple)
# 二分法找到指定时间戳啊的offset
tp0_start_offset = consumer.beginning_offsets(tp_tuple).get(tp0)
tp1_start_offset = consumer.beginning_offsets(tp_tuple).get(tp1)
tp2_start_offset = consumer.beginning_offsets(tp_tuple).get(tp2)
tp0_end_offset = consumer.end_offsets(tp_tuple).get(tp0)
tp1_end_offset = consumer.end_offsets(tp_tuple).get(tp1)
tp2_end_offset = consumer.end_offsets(tp_tuple).get(tp2)
tp0_nos = int((tp0_start_offset + tp0_end_offset) / 2)
tp1_nos = int((tp1_start_offset + tp1_end_offset) / 2)
tp2_nos = int((tp2_start_offset + tp2_end_offset) / 2)
# 目标时间戳
base_timestamp = 1635609600000
print("Partition-0: ", tp0_start_offset, tp0_nos, tp0_end_offset)
while True:
consumer.seek(tp0, tp0_nos)
consumer.seek(tp1, tp1_nos)
consumer.seek(tp2, tp2_nos)
res = consumer.poll(timeout_ms=10000, max_records=1)
if tp0 in res:
res_record = res[tp0][0]
the_time = res_record.timestamp
if the_time < base_timestamp:
tp0_start_offset, tp0_nos = tp0_nos, int((tp0_nos + tp0_end_offset) / 2)
print("Partition-0: ", tp0_start_offset, tp0_nos, tp0_end_offset,
from_timestamp(res_record.timestamp / 1000),
'offset:',
res_record.offset)
elif the_time > base_timestamp:
tp0_nos, tp0_end_offset = int((tp0_nos + tp0_start_offset) / 2), tp0_nos
print("Partition-0: ", tp0_start_offset, tp0_nos, tp0_end_offset,
from_timestamp(res_record.timestamp / 1000),
'offset:',
res_record.offset)
else:
continue
elif tp1 in res:
res_record = res[tp1][0]
the_time = res_record.timestamp
if the_time < base_timestamp:
tp1_start_offset, tp1_nos = tp1_nos, int((tp1_nos + tp1_end_offset) / 2)
print("Partition-1: ", tp1_start_offset, tp1_nos, tp1_end_offset,
from_timestamp(res_record.timestamp / 1000),
'offset:',
res_record.offset)
elif the_time > base_timestamp:
tp1_nos, tp1_end_offset = int((tp1_nos + tp1_start_offset) / 2), tp1_nos
print("Partition-1: ", tp1_start_offset, tp1_nos, tp1_end_offset,
from_timestamp(res_record.timestamp / 1000),
'offset:',
res_record.offset)
else:
continue
elif tp2 in res:
res_record = res[tp2][0]
the_time = res_record.timestamp
if the_time < base_timestamp:
tp2_start_offset, tp2_nos = tp2_nos, int((tp2_nos + tp2_end_offset) / 2)
print("Partition-2: ", tp2_start_offset, tp2_nos, tp2_end_offset,
from_timestamp(res_record.timestamp / 1000),
'offset:', res_record.offset)
elif the_time > base_timestamp:
tp2_nos, tp2_end_offset = int((tp2_nos + tp2_start_offset) / 2), tp2_nos
print("Partition-2: ", tp2_start_offset, tp2_nos, tp2_end_offset,
from_timestamp(res_record.timestamp / 1000),
'offset:', res_record.offset)
else
continue
if (tp0_nos == tp0_end_offset or tp0_start_offset == tp0_nos)
and (tp1_nos == tp1_end_offset or tp1_start_offset == tp1_nos)
and (tp2_nos == tp2_end_offset or tp2_start_offset == tp2_nos):
break
print('Partition-0: ', tp0_start_offset, tp0_nos, tp0_end_offset)
print('Partition-1: ', tp1_start_offset, tp1_nos, tp1_end_offset)
print('Partition-2: ', tp2_start_offset, tp2_nos, tp2_end_offset)
相关文章
- 从本体论开始说起——运营商关系图谱的构建及应用
- 如何成为一名数据科学家?
- 从未见过的堂兄杀了人,你的DNA是关键证据
- 20个安全可靠的免费数据源,各领域数据任你挑
- 20个安全可靠的免费数据源,各领域数据任你挑
- 阿里云李飞飞:All in Cloud时代,云原生数据库优势明显
- 基于Hadoop生态系统的一高性能数据存储格式CarbonData(性能篇)
- 大数据告诉你:10年漫威,到底有多少角色
- TigerGraph:实时图数据库助力金融风控升级
- Splunk利用Splunk Connected Experiences和Splunk Business Flow 扩大数据访问
- 大数据开发常见的9种数据分析手段
- 以免在景区看人,我爬了5W条全国景点门票数据...
- 【实战解析】基于HBase的大数据存储在京东的应用场景
- 数据科学家告诉你哪些计算机科学书籍是你应该看的
- Kafka作为大数据的核心技术,你了解多少?
- Spring Boot 整合 Redis 实现缓存操作
- 大数据学习必须掌握的五大核心技术有哪些?
- 基于Antlr在Apache Flink中实现监控规则DSL化的探索实践
- 甲骨文再次被Gartner评为分析型数据管理解决方案魔力象限领导者
- 爬取吴亦凡微博102118条转发数据,扒一扒流量的真假