zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Kafka多分区下二分法查找指定时间戳的offset

2023-03-15 23:28:12 时间

python消费Kafka的时候,不能指定时间戳开始消费,只能指定offset,因此需要先找到指定时间戳所在的offset再消费。百度找到的文章都是单分区下的查找方法,多分区时需要做一定的修改,记录下代码:

import time

from kafka import KafkaConsumer, TopicPartition


def from_timestamp(timestamp):
    """
    将long型的时间戳转换为格式话的限制方式
    :param timestamp:
    :return:
    """
    timeArray = time.localtime(timestamp)
    otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
    return otherStyleTime


# 目标时间戳
timestamp = 1635696000000

# 指定消费者
consumer = KafkaConsumer(bootstrap_servers=['xxx.xxx.xxx.xxx:9092'], 
                         auto_offset_reset='oldest',
                         max_poll_records=10000,
                         max_poll_interval_ms=500
                         )
# 指定partition信息
tp0 = TopicPartition('tpc_bd_hu_track', 0)
tp1 = TopicPartition('tpc_bd_hu_track', 1)
tp2 = TopicPartition('tpc_bd_hu_track', 2)

tp_tuple = (tp0, tp1, tp2)

consumer.assign(tp_tuple)

# 二分法找到指定时间戳啊的offset
tp0_start_offset = consumer.beginning_offsets(tp_tuple).get(tp0)
tp1_start_offset = consumer.beginning_offsets(tp_tuple).get(tp1)
tp2_start_offset = consumer.beginning_offsets(tp_tuple).get(tp2)

tp0_end_offset = consumer.end_offsets(tp_tuple).get(tp0)
tp1_end_offset = consumer.end_offsets(tp_tuple).get(tp1)
tp2_end_offset = consumer.end_offsets(tp_tuple).get(tp2)

tp0_nos = int((tp0_start_offset + tp0_end_offset) / 2)
tp1_nos = int((tp1_start_offset + tp1_end_offset) / 2)
tp2_nos = int((tp2_start_offset + tp2_end_offset) / 2)

# 目标时间戳
base_timestamp = 1635609600000

print("Partition-0: ", tp0_start_offset, tp0_nos, tp0_end_offset)
while True:
    consumer.seek(tp0, tp0_nos)
    consumer.seek(tp1, tp1_nos)
    consumer.seek(tp2, tp2_nos)
    res = consumer.poll(timeout_ms=10000, max_records=1)
    if tp0 in res:
        res_record = res[tp0][0]
        the_time = res_record.timestamp
        if the_time < base_timestamp:
            tp0_start_offset, tp0_nos = tp0_nos, int((tp0_nos + tp0_end_offset) / 2)
            print("Partition-0: ", tp0_start_offset, tp0_nos, tp0_end_offset,
                  from_timestamp(res_record.timestamp / 1000),
                  'offset:',
                  res_record.offset)
        elif the_time > base_timestamp:
            tp0_nos, tp0_end_offset = int((tp0_nos + tp0_start_offset) / 2), tp0_nos
            print("Partition-0: ", tp0_start_offset, tp0_nos, tp0_end_offset,
                  from_timestamp(res_record.timestamp / 1000),
                  'offset:',
                  res_record.offset)
        else:
            continue
    elif tp1 in res:
        res_record = res[tp1][0]
        the_time = res_record.timestamp
        if the_time < base_timestamp:
            tp1_start_offset, tp1_nos = tp1_nos, int((tp1_nos + tp1_end_offset) / 2)
            print("Partition-1: ", tp1_start_offset, tp1_nos, tp1_end_offset,
                  from_timestamp(res_record.timestamp / 1000),
                  'offset:',
                  res_record.offset)
        elif the_time > base_timestamp:
            tp1_nos, tp1_end_offset = int((tp1_nos + tp1_start_offset) / 2), tp1_nos
            print("Partition-1: ", tp1_start_offset, tp1_nos, tp1_end_offset,
                  from_timestamp(res_record.timestamp / 1000),
                  'offset:',
                  res_record.offset)
        else:
            continue
    elif tp2 in res:
        res_record = res[tp2][0]
        the_time = res_record.timestamp
        if the_time < base_timestamp:
            tp2_start_offset, tp2_nos = tp2_nos, int((tp2_nos + tp2_end_offset) / 2)
            print("Partition-2: ", tp2_start_offset, tp2_nos, tp2_end_offset,
                  from_timestamp(res_record.timestamp / 1000),
                  'offset:', res_record.offset)
        elif the_time > base_timestamp:
            tp2_nos, tp2_end_offset = int((tp2_nos + tp2_start_offset) / 2), tp2_nos
            print("Partition-2: ", tp2_start_offset, tp2_nos, tp2_end_offset,
                  from_timestamp(res_record.timestamp / 1000),
                  'offset:', res_record.offset)
        else
            continue
    if (tp0_nos == tp0_end_offset or tp0_start_offset == tp0_nos) 
            and (tp1_nos == tp1_end_offset or tp1_start_offset == tp1_nos) 
            and (tp2_nos == tp2_end_offset or tp2_start_offset == tp2_nos):
        break

print('Partition-0: ', tp0_start_offset, tp0_nos, tp0_end_offset)
print('Partition-1: ', tp1_start_offset, tp1_nos, tp1_end_offset)
print('Partition-2: ', tp2_start_offset, tp2_nos, tp2_end_offset)