Apache Kafka源码分析 - kafka controller
前面已经分析过kafka server的启动过程,以及server所能处理的所有的request,即KafkaApis
剩下的,其实关键就是controller,以及partition和replica的状态机
这里先看看controller在broker server的基础上,多做了哪些初始化和failover的工作
最关键的一句,
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, onControllerResignation, config.brokerId)
kafka.server.ZookeeperLeaderElector
参数的含义比较明显,注意两个callback,是关键
class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: String, onBecomingLeader: () => Unit, onResigningAsLeader: () => Unit, brokerId: Int)
做两件事,
1. 建立watcher,去listen “election path” in ZK, "/controller";
当controller发生变化是,可以做相应的处理
controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
LeaderChangeListener
监听这个path的两个行为,
handleDataChange,即controller发生了变化,这个比较简单,只需要把local的leaderid更新一下就好
leaderId = KafkaController.parseControllerId(data.toString)
handleDataDeleted,这种情况即,controller被删除了,一般是挂了
挂了就重新做elect,但是多个判断,如果本来我就是leader,那先调用onResigningAsLeader,即onControllerResignation,做些清理的工作,比如deregister watcher,关闭各种状态机
if(amILeader) onResigningAsLeader() elect
2. elect, 试图去创建EphemeralPath,从而成为Controller
用createEphemeralPathExpectConflictHandleZKBug,试图去创建EphemeralNode,
如果不成功说明已经被别人抢占
成功就说明我成为了controller,调用onBecomingLeader(),即onControllerFailover
KafkaController.onControllerFailover
这个callback就是controller初始化的关键,
/** * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller. * It does the following things on the become-controller state change - * 1. Register controller epoch changed listener * 2. Increments the controller epoch * 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and * leaders for all existing partitions. * 4. Starts the controller's channel manager * 5. Starts the replica state machine * 6. Starts the partition state machine * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller. * This ensures another controller election will be triggered and there will always be an actively serving controller */ def onControllerFailover() { if(isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) //开始日志 //read controller epoch from zk readControllerEpochFromZookeeper() // increment the controller epoch incrementControllerEpoch(zkClient) //增加ControllerEpoch,以区分过期 // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() //从zk读出broker,topic的信息以初始化controllerContext replicaStateMachine.startup() //启动状态机 partitionStateMachine.startup() // register the partition change listeners for all existing topics on failover controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) //为每个topic增加partition变化的watcher info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) //完成日志 brokerState.newState(RunningAsController) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() /* send partition leadership info to all live brokers */ sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) //如注释说,用于成为controller,需要把partition leadership信息发到所有brokers,大家要以我为准 if (config.autoLeaderRebalanceEnable) { //如果打开outoLeaderRebalance,需要把partiton leader由于dead而发生迁徙的,重新迁徙回去 info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS) } deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover") }
这里最后再讨论一下createEphemeralPathExpectConflictHandleZKBug
这个函数看着比较诡异,handleZKBug,到底是zk的什么bug?
注释给出了,https://issues.apache.org/jira/browse/ZOOKEEPER-1740
这个问题在于“The current behavior of zookeeper for ephemeral nodes is that session expiration and ephemeral node deletion is not an atomic operation.”
即zk的session过期和ephemeral node删除并不是一个原子操作,所以带来的问题的过程如下,
1. client session超时,它会假设在zk,这个ephemeral node已经被删除,但是zk当前由于某种原因hang住了,比如very long fsync operations,所以其实这个ephemeral node并没有被删除
2. 但client是认为ephemeral node已经被删除,所以它会尝试重新创建这个ephemeral node,但得到的结果是NodeExists,因为这个node并没有被删除,那么client想既然有就算了
3. 这个时候,zk从very long fsync operations的hang中恢复,它会继续之前没有完成的操作,把ephemeral node删掉
4. 这样client就会发现ephemeral node不存在了,虽然session并没有超时
所以这个函数就是为了规避这个问题,
方法就是,当我发现NodeExists时,说明zk当前hang住了,这个时候我需要等待,并反复尝试,直到zk把这个node删除后,我再重新创建
def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, expectedCallerData: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = { while (true) { try { createEphemeralPathExpectConflict(zkClient, path, data) return } catch { case e: ZkNodeExistsException => { // An ephemeral node may still exist even after its corresponding session has expired // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted // and hence the write succeeds without ZkNodeExistsException ZkUtils.readDataMaybeNull(zkClient, path)._1 match { case Some(writtenData) => { if (checker(writtenData, expectedCallerData)) { info("I wrote this conflicted ephemeral node [%s] at %s a while back in a different session, ".format(data, path) + "hence I will backoff for this node to be deleted by Zookeeper and retry") Thread.sleep(backoffTime) } else { throw e } } case None => // the node disappeared; retry creating the ephemeral node immediately } } case e2: Throwable => throw e2 } } }
这个方式有个问题就是,如果zk hang住,这里的逻辑是while true,这个函数也会一直hang住
本文章摘自博客园,原文发布日期:2015-11-05
相关文章
- 在 Go 里用 CGO?这 7 个问题你要关注!
- 9款优秀的去中心化通讯软件 Matrix 的客户端
- 求职数据分析,项目经验该怎么写
- 在OKR中,我看到了数据驱动业务的未来
- 火山引擎云原生大数据在金融行业的实践
- OpenHarmony富设备移植指南(二)—从postmarketOS获取移植资源
- 《数据成熟度指数》报告:64%的企业领袖认为大多数员工“不懂数据”
- OpenHarmony 小型系统兼容性测试指南
- 肯睿中国(Cloudera):2023年企业数字战略三大趋势预测
- 适用于 Linux 的十大命令行游戏
- GNOME 截图工具的新旧截图方式
- System76 即将推出的 COSMIC 桌面正在酝酿大变化
- 2GB 内存 8GB 存储即可流畅运行,Windows 11 极致精简版系统 Tiny11 发布
- 迎接 ecode:一个即将推出的具有全新图形用户界面框架的现代、轻量级代码编辑器
- loongarch架构介绍(三)—地址翻译
- Go 语言怎么解决编译器错误“err is shadowed during return”?
- 敏捷:可能被开发人员遗忘的部分
- Denodo预测2023年数据管理和分析的未来
- 利用数据推动可持续发展
- 在 Vue3 中实现 React 原生 Hooks(useState、useEffect),深入理解 React Hooks 的