zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

ZooKeeper源码研究系列(5)集群版建立连接过程

zookeeper源码集群连接 系列 过程 研究 建立
2023-09-14 08:59:45 时间
2 各服务器角色的请求处理器链

先介绍下Leader、Follower、Observer服务器的请求处理器链

2.1 Leader服务器

PrepRequestProcessor-》ProposalRequestProcessor-》CommitProcessor-》ToBeAppliedRequestProcessor-》FinalRequestProcessor

ProposalRequestProcessor-》SyncRequestProcessor-》AckRequestProcessor

下面分别一一介绍


2 ProposalRequestProcessor:主要对事务请求,向所有的Follower服务器发起一个议案,同时触发SyncRequestProcessor对事务请求的记录


3 SyncRequestProcessor:对事务请求记录到事务日志文件中,记录完成后触发AckRequestProcessor


4 AckRequestProcessor:对于上述议案,Leader也是投票的一份子,所以也要进行投票响应,只需执行下Leader的响应方法即可。而其他Follower服务器的投票响应则是需要向Leader发送一个Leader.ACK响应,Leader接收到后,同样去执行Leader的响应方法


在Leader的响应方法每执行一次,就会判断是否已经过半机器响应了,如果过半,则Leader向所有的Follower和Observer发送Leader.COMMIT请求,同时Leader也向自己的CommitProcessor中提交该事务请求,即该事务请求是通过过半机器认同的,需要被提交的事务请求。


5 CommitProcessor:对于已经被过半机器认同的请求交给下一个处理器处理。而那些还没有被过半机器认同的,则处于阻塞状态。


6 ToBeAppliedRequestProcessor:负责将请求交给下一个处理器FinalRequestProcessor,处理完毕后表示已经完成该项议案,然后就删除了该项议案


7 FinalRequestProcessor:最后一个请求处理器。对于事务请求,执行事务的具体操作,如增删改node、createSession、closeSession等。对于非事务操作如获取数据等,从DataTree中获取相应的数据。最终返回数据给客户端。


2.2 Follower服务器

FollowerRequestProcessor-》CommitProcessor-》FinalRequestProcessor

SyncRequestProcessor-》SendAckRequestProcessor


1 FollowerRequestProcessor:首先将请求交给下一个处理器即CommitProcessor处理器,如果是该请求是事务请求或者前面有事务请求在等待处理,则该请求会被阻塞。如果是事务请求,交给CommitProcessor处理器之后,又立马将该请求转发给Leader,即事务请求必须要经过Leader,然后Leader又会把该事务请求封装成一个议案发给各个Follower服务器进行投票,各个Follower服务器接收到Leader发送过来议案后,首先要把这个议案请求记录到事务日志中,即调用SyncRequestProcessor来处理


2 CommitProcessor:一旦是事务请求,就需要等待该事务请求被过半数认可,接收到Leader的Leader.COMMIT请求,才会继续走下去


3 SyncRequestProcessor:把Leader发送过来的议案记录到事务日志中,然后交给下一个处理器SendAckRequestProcessor


4 SendAckRequestProcessor:当日志记录完成之后,需要给Leader发送一个Leader.ACK响应,表示已经成功记录在案。


Leader开始统计Follower发送过来的响应,一旦有过半机器发送过来响应,则认为该事务可以提交了。然后Leader就向所有的Follower发送Leader.COMMIT请求,带上之前的请求号,向所有的Observer发送Leader.INFORM请求,带上之前的整个请求内容,因为Follower在前面已经接收到了该请求,而Observer则没有接收到该请求,所以要对Observer带上整个请求内容。

Follower接收到Leader发送过来的Leader.COMMIT请求之后,根据带过来的请求号,找到真个请求对象,然后放到Follower的CommitProcessor中,使之继续走下去,交给FinalRequestProcessor

5 FinalRequestProcessor:最后一个请求处理器。同上面一样。对于事务请求,执行事务的具体操作,如增删改node、createSession、closeSession等。对于非事务操作如获取数据等,从DataTree中获取相应的数据。最终返回数据给客户端 2.3 Observer服务器

