zl程序教程

您现在的位置是:首页 >  其他

当前栏目

深入分析 Watcher 机制的实现原理(二)服务端接收请求处理流程

流程原理 实现 处理 机制 请求 服务端 深入分析
2023-06-13 09:13:58 时间

服务端接收请求处理流程

服务端有一个 NettyServerCnxn 类,用来处理客户端发送过来的请求

private void receiveMessage(ByteBuf message) {
        checkIsInEventLoop("receiveMessage");
        try {
            while (message.isReadable() && !throttled.get()) {

                // //ByteBuffer 不为空
                if (bb != null) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
                        ByteBuffer dat = bb.duplicate();
                        dat.flip();
                        LOG.trace("0x{} bb {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
                    }

                    //bb 剩余空间大于 message 中可读字节大小
                    if (bb.remaining() > message.readableBytes()) {
                        int newLimit = bb.position() + message.readableBytes();
                        bb.limit(newLimit);
                    }

                    // 将 message 写入 bb 中
                    message.readBytes(bb);
                    bb.limit(bb.capacity());

                    if (LOG.isTraceEnabled()) {
                        LOG.trace("after readBytes message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
                        ByteBuffer dat = bb.duplicate();
                        dat.flip();
                        LOG.trace("after readbytes 0x{} bb {}",
                                  Long.toHexString(sessionId),
                                  ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
                    }
                    // 已经读完 message
                    if (bb.remaining() == 0) {
                        bb.flip();

                        //统计接收信息
                        packetReceived(4 + bb.remaining());

                        ZooKeeperServer zks = this.zkServer;
                        if (zks == null || !zks.isRunning()) {
                            throw new IOException("ZK down");
                        }
                        if (initialized) {
                            // TODO: if zks.processPacket() is changed to take a ByteBuffer[],
                            // we could implement zero-copy queueing.
                            //处理客户端传送过来的数据包
                            zks.processPacket(this, bb);
                        } else {
                            LOG.debug("got conn req request from {}", getRemoteSocketAddress());
                            zks.processConnectRequest(this, bb);
                            initialized = true;
                        }
                        bb = null;
                    }
                } else {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable {} bblenrem {}", message.readableBytes(), bbLen.remaining());
                        ByteBuffer dat = bbLen.duplicate();
                        dat.flip();
                        LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
                    }

                    if (message.readableBytes() < bbLen.remaining()) {
                        bbLen.limit(bbLen.position() + message.readableBytes());
                    }
                    message.readBytes(bbLen);
                    bbLen.limit(bbLen.capacity());
                    if (bbLen.remaining() == 0) {
                        bbLen.flip();

                        if (LOG.isTraceEnabled()) {
                            LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(bbLen)));
                        }
                        int len = bbLen.getInt();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("0x{} bbLen len is {}", Long.toHexString(sessionId), len);
                        }

                        bbLen.clear();
                        if (!initialized) {
                            if (checkFourLetterWord(channel, message, len)) {
                                return;
                            }
                        }
                        if (len < 0 || len > BinaryInputArchive.maxBuffer) {
                            throw new IOException("Len error " + len);
                        }
                        // checkRequestSize will throw IOException if request is rejected
                        zkServer.checkRequestSizeWhenReceivingMessage(len);
                        bb = ByteBuffer.allocate(len);
                    }
                }
            }
        } catch (IOException e) {
            LOG.warn("Closing connection to {}", getRemoteSocketAddress(), e);
            close(DisconnectReason.IO_EXCEPTION);
        } catch (ClientCnxnLimitException e) {
            // Common case exception, print at debug level
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);

            LOG.debug("Closing connection to {}", getRemoteSocketAddress(), e);
            close(DisconnectReason.CLIENT_RATE_LIMIT);
        }
    }

