zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

kafka源码解析之十二KafkaController(中篇)详解编程语言

Kafka源码编程语言 详解 解析 十二 中篇
2023-06-13 09:20:35 时间

1.partition的replicas中的leader下线之后,没有重新选举新的leader之前

2.partition创建之后直接被下线


1.分配第一个live replica作为leader,其它libe replicas作为isr,并把信息写入到zk

因此重点关注PartitionStateMachine的handleStateChange函数
private def handleStateChange(topic: String, partition: Int, targetState: PartitionState, 

 leaderSelector: PartitionLeaderSelector, 

 callbacks: Callbacks) { 

 val topicAndPartition = TopicAndPartition(topic, partition) 

 if (!hasStarted.get) 

 throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " + 

 "the partition state machine has not started") 

 .format(controllerId, controller.epoch, topicAndPartition, targetState)) 

 val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition) 

 try { 

 targetState match { 

 case NewPartition = 

//检查前置状态 

 assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition) 

 //更新controllerContext中的partitionReplicaAssignment 

assignReplicasToPartitions(topic, partition) 

//修改partition的状态 

 partitionState.put(topicAndPartition, NewPartition) 

 val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",") 

 stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s" 

 .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, 

 assignedReplicas)) 

 case OnlinePartition = 

//检查前置状态 

 assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition) 

 partitionState(topicAndPartition) match { 

 case NewPartition = // NewPartition- OnlinePartition 

 /* 1.根据partitionReplicaAssignment中信息选择第一个live的replica为leader,其余为isr 

 *2.将leader和isr持久化到zk 

 *3.更新controllerContext中的partitionLeadershipInfo 

*4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理 

 initializeLeaderAndIsrForPartition(topicAndPartition) 

 case OfflinePartition = // OfflinePartition- OnlinePartition 

/* 1.根据不同的leaderSelector选举新的leader,这里一般调用的是OfflinePartitionLeaderSelector 

 *2.将leader和isr持久化到zk 

 *3.更新controllerContext中的partitionLeadershipInfo 

*4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理 

 electLeaderForPartition(topic, partition, leaderSelector) 

 case OnlinePartition = // OnlinePartition - OnlinePartition 

/* 1.根据不同的leaderSelector选举新的leader,这里一般调用的是ReassignedPartitionLeaderSelector 

 *2.将leader和isr持久化到zk 

 *3.更新controllerContext中的partitionLeadershipInfo 

*4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理 

 electLeaderForPartition(topic, partition, leaderSelector) 

 case _ = // should never come here since illegal previous states are checked above 

//更新partition的状态 

 partitionState.put(topicAndPartition, OnlinePartition) 

 val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader 

 stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d" 

 .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader)) 

 case OfflinePartition = 

 //检查前置状态 

 assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition) 

 stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" 

 .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) 

//更新partition的状态 

 partitionState.put(topicAndPartition, OfflinePartition) 

 case NonExistentPartition = 

 //检查前置状态 

 assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition) 

 stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" 

 .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) 

//更新partition的状态 

 partitionState.put(topicAndPartition, NonExistentPartition) 

 // post: partition state is deleted from all brokers and zookeeper 

 } catch { 

 case t: Throwable = 

 stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed" 

 .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t) 

}
12.4 KafkaController PartitionLeaderSelector
当partition的状态发生切换时,特别发生如下切换:OfflinePartition- OnlinePartition和OnlinePartition - OnlinePartition时需要调用不同的PartitionLeaderSelector来确定leader和isr,当前一共支持5种PartitionLeaderSelector,分别为:NoOpLeaderSelector,OfflinePartitionLeaderSelector,ReassignedPartitionLeaderSelector,PreferredReplicaPartitionLeaderSelector,ControlledShutdownLeaderSelector。
12.4.1 NoOpLeaderSelector
/** 

 * Essentially does nothing. Returns the current leader and ISR, and the current 

 * set of replicas assigned to a given topic/partition. 

class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { 

 this.logIdent = "[NoOpLeaderSelector]: " 

 def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { 

 warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.") 

 (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition)) 

}
基本上啥也没做,就是把currentLeaderAndIsr和set of replicas assigned to a given topic/partition
12.4.2 OfflinePartitionLeaderSelector
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig) 

 extends PartitionLeaderSelector with Logging { 

 this.logIdent = "[OfflinePartitionLeaderSelector]: " 

 def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { 

 controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { 

 case Some(assignedReplicas) = 

 val liveAssignedReplicas = assignedReplicas.filter(r = controllerContext.liveBrokerIds.contains(r)) 

 val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r = controllerContext.liveBrokerIds.contains(r)) 

 val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch 

 val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion 

 val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { 

 case true = //isr中的broker都离线了,则需要从asr中选择leader 

 if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient, 

 topicAndPartition.topic)).uncleanLeaderElectionEnable) { 

 throw new NoReplicaOnlineException(("No broker in ISR for partition " + 

 "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + 

 " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) 

 debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" 

 .format(topicAndPartition, liveAssignedReplicas.mkString(","))) 

 liveAssignedReplicas.isEmpty match { 

 case true = //如果asr中的broker也都已经离线了,则这个topic/partition挂了 

 throw new NoReplicaOnlineException(("No replica for partition " + 

 "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + 

 " Assigned replicas are: [%s]".format(assignedReplicas)) 

 case false = //如果asr中的broker有一些是在线的 

 ControllerStats.uncleanLeaderElectionRate.mark() 

 val newLeader = liveAssignedReplicas.head//取第一个为leader 

 warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. Theres potential data loss." 

 .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(","))) 

 new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) 

 case false = //isr中的broker有一些是在线的 

 val liveReplicasInIsr = liveAssignedReplicas.filter(r = liveBrokersInIsr.contains(r)) 

 val newLeader = liveReplicasInIsr.head//选择第一个live的replica 

 debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." 

 .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) 

 new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) 

 info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) 

 (newLeaderAndIsr, liveAssignedReplicas) 

 case None = 

 throw new NoReplicaOnlineException("Partition %s doesnt have replicas assigned to it".format(topicAndPartition)) 

}
12.4.3 ReassignedPartitionLeaderSelector
class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { 

 this.logIdent = "[ReassignedPartitionLeaderSelector]: " 

 def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { 

//patition被重新分配的replicas 

 val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas 

 val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch 

 val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion 

//在reassignedInSyncReplicas中筛选replica其所在的broker是live的和当前的replica是位于isr中的 

val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r = controllerContext.liveBrokerIds.contains(r) 

 currentLeaderAndIsr.isr.contains(r)) 

 val newLeaderOpt = aliveReassignedInSyncReplicas.headOption 

 newLeaderOpt match {//存在满足以上条件的replica,则筛选为leader 

 case Some(newLeader) = (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr, 

 currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas) 

 case None = //否则reassigned失败 

 reassignedInSyncReplicas.size match { 

 case 0 = 

 throw new NoReplicaOnlineException("List of reassigned replicas for partition " + 

 " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) 

 case _ = 

 throw new NoReplicaOnlineException("None of the reassigned replicas for partition " + 

 "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) 

}
12.4.4 PreferredReplicaPartitionLeaderSelector
class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector 

with Logging { 

 this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: " 

 def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { 

 val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 

//默认选举第一个replica作为leader 

 val preferredReplica = assignedReplicas.head 

 // check if preferred replica is the current leader 

 val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader 

 if (currentLeader == preferredReplica) {//如果已经实现,则退出 

 throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s" 

 .format(preferredReplica, topicAndPartition)) 

 } else { 

 info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + 

 " Trigerring preferred replica leader election") 

 // 检查这个replica是否位于isr和其所在的broker是否live,如果是的话,则其恢复成leader,此场景主要用于负载均衡的情况 

 if (controllerContext.liveBrokerIds.contains(preferredReplica) currentLeaderAndIsr.isr.contains(preferredReplica)) { 

 (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr, 

 currentLeaderAndIsr.zkVersion + 1), assignedReplicas) 

 } else { 

 throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) + 

 "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) 

}
12.4.5 ControlledShutdownLeaderSelector
class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) 

 extends PartitionLeaderSelector 

 with Logging { 

 this.logIdent = "[ControlledShutdownLeaderSelector]: " 

 def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { 

 val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch 

 val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion 

 val currentLeader = currentLeaderAndIsr.leader 

 val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 

 val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds 

//筛选出live状态的replica 

 val liveAssignedReplicas = assignedReplicas.filter(r = liveOrShuttingDownBrokerIds.contains(r)) 

//筛选出live状态的isr 

 val newIsr = currentLeaderAndIsr.isr.filter(brokerId = !controllerContext.shuttingDownBrokerIds.contains(brokerId)) 

 val newLeaderOpt = newIsr.headOption 

 newLeaderOpt match { 

 case Some(newLeader) = //如果存在newLeader,选择其作为leader 

 debug("Partition %s : current leader = %d, new leader = %d" 

 .format(topicAndPartition, currentLeader, newLeader)) 

 (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), 

 liveAssignedReplicas) 

 case None = 

 throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" + 

 " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(","))) 

}
12.5 KafkaController ReplicaStateMachine

它实现了topic的partition的replica状态切换功能,replica存在的状态如下:


1.replica成功响应删除该副本的请求的时候 ,此时kafkaControl内存中还保留此replica的信息


NonExistentReplica- NewReplica 1.KafkaControl 发送LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
NewReplica - OnlineReplica 1.当KafkaControl按需把new replica加入到asr中的时候,实际上NewReplica转化为OnlineReplica是一个很快的过程,中间存在的时间很短,其转化出现在onNewPartitionCreation
OnlineReplica,OfflineReplica- OnlineReplica 1. KafkaControl 发送 LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible - OfflineReplica 1. kafkaControl发送StopReplicaRequest to the replica (w/o deletion)
2.kafkaControl 清除 this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and UpdateMetadata request for the partition to every live broker.
OfflineReplica- ReplicaDeletionStarted 1.kafkaControl发送StopReplicaRequest to the replica
ReplicaDeletionStarted- ReplicaDeletionSuccessful 1.kafkaControl mark the state of the replica in the state machine
ReplicaDeletionStarted- ReplicaDeletionIneligible 1.kafkaControl mark the state of the replica in the state machine
ReplicaDeletionSuccessful- NonExistentReplica 1.kafkaControl remove the replica from the in memory partition replica assignment cache

因此重点关注ReplicaStateMachine的handleStateChange函数

def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState, 

 callbacks: Callbacks) { 

 val topic = partitionAndReplica.topic 

 val partition = partitionAndReplica.partition 

 val replicaId = partitionAndReplica.replica 

 val topicAndPartition = TopicAndPartition(topic, partition) 

 if (!hasStarted.get) 

 throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " + 

 "to %s failed because replica state machine has not started") 

 .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState)) 

 val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica) 

 try { 

 val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) 

 targetState match { 

 case NewReplica = //当客户端刚创建topic的时候,触发KafkaControl内部的回调onNewPartitionCreation 

//判断前置状态 

 assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) 

 val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) 

 leaderIsrAndControllerEpochOpt match { 

 case Some(leaderIsrAndControllerEpoch) = 

 if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) //NewReplica不可能是该Partition的leader,只有online状态才有leader 

 throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica" 

 .format(replicaId, topicAndPartition) + "state as it is being requested to become leader") 

//封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理 

 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), 

 topic, partition, leaderIsrAndControllerEpoch, 

 replicaAssignment) 

 case None = // new leader request will be sent to this replica when one gets elected 

//置状态为NewReplica 

 replicaState.put(partitionAndReplica, NewReplica) 

 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 

 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, 

 targetState)) 

 case ReplicaDeletionStarted = 

//判断前置状态 

 assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState) 

//置状态为ReplicaDeletionStarted 

 replicaState.put(partitionAndReplica, ReplicaDeletionStarted) 

//封装发送给这些replica所在的broker的StopReplicaRequest请求,交由ControllerBrokerRequestBatch处理,并且在收到reponse的时候回调TopicDeletionManager中的deleteTopicStopReplicaCallback,将那些成功删除的replica状态切换为ReplicaDeletionSuccessful,将那些删除失败的replica状态切换为ReplicaDeletionIneligible 

 brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true, 

 callbacks.stopReplicaResponseCallback) 

 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 

 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 

 case ReplicaDeletionIneligible = 

//判断前置状态 

 assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) 

//置状态为ReplicaDeletionIneligible 

 replicaState.put(partitionAndReplica, ReplicaDeletionIneligible) 

 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 

 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 

 case ReplicaDeletionSuccessful = 

//判断前置状态 

 assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) 

//置状态为ReplicaDeletionIneligible 

 replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful) 

 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 

 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 

 case NonExistentReplica = 

//判断前置状态 

 assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState) 

 val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 

//更新partition的分布请求 

 controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) 

//删除该replica的状态 

 replicaState.remove(partitionAndReplica) 

 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 

 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 

 case OnlineReplica = 

//判断前置状态 

 assertValidPreviousStates(partitionAndReplica, 

 List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) 

 replicaState(partitionAndReplica) match { 

 case NewReplica = //基本上啥也没做 

 val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 

 if(!currentAssignedReplicas.contains(replicaId))//按需添加replica 

 controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) 

 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 

 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, 

 targetState)) 

 case _ = //可能之前已经存在,则向其发送leader和isr的request 

 controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { 

 case Some(leaderIsrAndControllerEpoch) = 

 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, 

 replicaAssignment) 

//置状态为OnlineReplica,感觉有点多余 

 replicaState.put(partitionAndReplica, OnlineReplica) 

 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 

 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 

 case None = 

//置状态为OnlineReplica 

 replicaState.put(partitionAndReplica, OnlineReplica) 

 case OfflineReplica = 

//判断前置状态 

 assertValidPreviousStates(partitionAndReplica, 

 List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) 

//封装发送给这些replica所在的broker的StopReplicaRequest请求,交由ControllerBrokerRequestBatch处理 

 brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) 

 val leaderAndIsrIsEmpty: Boolean = 

 controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { 

 case Some(currLeaderIsrAndControllerEpoch) = 

//删除该replica 

 controller.removeReplicaFromIsr(topic, partition, replicaId) match { 

 case Some(updatedLeaderIsrAndControllerEpoch) = 

//此topic的partition的replicas发生了shrink(缩减),需要通知其它的replica 

 val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 

 if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) { 

 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId), 

 topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) 

//置状态为OfflineReplica 

 replicaState.put(partitionAndReplica, OfflineReplica) 

 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 

 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 

 false 

 case None = 

 true 

 case None = 

 true 

 if (leaderAndIsrIsEmpty)//不能没有leader 

 throw new StateChangeFailedException( 

 "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty" 

 .format(replicaId, topicAndPartition)) 

 catch { 

 case t: Throwable = 

 stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed" 

 .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t) 

}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11815.html

cjavamac