ObserverRequestProcessor-》CommitProcessor-》FinalRequestProcessor

SyncRequestProcessor

其中上述SyncRequestProcessor是通过配置zookeeper.observer.syncEnabled系统属性的true or false来决定是否需要这个处理器,默认true

ObserverRequestProcessor:和FollowerRequestProcessor功能完全一样,只是参数不一样,可以抽象出来的。

所以我们看到Observer服务器和Follower服务器的处理器链基本差不多。不同之处就是Follower服务器还有一个SendAckRequestProcessor,向Leader发送投票反馈。而Observer不参与投票,则不需要这个处理器。

以上就大致说完了各个服务器角色的请求处理器链,下面就结合具体的请求案例,再来捋一下整个过程

3 连接Leader建立session关联的过程和session不断激活的过程

这里连接的服务器以Leader为例,先说建立session关联的过程,之后再说session不断激活的过程

3.1 建立session关联的过程

这就需要从用户创建ZooKeeper对象开始说起。


1 客户端: 用户创建ZooKeeper对象,内部创建出ClientCnxn,可以简单想象成ZooKeeper对象的内部管家,ClientCnxn有两个主要的线程SendThread和EventThread

SendThread负责与服务器端的通信,EventThread负责事件的通知。


1.1 SendThread启动之后,就从创建ZooKeeper对象的地址列表(被随机打乱了),取出一个服务器地址进行tcp连接操作


1.2 当tcp连接连接成功之后,就需要和服务器端建立session关联。依托tcp连接,向服务器端发送ConnectRequest请求,会把创建ZooKeeper对象时指定的sessionTimeout时间带上


2 Leader服务器端: 一旦和服务器端建立tcp连接之后,服务器端会给客户端创建一个ServerCnxn,专门负责与该客户端的通信


2.1 当客户端第一次发送ConnectRequest请求到ServerCnxn中,ServerCnxn首先会对tcp连接传递过来的数据序列化成ConnectRequest,拿到客户端传递的sessionTimeout时间,由于服务器端在启动的时候指定了maxSessionTimeout、minSessionTimeout(即使没有指定,也会使用默认的),要求客户端传递过来的sessionTimeout时间必须在此两者之间,不符合要求的分别取对应的最大值或者最小值


2.2 然后就使用Leader服务器的SessionTracker(session管理器)根据上面协商后的sessionTimeout时间,分配出sessionId,创建出session


2.3 根据分配的sessionId和刚才的ServerCnxn创建出一个请求,类型为OpCode.createSession,将该请求提交到Leader的请求处理器链上


2.4 首先遇到的是PrepRequestProcessor处理器,认为OpCode.createSession请求是一个事务请求,就创建了一个事务请求体,再次执行了session的添加操作,主要是作用于从Follower等转发过来的创建session的请求。放心不会进行重复添加的,里面进行来判断的。


2.5 PrepRequestProcessor处理器执行完毕,交给下一个处理器ProposalRequestProcessor。ProposalRequestProcessor处理器将该创建session的请求立马交给了下一个处理器CommitProcessor的处理队列中(该请求被阻塞在那),然后又立马返回将该请求封装成一个议案,发送给所有的Follower服务器


2.6 Follower服务器接收到Leader发过来的议案后,使用SyncRequestProcessor将该请求即创建session的请求记录到事务日志中,然后交给Follower的下一个处理器SendAckRequestProcessor,使用该处理向Leader发送Leader.ACK反馈


2.7 每收到Follower的一次Leader.ACK反馈,就要统计下是否已经过半数了,如果过半数,则Leader向所有的Follower发送Leader.COMMIT命令,带上之前的请求号,向所有的Observer发送Leader.INFORM命令,需要带上之前的整个请求内容。同时Leader也向自己的CommitProcessor提交创建session的请求,CommitProcessor拿到该请求后,不再阻塞,继续走向下一个处理器ToBeAppliedRequestProcessor


2.8 ToBeAppliedRequestProcessor将创建session的请求先交给下一个处理器FinalRequestProcessor处理,当FinalRequestProcessor处理完成之后,删除之前提出的针对该请求的议案