zks.processPacket(this, bb) 方法:

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        // We have the request, now process and setup for next
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
        RequestHeader h = new RequestHeader();
        //反序列化客户端 header 头信息
        h.deserialize(bia, "header");

        // Need to increase the outstanding request count first, otherwise
        // there might be a race condition that it enabled recv after
        // processing request and then disabled when check throttling.
        //
        // Be aware that we're actually checking the global outstanding
        // request before this request.
        //
        // It's fine if the IOException thrown before we decrease the count
        // in cnxn, since it will close the cnxn anyway.
        cnxn.incrOutstandingAndCheckThrottle(h);

        // Through the magic of byte buffers, txn will not be
        // pointing
        // to the start of the txn
        incomingBuffer = incomingBuffer.slice();
        //判断当前操作类型
        if (h.getType() == OpCode.auth) {
            LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
            AuthPacket authPacket = new AuthPacket();
            ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
            String scheme = authPacket.getScheme();
            ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
            Code authReturn = KeeperException.Code.AUTHFAILED;
            if (ap != null) {
                try {
                    // handleAuthentication may close the connection, to allow the client to choose
                    // a different server to connect to.
                    authReturn = ap.handleAuthentication(
                        new ServerAuthenticationProvider.ServerObjs(this, cnxn),
                        authPacket.getAuth());
                } catch (RuntimeException e) {
                    LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
                    authReturn = KeeperException.Code.AUTHFAILED;
                }
            }
            if (authReturn == KeeperException.Code.OK) {
                LOG.debug("Authentication succeeded for scheme: {}", scheme);
                LOG.info("auth success {}", cnxn.getRemoteSocketAddress());
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
                cnxn.sendResponse(rh, null, null);
            } else {
                if (ap == null) {
                    LOG.warn(
                        "No authentication provider for scheme: {} has {}",
                        scheme,
                        ProviderRegistry.listProviders());
                } else {
                    LOG.warn("Authentication failed for scheme: {}", scheme);
                }
                // send a response...
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
                cnxn.sendResponse(rh, null, null);
                // ... and close connection
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
                cnxn.disableRecv();
            }
            return;
        } else if (h.getType() == OpCode.sasl) {
            //如果不是授权操作,再判断是否为 sasl 操作
            processSasl(incomingBuffer, cnxn, h);
        } else {
            if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) {
                ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());
                cnxn.sendResponse(replyHeader, null, "response");
                cnxn.sendCloseSession();
                cnxn.disableRecv();
            } else {
                //最终进入这个代码块进行处理
                //封装请求对象
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
                int length = incomingBuffer.limit();
                if (isLargeRequest(length)) {
                    // checkRequestSize will throw IOException if request is rejected
                    checkRequestSizeWhenMessageReceived(length);
                    si.setLargeRequestSize(length);
                }
                si.setOwner(ServerCnxn.me);
                //提交请求
                submitRequest(si);
            }
        }
    }

submitRequest方法 负责在服务端提交当前请求

public void submitRequestNow(Request si) {
        if (firstProcessor == null) {
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null || state != State.RUNNING) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                setLocalSessionFlag(si);
                //处理请求 责任链模式
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type {}", si.type);
                // Update request accounting/throttling limits
                requestFinished(si);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            LOG.debug("Dropping request.", e);
            // Update request accounting/throttling limits
            requestFinished(si);
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request", e);
            // Update request accounting/throttling limits
            requestFinished(si);
        }
    }

firstProcessor.processRequest(si);

firstProcessor 的 初 始 化 是 在 ZookeeperServer 的setupRequestProcessor 中完成的,代码如下

protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
        ((SyncRequestProcessor) syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor) firstProcessor).start();
    }

这里用的责任链模式

从上面我们可以看到 firstProcessor 的实例是一个PrepRequestProcessor,而这个构造方法中又传递了一 个 Processor 构成了一个调用链。RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); 而syncProcessor的构造方法传递的又是一个Processor,对应的是 FinalRequestProcessor

所 以 整 个 调 用 链 是 PrepRequestProcessor ->SyncRequestProcessor ->FinalRequestProcessor

PrepRequestProcessor 的processRequest方法

LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();

public void processRequest(Request request) {
    request.prepQueueStartTime = Time.currentElapsedTime();
    submittedRequests.add(request);
    ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
}

PrepRequestProcessor 这个类又继承了线程类 是基于异步化的操作 看run()方法

public void run() {
        try {
            while (true) {
                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());

                //从阻塞队列中拿到请求进行处理
                Request request = submittedRequests.take();
                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
                    .add(Time.currentElapsedTime() - request.prepQueueStartTime);
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                }
                if (Request.requestOfDeath == request) {
                    break;
                }

                request.prepStartTime = Time.currentElapsedTime();

                //调用pRequest 进行预处理
                pRequest(request);
            }
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("PrepRequestProcessor exited loop!");
    }

pRequest方法

protected void pRequest(Request request) throws RequestProcessorException {
        // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
        // request.type + " id = 0x" + Long.toHexString(request.sessionId));
        request.setHdr(null);
        request.setTxn(null);

        if (!request.isThrottled()) {
          pRequestHelper(request);
        }

        request.zxid = zks.getZxid();
        ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
        nextProcessor.processRequest(request);
    }

nextProcessor 对 应 的 应 该 是SyncRequestProcessor

S yncRequestProcessor. processRequest方法

private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();

public void processRequest(final Request request) {
        Objects.requireNonNull(request, "Request cannot be null");

        request.syncQueueStartTime = Time.currentElapsedTime();
        queuedRequests.add(request);
        ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
    }

也看run方法

