zl程序教程

您现在的位置是:首页 >  工具

当前栏目

深入理解 ZK集群的Leader选举(二)

集群 深入 理解 选举 zk leader
2023-09-27 14:25:56 时间
真正开始选举#

下面就去看一下quorumPeer.java的这个线程类的启动,部分run()方法的截取,我们关心它的lookForLeader()方法


while (running) {

switch (getPeerState()) {

 * todo 四种可能的状态, 经过了leader选举之后, 不同的服务器就有不同的角色

 * todo 也就是说,不同的服务器会会走动下面不同的分支中

 * LOOKING 正在进行领导者选举

 * Observing

 * Following

 * Leading

case LOOKING:

 // todo 当为Looking状态时,会进入领导者选举的阶段

 LOG.info( LOOKING 

 if (Boolean.getBoolean( readonlymode.enabled )) {

 LOG.info( Attempting to start ReadOnlyZooKeeperServer 

 // Create read-only server but don t start it immediately

 // todo 创建了一个 只读的server但是不着急立即启动它

 final ReadOnlyZooKeeperServer roZk new ReadOnlyZooKeeperServer(

 logFactory, this,

 new ZooKeeperServer.BasicDataTreeBuilder(),

 this.zkDb);

 // Instead of starting roZk immediately, wait some grace 优雅 period(期间) before we decide we re partitioned.

 // todo 为了立即启动roZK 在我们决定分区之前先等一会

 // Thread is used here because otherwise it would require changes in each of election strategy classes which is

 // unnecessary code coupling.

 //todo 这里新开启一条线程,避免每一个选举策略类上有不同的改变 而造成的代码的耦合

 Thread roZkMgr new Thread() {

 public void run() {

 try {

 // lower-bound grace period to 2 secs

 sleep(Math.max(2000, tickTime));

 if (ServerState.LOOKING.equals(getPeerState())) {

 // todo 启动上面那个只读的Server

 roZk.startup();

 } catch (InterruptedException e) {

 LOG.info( Interrupted while attempting to start ReadOnlyZooKeeperServer, not started 

 } catch (Exception e) {

 LOG.error( FAILED to start ReadOnlyZooKeeperServer , e);

 try {

 roZkMgr.start();

 setBCVote(null);

 // todo 上面的代码都不关系,直接看它的 lookForLeader()方法

 // todo 直接点进去,进入的是接口,我们看它的实现类

 setCurrentVote(makeLEStrategy().lookForLeader());

 } catch (Exception e) {

 LOG.warn( Unexpected exception ,e);

 setPeerState(ServerState.LOOKING);

 } finally {

 // If the thread is in the the grace period, interrupt

 // to come out of waiting.

 roZkMgr.interrupt();

 roZk.shutdown();

 }


下面是lookForLeader()的源码解读

说实话这个方法还真的是挺长的,但是吧这个方法真的很重要,因为我们可以从这个方法中找到网络上大家针对Leader的选举总结的点点滴滴


第一点: 每次的投票都会先投自己一票,说白了new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());将自己的myid,最大的zxid,以及第几届封装起来,但是还有一个细节,就是在投自己的同时,还是会将存有自己信息的这一票通过socket发送给其他的节点

接受别人的投票是通过QuorumManager的recvWorker线程类将投票添加进recvQueue队列中,投票给自己时,就不走这条路线了,而是选择直接将票添加进recvQueue队列中

在下面代码中存在一行HashMap Long, Vote recvset new HashMap Long, Vote 这个map可以理解成一个小信箱,每一个节点都会维护一个信箱,这里面可能存放着自己投给自己的票,或者别人投给自己的票,或者别人投给别人的票,或者自己投给别人的票,通过统计这个信箱中的票数可以决定某一个节点是否可以成为leader,源码如下, 使用信箱中的信息,


// todo 根据别人的投票,以及自己的投票判断,本轮得到投票的集群能不能成为leader

 if (termPredicate(recvset,

 new Vote(proposedLeader, proposedZxid,

 logicalclock.get(), proposedEpoch))) {

 // todo 到这里说明接收到投票的机器已经是准leader了

 // Verify if there is any change in the proposed leader

 // todo 校验一下, leader有没有变动

 while ((n recvqueue.poll(finalizeWait,

 TimeUnit.MILLISECONDS)) ! null) {

 if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,

 proposedLeader, proposedZxid, proposedEpoch)) {

 recvqueue.put(n);

 break;

 if (n null) {

 // todo 判断自己是不是leader, 如果是,更改自己的状态未leading , 否则根据配置文件确定状态是 Observer 还是Follower

 // todo leader选举出来后, QuorumPeer中的run方法中的while再循环,不同角色的服务器就会进入到 不同的分支

 self.setPeerState((proposedLeader self.getId()) ?

 ServerState.LEADING : learningState());

 Vote endVote new Vote(proposedLeader,

 proposedZxid,

 logicalclock.get(),

 proposedEpoch);

 leaveInstance(endVote);

 return endVote;

 }


在termPredicate()函数中有如下的逻辑,self.getQuorumVerifier().containsQuorum(set);它的实现如下,实际上就是在进行过半机制的检验,结论就是当某个节点拥有了集群中一半以上的节点的投票时,它就会把自己的状态修改成leading, 其他的节点根据自己的需求将状态该变成following或者observing


public boolean containsQuorum(Set Long set){

 return (set.size() half);

 }


维护着一个时钟,标记这是第几次投票了logicalclock他是AutomicLong类型的变量,他有什么用呢? 通过下面的代码可以看到如下的逻辑,就是当自己的时钟比当前接收到投票的时钟小时,说明自己可能因为其他原因错过了某次投票,所以更新自己的时钟,重新判断投自己还是投别人, 同理,如果接收到的投票的时钟小于自己当前的时钟,说明这个票是没有意义的,直接丢弃不理会


if (n.electionEpoch logicalclock.get()) {

 // todo 将自己的时钟调整为更新的时间

 logicalclock.set(n.electionEpoch);

 // todo 清空自己的投票箱

 recvset.clear();


那么根据什么判断是投给自己还是投给别人呢? 通过解析出票的封装类中封装的节点的信息,什么信息呢?zxid,myid,epoch 通常情况是epoch大的优先成为leader,一般来说epoch都会相同,所以zxid大的优先成为leader,如果zxid再相同,则myid大的优先成为leader


检查到别的节点比自己更适合当leader,会重新投票,选举更适合的节点


完整的源码


// todo 当前进入的是FastLeaderElection.java的实现类

public Vote lookForLeader() throws InterruptedException {

try {

 // todo 创建用来选举Leader的Bean

 self.jmxLeaderElectionBean new LeaderElectionBean();

 MBeanRegistry.getInstance().register(

 self.jmxLeaderElectionBean, self.jmxLocalPeerBean);

} catch (Exception e) {

 LOG.warn( Failed to register with JMX , e);

 self.jmxLeaderElectionBean null;

if (self.start_fle 0) {

 self.start_fle Time.currentElapsedTime();

try {

 // todo 每台服务器独有的投票箱 , 存放其他服务器投过来的票的map

 // todo long类型的key (sid)标记谁给当前的server投的票 Vote类型的value 投的票

 HashMap Long, Vote recvset new HashMap Long, Vote 

 HashMap Long, Vote outofelection new HashMap Long, Vote 

 int notTimeout finalizeWait;

 synchronized (this) {

 //todo Automic 类型的时钟

 logicalclock.incrementAndGet();

 //todo 一开始启动时,入参位置的值都取自己的,相当于投票给自己

 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());

 LOG.info( New election. My id self.getId() 

 , proposed zxid 0x Long.toHexString(proposedZxid));

 // todo 发送出去,投票自己

 sendNotifications();

 * Loop in which we exchange notifications until we find a leader

 // todo 如果自己一直处于LOOKING的状态,一直循环

 while ((self.getPeerState() ServerState.LOOKING) (!stop)) {

 * Remove next notification from queue, times out after 2 times

 * the termination time

 //todo 尝试获取其他服务器的投票的信息

 // todo 从接受消息的队列中取出一个msg(这个队列中的数据就是它投票给自己的票)

 // todo 在QuorumCxnManager.java中 发送的投票的逻辑中,如果是发送给自己的,就直接加到recvQueue,而不经过socket

 // todo 所以它在这里是取出了自己的投票

 Notification n recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

 * Sends more notifications if haven t received enough.

 * Otherwise processes new notification.

 // todo 第一轮投票这里不为空

 if (n null) {

 // todo 第二轮就没有投票了,为null, 进入这个分支

 // todo 进行判断 ,如果集群中有三台服务器,现在仅仅启动一台服务器,还剩下两台服务器没启动

 // todo 那就会有3票, 其中1票直接放到 recvQueue 另外两票需要发送给其他两台机器的逻辑就在这里判断

 // todo 验证是通不过的,因为queueSendMap中的两条队列都不为空

 if (manager.haveDelivered()) {

 sendNotifications();

 } else {

 // todo 进入这个逻辑

 manager.connectAll();

 * Exponential backoff

 int tmpTimeOut notTimeout * 2;

 notTimeout (tmpTimeOut maxNotificationInterval ?

 tmpTimeOut : maxNotificationInterval);

 LOG.info( Notification time out: notTimeout);

 } else if (validVoter(n.sid) validVoter(n.leader)) {

 // todo 收到了其他服务器的投票信息后,来到下面的分支中处理

 * Only proceed if the vote comes from a replica in the

 * voting view for a replica in the voting view.

 * todo 仅当投票来自投票视图中的副本时 才能继续进行投票。

 switch (n.state) {

 case LOOKING:

 // todo 表示获取到投票的服务器的状态也是looking

 // If notification current, replace and send messages out

 // todo 对比接收到的头片的 epoch和当前时钟先后

 // todo 接收到的投票 当前服务器的时钟

 // todo 表示当前server在投票过程中可能以为故障比其他机器少投了几次,需要重新投票

 if (n.electionEpoch logicalclock.get()) {

 // todo 将自己的时钟调整为更新的时间

 logicalclock.set(n.electionEpoch);

 // todo 清空自己的投票箱

 recvset.clear();

 // todo 用别人的信息和自己的信息对比,选出一个更适合当leader的,如果还是自己适合,不作为, 对方适合,修改投票,投 对方

 if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,

 getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {

 updateProposal(n.leader, n.zxid, n.peerEpoch);

 } else {

 updateProposal(getInitId(),

 getInitLastLoggedZxid(),

 getPeerEpoch());

 sendNotifications();

 // todo 接收到的投票 当前服务器的时钟

 // todo 说明这个投票已经不能再用了

 } else if (n.electionEpoch logicalclock.get()) {

 if (LOG.isDebugEnabled()) {

 LOG.debug( Notification election epoch is smaller than logicalclock. n.electionEpoch 0x 

 Long.toHexString(n.electionEpoch)

 , logicalclock 0x Long.toHexString(logicalclock.get()));

 break;

 // todo 别人的投票时钟和我的时钟是相同的

 // todo 满足 totalOrderPredicate 后,会更改当前的投票,重新投票

 * 在 totalOrderPredicate 比较两者之间谁更满足条件

 * ((newEpoch curEpoch) ||

 * ((newEpoch curEpoch) 

 * ((newZxid curZxid) ||

 * ((newZxid curZxid) 

 * (newId curId)))));

 // todo 返回true说明 对方更适合当leader

 } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,

 proposedLeader, proposedZxid, proposedEpoch)) {

 updateProposal(n.leader, n.zxid, n.peerEpoch);

 sendNotifications();

 if (LOG.isDebugEnabled()) {

 LOG.debug( Adding vote: from n.sid 

 , proposed leader n.leader 

 , proposed zxid 0x Long.toHexString(n.zxid) 

 , proposed election epoch 0x Long.toHexString(n.electionEpoch));

 // todo 将自己的投票存放到投票箱子中

 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

 // todo 根据别人的投票,以及自己的投票判断,本轮得到投票的集群能不能成为leader

 if (termPredicate(recvset,

 new Vote(proposedLeader, proposedZxid,

 logicalclock.get(), proposedEpoch))) {

 // todo 到这里说明接收到投票的机器已经是准leader了

 // Verify if there is any change in the proposed leader

 // todo 校验一下, leader有没有变动

 while ((n recvqueue.poll(finalizeWait,

 TimeUnit.MILLISECONDS)) ! null) {

 if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,

 proposedLeader, proposedZxid, proposedEpoch)) {

 recvqueue.put(n);

 break;

 * This predicate is true once we don t read any new

 * relevant message from the reception queue

 if (n null) {

 // todo 判断自己是不是leader, 如果是,更改自己的状态未leading , 否则根据配置文件确定状态是 Observer 还是Follower

 // todo leader选举出来后, QuorumPeer中的run方法中的while再循环,不同角色的服务器就会进入到 不同的分支

 self.setPeerState((proposedLeader self.getId()) ?

 ServerState.LEADING : learningState());

 Vote endVote new Vote(proposedLeader,

 proposedZxid,

 logicalclock.get(),

 proposedEpoch);

 leaveInstance(endVote);

 return endVote;

 break;

 case OBSERVING:

 // todo 禁止Observer参加投票

 LOG.debug( Notification from observer: n.sid);

 break;

 case FOLLOWING:

 case LEADING:

 * Consider all notifications from the same epoch

 * together.

 if (n.electionEpoch logicalclock.get()) {

 recvset.put(n.sid, new Vote(n.leader,

 n.zxid,

 n.electionEpoch,

 n.peerEpoch));

 if (ooePredicate(recvset, outofelection, n)) {

 self.setPeerState((n.leader self.getId()) ?

 ServerState.LEADING : learningState());

 Vote endVote new Vote(n.leader,

 n.zxid,

 n.electionEpoch,

 n.peerEpoch);

 leaveInstance(endVote);

 return endVote;

 * Before joining an established ensemble, verify

 * a majority is following the same leader.

 outofelection.put(n.sid, new Vote(n.version,

 n.leader,

 n.zxid,

 n.electionEpoch,

 n.peerEpoch,

 n.state));

 if (ooePredicate(outofelection, outofelection, n)) {

 synchronized (this) {

 logicalclock.set(n.electionEpoch);

 self.setPeerState((n.leader self.getId()) ?

 ServerState.LEADING : learningState());

 Vote endVote new Vote(n.leader,

 n.zxid,

 n.electionEpoch,

 n.peerEpoch);

 leaveInstance(endVote);

 return endVote;

 break;

 default:

 LOG.warn( Notification state unrecognized: {} (n.state), {} (n.sid) ,

 n.state, n.sid);

 break;

 } else {

 if (!validVoter(n.leader)) {

 LOG.warn( Ignoring notification for non-cluster member sid {} from sid {} , n.leader, n.sid);

 if (!validVoter(n.sid)) {

 LOG.warn( Ignoring notification for sid {} from non-quorum member sid {} , n.leader, n.sid);

 return null;


经过如上的判断各个节点的就可以选举出不同的角色,再次回到QuorumPeer.java的run()中进行循环时,不再会进入case LOOKING:代码块了,而是按照自己不同的角色各司其职,完成不同的初始化启动


Zookeeper的Leader选举 Leader选举是保证分布式数据一致性的关键所在。Leader选举分为Zookeeper集群初始化启动时选举和Zookeeper集群运行期间Leader重新选举两种情况。在讲解Leader选举前先了解一下Zookeeper节点4种可能状态和事务ID概念。
【分布式】Zookeeper的Leader选举 前面学习了Zookeeper服务端的相关细节,其中对于集群启动而言,很重要的一部分就是Leader选举,接着就开始深入学习Leader选举。