2.9 FinalRequestProcessor针对创建session的请求,会使用SessionTracker再次执行创建(不会重复的,内部进行了判断,一旦已经有了sessionId对应的session则只需要查看是否过期,如果没有过期则重新激活session,即重新计算session的过期时间)。然后就Leader从request中取出ServerCnxn,开始准备向客户端发送响应了。响应内容是sessionId、根据sessionId计算出来的密码、协商后的sessionTimeout时间


2.10 虽然上述Leader已经对客户端进行了响应,但是其他Follower和Observer接收到Leader发送的Leader.COMMIT命令和Leader.INFORM命令,他们接收到上述命令之后,也都从CommitProcessor中走出来了,不再阻塞在那里,走向了FinalRequestProcessor。仍然使用SessionTracker再次执行创建session的操作,但是Follower和Observer中的SessionTracker实现是LearnerSessionTracker,而Leader中的SessionTracker是SessionTrackerImpl。

session已经在Leader内部创建出来了,其他的Follower、Observer仅仅是保存下sessionId和sessionTimeout时间,并没有完整的session对象。

所谓的session在所有机器上共享,其实就是Leader中保存着所有的session信息,并负责检查session的过期。其他机器只负责简单保存下sessionId以及对应的sessionTimeout时间,对于session问题下面再详细说明。

FinalRequestProcessor执行完session添加之后,也从request中取出对应的ServerCnxn,当然为null,然后就不执行任何操作。只有客户端所连接的那台服务器才会有ServerCnxn


3 客户端: 当客户端收到服务器端的创建session的响应之后,首先判断服务器端返回的sessionTimeout时间是否小于等于0

如果小于等于0则表示session创建失败。向EventThread中发送两个事件,第一个事件是KeeperState.Expired类型的事件,即session过期事件。第二个事件是eventOfDeath死亡事件,当EventThread收到eventOfDeath事件后,就会结束EventThread线程循环,EventThread线程走向死亡,即ZooKeeper对象不再可用。

如果大于0则表示创建session成功。向EventThread中发送一个KeeperState.SyncConnected事件


3.2 session不断激活的过程

上面说完了建立session关联的过程,下面就说说session是如何不断的激活的

因为服务器端,Leader服务器的SessionTrackerImpl,会每隔tickTime时间就会执行一次session过期检查,如果session没有及时激活的话,就会过期,就会被SessionTrackerImpl清理掉,对应的客户端ZooKeeper对象就不可用了。


1 客户端:客户端的SendThread线程在循环过程中,不断的向服务器端发送Ping请求。操作类型为OpCode.ping

对于发送频率,先说明下。

lastSend:客户端向服务器端最后一次发送请求的时间 idleSend:lastSend时间到当前时间的间隔 readTimeout:客户端的读超时时间,取值是2/3的sessionTimeout时间

SendThread在循环过程中发现idleSend时间已经超过了readTimeout的一半了,或者idleSend时间已经超过10s,就会执行一次发送Ping请求。


2 Leader服务器端:为该客户端分配的ServerCnxn接收到客户端的请求后,先将请求封装成一个Request对象,然后提交该请求到Leader的请求处理器链


2.1 在交给请求处理器之前,进行了session的激活操作。

SessionTrackerImpl对于session超时检查,是进行的分桶策略。以tickTime的整数倍的时间点就是一个桶,存放着在该时间点过期的session。

session被激活的过程就是从某个tickTime的整数倍的时间点对应的桶中移到后面时间点对应的桶中

SessionTrackerImpl会每隔tickTime时间就会执行一次session过期检查,有了分桶策略就比较方便了,不用遍历每个session执行检查,只需要查看当前时间点对应的桶中是否含有session,如果有则表示该session没有被及时激活,需要进行过期操作。

session的激活就是先检查当前session是否过期,如果没有过期,则重新计算session的过期时间,计算方式就是当前时间加上sessionTimeout时间然后取一个tickTime的整数值,即选择了后面的一个桶进行存放该session。


2.2 首先是Leader的PrepRequestProcessor处理器:发现该请求是Ping请求,不会创建事务请求体,只会检查下session是否过期。然后交给下一个处理器ProposalRequestProcessor


2.3 ProposalRequestProcessor对于非事务请求也仅仅是直接交给下一个处理器CommitProcessor


2.4 CommitProcessor中如果没有正在等待处理的事务请求,则会直接交给下一个处理器ToBeAppliedRequestProcessor。如果有正在被处理的事务请求,则也需要进行等待,感觉这里不是太合理,Ping类型的请求,应该直接通过,不经过任何等待的。


2.5 ToBeAppliedRequestProcessor也没有做什么处理,直接交给下一个处理器FinalRequestProcessor


可以看到当客户端连接的是Leader服务器时,session的不断激活就是通过客户端不断发送Ping请求给Leader服务器端,重新计算session过期时间达到激活session的目的。可以看到Follower、Observer都不参与此过程,然而当客户端连接的不是Leader服务器端,就不一样了,过程就没这么简单了。后面详细说明

3.3 session过期过程

一旦Leader发现某个session过期了,会先从Leader中删除该session,然后创建一个OpCode.closeSession请求,提交到Leader的请求处理器链


1.1 首先是Leader的PrepRequestProcessor处理器,发现session过期是一个事务请求,创建出事务请求头。然后设置该session的isClosing属性为true,然后交给下一个处理器ProposalRequestProcessor


1.2 ProposalRequestProcessor处理器先把该请求交给下一个处理器CommitProcessor,由于该请求是事务请求,则针对该请求提出一个议案,发给所有的Follower进行投票,其实所谓的投票就是Follower记录事务请求的过程,记录成功并发送响应给Leader,就算是一次成功投票。一旦过半数的Follower进行了反馈,Leader就认为此次事务请求可以被提交了。然后向所有的Follower发送Leader.COMMIT请求,向所有的Observer发送Leader.INFORM请求。向Leader的CommitProcessor处理器的提交队列中发送这个closeSession请求。使之继续往下一个处理器走,下一个处理器即ToBeAppliedRequestProcessor


1.3 ToBeAppliedRequestProcessor也没有做什么处理,直接交给下一个处理器FinalRequestProcessor


1.4 在FinalRequestProcessor中对closeSession请求会做以下操作:

首先删除这个session创建的所有临时节点,并触临时节点删除的事件,类型为EventType.NodeDeleted,同时触发父节点的children变化的事件,类型为EventType.NodeChildrenChanged。

其次再次从sessionTracker中将该session删除

最后关闭使用该session创建的ServerCnxn,即断开了与客户端的连接,至此,Leader就完成了closeSession的整个过程


1.5 closeSession请求在Follower和Observer作为:仍然是到FinalRequestProcessor执行上述同样的操作。至此,session在整个集群中就彻底被删除了。


当客户端连接的是Leader服务器,建立session关联和session激活、session过期过程比较简单。一旦是客户端连接的是Follower或者Observer的时候,过程就稍微多了一些。

4 连接Follower建立session关联的过程和session不断激活的过程

当客户端连接的是Follower的话,和上面的情况稍有差别。下面的部分内容和上面有很多重复的地方,为了方便观看,重复的部分直接复制过来了,同时要注意不同的地方

4.1 建立session关联的过程

这就需要从用户创建ZooKeeper对象开始说起。


1 客户端: 用户创建ZooKeeper对象,内部创建出ClientCnxn,可以简单想象成ZooKeeper对象的内部管家,ClientCnxn有两个主要的线程SendThread和EventThread

SendThread负责与服务器端的通信,EventThread负责事件的通知。


1.1 SendThread启动之后,就从创建ZooKeeper对象的地址列表(被随机打乱了),取出一个服务器地址进行tcp连接操作


1.2 当tcp连接连接成功之后,就需要和服务器端建立session关联。依托tcp连接,向服务器端发送ConnectRequest请求,会把创建ZooKeeper对象时指定的sessionTimeout时间带上


2 Follower服务器端: 一旦和服务器端建立tcp连接之后,服务器端会给客户端创建一个ServerCnxn,专门负责与该客户端的通信


2.1 当客户端第一次发送ConnectRequest请求到ServerCnxn中,ServerCnxn首先会对tcp连接传递过来的数据序列化成ConnectRequest,拿到客户端传递的sessionTimeout时间,由于服务器端在启动的时候指定了maxSessionTimeout、minSessionTimeout(即使没有指定,也会使用默认的),要求客户端传递过来的sessionTimeout时间必须在此两者之间,不符合要求的分别取对应的最大值或者最小值


2.2 然后就使用Follower服务器的SessionTracker(session管理器)根据上面协商后的sessionTimeout时间,分配出sessionId。

Leader服务器使用的SessionTracker是SessionTrackerImpl,而Follower使用的SessionTracker是LearnerSessionTracker。两者的区别如下:

SessionTrackerImpl:不仅分配sessionId,还负责创建session对象,维护session对象,开启线程不断检查session是否过期

LearnerSessionTracker:仅仅分配sessionId,只保存sessionId,没有session对象。session对象的创建都是在Leader的SessionTrackerImpl中创建的


2.3 根据分配的sessionId和刚才的ServerCnxn创建出一个请求,类型为OpCode.createSession,将该请求提交到Follower的请求处理器链上


2.4 首先遇到的是Follower服务器的FollowerRequestProcessor处理器,它先将该请求交给下一个处理器CommitProcessor,然后会阻塞。由于创建session请求是事务请求,则这个Follower会把该请求转发给Leader服务器


3 Leader服务器端:Leader服务器为上述Follower服务器分配的LearnerHandler会收到来自Follower服务器的上述创建session的请求,LearnerHandler把该请求交给了Leader请求处理器链来处理了


3.1 首先遇到的是PrepRequestProcessor处理器,认为OpCode.createSession请求是一个事务请求,就创建了一个事务请求体,根据请求传递过来的sessionId和sessionTimeout时间,使用Leader服务器的SessionTrackerImpl创建出了session,保存来起来。然后就交给了下一个处理器ProposalRequestProcessor来处理


3.2 ProposalRequestProcessor处理器将该创建session的请求立马交给了下一个处理器CommitProcessor的处理队列中(该请求被阻塞在那),然后又立马返回将该请求封装成一个议案,发送给所有的Follower服务器


3.3 Follower服务器接收到Leader发过来的议案后,使用SyncRequestProcessor将该请求即创建session的请求记录到事务日志中,然后交给Follower的下一个处理器SendAckRequestProcessor,使用该处理向Leader发送Leader.ACK反馈


3.4 每收到Follower的一次Leader.ACK反馈,就要统计下是否已经过半数了,如果过半数,则Leader向所有的Follower发送Leader.COMMIT命令,带上之前的请求号,向所有的Observer发送Leader.INFORM命令,需要带上之前的整个请求内容。同时Leader也向自己的CommitProcessor提交创建session的请求,CommitProcessor拿到该请求后,不再阻塞,继续走向下一个处理器ToBeAppliedRequestProcessor


3.5 ToBeAppliedRequestProcessor将创建session的请求先交给下一个处理器FinalRequestProcessor处理,当FinalRequestProcessor处理完成之后,删除之前提出的针对该请求的议案


3.6 FinalRequestProcessor针对创建session的请求,会使用SessionTracker再次执行创建(不会重复的,内部进行了判断,一旦已经有了sessionId对应的session则只需要查看是否过期,如果没有过期则重新激活session,即重新计算session的过期时间)。然后就Leader从request中取出ServerCnxn,发现为null(只有客户端连接的那台服务器的才会有对应的ServerCnxn),就什么也不执行。至此Leader的任务已经完成。


4 Follower和Observer服务器:他们分别接收到Leader的Leader.COMMIT、Leader.INFORM命令之后,就会在他们的CommitProcessor处理器的提交队列中接收到创建session的请求,然后交给下一个处理器即FinalRequestProcessor处理器来执行创建session的具体内容

Follower和Observer服务器都使用LearnerSessionTracker记录下这个sessionId和对应的sessionTimeout时间。至此,所有的服务器上都保存了这个创建的sessionId了。

然后其他的一些Observer和Follower从request中取出ServerCnxn,发现都为null,什么也不操作。只有客户端连接的这台Follower取出的ServerCnxn是有值的,需要给客户端反馈创建session的响应了。

响应内容就是sessionTimeout、sessionId、根据sessionId计算出的密码。


5 客户端: 当客户端收到Follower服务器端的创建session的响应之后,首先判断服务器端返回的sessionTimeout时间是否小于等于0

如果小于等于0则表示session创建失败。向EventThread中发送两个事件,第一个事件是KeeperState.Expired类型的事件,即session过期事件。第二个事件是eventOfDeath死亡事件,当EventThread收到eventOfDeath事件后,就会结束EventThread线程循环,EventThread线程走向死亡,即ZooKeeper对象不再可用。

如果大于0则表示创建session成功。向EventThread中发送一个KeeperState.SyncConnected事件


4.2 session不断激活的过程

客户端还是一样,定期向连接的服务器这里是Follower服务器,发送Ping请求


1 客户端:客户端的SendThread线程在循环过程中,不断的向服务器端发送Ping请求。操作类型为OpCode.ping

对于发送频率,先说明下。

lastSend:客户端向服务器端最后一次发送请求的时间 idleSend:lastSend时间到当前时间的间隔 readTimeout:客户端的读超时时间,取值是2/3的sessionTimeout时间

SendThread在循环过程中发现idleSend时间已经超过了readTimeout的一半了,或者idleSend时间已经超过10s,就会执行一次发送Ping请求。


2 Follower服务器端:为该客户端分配的ServerCnxn接收到客户端的Ping请求后,先将请求封装成一个Request对象,然后提交该请求到Follower的请求处理器链


2.1 在交给请求处理器之前,进行了session的激活操作。

LearnerSessionTracker对session的激活,仅仅是把该sessionId和sessionTimeout时间放到另一个HashMap Long, Integer touchTable结构中,仅此而已。其实我们想要达到的目的是:能够在Leader服务器上进行真正的session激活,不然的话,Leader服务器在不断检查session过期,一旦没有及时激活就会删除该session的。先提出一个疑问:客户端的Ping请求如何来激活Leader服务器上保存的session呢?


2.2 首先是Follower的FollowerRequestProcessor处理器:直接把该请求交给下一个处理器CommitProcessor,由于Ping请求并不是事务请求,则不会把该请求转发给Leader服务器


2.3 CommitProcessor中如果没有正在等待处理的事务请求,则会直接交给下一个处理器FinalRequestProcessor。如果有正在被处理的事务请求,则也需要进行等待,感觉这里不是太合理,Ping类型的请求,应该直接通过,不经过任何等待的。


至此整个客户端的Ping请求就处理完成了。上面的疑问还没解决呢?这次Ping请求的目的还没达到呢?

还有一个过程如下:


1 Leader会在一个while循环中,通过循环遍历所有的LearnerHandler不断的向所有的其他服务器发送Ping请求,用于检测Leader和其他服务器之间的正常连接,一旦超时没有得到及时的反馈,则就会删除该LearnerHandler。一旦LearnerHandler的数量小于最初总数量的一半,则认为该Leader该下台了,需要重新选举Leader了。


2 Follower和Observer接收到Leader的Ping请求之后,就会把LearnerSessionTracker中的touchTable(里面的这些sessionId都是需要重新被激活的)全部传递给Leader


3 Leader接收到这些需要被激活的session后,一个一个挨个的重新计算了session的超时时间。至此,客户端的Ping请求终于达到效果了


5 结束语

本篇文章,详细描述了连接不同服务器创建session的过程,以及session的激活过程和session过期过程。下一篇就开始讲述Leader的选举过程


Java--安装和用原生API连接Zookeeper Zookeeper是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。
安装和用原生API连接Zookeeper Zookeeper是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。
Java连接Zookeeper Java操作Zookeeper很简单,但是前提要把包导对。 关于Zookeeper的Linux环境搭建可以参考我的这篇博客:Linux环境下Zookeeper安装 下面进入正题: 一、导入依赖