@Override
    public void run() {
        try {
            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            resetSnapshotStats();
            lastFlushTime = Time.currentElapsedTime();
            while (true) {
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());

                long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
                Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
                if (si == null) {
                    /* We timed out looking for more writes to batch, go ahead and flush immediately */
                    flush();

                    //从阻塞队列中获取请求
                    si = queuedRequests.take();
                }

                if (si == REQUEST_OF_DEATH) {
                    break;
                }

                long startProcessTime = Time.currentElapsedTime();
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);

                // track the number of records written to the log
                //下面这块代码,粗略看来是触发快照操作,启动一个处理快照的线程
                if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
                    if (shouldSnapshot()) {
                        resetSnapshotStats();
                        // roll the log
                        zks.getZKDatabase().rollLog();
                        // take a snapshot
                        if (!snapThreadMutex.tryAcquire()) {
                            LOG.warn("Too busy to snap, skipping");
                        } else {
                            new ZooKeeperThread("Snapshot Thread") {
                                public void run() {
                                    try {
                                        zks.takeSnapshot();
                                    } catch (Exception e) {
                                        LOG.warn("Unexpected exception", e);
                                    } finally {
                                        snapThreadMutex.release();
                                    }
                                }
                            }.start();
                        }
                    }
                } else if (toFlush.isEmpty()) {
                    // optimization for read heavy workloads
                    // iff this is a read or a throttled request(which doesn't need to be written to the disk),
                    // and there are no pending flushes (writes), then just pass this to the next processor
                    if (nextProcessor != null) {

                        继续调用下一个处理器来处理请求
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable) nextProcessor).flush();
                        }
                    }
                    continue;
                }
                toFlush.add(si);
                if (shouldFlush()) {
                    flush();
                }
                ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        }
        LOG.info("SyncRequestProcessor exited!");
    }

FinalRe questProcessor. . processRequest方 法 并 根 据Request 对象中的操作更新内存中 Session 信息或者znode 数据。

关键代码:

ExistsRequest existsRequest = new ExistsRequest();

                //反序列化 (将 ByteBuffer 反序列化成为 ExitsRequest.这个就是我们在客户端发起请求的时候传递过来的 Request 对象
                ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
                //得到请求路径
                path = existsRequest.getPath();
                if (path.indexOf('\0') != -1) {
                    throw new KeeperException.BadArgumentsException();
                }

                //终于找到一个很关键的代码,判断请求的 getWatch 是否存在,如果存在,则传递 cnxn(servercnxn)
                //对于 exists 请求,需要监听 data 变化事件,添加 watcher
                Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
                //在服务端内存数据库中根据路径得到结果进行组装,设置为 ExistsResponse
                rsp = new ExistsResponse(stat);
                requestPathMetricsCollector.registerRequest(request.type, path);
                break;

statNode方法:

public Stat statNode(String path, Watcher watcher) throws KeeperException.NoNodeException {
        Stat stat = new Stat();
        //获得节点数据
        DataNode n = nodes.get(path);
        //如果 watcher 不为空,则讲当前的 watcher 和 path 进行绑定
        if (watcher != null) {
            dataWatches.addWatch(path, watcher);
        }
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            n.copyStat(stat);
        }
        updateReadStat(path, 0L);
        return stat;
    }

WatchManager的addWatch方法:

@Override
    public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) {
        if (isDeadWatcher(watcher)) {
            LOG.debug("Ignoring addWatch with closed cnxn");
            return false;
        }

        //判断 watcherTable 中是否存在当前路径对应的 watcher
        Set<Watcher> list = watchTable.get(path);
        //不存在则主动添加
        if (list == null) {
            // don't waste memory if there are few watches on a node
            // rehash when the 4th entry is added, doubling size thereafter
            // seems like a good compromise
            // 新生成 watcher 集合
            list = new HashSet<>(4);
            watchTable.put(path, list);
        }
        list.add(watcher);

        Set<String> paths = watch2Paths.get(watcher);
        if (paths == null) {
            // cnxns typically have many watches, so use default cap here
            paths = new HashSet<>();
            // 设置watcher 到节点路径的映射
            watch2Paths.put(watcher, paths);
        }

        watcherModeManager.setWatcherMode(watcher, path, watcherMode);

        // 将路径添加至paths集合
        return paths.add(path);
    }

其大致流程如下 ① 通过传入的 path(节点路径)从 watchTable 获取相应的 watcher 集合,进入② ② 判断①中的 watcher 是否为空,若为空,则进入③,否则,进入④ ③ 新生成 watcher 集合,并将路径 path 和此集合添加至 watchTable 中,进入④ ④ 将传入的 watcher 添加至 watcher 集合,即完成了path 和 watcher 添加至 watchTable 的步骤,进入⑤ ⑤ 通过传入的 watcher 从 watch2Paths 中获取相应的 path 集合,进入⑥ ⑥ 判断 path 集合是否为空,若为空,则进入⑦,否则,进入⑧ ⑦ 新生成 path 集合,并将 watcher 和 paths 添加至watch2Paths 中,进入⑧ ⑧ 将传入的 path(节点路径)添加至 path 集合,即完成了 path 和 watcher 添加至 watch2Paths 的步骤

NettyServerCnxn的sendResponse()方法

@Override
    public void sendResponse(ReplyHeader h, Record r, String tag,
                             String cacheKey, Stat stat, int opCode) throws IOException {
        // cacheKey and stat are used in caching, which is not
        // implemented here. Implementation example can be found in NIOServerCnxn.
        if (closingChannel || !channel.isOpen()) {
            return;
        }
        sendBuffer(serialize(h, r, tag, cacheKey, stat, opCode));
        decrOutstandingAndCheckThrottle(h);
    }

服务端接收请求处理流程图: