深入分析 Watcher 机制的实现原理(二)服务端接收请求处理流程
服务端接收请求处理流程
服务端有一个 NettyServerCnxn 类,用来处理客户端发送过来的请求
private void receiveMessage(ByteBuf message) {
checkIsInEventLoop("receiveMessage");
try {
while (message.isReadable() && !throttled.get()) {
// //ByteBuffer 不为空
if (bb != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("0x{} bb {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
//bb 剩余空间大于 message 中可读字节大小
if (bb.remaining() > message.readableBytes()) {
int newLimit = bb.position() + message.readableBytes();
bb.limit(newLimit);
}
// 将 message 写入 bb 中
message.readBytes(bb);
bb.limit(bb.capacity());
if (LOG.isTraceEnabled()) {
LOG.trace("after readBytes message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("after readbytes 0x{} bb {}",
Long.toHexString(sessionId),
ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
// 已经读完 message
if (bb.remaining() == 0) {
bb.flip();
//统计接收信息
packetReceived(4 + bb.remaining());
ZooKeeperServer zks = this.zkServer;
if (zks == null || !zks.isRunning()) {
throw new IOException("ZK down");
}
if (initialized) {
// TODO: if zks.processPacket() is changed to take a ByteBuffer[],
// we could implement zero-copy queueing.
//处理客户端传送过来的数据包
zks.processPacket(this, bb);
} else {
LOG.debug("got conn req request from {}", getRemoteSocketAddress());
zks.processConnectRequest(this, bb);
initialized = true;
}
bb = null;
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("message readable {} bblenrem {}", message.readableBytes(), bbLen.remaining());
ByteBuffer dat = bbLen.duplicate();
dat.flip();
LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (message.readableBytes() < bbLen.remaining()) {
bbLen.limit(bbLen.position() + message.readableBytes());
}
message.readBytes(bbLen);
bbLen.limit(bbLen.capacity());
if (bbLen.remaining() == 0) {
bbLen.flip();
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(bbLen)));
}
int len = bbLen.getInt();
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} bbLen len is {}", Long.toHexString(sessionId), len);
}
bbLen.clear();
if (!initialized) {
if (checkFourLetterWord(channel, message, len)) {
return;
}
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " + len);
}
// checkRequestSize will throw IOException if request is rejected
zkServer.checkRequestSizeWhenReceivingMessage(len);
bb = ByteBuffer.allocate(len);
}
}
}
} catch (IOException e) {
LOG.warn("Closing connection to {}", getRemoteSocketAddress(), e);
close(DisconnectReason.IO_EXCEPTION);
} catch (ClientCnxnLimitException e) {
// Common case exception, print at debug level
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.debug("Closing connection to {}", getRemoteSocketAddress(), e);
close(DisconnectReason.CLIENT_RATE_LIMIT);
}
}
zks.processPacket(this, bb) 方法:
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
//反序列化客户端 header 头信息
h.deserialize(bia, "header");
// Need to increase the outstanding request count first, otherwise
// there might be a race condition that it enabled recv after
// processing request and then disabled when check throttling.
//
// Be aware that we're actually checking the global outstanding
// request before this request.
//
// It's fine if the IOException thrown before we decrease the count
// in cnxn, since it will close the cnxn anyway.
cnxn.incrOutstandingAndCheckThrottle(h);
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
//判断当前操作类型
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if (ap != null) {
try {
// handleAuthentication may close the connection, to allow the client to choose
// a different server to connect to.
authReturn = ap.handleAuthentication(
new ServerAuthenticationProvider.ServerObjs(this, cnxn),
authPacket.getAuth());
} catch (RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
LOG.debug("Authentication succeeded for scheme: {}", scheme);
LOG.info("auth success {}", cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
LOG.warn(
"No authentication provider for scheme: {} has {}",
scheme,
ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: {}", scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
}
return;
} else if (h.getType() == OpCode.sasl) {
//如果不是授权操作,再判断是否为 sasl 操作
processSasl(incomingBuffer, cnxn, h);
} else {
if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) {
ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());
cnxn.sendResponse(replyHeader, null, "response");
cnxn.sendCloseSession();
cnxn.disableRecv();
} else {
//最终进入这个代码块进行处理
//封装请求对象
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
int length = incomingBuffer.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
//提交请求
submitRequest(si);
}
}
}
submitRequest方法 负责在服务端提交当前请求
public void submitRequestNow(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
setLocalSessionFlag(si);
//处理请求 责任链模式
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type {}", si.type);
// Update request accounting/throttling limits
requestFinished(si);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
LOG.debug("Dropping request.", e);
// Update request accounting/throttling limits
requestFinished(si);
} catch (RequestProcessorException e) {
LOG.error("Unable to process request", e);
// Update request accounting/throttling limits
requestFinished(si);
}
}
firstProcessor.processRequest(si);
firstProcessor 的 初 始 化 是 在 ZookeeperServer 的setupRequestProcessor 中完成的,代码如下
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor) syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor) firstProcessor).start();
}
这里用的责任链模式
从上面我们可以看到 firstProcessor 的实例是一个PrepRequestProcessor,而这个构造方法中又传递了一 个 Processor 构成了一个调用链。RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); 而syncProcessor的构造方法传递的又是一个Processor,对应的是 FinalRequestProcessor
所 以 整 个 调 用 链 是 PrepRequestProcessor ->SyncRequestProcessor ->FinalRequestProcessor
PrepRequestProcessor 的processRequest方法
LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
public void processRequest(Request request) {
request.prepQueueStartTime = Time.currentElapsedTime();
submittedRequests.add(request);
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
}
PrepRequestProcessor 这个类又继承了线程类 是基于异步化的操作 看run()方法
public void run() {
try {
while (true) {
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
//从阻塞队列中拿到请求进行处理
Request request = submittedRequests.take();
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
.add(Time.currentElapsedTime() - request.prepQueueStartTime);
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
request.prepStartTime = Time.currentElapsedTime();
//调用pRequest 进行预处理
pRequest(request);
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}
pRequest方法
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.setHdr(null);
request.setTxn(null);
if (!request.isThrottled()) {
pRequestHelper(request);
}
request.zxid = zks.getZxid();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
nextProcessor.processRequest(request);
}
nextProcessor 对 应 的 应 该 是SyncRequestProcessor
S yncRequestProcessor. processRequest方法
private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
public void processRequest(final Request request) {
Objects.requireNonNull(request, "Request cannot be null");
request.syncQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request);
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
}
也看run方法
@Override
public void run() {
try {
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
resetSnapshotStats();
lastFlushTime = Time.currentElapsedTime();
while (true) {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
if (si == null) {
/* We timed out looking for more writes to batch, go ahead and flush immediately */
flush();
//从阻塞队列中获取请求
si = queuedRequests.take();
}
if (si == REQUEST_OF_DEATH) {
break;
}
long startProcessTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
// track the number of records written to the log
//下面这块代码,粗略看来是触发快照操作,启动一个处理快照的线程
if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
if (shouldSnapshot()) {
resetSnapshotStats();
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (!snapThreadMutex.tryAcquire()) {
LOG.warn("Too busy to snap, skipping");
} else {
new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
snapThreadMutex.release();
}
}
}.start();
}
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read or a throttled request(which doesn't need to be written to the disk),
// and there are no pending flushes (writes), then just pass this to the next processor
if (nextProcessor != null) {
继续调用下一个处理器来处理请求
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (shouldFlush()) {
flush();
}
ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
}
} catch (Throwable t) {
handleException(this.getName(), t);
}
LOG.info("SyncRequestProcessor exited!");
}
FinalRe questProcessor. . processRequest方 法 并 根 据Request 对象中的操作更新内存中 Session 信息或者znode 数据。
关键代码:
ExistsRequest existsRequest = new ExistsRequest();
//反序列化 (将 ByteBuffer 反序列化成为 ExitsRequest.这个就是我们在客户端发起请求的时候传递过来的 Request 对象
ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
//得到请求路径
path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
//终于找到一个很关键的代码,判断请求的 getWatch 是否存在,如果存在,则传递 cnxn(servercnxn)
//对于 exists 请求,需要监听 data 变化事件,添加 watcher
Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
//在服务端内存数据库中根据路径得到结果进行组装,设置为 ExistsResponse
rsp = new ExistsResponse(stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
statNode方法:
public Stat statNode(String path, Watcher watcher) throws KeeperException.NoNodeException {
Stat stat = new Stat();
//获得节点数据
DataNode n = nodes.get(path);
//如果 watcher 不为空,则讲当前的 watcher 和 path 进行绑定
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
}
updateReadStat(path, 0L);
return stat;
}
WatchManager的addWatch方法:
@Override
public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) {
if (isDeadWatcher(watcher)) {
LOG.debug("Ignoring addWatch with closed cnxn");
return false;
}
//判断 watcherTable 中是否存在当前路径对应的 watcher
Set<Watcher> list = watchTable.get(path);
//不存在则主动添加
if (list == null) {
// don't waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
// 新生成 watcher 集合
list = new HashSet<>(4);
watchTable.put(path, list);
}
list.add(watcher);
Set<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<>();
// 设置watcher 到节点路径的映射
watch2Paths.put(watcher, paths);
}
watcherModeManager.setWatcherMode(watcher, path, watcherMode);
// 将路径添加至paths集合
return paths.add(path);
}
其大致流程如下 ① 通过传入的 path(节点路径)从 watchTable 获取相应的 watcher 集合,进入② ② 判断①中的 watcher 是否为空,若为空,则进入③,否则,进入④ ③ 新生成 watcher 集合,并将路径 path 和此集合添加至 watchTable 中,进入④ ④ 将传入的 watcher 添加至 watcher 集合,即完成了path 和 watcher 添加至 watchTable 的步骤,进入⑤ ⑤ 通过传入的 watcher 从 watch2Paths 中获取相应的 path 集合,进入⑥ ⑥ 判断 path 集合是否为空,若为空,则进入⑦,否则,进入⑧ ⑦ 新生成 path 集合,并将 watcher 和 paths 添加至watch2Paths 中,进入⑧ ⑧ 将传入的 path(节点路径)添加至 path 集合,即完成了 path 和 watcher 添加至 watch2Paths 的步骤
NettyServerCnxn的sendResponse()方法
@Override
public void sendResponse(ReplyHeader h, Record r, String tag,
String cacheKey, Stat stat, int opCode) throws IOException {
// cacheKey and stat are used in caching, which is not
// implemented here. Implementation example can be found in NIOServerCnxn.
if (closingChannel || !channel.isOpen()) {
return;
}
sendBuffer(serialize(h, r, tag, cacheKey, stat, opCode));
decrOutstandingAndCheckThrottle(h);
}
服务端接收请求处理流程图:
相关文章
- 2022超详细流程ios APP最新打包上线教程,保证一看就会!
- SpringMVC工作原理及其流程
- Java如何卸载?怎么删掉Windows计算机上的Java?Java卸载流程详解!
- 蓝鲸bk-sops源码学习二:流程组件注册实现原理「建议收藏」
- Activiti流程引擎_activiti工作流原理
- GB28181中SSRC的使用和语音广播流程浅析
- OAuth2.0 原理流程及其单点登录和权限控制
- SpringCloudRPC远程调用核心原理:FeignRPC动态代理实例创建流程
- 麦克风声源定位原理_一种利用麦克风阵列进行声源定位的方法与流程
- 7个步骤详解AdaBoost 算法原理和构建流程
- MapReduce编程初级实践_mapreduce的执行流程
- Hbuilder用自有证书打包 ios App上架AppStore流程
- web3.0区块链NFT链游系统开发流程源码部署方案
- 网络割接是什么样的,有哪些流程,一张流程图搞定了!
- 2022史上最全App Store上架流程分享
- 【嵌入式开发】LED 驱动 ( LED发光二极管原理 | 底板原理图分析 | 核心板原理图分析 | GPIO | 裸板程序烧写流程 )
- 【数据挖掘】基于密度的聚类方法 - DBSCAN 方法 ( DBSCAN 原理 | DBSCAN 流程 | 可变密度问题 | 链条现象 | OPTICS 算法引入 | 聚类层次 | 族序概念 )
- iOS—-CocoaPods的安装、使用和,原理+参考流程+常见问题详解手机开发
- Glide原理解析(一):加载流程分析详解手机开发
- 大数据平台Hive数据迁移至阿里云ODPS平台流程与问题记录详解大数据
- Spring IOC/BeanFactory/ApplicationContext的工作流程/实现原理/初始化/依赖注入源码详解编程语言
- 强大的Linux: 中断处理流程深度剖析(linux中断处理程序)
- Linux下安装JIRA流程指南(linux下jira安装)
- 深入理解MySQL复制的原理和流程(mysql复制原理及流程)
- Linux系统下使用Bjam流程优化(linuxbjam)
- 控制Redis 实现流程控制(redis流程)
- Linux系统启动过程与步骤详解(linux启动流程)
- Oracle任务锁链的威力强大的流程控制力(Oracle任务链锁)
- Oracle AR 帐龄管理优化财务流程(oracle ar 帐龄)
- 与实现Redis访问流程原理与实现(redis访问流程 原理)
- Python流程控制实例代码