zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

HBase的Nonce实现分析

HBase 实现 分析
2023-09-27 14:29:33 时间

        一、问题背景

        当客户端发送RPC请求给服务端时,基于各种原因,服务器的响应很可能会超时。如果客户端只是在那等待,针对数据的操作,很可能出现服务器端已处理完毕,但是无法通知客户端,此时,客户端只能重新发起请求,可是又可能造成服务端重复处理请求。该如何解决该问题呢?

        二、解决方案

        实际上,客户端发送RPC请求给服务器后,如果响应超时,那么客户端会重复发送请求,直到达到参数配置的重试次数上限。而且,客户端第一次发送和以后重发请求时,会附带相同的nonce,服务端只要根据nonce进行判断,就能得知是否为同一请求,并根据之前请求处理的结果,决定是等待、拒绝还是直接处理。

        三、HBase如何实现的

        在HRegionServer中,有一个ServerNonceManager类型的成员变量nonceManager,由它负责管理该RegionServer上的nonce。其定义如下:

        

final ServerNonceManager nonceManager;

        ServerNonceManager中有一个十分重要的方法,用于当一个操作在服务端执行后未及时反馈响应给客户端,客户端重新发起携带相同nonceGroup和nonce的同一操作的请求时,服务端根据nonceGroup和nonce做相应的判断。定义如下:

/**

 * Starts the operation if operation with such nonce has not already succeeded. If the

 * operation is in progress, waits for it to end and checks whether it has succeeded.

 * 如果操作未执行成功,重新开始一个操作。如果该操作在进行过程中,等待它完成并判断它是否成功。

 * @param group Nonce group.

 * @param nonce Nonce.

 * @param stoppable Stoppable that terminates waiting (if any) when the server is stopped.

 * @return true if the operation has not already succeeded and can proceed; false otherwise.

 public boolean startOperation(long group, long nonce, Stoppable stoppable)

 throws InterruptedException {

 // 如果传入的nonce为0,则返回true,表明操作可以进行

 if (nonce == HConstants.NO_NONCE) return true;

 // 构造NonceKey实例nk

 NonceKey nk = new NonceKey(group, nonce);

 // 构造OperationContext实例ctx,初始状态为WAIT

 OperationContext ctx = new OperationContext();

 while (true) {

 // 将NonceKey到OperationContext的映射,添加到ConcurrentHashMap类型的nonces中去

 OperationContext oldResult = nonces.putIfAbsent(nk, ctx);

 // 如果之前没有,则说明该操作可以直接执行

 if (oldResult == null) return true;

 // Collision with some operation - should be extremely rare.

 // 如果之前存在该操作,则取出该操作nonce对应的OperationContext

 synchronized (oldResult) {

 // 获得该nonce对应的OperationContext状态

 int oldState = oldResult.getState();

 LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult);

 // 如果之前的状态不是WAIT

 if (oldState != OperationContext.WAIT) {

 // 如果之前的状态是PROCEED,说明之前的操作执行完成且以失败告终,此处返回true,表示操作可以再次执行

 return oldState == OperationContext.PROCEED; // operation ended

 // 等待一段时间后继续循环

 oldResult.setHasWait();

 oldResult.wait(this.conflictWaitIterationMs); // operation is still active... wait and loop

 // 判断RegionServer的状态

 if (stoppable.isStopped()) {

 throw new InterruptedException("Server stopped");

 }
        在RSRpcServices的append()方法中,有如下代码:

if (r == null) {

 long nonce = startNonceOperation(m, nonceGroup);

 boolean success = false;

 try {

 r = region.append(append, nonceGroup, nonce);

 success = true;

 } finally {

 endNonceOperation(m, nonceGroup, success);

 if (region.getCoprocessorHost() != null) {

 region.getCoprocessorHost().postAppend(append, r);

 }
        其中,startNonceOperation()方法源码如下:

/**

 * Starts the nonce operation for a mutation, if needed.

 * 如果需要的话,为mutation开启一个nonce操作

 * @param mutation Mutation.

 * @param nonceGroup Nonce group from the request.

 * @returns Nonce used (can be NO_NONCE).

 private long startNonceOperation(final MutationProto mutation, long nonceGroup)

 throws IOException, OperationConflictException {

 // 如果RegionServer上的nonceManager为null,或者该mutation不存在nonce,那么直接返回HConstants.NO_NONCE,即0

 if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;

 // 标志位,是否可以运行

 boolean canProceed = false;

 try {

 // 调用RegionServer上nonceManager的startOperation()方法,确定是否可以执行该操作

 canProceed = regionServer.nonceManager.startOperation(

 nonceGroup, mutation.getNonce(), regionServer);

 } catch (InterruptedException ex) {

 throw new InterruptedIOException("Nonce start operation interrupted");

 if (!canProceed) {// 如果不能运行,抛出OperationConflictException异常,即操作冲突异常

 // TODO: instead, we could convert append/increment to get w/mvcc

 String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()

 + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())

 + "] may have already completed";

 throw new OperationConflictException(message);

 // 最后,返回mutation的nonce

 return mutation.getNonce();

 }
        它会调用RegionServer上nonceManager的startOperation()方法,确定是否可以执行该操作。



HBase读链路分析 HBase的存储引擎是基于LSM-Like树实现的,更新操作不会直接去更新数据,而是使用各种type字段(put,delete)来标记一个新的多版本数据,采用定期compaction的形式来归档合并数据。这种数据结构将写操作变得非常简单且高效,但是却给读造成了很大的困扰。读取过程需要根据列族读取不同HFile中的数据;还需要根据版本进行过滤,同时对已经标记删除的数据也要进行过滤;硬盘中的数据与MemStore中的数据重合时,还需要执行合并,最后在内存中拼接成一行完整的数据再向上返回。 本文粗粒度地展示了HBase的读取链路,欢迎一起探讨交流~
HBase可用性分析与高可用实践 云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! HBase作为一个分布式存储的数据库,它是如何保证可用性的呢?对于分布式系统的CAP问题,它是如何权衡的呢? 最重要的是,我们在生产实践中,又应该如何保证HBase服务的高可用呢? 下面我们来仔细分析一下。
BigData NoSQL:ApsaraDB HBase数据存储与分析平台概览 未来,我们将继续紧紧贴合云上用户需求打磨产品,打造核心竞争力,提升易用性,保障系统稳定性,以及引入Serverless特性以进一步降低成本。
第十二届 BigData NoSQL Meetup — 基于hbase的New sql落地实践 立即下载