zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Kafka:可靠性保证

2023-04-18 16:46:22 时间

摘要

系统设计之初,就该保证可靠性,这样才能保证数据不丢失。kafka也是如此,因此它支持集群部署,消息也有副本备份,有确认和重发机制,以及文件的保存和删除,它们具体怎样工作的,这里探讨下。

代理节点控制器

控制器controller是一个broker代理服务节点,除了具有一般broker的功能外,还负责分区leader的选举。集群里第一个启动的broker会在zookeeper里创建/controller节点,让自己成功控制器。而其它broker启动时发现/controller控制节点已存在,则会在控制节点上创建zookeeper watch对象,观察这个节点的变更,这样只有一个控制器存在,其它的节点只是观察。

若控制器被关闭或与zookeeper断开连接,/controller节点会消失,其它从节点观察到后都会申请/controller,第一个创建控制节点的broker会成为新控制器,接着其它节点会观察新的控制节点。

当控制器观察到普通broker离开集群时,会给那些失去leader(leader刚好在断连的broker上)的分区们重新选举出行leader,新leader分区选举出来后,则继续处理生产者和消费者的请求。而剩下的fllower会复制新leader的消息。

当控制器发现 普通broker 加入集群时,它会使用 broker.id来检查新加入的 broker 的分区的副本。把变更通知发送给新加入的 broker和其他 broker,然后新broker上的follower分区开始从leader分区那里复制消息。

分区副本

一个主题有多个分区,但每个分区都有多个副本,用副本复制系数用replication.factor指定,默认为3,这些副本只有一个是leader,其它的都是fllower。为了保证一致性,所有生产和消费请求都是leader副本处理,fllower只是复制leader的消息并保持状态一致。若leader崩溃则其它follower的其中一个会被提升为leader。

follower为了与leader保持一致,会发送获取数据偏移量请求,通过查看follower的最新偏移量,leader可以知道follower的同步进度,若follower在10s内没有发送请求或者没有请求最新的数据,则该follower会被认为不同步,当然,follower还与zookeeper有心跳检测(6s内发送过心跳),若follower与zookeeper断连了,则也是认为follower不同步,不同步的follower默认是不能选举成为leader。但设置了unclean.leader.election.enable=true,则不同步的follower也能成为leader(不完全选举),对可靠性要求高的不建议设置为true。

分区除了当前首领,还有首选首领,比如一开始主题创建时就确定了首选首领,这个首选首领一般是各分区首领在broker上的分布确定的,确保每个broker都均衡处理生产消费请求。而中间可能发生首选首领崩溃,跟随者变成了当前首领。若kafka的auto.leader.reblance.enable=true时,它会检查首选首领是否是当前首领,若不是,且该副本是同步的,则会触发首领选举,使之成为当前首领。

一般来说,各副本会分散存储在broker上,且leader是均衡分布的,即使其中一个broker挂掉,部分leader丢失,还是可以通过follower选出新leader进行消息处理。由broker集群和分区副本保存同步可以保证存储消息的可靠性。

客户端会向任意broker发送元数据请求,因为元数据存储在所有broker上,broker都存储着所有主题信息,有哪些分区副本,以及leader副本在broker的分布情况。客户端获取元数据后会缓存到本地,并在metadata.max.age.ms间隔时间发送元数据请求刷新缓存,客户端再向主题的leader分区副本所在的broker发送请求进行写入或消费消息。

最少同步副本

在主题和broker上,最少同步副本参数都是 min.insync.replicas,尽管集群和分区,都有副本同步机制,尽管配置了3个副本,但还是会出现只有一个同步副本的情况甚至没有同步副本的情况,若唯一的一个同步副本不可用或无同步副本,则数据有丢失风险。设置最少同步副本为2,则生产者发送消息,若其中一个副本不可用,则broker不会接收消息,因为此时同步副本只有1个,另一个不可用,生产者会收到broker返回的NotEnoughReplicasException异常,此时消费者是可以消费的,这样该分区只变成只读了。为了恢复不可用分区,一般要重启broker,让不可用分区变成可用,这样才能继续变成同步分区,使同步分区数量满足要求,生产者才能成功写入消息到broker中。

发送消息

生产者发送消息后,需要确认broker是否收到消息,总共有三种策略,用acks参数指定,acks=0,则生产者发出消息后不管,不会等待broker响应。acks=1,则生产者发出消息后,broker将消息写入leader分区就返回响应,可以确保消息到达leader副本。若acks=all,则生产者发出消息后,broker将消息写入leader并且所有同步follower都同步后,broker才返回响应。即可以确保消息到达所有leader和同步follower分区副本,同步follower数量可以用 min.insync.replicas参数指定。

当acks=0,是,生产者发送消息后返回的元数据信息里的offset是-1,不会去读取真正的offset。当acks=1或acks=all,返回的元数据的offset才是真正的偏移量。

发送消息失败,比如网络异常或此时leader在重新选举,生产者会重试发送消息,因为这些问题可能在几秒钟内解决,重发消息可以保证消息到达broker。但重试也有可能造成消息重复发送,比如,生产者发给broker消息,broker成功接收并写入到分区,返回ack给生产者,但ack返回时,网络异常,生产者没有收到返回的ack,若配置了重试的话,生产者会重新发送消息给broker,导致broker收到两条相同消息。所以建议发送的消息内容可以加入UUID,然后在消费端也用UUID确认,确保每个UUID对应的消息只消费一次。

获取消息

消费者获取从broker的leader分区获取的消息,都是已同步到follower副本的消息。这样可以保证数据一致性,各消费者消费到的消息是一致的,不至于leader崩溃后,同消费者返回一个不存在的消息ack给新leader或不同的消费者漏了消息。

消费者消费了消息后,若设置了自动提交enable.auto.commit=true,则会自动提交偏移offest给broker,偏移就是消费者消费消息的进度,一般每5秒提交一次,这个提交间隔可用auto.commit.interval.ms来设置。当然,消费者也可以设置手动提交,然后手动可以调用同步或异步提交offest。当然,这里提交offest和生产者确认ack一样,可能提交的offset由于网络原因broker没有收到,从而消息被消费者重复拉取。

还有一个参数,auto.offset.reset,指定了没有偏移量提交时(比如消费者第一次启动)消费者获取消息的策略,若为esrliest,则分区会从起始位置开始消费消费,可能会导致重复消费消息,若为laset,则消费者会从分区末尾开始读取消息,即只消费它连线后分区里接收的消息。

消费者重复消费消息可能没法避免,因此还是建议生产和消费端用UUID标识业务消息,确保每个UUID对应的消息只消费一次,或者消费代码保持幂等性,即使消费相同的消息也能保证结果相同准确。

文件管理

kafka虽然会保留消息,但不会一直保留消息,broker为每个主题设置了数据保留期限,会在规定时间后删除消息。具体消息是保存在分区上的,但实际物理存储,每个分区是由多个片段文件保存在硬盘上的,每个片段保存1G或一周的数据。活跃片段(正在写入但未达到1G或1周)是不会删除,即使消息设置了保留1天,但活跃片段即使在第六天还未达到1G,那么活跃片段仍有6天的消息,消息并不是真正的保留一天,消息的真正删除是以分区的片段文件为准来保留和删除的。