zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

HBase源码分析之事件处理模型

HBase源码 分析 模型 事件处理
2023-09-27 14:29:33 时间

        HBase是一个复杂的分布式非结构化数据库,它将表中的数据按照行的方向切分成一个个的Region,并在若干RegionServer上上线,依靠所在RegionServer对外提供数据读写IO服务。一开始,表中数据由于很少,只有一个Region。随着数据越来越多,一个Region已难以满足频繁的数据读写请求,所以,Region开始分裂。分裂后的两个Region又会按照一定策略选择RegionServer上线,继续对外提供数据读写服务。并且,HBase作为一个分布式数据库,肯定需要考虑负载均衡,它会按照某些策略选择若干Region,在比较繁忙的RegionServer上下线,转移到较为空闲的RegionSever上线继续提供高质量的数据读写服务。所有涉及到的这些Region的上线、下线、分裂,以及我们还没提到的合并等等流程,在HBase内部都是通过不同组件之间发送事件,然后按照一定策略调度执行的。这就是HBase的事件处理模型。

        那么,HBase的事件处理模型是如何实现的呢?本文,我们就将研究下HBase内部事件处理模型的实现。

        在HBase中有一个抽象类EventHandler,定义如下:

@InterfaceAudience.Private

public abstract class EventHandler implements Runnable, Comparable Runnable {
        它实现了Runnable接口,说明其子类是一个线程,而且,在它内部定义了以下成员变量:

 // type of event this object represents

 // 该对象代表的事件类型

 protected EventType eventType;

 // 服务器

 protected Server server;

 // sequence id generator for default FIFO ordering of events

 // 默认的FIFO调度的事件的序列化ID生成器

 protected static final AtomicLong seqids = new AtomicLong(0);

 // sequence id for this event

 // 该事件的序列化ID

 private final long seqid;

 // Listener to call pre- and post- processing. May be null.

 // 监听器,可能为空

 private EventHandlerListener listener;

 // Time to wait for events to happen, should be kept short

 // 等待事件发生的时间

 protected int waitingTimeForEvents;

 // 祖先

 private final Span parent;
        监听器listener是实现了EventHandlerListener接口的实例,接口定义如下:

 /**

 * This interface provides pre- and post-process hooks for events.

 * 为事件提供事前和事后处理钩子的接口

 public interface EventHandlerListener {

 * Called before any event is processed

 * 任何事件执行前被调用

 * @param event The event handler whose process method is about to be called.

 void beforeProcess(EventHandler event);

 * Called after any event is processed

 * 任何事件执行后被调用

 * @param event The event handler whose process method is about to be called.

 void afterProcess(EventHandler event);

 }
        接口就定义了两个方法,一个是任何事件执行前被调用的beforeProcess()方法和任何事件执行后被调用的afterProcess()方法。

        抽象类EventHandler既然实现了Runnable接口,那么其子类肯定是一个线程,而且其功能的实现,必然在核心方法run()方法内。下面,我们就看下这个run()方法,代码如下:

  /**

   * 线程实现功能的主方法,run()方法,是不是很像一个模板方法啊

   */

  public void run() {

 span /span  

 span /span // 开启一个TraceScope

    TraceScope chunk = Trace.startSpan(this.getClass().getSimpleName(), parent);

    try {

      

      // 先执行监听器的beforeProcess()方法

      if (getListener() != null) getListener().beforeProcess(this);

      

      // 接着执行process()方法

      process();

      

      // 最后执行监听器的afterProcess()方法

      if (getListener() != null) getListener().afterProcess(this);

      

    } catch(Throwable t) {

    span /span 

      // 处理事件异常t

      handleException(t);

      

    } finally {

      

      // 关闭TraceScope

      chunk.close();

    }

  }
        第一感觉是不是个模板方法呢?它的主要流程就是:

        1、开启一个TraceScope;

        2、先执行监听器的beforeProcess()方法;

        3、接着执行process()方法;

        4、最后执行监听器的afterProcess()方法;

        5、关闭TraceScope。

        如果中间出现Throwable异常,则调用handleException()方法处理事件异常。

        关于监听器的beforeProcess()方法和afterProcess()方法我们在上面已经提到过了,这里不再赘述。关键是看一下process()方法,其定义如下:

