zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Python kafka操作实例(kafka-python)

2023-09-14 09:13:28 时间

一、基本概念

  • Topic:一组消息数据的标记符;
  • Producer:生产者,用于生产数据,可将生产后的消息送入指定的Topic;
  • Consumer:消费者,获取数据,可消费指定的Topic;
  • Group:消费者组,同一个group可以有多个消费者,一条消息在一个group中,只会被一个消费者获取;
  • Partition:分区,为了保证kafka的吞吐量,一个Topic可以设置多个分区。同一分区只能被一个消费者订阅。

二、本地安装与启动(基于Docker)

  1. 下载zookeeper镜像与kafka镜像:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

2. 本地启动zookeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper  

3. 本地启动kafka

docker run -d --name kafka --publish 9092:9092 --link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=localhost \
--env KAFKA_ADVERTISED_PORT=9092 \
wurstmeister/kafka:latest 

注意:上述代码,将kafka启动在9092端口

4. 进入kafka bash

docker exec -it kafka bash
cd /opt/kafka/bin

5. 创建Topic,分区为2,Topic name为'kafka_demo'

kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic kafka_demo

6. 查看当前所有topic

kafka-topics.sh --zookeeper zookeeper:2181 --list

7. 安装kafka-python

pip install kafka-python

三、生产者(Producer)与消费者(Consumer)

个人封装

生产者和消费者的简易Demo,这里一起演示:

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import time
import json
import logging
import traceback

from kafka import KafkaConsumer, KafkaProducer, TopicPartition

log = logging.getLogger(__name__)


"""
kafka 生产者

KafkaProducer是发布消息到Kafka集群的客户端,它是线程安全的并且共享单一生产者实例。生产者包含一个带有缓冲区的池,
用于保存还没有传送到Kafka集群的消息记录以及一个后台IO线程,该线程将这些留在缓冲区的消息记录发送到Kafka集群中。

KafkaProducer构造函数参数解释
    - acks 0表示发送不理睬发送是否成功;1表示需要等待leader成功写入日志才返回;all表示所有副本都写入日志才返回
    - buffer_memory 默认33554432也就是32M,该参数用于设置producer用于缓存消息的缓冲区大小,如果采用异步发送消息,那么
                    生产者启动后会创建一个内存缓冲区用于存放待发送的消息,然后由专属线程来把放在缓冲区的消息进行真正发送,
                    如果要给生产者要给很多分区发消息那么就需要考虑这个参数的大小防止过小降低吞吐量
    - compression_type 是否启用压缩,默认是none,可选类型为gzip、lz4、snappy三种。压缩会降低网络IO但是会增加生产者端的CPU
                       消耗。另外如果broker端的压缩设置和生产者不同那么也会给broker带来重新解压缩和重新压缩的CPU负担。
    - retries 重试次数,当消息发送失败后会尝试几次重发。默认为0,一般考虑到网络抖动或者分区的leader切换,而不是服务端
              真的故障所以可以设置重试3次。
    - retry_backoff_ms 每次重试间隔多少毫秒,默认100毫秒。
    - max_in_flight_requests_per_connection 生产者会将多个发送请求缓存在内存中,默认是5个,如果你开启了重试,也就是设置了
                                            retries参数,那么将可能导致针对于同一分区的消息出现顺序错乱。为了防止这种情况
                                            需要把该参数设置为1,来保障同分区的消息顺序。
    - batch_size 对于调优生产者吞吐量和延迟性能指标有重要的作用。buffer_memeory可以看做池子,而这个batch_size可以看做池子里
                 装有消息的小盒子。这个值默认16384也就是16K,其实不大。生产者会把发往同一个分区的消息放在一个batch中,当batch
                 满了就会发送里面的消息,但是也不一定非要等到满了才会发。这个数值大那么生产者吞吐量高但是性能低因为盒子太大占用内存
                 发送的时候这个数据量也就大。如果你设置成1M,那么显然生产者的吞吐量要比16K高的多。
    - linger_ms 上面说batch没有填满也可以发送,那显然有一个时间控制,就是这个参数,默认是0毫秒,这个参数就是用于控制消息发送延迟
                多久的。默认是立即发送,无需关系batch是否填满。大多数场景我们希望立即发送,但是这也降低了吞吐量。
    - max_request_size 最大请求大小,可以理解为一条消息记录的最大大小,默认是1048576字节。
    - request_timeout_ms  生产者发送消息后,broker需要在规定时间内将处理结果返回给生产者,那个这个时间长度就是这个参数
                          控制的,默认30000,也就是30秒。如果broker在30秒内没有给生产者响应,那么生产者就会认为请求超时,并在回调函数
                          中进行特殊处理,或者进行重试。
 
"""

class KProducer(object):
    def __init__(self, host, port, topic):
        self.host = host
        self.port = port
        self.bootstrap_servers = "{0}:{1}".format(self.host, self.port)
        self.topic = topic
        self.producer = None
        self.connect()

        log.debug(
            "Kafka producer info {0} {1}".format(
                self.bootstrap_servers,
                self.topic
            )
        )

    def connect(self):
        """
        :param bootstrap_servers: 地址
        """
        while True:
            try:
                # json 格式化发送的内容
                self.producer = KafkaProducer(
                    bootstrap_servers = self.bootstrap_servers,
                    buffer_memory = 33554432,
                    batch_size = 1048576,
                    max_request_size = 1048576,
                    value_serializer = lambda m: json.dumps(m).encode("ascii"),
                    compression_type = "gzip"    # 压缩消息发送
                )
                break
            except Exception as e:
                log.error("Connect kafka fail, {}.".format(e))
                time.sleep(5)
                continue

    def sync_producer(self, data):
        """
        同步发送 数据
        :param data_li:  发送数据
        :return:
        """
        while True:
            try:
                future = self.producer.send(self.topic, data)
                record_metadata = future.get(timeout=10)  # 同步确认消费
                partition = record_metadata.partition     # 数据所在的分区
                offset = record_metadata.offset           # 数据所在分区的位置
                log.debug("save success, partition: {}, offset: {}".format(partition, offset))
                break
            except Exception as e:
                log.error("Kafka sync send fail, {}.".format(e))
                self.connect()

    def asyn_producer(self, data):
        """
        异步发送数据
        :param data_li:发送数据
        :return:
        """
        while True:
            try:
                self.producer.send(self.topic, data)
                #self.producer.flush()  # 批量提交
                break
            except Exception as e:
                log.error("Kafka asyn send fail, {}.".format(e))
                self.connect()

    def asyn_producer_callback(self, data):
        """
        异步发送数据 + 发送状态处理
        :param data_li:发送数据
        :return:
        """
        while True:
            try:
                for item in data:
                    self.producer.send(self.topic, item).add_callback(self.send_success).add_errback(self.send_error)
                #self.producer.send(self.topic, data).add_callback(self.send_success).add_errback(self.send_error)
                self.producer.flush()  # 批量提交
                break
            except Exception as e:
                log.error("Kafka asyn send fail, {}.".format(e))
                self.connect()

    def send_success(self, *args, **kwargs):
        """异步发送成功回调函数"""
        log.debug("save success")
        return

    def send_error(self, *args, **kwargs):
        """异步发送错误回调函数"""
        log.debug("save error")
        return

    def close_producer(self):
        try:
            self.producer.close()
        except:
            pass


"""
kafka 消费商
"""
class PConsumers(object):
    def __init__(self, host, port, topic, group_id):
        """
        :param bootstrap_servers: 地址
        """
        self.host = host
        self.port = port
        self.bootstrap_servers = "{0}:{1}".format(self.host, self.port)
        self.topic = topic
        self.group_id = group_id
        self.consumer = None
        self.set_conn()

        log.info(
            "Kafka consumers info {0} {1} {2}".format(
                self.bootstrap_servers,
                self.topic,
                self.group_id
            )
        )

    def set_conn(self):
        while True:
            try:
                self.consumer = KafkaConsumer(
                    self.topic,
                    bootstrap_servers = self.bootstrap_servers,
                    group_id = self.group_id,
                    enable_auto_commit = False,
                    #auto_commit_interval_ms = 1000,
                    session_timeout_ms = 30000,
                    #max_poll_records=50,
                    max_poll_records = 1,
                    #max_poll_interval_ms=30000,
                    max_poll_interval_ms = 86400000,
                    #metadata_max_age_ms = 3000,
                    auto_offset_reset = "latest",
                    #auto_offset_reset = "earliest"
                )
                break
            except Exception as e:
                log.error("Kafka pconsumers set connect fail, {0}, {1}".format(e, traceback.print_exc()))
                time.sleep(5)
                continue

    def get_message(self, count=1):
        """
        :param count: 取的条数
        :return: msg
        """
        counter = 0
        msg = []

        while True:
            try:
                consumer = KafkaConsumer(
                    self.topic,
                    bootstrap_servers = self.bootstrap_servers,
                    group_id = self.group_id,
                    #auto_offset_reset = "latest"
                    auto_offset_reset = "earliest"
                )

                for message in consumer:
                    log.debug(
                        "%s:%d:%d: key=%s value=%s header=%s" % (
                            message.topic, message.partition,
                            message.offset, message.key, message.value, message.headers
                        )
                    )

                    msg.append(json.loads(message.value))

                    counter += 1
                    if count == counter:
                        break
                    else:
                        continue

                consumer.close()
                return msg
            except Exception as e:
                log.error("Kafka get message fail, {0}, {1}".format(e, traceback.print_exc()))
                time.sleep(5)
                continue

    def get_count(self):
        """
        :return: count
        """
        left_sum = 0
        test_num = 5

        while test_num:
            try:
                consumer = KafkaConsumer(
                    self.topic,
                    bootstrap_servers = self.bootstrap_servers,
                    group_id = self.group_id
                )
                partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
 
                #print("start to cal offset:")
 
                # total
                toff = consumer.end_offsets(partitions)
                toff = [(key.partition, toff[key]) for key in toff.keys()]
                toff.sort()
                #print("total offset: {}".format(str(toff)))
    
                # current
                coff = [(x.partition, consumer.committed(x)) for x in partitions]
                coff.sort()
                #print("current offset: {}".format(str(coff)))
 
                # cal sum and left
                toff_sum = sum([x[1] for x in toff])
                cur_sum = sum([x[1] for x in coff if x[1] is not None])
                left_sum = toff_sum - cur_sum
                #print("kafka left: {}".format(left_sum))

                consumer.close()
                return left_sum
            except Exception as e:
                log.error("Kafka get count fail, {0}, {1}".format(e, traceback.print_exc()))
                time.sleep(5)
                test_num -= 1
                continue
        return left_sum
        



if __name__ == "__main__":

    send_data_li = {"test": 1}
    #kp = KProducer(topic="test", bootstrap_servers='127.0.0.1:9001,127.0.0.1:9002')
    kp = KProducer(bootstrap_servers="1.1.1.1:9092")

    # 同步发送
    #kp.sync_producer(send_data_li)

    # 异步发送
    # kp.asyn_producer(send_data_li)

    # 异步+回调
    kp.asyn_producer_callback(topic="test", data=send_data_li)

    #kp.close_producer()

    #cp = PConsumers(bootstrap_servers="1.1.1.1:9092", topic="detect-file")
    cp = PConsumers(bootstrap_servers="1.1.1.1:9092", group_id = "boxer")
    #cp = PConsumers(bootstrap_servers="1.1.1.1:9092", topic="custom-event")

    #print(cp.get_count(topic="test"))
    print(cp.get_message(topic="test"))