Kafka入门篇学习笔记整理
Kafka入门篇学习笔记整理
该文为Kafka入门学习笔记整理:
- 参考视频: kafka一小时入门精讲课程
- 参考文档: 极客时间: Kafka核心技术与实战 和 Kafka权威指南
本文只包含Kafka入门使用导学,后续会继续整理Kafka进阶知识与底层原理剖析。
本文大部分图为个人手绘补充,绘图软件采用: draw.io
完整代码Demo工程仓库链接: https://gitee.com/DaHuYuXiXi/kafak-demo
Kafka是什么
Kafka 是由Linkedin公司开发的一款开源的用于实时流式数据处理的平台,也可以说是一款具有分布式、多分区、多副本、多生产者及消费者的消息队列中间件。
消息引擎系统需要设置具体的传输协议,即用何种方法将消息传输出去,常见的方法有:
- 点对点模型
- 发布订阅模型
Kafka同时支持这两种消息引擎模型。
消息引擎作用之一是为了 “削峰填谷” , 弥补上下游系统生产和消费速度不一致的问题。
Kafka的特性
- 高吞吐,低延迟: kafka处理数据的速度可以达到每秒几百万条,数据处理速度主要受每条数据的大小影响,数据传递的延迟最低可以达到几毫秒。kafka之所以能够有那么高的性能,原因如下:
- 采用零拷贝,避免了从内核空间到用户空间的数据拷贝
- 数据顺序写盘,避免随机写磁盘产生的寻址耗时,同时也因为顺序写盘,kafka不提供数据寻址删除的能力
- 对数据进行压缩,从而减少IO延迟
- 采用批量发送数据的方式,而非逐条发送
- 持久性,可靠性: kafka会将接收到的数据持久化到磁盘保存,并且存在多副本的备份机制,所以一定程度上保证了数据的持久性和可靠性
- 高可用,容错性: kafka将消息分成多个主题(Topic),每个主题由多个分区(partition)构成,每个分区存在多个副本,分区副本分布在不同的服务器(Broker)中。正因为kafka这种分布式架构,所以集群中某个节点宕机,整个集区依然能够正常对外提供服务。
应用场景
- 业务逻辑处理中,将非核心业务逻辑转换为异步,提升请求处理速度,同时完成应用间解耦
1.对于下单请求而言,必须先有订单用户才能进行支付操作,而库存和积分的变动时效性并没有要求很高,可以异步处理。
2.同步模型中,通常会将各个模块聚合于一个单体应用之中,每次修改都需要重新打包部署,更适合小团队开发。异步模型,可以将各个模块拆分出来,形成一个个单独的微服务,独立代码,独立部署,彼此间通过消息队列或者RPC进行互相访问,适合大型复杂项目开发。
- 对于高并发处理,通常有两种解决思路:
- 水平扩展,增加应用实例数,也就是弹性扩容,配合负载均衡处理高并发需求,但是服务器资源是有限的,服务器数量需要的越多,money need more
- 将消息处理按照时效性进行分类:
- 对于高时效性的消息进行实时同步处理,如上面的订单创建
- 对于低时效性的消息,并且并发度又很高的消息,先用消息队列缓冲起来,目的是降低后端服务处理瓶颈。根据后端服务的处理能力来拉取消息进行处理,而不是一下子把请求都打到后端服务,从而打垮后端服务,如; 库存和积分管理。
- 通过批处理提升性能: 既然消息队列已经缓冲了数据,那么就为我们对消息进行批处理创造了条件,数据批量接收,批量处理,批量入库的操作,一定是比我们一条一条数据的处理操作性能更高的。
- 以数据库为核心的数据采集分析系统: 记录系统中各个微服务运行状态,采集相关运行指标并保存到数据库中,同时第三方系统查询数据库,并进行可视化展示,该过程又涉及: 指标超过阈值自动告警,对各个指标数据进行周期性聚合计算,如: 最大值,最小值,平均值
问题:
- 交互以数据库为中心,DB压力山大
- 由于以数据库为中心进行交互,导致数据延时很大,因为数据库查询统计需要时间,可能等我们发现阈值告警时,应用服务已经宕机了
- 数据库查询无法做到实时统计
- 改为以Kafka为核心的流数据处理系统:
采集器采集到指标后,发送给消息队列:
- 系统管理应用,监听该数据主题,接收到数据后保存到DB
- 数据监控应用,接收到数据后判断数据是否超过阈值,如果超过发出告警短信
- 第三方系统B,直接监听消息队列指定主题,获取实时数据
- 统计每5,15,30,60分钟指标的最大值,最小值,平均值,Kafka提供了相关的时间窗口能力,能够有效进行统计,这是Kafka,Flink之类的流处理系统给我们提供的流式数据统计功能,如果不会使用,自己编码同样可以实现,使用相关变量记录,每次接收到消息后,更新变量值。
好处:
- Kafka作为消息队列的消息延迟很低,可以满足实时性要求
- Kafka提供的Kafka Connect可以标准化的将各种数据从各种数据源中移入Kafka,并提供标准化的Sink将数据移入到某种数据存储或数据库中,并且对于常见的数据库或者大数据应用存储都有很好的支持,如: mysql,HDFS等
- 用户行为跟踪: 比如电商购物,当你打开一个电商购物平台,你的登录用户信息,登录时间地点等信息;当你浏览商品的时候,你浏览的商品的分类,价格,店铺等信息都可以通过Kafka消息的方式传递给Kafka,通过实时的流式计算,根据您的喜好向您做出商品推荐。
- 度量指标: 比如kafka可以用来做服务器指标监控数据的消息传递,在服务器发生异常时可以实时的通过分析服务器监控指标数据,并据此产生告警。
- 流式处理: 流式数据处理,是相对于批量数据处理而言的,数据以数据流的方式进行处理,注重数据的时效性,而不是将数据入库之后,再进行处理,这种通常被称为批处理
- 限流削峰: 在面对高并发用户请求冲击应用系统的场景下,kafka能有效地作为数据缓冲区,避免大量并发直接面对服务程序及数据库,避免后端程序及数据库压力过大导致服务奔溃。
Kafka的安装
单机版部署
集群部署环境准备
- 配置好java环境
- /etc/hosts文件中配置ip和主机名映射关系
hostnamectl set-hostname hostname命令可以用来设置主机名,修改完毕后,重新打开一个终端会看到修改生效,或者通过hostname命令查看修改后的主机名称
- /etc/hosts文件是Linux系统中负责Ip与域名快速解析的文件,该文件包含IP地址和主机名之间的映射关系,还包括主机名的别名,在没有DNS域名服务器的情况下,系统上的所有网络程序都通过查询该文件来解析对应于某个主机名的IP地址,否则就需要使用DNS服务完成解析。
- /etc/hosts文件的优先级高于DNS服务程序
/etc/hosts文件通常内容如下:
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
172.23.57.29 dhy.com dhy
每行一条映射关系,由三部分组成:IP地址,域名或主机名,主机名
主机名和域名的区别: 主机名通常在局域网内使用,通过hosts文件,主机名就被解析到对应IP,域名通常在互联网上使用,使用DNS解析,如果本机不想使用DNS上的域名解析,可以更改hosts文件,加入自己的域名解析。
假设我们有三台服务器,在三台服务器的/etc/hosts文件中加入如下映射关系:
192.168.110.92 kafka-0
192.168.110.93 kafka-1
192.168.110.94 kafka-2
此时如果我们在192.168.110.92,想要通过ssh远程登录192.168.110.93,只需要通过主机名访问,然后输入密码进行登录即可:
ssh root@kafka-1
- 创建用户 – 例如: 创建一个新用户,名为kafka
- 防火墙相关端口开放(云服务器需要去网页操作开放)
- 集群间免密登录设置
非对称加密:
- 发布方创建一个秘钥对,发布方自己保存的密钥被称为私钥,公开发布给其他人使用的密钥称为公钥
- 公钥加密结果,私钥能够解密;私钥加密结果,公钥也能够解密。
SSH免密登录原理图:
- 如果host1希望免密登录host2,那么秘钥对是host1发布的
- 让host2信任host1的公钥,host1即可免密登录host2,所以host1需要将自己的公钥,在host2服务器上保存一份,即复制秘钥
在被SSH免密登录的主机中,有一个存储来登录的主机的公钥的文件,名字叫做authorized_keys,它的位置存在于/登录用户根目录/.ssh
目录中:
如果当前主机还没有被设置任何免密登录,这个文件默认是不存在的
在authorized_keys文件中,存储着能够登录本地主机的其他各个主机的身份信息,如果使用rsa算法生成的密钥,文件的存储格式都是以ssh-rsa开头的一组字符串。
具体步骤:
- 主机环境
ip 主机名 用户
192.168.110.92 kafka-0 kafka
192.168.110.93 kafka-1 kafka
192.168.110.94 kafka-2 kafka
- 生成密钥对
ssh-keygen -t rsa
//执行完毕后,会在/home/kafka/.ssh目录下看到如下两个文件,通常认为前者是公钥,后者是私钥:
id_rsa.pub
id_rsa
- 密钥处理
//1.将公钥保存到authorized_keys文件中
cat ~/.ssh/id_rsa.pub > ~/.ssh/authorized_keys
//2.将公钥分发给kakfa-1、kakfa-2主机。按提示输入kafka登陆密码
ssh-copy-id -i ~/.ssh/id_rsa.pub -p22 kafka@kakfa-1;
ssh-copy-id -i ~/.ssh/id_rsa.pub -p22 kafka@kakfa-2;
//3.我们现在就可以通过kakfa-0主机免密登录kakfa-1和kakfa-2了
- 免密登录测试: 在kakfa-0主机上登录kakfa-1(或kakfa-2)主机,输入如下命令,不需要输入密码。你会发现登录主机的切换如下,不需要密码就完成登陆了
ssh kafka@kakfa-1
在kakfa-1、kakfa-2服务器上重复以上步骤,就可以完全实现三台服务器之间ssh免密登录
Kafka 2.x集群部署
Kafka 3.x集群部署
kafka 3.0中已经将zk移除,使用kraft机制实现controller主控制器的选举:
- 2.0: 一个集群所有节点都是Broker,从中选举出一个作为Controller控制器,控制器将集群元数据信息,如: 主题分类,消费进度等数据保存到zk,用于集群各节点之间的分布式交互
- 3.0: 假设一个集群由四个Broker,我们人为指定其中三个作为Controller,从三个Controller中选举出来一个Controller作为主控制器,其他两个备用。zk不再需用,所有集群元数据都存储在Kafka内部主题中,由kafka自行管理,不再依赖zookeeper
- 在搭建kafka集群前,我们需要先做好kafka实例角色规划
ip 主机名 角色 node.id
192.168.110.92 kafka-0 broker,controller 1
192.168.110.93 kafka-1 broker,controller 2
192.168.110.94 kafka-2 broker,controller 3
192.168.110.94 kafka-3 broker 4
Controller需要奇数个,这和zk是一样的
- 下载Kafak安装包: 国内阿里镜像地址下载
kafka 3.0不再支持JDK8,建议安装JDK11或JDK17
- 在kafka用户家目录下新建一个kafka安装目录,并将安装包解压缩到该目录下
tar -zxvf kafka_2.12-3.1.2.tgz
- 新建一个目录用于保存kafka的持久化日志数据
mkdir -p /home/kafka/data
,并保证安装kafka的用户具有该目录的读写权限 - 所有安装kafka服务器实例开放9092,9093端口,使用该端口作为controller之间的通信端口,该端口作用与zk的2181端口类似
- 修改Kraft协议配置文件: kafka 3.0版本中,使用Kraft协议代替zk进行集群的controller选举,所以要针对它进行配置,配置文件在kraft目录下,这与kafka 2.0版本依赖zk安装方式的配置文件是不同的
- 具体配置参数如下: (每个节点都进行配置)
node.id=1
process.roles=broker,controller
listeners=PLAINTEXT://kafka-0:9092,CONTROLLER://kafka-0:9093
advertised.listeners = PLAINTEXT://:9092
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
log.dirs=/home/kafka/data/
- node.id:这将作为集群中的节点 ID。对于kafka2.0中的broker.id,只是在3.0版本中kafka实例不再只担任broker角色,也有可能是controller角色,所以改名叫做node节点。
- process.roles:一个节点可以充当broker或controller或两者兼而有之。
- listeners:broker 将使用 9092 端口,而 kraft controller控制器将使用 9093端口。
- advertised.listeners: kafka通过代理暴漏的地址,如果都是局域网使用,就配置PLAINTEXT://:9092即可。
- controller.quorum.voters:指定controller主控选举的投票节点,所有process.roles包含controller角色的规划节点都要参与,即:kafka-0、kafka-1、kafka-2。其配置格式为:node.id1@host1:9093,node.id2@host2:9093
- log.dirs:kafka 将存储数据的日志目录。
- 格式化存储目录(来到kafka安装bin目录下)
//1.生成一个唯一的集群ID(在一台kafka服务器上执行一次即可),这一个步骤是在安装kafka2.0版本的时候不存在的。
./kafka-storage.sh random-uuid
//2.使用生成的集群ID+配置文件格式化存储目录log.dirs,所以这一步确认配置及路径确实存在,并且kafka用户有访问权限(检查准备工作是否做对)。每一台主机服务器都要执行这个命令
kafka-storage.sh format \
-t 集群ID \
-c /home/kafka/kafka_2.12-3.1.2/config/kraft/server.properties
//3.格式化操作完成之后,你会发现在我们定义的log.dirs目录下多出一个meta.properties文件。meta.properties文件中存储了当前的kafka节点的id(node.id),当前节点属于哪个集群(cluster.id)
- 启动集群进行测试
//依次在每台机器上执行下面这条启动命令
./kafka-server-start.sh /home/kafka/kafka_2.12-3.1.2/config/kraft/server.properties
- 创建主题进行测试 (注意: Kafka 3.0版本和之前旧版本创建主题命令的不同,由于不依赖zk启动,所以此处填写Broker地址)
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test1
新版本的kafka,已经不需要依赖zookeeper来创建topic,已经不需要zookeeper来保存元数据信息
Kafka 2.8版本引入一个重大改进:KRaft模式。这个功能一直处于实验阶段。 2022年10月3日,Kafka 3.3.1发布,正式宣告KRaft模式可以用于生产环境。 在KRaft模式下,所有集群元数据都存储在Kafka内部主题中,由kafka自行管理,不再依赖zookeeper
许多旧的kafka版本中只用–zookeeper ip:2181来连接zookeeper进而控制broker服务执行命令,在kafka较新的版本中虽然仍然支持该参数,但是已经不建议使用,因为在kafka的发展路线图中zookeeper会逐步被剔除。所以建议大家采用–bootstrap-server ip:9097方式进行服务连接。
- 查看主题的状态
./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test1
- 使用生产者和消费者进行测试
#消费者窗口监听主题中的消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1
#生产者窗口往主题中投递消息
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test1
监听器和内外网络
- 监听器: 指定Broker实例启动时本机的监听名称和端口号,给服务器端使用
listeners = listener_name://host_name:port,listener_name2://host_name2:port2
例如:
listeners = PLAINTEXT://:9092
listeners = PLAINTEXT://192.168.1.10:9092
listeners = PLAINTEXT://hostname:9092
#监听所有网卡
listeners = PLAINTEXT://0.0.0.0:9092
- 监听器的名称和端口必须是唯一的,端口相同,就冲突了
- host_name如果为空,则会绑定所有的网卡, 也就是说不管从哪个网卡进入的请求都会被接受处理。
- listener_name 是监听名,唯一值, 它并不是安全协议,因为默认的4个安全协议已经做好了映射:
格式为:
监听名称1:安全协议1,监听名称2:安全协议2
默认四个映射:
PLAINTEXT => PLAINTEXT 不需要授权,非加密通道
SSL => SSL 使用SSL加密通道
SASL_PLAINTEXT => SASL_PLAINTEXT 使用SASL认证非加密通道
SASL_SSL => SASL_SSL 使用SASL认证并且SSL加密通道
- advertised.listeners: 对外发布的访问IP和端口,这些信息会注册到zk保存(2.x版本)或者kafka内部主题中(3.x版本),给客户端使用
- avertised.listeners如果没有配置,默认采用listeners的配置
- 下图中Broker通过avertised.listeners配置,对外暴露的是内网IP或者主机名,这要求客户端必须和Broker实例处于同一网段才可以访问((下图展示的是kafka 2.x版本,将元数据信息保存在zk中))
- Broker实例之间可以通过内部网络或者主机名进行访问
- listeners配置多个监听器,内外网分流,这样Broker直接就会通过内网互相连接, 客户端除了可以通过内网连接(如果在内网环境的话),也可以通过外网连接。
例子:
listeners=INTERNAL://:9092,EXTERNAL://0.0.0.0:9093
advertised.listeners=INTERNAL://kafka-0:9092,EXTERNAL://公网IP:9093
#设置监听器名称和安全协议之间的映射关系集合。
#注意:自定义了监听器,则必须要配置inter.broker.listener.name
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
#用于Broker之间通信的listener的名称,如果未设置,则listener名称由 security.inter.broker.protocol定义,默认为PLAINTEXT
inter.broker.listener.name=INTERNAL
详细可参考: 【云原生】一文搞懂Kafka中的listeners和advertised.listeners以及其他通信配置
KRaft模型
在2.8版本之前,kafka都是强依赖zk这个分布式服务协调管理工具的,在kafka2.8版本开始尝试从服务架构中去掉zk,到了3.0版本这个工作基本完成。
- kafka 2.x版本中zk保存了哪些元数据信息呢?
- kafak 2.x版本中,zk负责保存kafka集群运行的元数据信息,主要是一些集群节点运行状态信息,配置信息等。
/admin : 用于保存kafka集群管理相关的信息,如已经被删除的topic。
/brokers : 用于保存当前集群的所有broker的id,和已经创建未被删除的topic
/cluster : 用于保存kafka集群的id,kafka的集群存在一个唯一的id及版本信息保存在这里
/config : 集群运行过程中的客户端、服务端、主题、用户等配置信息
/controller : 用于保存kafka集群控制器组件的信息,如:版本号、控制器在哪个broker上、时间戳信息。
/controller_epoch :用于记录controller选举的次数,每完成一次controller选举,该数据就加1。
/isr_change_notification : ISR列表发生变更时候,发出的通知信息。
/latest_producer_id_block :每个 Producer 在初始化时都会被分配一个唯一的 PID,pid开始结束范围以及申请结果保存在这里。
/log_dir_event_notification :如果broker在向磁盘写入数据的时候出现异常,信息保存在这里。 controller监听到该目录的变化之后会进行相应的处理。
- Kafka强依赖zk产生的问题有哪些呢?
- zk作为外部服务组件保存kafka集群相关元数据信息,意味着所有操作都需要通过大量的网络交互实现,网络IO的开销降低了集群的性能
- 需要额外维护zk集群的稳定性
- zk目录节点过多会影响性能,同时每一个znode节点都有1M的数据限制,随着kafka集群变大,zk本身数据存储能力也会影响性能。
- Kafka 2.8版本引入一个重大改进:KRaft模式。这个功能一直处于实验阶段。 2022年10月3日,Kafka 3.3.1发布,正式宣告KRaft模式可以用于生产环境。 在KRaft模式下,所有集群元数据都存储在Kafka内部主题中,由kafka自行管理,不再依赖zookeeper。
- kafka2.0的服务架构,黄色图标代表controller,黑色图标代表broker,所有的broker依赖于一个被选举出来的controller对其进行控制管理。controller服务实例实际上是三个broker其中的一个,其中的一个broker被选举出来被赋予controller的角色。
- 在kafka3.0(KIP - 500)服务架构下,上图一共四个broker,其中三个被赋予controller的角色。在三个controller中选举其中一个作为主控制器。
zookeeper的分布式数据服务协调能力,在kafka3.0版本中被raft协议所替代,从而使leader controller的选举和分区副本一致性得到保证
Kafka在去掉zk后,部署运维更加easy,同时监控实现更加方便,性能也得到很大提升。
Docker安装
Topic管理与集群容错测试
topic的创建与状态查询
- 创建Topic
# 创建名为topic的主题,该主题下只有一个分区,每个分区两个副本的topic
./kafka-topics.sh --create \
--bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092 \
--replication-factor 3 --partitions 1 \
--topic test2
bootstrap.servers只是用于客户端启动(bootstrap)的时候有一个可以热启动的一个连接者,一旦启动完毕,客户端就应该可以得知当前集群的所有节点的信息,日后集群扩展的时候客户端也能够自动实时的得到新节点的信息,即使bootstrap.servers里面的挂掉了也应该是能正常运行的,除非节点挂掉后客户端也重启了。
- 查看topic详细状态
./kafka-topics.sh --bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092 --describe -topic test2
该命令返回结果格式如下所示:
Topic: test2 TopicId: Sf8chBRzRoWs2oBSoXNsbQ PartitionCount: 1 ReplicationFactor: 3 Configs: cleanup.policy=delete,flush.ms=1000,segment.bytes=1073741824,flush.messages=10000
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
- Topic:名称test2
- PartitionCount:分区数量1
- ReplicationFactor:分区副本共3个,含1个Leader分区副本2个Follower分区副本
Partition: 0
:这一条记录对应的分区编号0(一个分区编号从0开始),如果有多个分区依次为:Partition: 1 、Partition: 2
,以此类推。Leader: 1
: 表示主分区在broker.id=1的节点上Replicas: 1,2,3
: 表示分区的3个副本分别在broker.id=1\2\3的节点上Isr: 1,2,3
: 表示3个分区副本都在Isr集合中,2个Follower与1个Leader数据处于同步状态。
容错测试
我们需要测试以下几点:
- 集群所有节点都正常时,保证生产和消费正常
//1.在某个节点上生产者发送数据
./kafka-console-producer.sh --topic test --bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092
//2.在另一个节点上消费者接收数据
//参数--from-beginning的作用是使consumer从kafka最早的消息开始消费
./kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092
//3.检查消息收发是否正常
能够正常的收发数据,证明集群所有节点都正常状态下,生产消费正常。如果不能正常检查自己的安装过程配置、集群是否启动、kafka服务日志等信息。
- 停掉部分follower副本,生产和消费依然要保证正常
- 停掉leader副本,并且存在可用follower副本的情况下,生产和消费依然要保证正常
我们如果把Follwer副本broker.id=2所在服务器上的kafka进程停掉:
./kafka-server-stop.sh
此时查看test2 topic的分区情况,会发现ISR集合中只剩下1,2号分区副本:
Topic: test2 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2
此时再去生产者继续发送数据,会出现两种可能结果:
- 生产方正常发送,消费者正常消费
- 生产法正常发送,消费者不能消费到数据
Why?
- kafka存在一个特殊的主题
__consumer_offsets
,该主题用于保存其他主题生产数据及消费数据的进度offsets,这个主题有50个分区。核心问题默认情况下是它的分区副本因子是1,也就是说每个分区只有一个Leader副本
。
通过下列命令我们可以看到:
./kafka-topics.sh --bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092 --describe -topic __consumer_offsets
假设主题test2的数据生产进度偏移量需要保存到__consumer_offsets
主题的2号分区,但是我们把broker.id=2的服务器停掉了,并且分区只有一个Leader没有备份。
生产者数据正常写入test2主题,但是主题的生产进度偏移量需要更新到__consumer_offsets
,如果无法更新,消费者就不能消费这条数据。
为了实现集群的高可用,需要做下面这些事:
- 我们自己建的主题的分区要有多副本,创建主题时候指定参数–replication-factor n
__consumer_offsets
也要有多副本,可以通过在配置文件中指定:
# 创建topic时候,默认的分区数
num.partitions=3
# 允许默认创建Topic
auto.create.topics.enable=true
# __consumer_offsets的分区数设置(n>1,建议值3)
offsets.topic.replication.factor=n
上面提到的三个参数,是在__consumer_offsets主题初始化创建的时候生效(集群生产第一条消息的时候),因此即使重启了集群,__consumer_offsets主题的分区副本数刚开始看的时候还是1
假如__consumer_offsets
的分区包含n=3个副本,除非所有副本都坏掉,否则还是可以正常工作,因此副本数量越多,集群可用性越高,但是数据存储和副本之间数据同步消耗的资源也会越多。
动态修改分区数
如果我们想动态修改某个主题的分区数,这里以__consumer_offsets
主题为例,首先新建一个JSON文件,比如topic.json:
{
"version": 1,
"partitions": [
{
"topic": "__consumer_offsets",
"partition": 0,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 1,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 2,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 3,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 4,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 5,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 6,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 7,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 8,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 9,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 10,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 11,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 12,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 13,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 14,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 15,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 16,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 17,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 18,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 19,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 20,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 21,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 22,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 23,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 24,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 25,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 26,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 27,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 28,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 29,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 30,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 31,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 32,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 33,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 34,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 35,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 36,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 37,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 38,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 39,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 40,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 41,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 42,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 43,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 44,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 45,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 46,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 47,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 48,
"replicas": [1,2,3]
},
{
"topic": "__consumer_offsets",
"partition": 49,
"replicas": [1,2,3]
}
]
}
一共50个分区,每个分区三个副本分布在broker.id=1,2,3的三台服务器上,这就是上面的这个json文件的含义。把该文件放入kafka安装目录的explain目录下。
然后执行如下命令:
./kafka-reassign-partitions.sh \
--bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092 \
--reassignment-json-file ./topic.json --execute
执行完后,我们可以执行下面这条命令进行验证:
./kafka-reassign-partitions.sh \
--bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092 \
--reassignment-json-file ./topic.json --verify
如果验证的结果,每一个分区都是“is complete”,表示正确完成分区副本分配。
经过上面的对__consumer_offsets主题的配置后:
- 三个Broker节点,即使停掉1个节点或者停掉2个节点,都不影响集群正常的接收生产者的消息,也不响消费者进行消费。 即使停掉分区Leader副本,如果有Follower分区副本活着,就会重新选举Leader,除了会浪费一定的选举时间,集群整体上还是可用的。
Kafka中的基本概念
- Topic(主题) : 在Kafka中,发布订阅的对象是主题(Topic),你可以为每个业务,每个应用甚至每类数据都创建专属的主题。
生产者不断向主题发送消息,消费者不断从主题拉取消息进行消费,并且生产者和消费者都可以同时向一个或多个主题发送或拉取消息:
- Broker(消息代理): 一个Broker一个Kafka服务实例,Kafka集群由多个Broker组成,Broker负责接收和处理客户端请求,以及对消息进行持久化操作。
通常会将Kafka集群中不同的Broker分散运行在不同的机器上,防止一台机器宕机导致整个集群不可用。
- Relication(副本): Kafka定义了两类副本: 领导者副本和追随者副本, 前者对外提供读写服务,后者不对外提供读写服务,只负责同步领导者副本的数据,副本机制可以通过冗余存储来保证数据不丢失。
注意: 生产者总是向领导者副本写消息,而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事: 向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的数据同步。
追随者副本只负责同步领导者副本数据,通过追随者副本进行选举实现故障转移。
- Partitioning(分区): 在数据量很大的情况下,单一的领导者副本会积累太多数据, 以至于单台Broker机器都无法容纳了,那么此时就应该考虑将数据分成多份保持在不同的Broker上,这种机制被称为分区,可以类比: ES和Redis中的分片机制。
分区是一个实实在在的物理存在队列数据结构用于存储数据,占用系统内存以及磁盘数据存储等资源。
Kafka中的分区机制是将每个主题划分成多个分区,每个分区是一组有序的消息日志,一个Topic包含多少个分区取决于该主题下的商品处理的吞吐量能力需求。
生产者生产的每条消息会被发送到其中一个分区中,具体发送到哪个分区由具体的消息路由策略决定,默认为轮询策略。
Kafka的分区编号从0开始。
副本是在分区层级定义的,每个分区下可以配置若干个副本,其中只能有1个领导者副本和N-1个追随者副本。生产者向分区写入消息,每条消息在分区中的位置信息由一个叫位移的数据来表示,分区位移总是从0开始。
- Record(消息) : Kakfa中消息格式如下
如果我们发送消息时,消息的key值为空,Kafka默认采用轮询的方式将消息写入当前主题的各个分区中。
如果我们指定了消息的Key,那么相同key的消息会被写入同一个分区中,这样我们就能保证具有相同key的消息按照一定的顺序进行写入:
- 主题,分区,副本,Broker 四者之间的关系是什么样子的呢?
- 第一层是主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。
- 第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。
- 属于同一个分区的不同副本会分散存储在不同的Broker上,以此达到高可用效果。
- 第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。
- 最后,客户端程序只能与分区的领导者副本进行交互。
Kafka如何持久化数据
Kafka使用消息日志来保持数据,一个日志就是磁盘上一个只能追加写消息的物理文件。追加写机制避免了缓慢的随机IO操作,改为性能较好的顺序IO操作,这也是Kafka实现高吞吐量特性的一个重要手段。
Kafka将消息日志切分为多个日志段,消息被追加写入到当前最新的日志段中,当写满了一个日志段后,Kafka会自动切分出来一个新的日志段,并将老的日志段封存起来,通过后台定时任务定期检查老的日志段能否被删除,从而实现回收磁盘空间的目的。
消费者
Kafka支持两种消息模型,即点对点模型和发布订阅模型,Kfaka通过引入消费者组的概念来实现这两种消费模型。
- 消费者组(Consumer Group)
- 多个消费同一主题数据的消费者线程,可以组成一个消费者组
- 一个消费者组可以订阅多个主题,消费多个主题下的数据
- 多个消费者实例共同组成一个组来消费一组主题,这组主题中的每个分区都只会被组内的一个消费者实例消费,组内的其他消费者实例不能消费它
多个消费者实例可以同时消费,从而加速整个消费端的吞吐量(TPS), 这里的消费者实例可以是运行消费者应用的进程,也可以是一个线程,它们都称为一个消费者实例。
分区副本数据同步机制
生产者和消费者只和分区的领导者副本(主分区副本)进行数据通信,分区的追随者副本(分区副本)负责同步领导者副本的数据。
这里涉及到kafka中一些技术名词:
- AR: 代表一个分区的所有副本集合,包括leader和follower副本
- ISR: 与主分区副本处于同步状态的分区副本集合,当然这里也会存在少量的数据延迟,通常我们说的ISR还会包括Leader自己。
- OSR:数据同步状态已经跟不上主分区副本的集合,可能由于网络等问题导致
kafka为了保证高可用(当leader节点挂了之后,kafka依然能提供服务)kafka提供了分区副本复制机制。这个副本是针对分区partition而言的,所以也可以被称为分区副本(partition replica),可以通过 default.replication.factor
对replica的数目进行配置,默认值为1,表示不对topic的分区进行备份。如果配置为2,表示除了leader节点,对于topic里的每一个partition,都会有一个额外的副本。
假如分区副本的Leader挂掉了,kafka会从剩余的分区副本中再选出一个作为Leader,继续提供服务。这种分区副本复制机制保证了:其中一台或几台服务器挂掉,kafka集群依然有分区副本可用。当然,如果存在某个副本的Broker服务都挂掉了,该分区就彻底地挂掉了。
什么样的分区会被判断为OSR?
- 判断一个follower分区副本是否与主分区同步,是通过判断从分区副本数据落后主分区副本数据的时间跨度来实现的,如果这个时间跨度大于replica.lag.time.max.ms配置的参数值(默认10秒),则认为从分区副本与主分区副本不同步,不同步的分区副本将被踢出ISR集合,放入OSR集合,如果后续该Follower分区副本逐渐赶上Leader副本的数据进度,还会自动回到ISR集合中。
OSR集合中的未同步副本是否可以作为Leader
与主分区副本处于同步状态的分区副本被称为ISR(包含Leader自己),数据同步状态已经跟不上主分区副本的从分区副本被称为OSR。
正常情况下,如果Leader挂掉,kafka肯定从ISR中选举一个Leader。但是假如ISR列表为空,就只能从OSR中选举Leader。下面的这个参数的作用就是配置:是否允许从OSR中选举Leader。
unclean.leader.election.enable=false
默认值是true,我们给它设置为false。因为当允许从OSR中选举Leader,并且Leader负责和客户端的数据通信,OSR内的分区副本数据数据是严重滞后的,不同步的,所以会导致数据丢失。
配置这个参数代表我们接受一种情况:宁可接受kafka服务挂掉,不提供服务;也不能接受苟延残喘的提供服务,最终导致数据丢失。
最小同步副本数
kafka生产者在进行数据发送的时候,可以设置一个参数叫做acks。如果acks=all,代表消息需要在所有的ISR分区副本都被持久化数据保存成功之后,Broker才可以告诉生产者,这个消息已经发送成功了。
- 上面的做法会产生什么问题?
- 如果[1,2,3]一共3个分区副本,1是Leader分区副本,其他是Follower分区副本。
- 由于网络的问题,可能[2,3]分区副本的数据同步进度远远落后于1分区副本,那么现在ISR集合中就只剩下1号分区副本自己。
那么问题出现了:
- 当生产者acks=all的时候,这种情况下只有1号Leader分区副本持久化成功,这个消息就算发送成功了,因为ISR集合中只有1。这个时候如果Leader分区副本也挂了怎么办,是不是数据就丢失了?
- 数据确实发出来了,而且回复生产者数据发送成功了
- Leader自己挂了,而Follower没有同步的数据
如何解决这个问题呢?
- 当ISR中只剩下Leader的时候,broker就不应该接收生产者发送的消息。宁可接受kafka服务挂掉,不提供服务;也不能接受苟延残喘的提供服务,最终导致数据丢失。
- 设置min.insync.replicas=n: 当生产者设置acks为“all”(或“-1”)时,该配置
min.insync.replicas
指定了写操作的最小副本数量(即数据同步的最小副本数量) - 大白话就是: 如果生产者acks=all,在发送消息时,Broker的ISR数量没有达到n,Broker不能接收这条消息,需要直接给生产者报错:
NotEnoughReplicas或NotEnoughReplicasAfterAppend
- 一个包含 3 个分区副本的主题,如果配置
min.insync.replicas>=2
,那么当只剩下一个Leader分区副本时,Leader分区副本就变成只读了(只能提供消费,不能接收生产数据)。这样可以有效的避免在kafka主题分区更换选举过程中,数据的写入和读取出现非预期的行为。
重平衡
假设消费者组内某个实例挂掉了,Kafka能够自动监测到,然后把这个Failed实例之前负责的分区转移给其他活着的消费者,这个过程就是Kafka中臭名昭著的"重平衡"。
每个消费者在消费消息的过程中通过消费者位移字段记录它消费到了分区的哪个位置上。
这里的位移和分区在消息内的"位移"不是一个概念,消息在分区中的位移表示的是消息在分区内的位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了。
而消费者位移不同,它随着消费者的消费进度而不断往前推移,另外每个消费者有着自己的消费者位移,因此一定要区分这两类位移的区别:
- 分区位移: 消息在分区中的位移
- 消费者位移: 消息者消费进度指示器
概念总结
- 消息: Record
- 主题: Topic ,承载消息的逻辑容器,主要用来区分具体的业务
- 分区: Partition ,一个有序不变的消息序列,每个主题下可以有多个分区
- 消息位移: Offset ,分区中每条消息的位置信息
- 副本: Replica ,用于实现数据的冗余存储,副本又分为领导者副本和追随者副本,副本是在分区层级之下的,即每个分区可配置多个副本以此来实现高可用
- 生产者: Produce ,向主题发布消息
- 消费者: Consumer ,从主题拉取消息
- 消费者位移: Consumer Offser ,表示消费者的消费进度,每个消费者都有自己的消费者位移
- 消费者组: Consumer Group ,多个消费者实例共同组成一个组,同时消费多个分区以此来实现高吞吐量
- 重平衡: Rebalance ,消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分片订阅主题分区的过程。Rebalance是Kafka消费者端实现高可用的重要手段。
此图来源: 极客时间Kafka核心技术与实战第二讲
思考: 为什么Kafka中的追随者副本不能像主从模型中的从节点一样对外提供读服务呢?
- 读写分离适合什么场景?
- Redis和Mysql都支持主从读写分离,因为对于它们来说通常读操作会比写操作更多,因此采用读写分离架构,可以通过添加很多follower横向扩展,提升读操作性能。
- Kafka作为消息引擎而不是以数据存储的方式对外提供读服务,通常涉及频繁的生产消息和消费消息操作,此时读写操作其实差不多,读写分离架构在这种场景下无法起到太大性能提升作用。
- 写多读少的场景促使Kafka做出了何种选择?
- Kafka副本机制使用异步消息拉取,因此存在leader和follower之间的不一致性,如果要采用读写分离架构,必然要处理副本异步拉取数据引入的数据一致性问题,比如: read-your-writes,如何保证单调读以及处理消息因果顺序颠倒的问题。
- 相反地,如果不采用读写分离,所有客户端读写请求都只在leader上处理也就没有这些问题了。
- 但是,全局消息顺序颠倒的问题在Kafak中依然存在,最简单的解决办法就是采用单分区。
本部分结论引用来源:
消息分区机制
为什么分区
Kafka的消息组织方法为三级结构: 主题-分区-消息 , 主题下的每条消息只会保存在某一分区中,而不会在多个分区中被保存多份。
分区的主要作用就是提供负载均衡的能力,从而实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度进行的,这样每个节点的机器都能够独立执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
分区也可以用来实现一些业务级别的需求,比如实现业务级别的消息顺序的问题。
消息的路由策略
这里的消息路由是生产者决定将消息发送到哪个分区的算法,Kafka为我们提供了默认的分区策略,为轮询策略,同时也支持我们自定义分区策略。
Kafka消息模型
- 分区是最小的并行单位
- 一个消费者可以消费多个分区
- 一个分区可以被多个消费者组里的消费者消息
- But: 一个分区不能同时被同一个消费者组里的多个消费者消费
点对点模型
系统A发送的消息只能被系统B接收,其他任何系统都不能读取A发送的消息。
Kafka实现点对点方式,可以把所有消费者归于一个消费者组,这样生产者向主题发送的消息只能被订阅该主题的消费者组中一个消费者进行消费:
发布订阅模式
可能存在多个消费者向相同的主题发送消息,订阅者也可能存在多个,他们都能接收到相同主题的消息。
Kafka实现发布订阅方式,可以把每个消费者归于不同的消费者组,这样生产者向主题发送的消息可以被所有订阅该主题的消费者进行消费:
消息顺序
- 生产顺序
- 同一个生产者发送到同一个分区的消息,先发送的Offset比后发送的Offset的小。
- 同一个生产者发送到不同分区的消息,消息顺序无法保证。
- 消费顺序
- 消费者按照消息在分区里的存放顺序进行消费
- Kafka只保证分区内的消息顺序,不能保证分区间的消息顺序
如何保证消息顺序
- 设置一个分区,这样就可以保证所有消息的顺序,但是失去了扩展性和性能
- 支持通过设置消息的key,相同的key的消息会发送到同一个分区
消息传递语义
- 最多一次 — 消息可能会丢失,永远不重复发送
- 最少一次 — 消息不会丢失,但是可能会重复
- 精确一次 — 保证消息被传递到服务器端且在服务器不重复
以上三种消息传递语义需要生产者和消费者合力完成。
- 生产者最多一次: 生产者发送完消息后,就认为消息发送成功
- 生产者最少一次: 生产者发送完消息后,如果没接收到Broker的确认,那么便会重试
- 消费者至少一次
- 消费者最多一次
- 精确一次: 在Kafka 0.11.0版本之后实现
生产者
Kafka生产者客户端发送数据流程如下:
- 主线程调用KafkaProducer发送数据,数据不是之间发送给kafka broker服务端,而是先缓冲起来
- 异步线程sender负责将缓冲数据发往kafka broker服务端
- 使用缓冲可以避免高并发请求造成服务端压力,并且还可以利用缓冲实现数据批量发送。
- 异步sender线程负责数据发送,避免了主线程发送数据阻塞,造成核心业务响应延迟。
API使用
- 引入Kafka客户端依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
- 设置生产者客户端参数
ProducerConfig:设置生产者客户端的一系列配置参数。
private Properties props;
@BeforeEach
public void prepareTest(){
props = new Properties();
//kafka broker列表
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_ADDRESS);
//可靠性确认应答参数
props.put(ProducerConfig.ACKS_CONFIG,"1");
//发送失败,重新尝试的次数
props.put(ProducerConfig.RETRIES_CONFIG,"3");
//生产者数据key序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//生产者数据value序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
}
消息对象–ProducerRecord
ProducerRecord是Kafka用来封装消息的实体对象,里面包含了很多信息,主要有以下几个构造函数:
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {}
public ProducerRecord(String topic, Integer partition, K key, V value) {}
public ProducerRecord(String topic, K key, V value) {}
public ProducerRecord(String topic, V value) {}
- topic:主题
- partition: 主题的分区编号,编号从0开始,表示消息发送到哪个分区
- timestamp: 时间戳,默认为当前时间戳
- key: 消息的键,可以是不同的数据类型,但是通常为字符串类型,具有相同key的消息会被路由到同一个分区,从而可以确保消息的有序性
- data: 消息数据,可以是不同的数据类型
- headers: 消息自定义头信息
三种数据发送方式
Kafka生产者客户端由三种发送消息的方式:
- 只发不管结果(fire and forget) : 生产者将消息放入缓冲区中后,就认为消息发送成功,直接返回,由于Kafka是高可用的,因此大部分情况下消息都会成功写入,但在异常情况下会丢失消息
- 同步发送(sync send): 调用send方法返回一个Future对象,利用Future对象的get方法阻塞等待消息发送完毕。
- 异步发送(async send): 调用send方法时提供一个回调方法,不会阻塞当前线程,当接收到Broker结果后回调此方法
下面分别演示三种消息发送方式。
可以设置打开DEBUG或者TRACE日志查看数据发送详细日志过程
- 生产者异步发送消息: 生产者将消息放入缓冲区中后,就认为消息发送成功,直接返回
- KafkaProducer:生产者对象,用来发送数据
- ProducerRecord:每条数据都要封装成一个ProducerRecord 对象
@Test
public void sendAsyncWithNoCallBack() throws ExecutionException, InterruptedException {
//ProducerRecord: 主题名,key,val
sendMsgTemplate(data->new ProducerRecord<>(TEST_TOPIC,data,"val: "+data),null);
}
private void sendMsgTemplate(Function<String, ProducerRecord<String,String>> recordCallBack, Callback callback) {
//try-with-resource -- 自动释放资源 -- 因为KafkaProducer实现了AutoCloseable接口
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < 20; i++) {
//send方法参数: ProducerRecord<K, V> record, Callback callback
producer.send(recordCallBack.apply(Integer.toString(i)),callback);
}
}
}
- 异步发送消息,带回调方法
@Test
public void sendAsyncWithCallBack() throws ExecutionException, InterruptedException {
//ProducerRecord: 主题名,key,val
sendMsgTemplate(data->new ProducerRecord<>(TEST_TOPIC,data,"val: "+data),((recordMetadata, e) -> {
if(e==null){
System.out.println("消息发送成功,消息所在分区为:" + recordMetadata.partition()+" ,在分区中的位移为:"+recordMetadata.offset());
}else{
e.printStackTrace();
}
}));
}
回调函数会在生产者收到ack之前异步调用,该方法有两个参数: RecordMetadata和Exception :
- 如果消息发送成功,则异常对象为null,消息元数据对象不为null
- 如果消息发送失败,则消息元数据对象为null,异常对象不为null
消息发送失败会自动重试,不需要在回调函数中手动重试,重试次数由参数retries设置,重试次数达到上限后,仍然发送失败才会有exception存在。
- 同步发送消息
@Test
public void sendSync() throws ExecutionException, InterruptedException {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TEST_TOPIC, Integer.toString(520), "val=520"));
RecordMetadata recordMetadata = future.get();
if(recordMetadata!=null && recordMetadata.hasOffset()){
System.out.println("消息同步发送成功: "+recordMetadata.offset());
}
}
}
producer的send方法返回对象是Future类型,因此可以通过调用Future对象的get()方法阻塞,等待发送结果的响应,从而达到同步发送消息的效果。这里的同步是指,一条消息发送后会阻塞当前线程,直至返回ack消息。
- 批量发送消息
@Test
public void sendBatch() throws InterruptedException, ExecutionException {
//满足下面两个条件其中之一就会把消息缓冲区中的消息进行批量发送
//每一批消息的大小: 默认是16KB
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//延迟时间
props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
sendMsgTemplate(data -> {
try {
//模拟业务请求耗时,使得能够满足上面延迟时间条件,触发批量发送
Thread.sleep(1000);
return new ProducerRecord<>(TEST_TOPIC, data, "val: " + data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, null);
}
生产者发布确认机制
- acks参数: 可选值有 all , 1 , 0 , 默认值为1
- 0 : fire and forget,也就是我发了就算完了,后续成不成功我都不管,这种设置下消息的高可靠性几乎没有保障,但是有极大的吞吐量。
- 1 : 写入主节点就算成功,这种设置,可以保障一定的高可靠性,也具有不错的吞吐量。
- all / -1 : 也就是写入 ISR 中所有的副本才算成功,这种设置下,就能提供较高的高可靠性,但是吞吐量就相对较低。
props.put("acks",1);
- retries参数: 生产者重试次数,默认为0
props.put("retries",0);
acks和retries配合使用,就可以产生不同的消息传递语义:
- 至多一次: acks=0 or acks=1
- 至少一次: acks=-1 or akcs=all 并且 retries>0
消费者
分区与消费者组回顾
消费者组是由一组具有共同消费特征的消费者组成的集合,这些消费者共同消费一个或多个主题。
由于单个消费者无法满足某个主题下的数据处理速度,所以需要多个消费者来负载,这是消费者组出现的一个重要原因。
每一个消费者组内的消费者都具备一个消费者组ID,在创建消费者的时候,我们可以指定消费者所属的group id,如果不指定,默认值在kafka安装目录/config/consumer.properties
文件中定义,为test-consumer-group
。
还有一点是反复强调的: 一个分区只能被消费者组里面的一个消费者消费。
- 当某个主题的分区数量,大于订阅它的消费者组内的消费者数量时,会出现以下情况:
分区会尽量均衡的分给消费者组内的多个消费者
- 当某个主题的分区数量,小于订阅它的消费者组内的消费者数量是,会出现以下情况:
四个分区六个消费者,会有两个消费者处于空闲状态,因此如果分区数没有匹配消费者数量,创 建再多的消费者也不会提高数据消费速率。
- 最佳的状态是分区数与消费者数量相等
如果我们发现某一个主题的消费数据积压的时候,首先想到的应该是优化消费者数据消费的程序,提高数据处理效率,如果仍然无法满足需求,则同步加大主题的分区数量以及消费者组内的消费者数量,让二者保持一致。
- 多主题,多消费者组
- 一个分区只能被消费者组里面一个消费者消费
- 一个消费者可能会消费某个主题内的多个分区
- 最好的状态是主题的分区数等于消费者组内消费者数量
- 如果某个消费者组内消费者数量大于其订阅的主题的分区数,会发生多出来的消费者空转的现象
消费者组数据积压查看与处理
所以查看数据消费进度,或者消息数据是否积压,是以“消费者”组为单位进行查看的。
可以以下通过命令行查看某个消费者组的消费进度情况
./kafka-consumer-groups.sh --bootstrap-server kafka-0:9092 --describe --group dhy-group
响应结果格式如下:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
topic2 0 241019 395308 154289
topic2 1 241019 398817 157799
topic1 0 854144 855809 1669
- 一个主题的一个分区信息占一行
- 该消费者组订阅了三个主题:topic1(一个分区0)、topic2(两个分区0、1)
- CURRENT-OFFSET是当前数据消费进度的偏移
- LOG-END-OFFSET是当前分区已经接收到的数据信息偏移
- LAG = (LOG-END-OFFSET - CURRENT-OFFSET)。LAG的值越大说明数据积压越严重。
如何看待数据积压?
- 如果某个分区一天平均2000完数据,积压量稳定在1万,并在对用户正常使用也没有影响,那么就不算数据积压,如果异常状态下,LAG飙到了几十万,那么这就是数据积压了。
如何解决数据积压?
- 优化消费端消费业务逻辑,提供数据处理速度
- 增大发生积压的主题的分区的数量,同时增大消费该主题分区的消费者的数量,让其等于该主题的分区数
- 借助sentinel等限流工具,进行流量控制
增大主题分区数量的命令如下:
./kafka-topics.sh --alter \
--bootstrap-server kafka-0:9092,kafka-1:9092,kafka-2:9092 \
--topic test2
--partitions 4
注意:主题的分区数只能增大,不能减小。
API使用
复习:
- Kafka中有一个主题_consumer_offsets , 用来保持消费者消费到哪个主题,哪个分区的哪个消费位置,这样一旦某个消费者进行了重启,可以快速恢复到上一次的消费位置。
- 消费者在拿到消息后,会将消息位置存储到_consumer_offsets这个主题的分区下面,下次读取时,就会返回下一个消费位置。
- 引入Kafka客户端依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
- 设置消费者客户端参数
public class KafkaConsumerTest {
private static final String TEST_TOPIC = "test1";
private Properties props;
@BeforeEach
public void prepareTest() {
props = new Properties();
//kafka集群信息
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_ADDRESS);
//消费者组名称
props.put(ConsumerConfig.GROUP_ID_CONFIG, "dhy_group");
//key的反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//value的反序列化器
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
}
}
- 设置消费者消费消息的模板方法
/**
* recordConsumer针对单条数据进行处理,此方法中应该做好异常处理,避免外围的while循环因为异常中断。
*/
public void consumeTemplate(Consumer<ConsumerRecord<String,String>> recordConsumer,Consumer<KafkaConsumer<String,String>> afterCurrentBatchHandle){
//1.创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//2.订阅Topic--支持正则表达式: consumer.subscribe("test.*");
consumer.subscribe(Collections.singletonList(TEST_TOPIC));
try {
while (true) {
//循环拉取数据,
// Duration超时时间,如果有数据可消费,立即返回数据
// 如果没有数据可消费,超过Duration超时时间也会返回,但是返回结果数据量为0
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
recordConsumer.accept(record);
}
afterCurrentBatchHandle.accept(consumer);
}
} finally {
//退出应用程序前使用close方法关闭消费者,
// 网络连接和socket也会随之关闭,并立即触发一次再均衡
consumer.close();
}
}
private static void printRecord(ConsumerRecord<String, String> record) {
System.out.println("topic:" + record.topic()
+ ",partition:" + record.partition()
+ ",offset:" + record.offset()
+ ",key:" + record.key()
+ ",value" + record.value());
record.headers().forEach(System.out::println);
}
消费偏移量
Kafka每个消费者客户端消费一个分区的数据,同时会使用消息位移来标识当前的消费进度,该位移也被称为消费者偏移量(Consumer Offset):
- 对于一个消费者组而言,记录的是该消费者组在多个分区的消费进度,也就是一组<K,V>对,Key 是分区对象,V 对应 Consumer 消费该分区的最新偏移量
- 上图中黄色代表分区内未消费的消息数据,绿色代表已经被消费的消息数据。消费者偏移量是即将消费的下一条消息的偏移量,而不是目前已经消费完成的消息的偏移量。
- 消费者消费完成的消息数据会进行偏移量提交,这样在 Consumer 发生故障重启之后,就能够从 Kafka 中读取该消费者组之前提交的偏移量,然后从相应的偏移处继续消费。
HighWatermark
HighWatermark简称HW,代表高水位,高水位及高水位之后的消息已经在分区内实际物理存在,但是不能被消费。
- HighWatermark的偏移量大于Consumer Offset的偏移量
- Log End Offset(LEO)是指即将追加到当前副本的最后一个消息的偏移量
HW的作用是什么呢? HW至LEO之间的消息为什么不能被消费呢?
- 首先Follower副本的数据是从Leader副本同步过来的,kafka的生产者消费者都只和Leader副本进行通信
- 其次由于Follower副本与Leader副本之间数据存在同步延迟问题,所以一定存在某个时刻Follower副本数据写入进度落后Leader副本
- 所以HW代表的就是该分区所有ISR副本能够同步达到的日志偏移量,如下图所示:
不允许HW及其之后的偏移量的消息被消费,是为了避免发生分区Leader重新选举时,切换到Follower2,无法实现消费数据进度的同步。
自动提交和手动提交(同步提交和异步提交)
Consumer需要向Broker提交自己消费某个分区的偏移量,偏移量的提交方式又分为自动提交和手动提交,从是否阻塞的角度看,又可以分为同步提交和异步提交。
以下消费者演示用例会使用到上面给出的消费者消费模板方法
- 自动提交—至多消费一次: 当我们将客户端的
ENABLE_AUTO_COMMIT
参数设置为true时,消费者会为我们定期自动提交偏移量,提交的时间间隔由参数AUTO_COMMIT_INTERVAL_MS
控制。
@Test
public void consumeWithAutoCommit(){
//下面两个属性用于设置自动提交---只要消息被拉取到,并且到了指定的间隔时间,消息便会自动提交
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
consumeTemplate(KafkaConsumerTest::printRecord,null);
}
注意:
- 由于消费者是单线程的,所以实际情况下,可能并不是每隔
AUTO_COMMIT_INTERVAL_MS
就提交一次偏移量,具体执行流程如下:- 通过poll获取一批消息然后进行消费
- 在poll下一批数据时,判断上一次poll数据的时间间隔是否大于
AUTO_COMMIT_INTERVAL_MS
,如果大于,就自动提交偏移量 - 这里自动提交的偏移量,是上一批次完成消费的数据的偏移量offset
- 手动同步提交(批量提交)—至少消费一次: 自动提交方式,只能在poll方法被调用的时候才能提交偏移量,如果我们希望在程序处理的任意位置提交偏移量,就需要考虑采用手动提交的方式了。
- 设置客户端的
ENABLE_AUTO_COMMIT
参数为false - 使用commitSync完成偏移量的手动同步提交
- 设置客户端的
@Test
public void consumeWithNoAutoCommitWithSyncBatch(){
//设置为手动提交--默认值
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//缓冲记录,用于批量手动提交
List<ConsumerRecord<String,String>> buffer=new ArrayList<>();
final int minBatchSize=3;
consumeTemplate(buffer::add, consumer->{
if(buffer.size()>=minBatchSize){
//数据操作
buffer.forEach(KafkaConsumerTest::printRecord);
//批量提交
consumer.commitSync();
//清空缓存
buffer.clear();
}
});
}
注意:
- commitSync是一个同步方法,直到偏移量被成功提交之前都处于阻塞状态
- commitSync同步提交会在失败之后进行重试,重试仍然失败会抛出CommitFailedException异常
- commitSync同步方法会阻塞消费线程,因此针对消息消费速度要求较高的业务场景要尽量避免使用。
- 手动异步提交(批量提交)—至少消费一次: 使用异步提交通常使用带回调函数的commitAsync,如果偏移量提交失败,进行日志记录或者异常处理
@Test
public void consumeWithNoAutoCommitWithAsyncBatch(){
//设置为手动提交--默认值
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//缓冲记录,用于批量手动提交
List<ConsumerRecord<String,String>> buffer=new ArrayList<>();
final int minBatchSize=3;
consumeTemplate(buffer::add, consumer->{
if(buffer.size()>=minBatchSize){
//数据操作
buffer.forEach(KafkaConsumerTest::printRecord);
//批量提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if(exception!=null){
//异常处理
}
}
});
//清空缓存
buffer.clear();
}
});
}
注意:
- commitAsync 的问题在于提交偏移量出现异常时它不会自动重试
- 同步提交结合异步提交: 阶段性手动提交,为了避免阻塞,调用commitAsync异步提交方法,一旦消费者线程出现异常,调用commitSync方法执行同步阻塞提交,以确保Consumer关闭前能够成功提交偏移量
@Test
public void consumeWithNoAutoCommitCombineSyncBatchAndAsyncBatch() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList(TEST_TOPIC));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
records.forEach(KafkaConsumerTest::printRecord);
consumer.commitAsync();
}
} catch (CommitFailedException e) {
//记录错误日日志,进行告警处理
e.printStackTrace();
} finally {
consumer.commitSync();
consumer.close();
}
}
重复消费问题
- 什么情况下会产生重复消费呢?
- 假设我们一次性拉取了500条消息,但是Consumer消费到第200条消息时,奔溃重启
- 由于之前199条消息已经消费完成,但是消费位移还未提交,所以再次消费时仍然拉取上一次那500条数据,这会导致0~199区间范围的消息被重复消费
注意:
- 只要数据时批量消费,并且偏移量采用批量提交,就无法避免重复消费的问题,无法是手动提交还是自动提交,无论是同步提交还是异步提交
避免重复消费的最简单方法就是每消费一条消息,就将这条消息的偏移量进行提交。
- 手动提交—单个提交
@Test
public void consumeWithNoAutoCommitWithSingle() {
//设置为手动提交--默认值
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TEST_TOPIC));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
while (true) {
//一次性请求尽可能多的数据
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
printRecord(record);
//记录下offsets信息
offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1));
//当前消息处理完,就提交当前消息的偏移量
consumer.commitAsync(offsets,null);
}
try {
//处理完当前批次的消息,在拉取下一批消息前,调用commitSync方法提交当前批次最新消息
consumer.commitSync(offsets);
}catch (CommitFailedException e){
e.printStackTrace();
}
}
}
注意:
- 虽然单个提交的方式能够避免消息被重复消费,但是效率却很低,所以更建议采用批量消费
- 避免消息重复消费的最好方法还是保证消费者端程序的健壮性,充分测试,避免因为数据格式等问题出现异常,一旦出现异常做好告警和日志记录
数据丢失问题
通常情况下,我们都是在处理完当前批次消息后,才会去提交这个批次数据的偏移量,所以只要异常处理得当,是不存在数据丢失问题的。
但是在某些场景下,还是存在数据丢失风险的,如下图所示:
Consumer一次性去了300条数据,然后将消息转交给一个单独的线程池处理,然后主线程就继续往下执行,提交这300条消费的偏移量。
假设线程中有三个线程,每个线程负责处理一百条消息,但是线程2和线程3处理消息时发生异常,由于主线程已经将偏移量进行了提交,那么子线程执行失败的那些数据,就永远不会被消费了,这就产生了数据丢失问题。
如果想避免这些问题就不要用线程去处理消息数据,因为消费者组包含的多个消费者本身就是多线程,就不要在消费者的代码里面再去开启多个线程了。
指定分区偏移量进行消费
有时候由于某些故障原因,可能需要执行数据补救措施,这时候就需要针对某个主题或者分区从指定offset开始消费。
- seek指定消费某个主题分区,指定offset(long)开始进行消费
- seekToBeginning 从某个主题分区的队列头部(offset=0)的位置开始消费
- seekToEnd 从某个主题分区的队列的尾部(offset=最新一条数据的offset)的位置开始消费。
例如:
//指定消费者消费主题及分区
String topic="foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0,partition1));
//指定从foo主题的partition0分区的offset=1的位置开始消费
consumer.seek(partition0,1);
auto.offset.reset
参数含义: 当服务端各分区下,没有消费者提交的offset或者通过程序指定消费的offset不存在时,该如何开始一次新的消费。
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
该参数也是指定从指定位置进行消费的重要配置参数,该参数有三种取值:
earliest、latest、none
注意: 没有消费者提交的Offset存在两种情况
- 有可能这个主题的分区是新创建的,之前没有消费者消费过
- 有可能消费者组是新创建的,这个消费者组之前没有消费过这个分区
参数值解析:
- earliest(默认): 当各分区下有已提交的offset时,从提交的offset位置开始消费;无提交的offset时,从头开始消费。
- latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费该分区下最新产生的的数据。
- none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
小结
本文作为Kafak入门篇学习笔记整理,重点整理了Kafka的安装过程(待补充完善),Kafka核心概念和Kafka生产者和消费者简单的API使用。
在后续的Kafak基础篇中,将会针对Kakfa部分进阶知识进行整理,还有SpringBoot整合Kafka的使用。
相关文章
- Kafka + Zookeeper集群搭建
- kafka安装完整步骤_kafka集群搭建详细步骤
- 【云原生】一文搞懂Kafka中的listeners和advertised.listeners以及其他通信配置
- Flume+Kafka整合案例实现
- Kafka入门实战教程:学习总结目录索引
- Kafka如何删除topic中的部分数据_kafka修改topic副本数
- 说一说你对 Kafka 中 ISR 的理解
- 慕了,我要是早点看到这篇写 Kafka 的分区管理的文章就好了
- KafKa主题、分区、副本、消息代理
- Kafka消费者
- kafka源码解析之十七消费者流程(客户端如何获取topic的数据)详解编程语言
- kafka源码解析之二kafka内部的专业术语详解编程语言
- 深入探究Kafka与Redis的对比(kafka与redis)
- 的数据同步从MySQL到Kafka:实现实时数据同步(mysql到kafka)
- Linux环境下部署Kafka服务器实践(linux kafka)
- Oracle与Kafka的联合应用突破传统数据处理极限(oracle与kafka)
- 比较Redis vs Kafka(redis还是kafka)