 /**

 * This method is the main processing loop to be implemented by the various

 * subclasses.

 * 核心方法process()方法是一个抽象方法,子类必须实现

 * @throws IOException

 public abstract void process() throws IOException;

        它是一个抽象方法,也就意味着子类必须要实现它。并且,它是子类完成事件处理核心逻辑所必须执行的方法。

        另外,还实现了诸如获取监听器、设置监听器、获取优先级、获取事件序列ID等工具方法,十分简单,不再一一介绍,代码粘贴如下,读者可自行查看:

 /**

 * This method is the main processing loop to be implemented by the various

 * subclasses.

 * 核心方法process()方法是一个抽象方法,子类必须实现

 * @throws IOException

 public abstract void process() throws IOException;

 * Return the event type

 * 获取时事件类型

 * @return The event type.

 public EventType getEventType() {

 return this.eventType;

 * Get the priority level for this handler instance. This uses natural

 * ordering so lower numbers are higher priority.

 * 获取handler实例的优先级,数字越低级别越高

 * p 

 * Lowest priority is Integer.MAX_VALUE. Highest priority is 0.

 * p 

 * Subclasses should override this method to allow prioritizing handlers.

 * p 

 * Handlers with the same priority are handled in FIFO order.

 * p 

 * @return Integer.MAX_VALUE by default, override to set higher priorities

 public int getPriority() {

 return Integer.MAX_VALUE;

 * 获取事件的序列号ID

 * @return This events sequence id.

 public long getSeqid() {

 return this.seqid;

 * Default prioritized runnable comparator which implements a FIFO ordering.

 * p 

 * Subclasses should not override this. Instead, if they want to implement

 * priority beyond FIFO, they should override {@link #getPriority()}.

 * 实现可比较接口的compareTo()方法,先比较优先级Priority,谁的优先级越小谁就越小。

 * 优先级相同的话,再比较事件序列号ID,谁的事件序列号ID越小谁就越小

 @Override

 public int compareTo(Runnable o) {

 EventHandler eh = (EventHandler)o;

 if(getPriority() != eh.getPriority()) {

 return (getPriority() eh.getPriority()) ? -1 : 1;

 return (this.seqid eh.seqid) ? -1 : 1;

 * 获取事件的监听器

 * @return Current listener or null if none set.

 public synchronized EventHandlerListener getListener() {

 return listener;

 * 设置事件的监听器

 * @param listener Listener to call pre- and post- {@link #process()}.

 public synchronized void setListener(EventHandlerListener listener) {

 this.listener = listener;

 @Override

 public String toString() {

 return "Event #" + getSeqid() +

 " of type " + eventType +

 " (" + getInformativeName() + ")";

 * Event implementations should override thie class to provide an

 * informative name about what event they are handling. For example,

 * event-specific information such as which region or server is

 * being processed should be included if possible.

 public String getInformativeName() {

 return this.getClass().toString();

 * 处理事件异常,可能被覆写

 * Event exception handler, may be overridden

 * @param t Throwable object

 protected void handleException(Throwable t) {

 LOG.error("Caught throwable while processing event " + eventType, t);

 }

        接下来就有一个问题,继承了抽象类EventHandler的各种事件是如何被提交的?它们被提交到哪里,又是如何被调度执行的呢?别慌,下面我一一为大家解答。

        首先在HRegionServer上有一个叫做service的成员变量,定义如下:

 // Instance of the hbase executor service.

 // HBase执行服务的实例

 protected ExecutorService service;
        它是HRegionServer上执行各种事件的ExecutorService实例,而ExecutorService提供了通用的事件执行机制,它抽象了线程池、队列,EventType可以被提交,使用线程处理被添加到队列中的对象。如果要创建一个的服务, 创建该类的一个实例,并调用实例的startExecutorService()方法。当服务完成后,调用shutdown()方法。

        那么事件是如何被提交的呢?我们以Region上线为例,在HRegionServer对外提供RPC服务的RSRpcServices类的openRegion()方法中,Region上线事件OpenRegionHandler是通过以下方式被提交的,代码如下:

 // If there is no action in progress, we can submit a specific handler.

 // Need to pass the expected version in the constructor.

 // 如果对应Region上没有相关的操作在进行,我们可以提交一个特定的处理者

 if (region.isMetaRegion()) {

 regionServer.service.submit(new OpenMetaHandler(

 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));

 } else {

 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),

 regionOpenInfo.getFavoredNodesList());

 regionServer.service.submit(new OpenRegionHandler(

 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));

 }
        它就是通过HRegionServer中成员变量service的submit()方法,来提交OpenRegionHandler事件的。




        




1.HbaseAdmin发起split:### 2.RSRpcServices实现类执行split(Implements the regionserver RPC services.)### 3.CompactSplitThread类与SplitRequest类用来执行region切割:### 4.splitRequest执行doSplitting操作### 4.1初始化两个子region### 4.2执行切割#### 4.2.1:(创建子region。
        HBase内部,单元格Cell的实现为KeyValue,它是HBase某行数据的某个单元格在内存中的组织形式,由Key Length、Value Length、Key、Value四大部分组成。
第十二届 BigData NoSQL Meetup — 基于hbase的New sql落地实践 立即下载