zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

HBase源码分析之HRegionServer上MemStore的flush处理流程(一)

HBase流程源码 处理 分析 Flush
2023-09-27 14:29:33 时间

        在《HBase源码分析之HRegion上MemStore的flsuh流程(一)》《HBase源码分析之HRegion上MemStore的flsuh流程(二)》等文中,我们介绍了HRegion上Memstore flush的主体流程和主要细节。但是,HRegion只是HBase表中按照行的方向对一片连续的数据区域的抽象,它并不能对外提供单独的服务,供客户端或者HBase其它实体调用。而HRegion上MemStore的flush还是要通过HRegionServer来对外提供服务的。下面,我们就详细探究下HRegionServer上是如何实现这点的。

        在HRegionServer中,有一个叫做cacheFlusher的东东,它是什么呢?我们先看一下它是如何被定义的:

 // Cache flushing

 // memstore内存刷新管理对象

 protected MemStoreFlusher cacheFlusher;
        可以发现,cacheFlusher是MemStoreFlusher类型的一个对象,我们来看下类的注释及定义:
/**

 * Thread that flushes cache on request

 * 处理刷新缓存请求的线程

 * NOTE: This class extends Thread rather than Chore because the sleep time

 * can be interrupted when there is something to do, rather than the Chore

 * sleep time which is invariant.

 * @see FlushRequester

@InterfaceAudience.Private

class MemStoreFlusher implements FlushRequester {
        cacheFlusher实际上就是HRegionServer上处理刷新缓存请求的线程。那么接下来的问题就是,cacheFlusher是如何被初始化的?它又是如何处理flush请求的?带着这两个问题,我们继续本文。

        

        一、如何初始化cacheFlusher

        首先,我们发现HRegionServer继承自HasThread,而HasThread实现了Runnable接口,那么在其内部肯定会执行run()方法,而run()方法的开始,有如下代码:

try {

 // Do pre-registration initializations; zookeeper, lease threads, etc.

 preRegistrationInitialization();

 } catch (Throwable e) {

 abort("Fatal exception during initialization", e);

 }
        继续追踪preRegistrationInitialization()方法,在其内部,调用了initializeThreads()方法,如下:
if (!isStopped() !isAborted()) {

 initializeThreads();

 }
        而这个initializeThreads()方法,做的主要工作就是初始化HRegionServer内部的各种工作线程,其中就包括cacheFlusher,代码如下:

// Cache flushing thread.

 // 缓存刷新线程

 this.cacheFlusher = new MemStoreFlusher(conf, this);
        接下来,我们在看看这个MemStoreFlusher类是如何定义及工作的。首先看下它最主要的几个成员变量:

        首当其冲的便是flushQueue,其定义如下:

private final BlockingQueue FlushQueueEntry flushQueue =

 new DelayQueue FlushQueueEntry 
        flushQueue是MemStoreFlusher中非常重要的一个变量,它是一个存储了Region刷新缓存请求的队列。而与flushQueue同时被更新的是regionsInQueue,它存储的是HRegion到FlushRegionEntry映射关系的集合,FlushRegionEntry是对发起memstore刷新请求的HRegion的一个封装,不仅包含了HRegion实例,还包括HRegion刷新memstore请求的产生时间,到期时间,以及一种类似续约的处理方式,即延长该请求的到期时间等。regionsInQueue的定义如下:

private final Map HRegion, FlushRegionEntry regionsInQueue =

 new HashMap HRegion, FlushRegionEntry 
        flushQueue和regionsInQueue的更新是同步的,即如果在flushQueue中加入或删除一条记录,那么在regionsInQueue中也会同步加入或删除一条记录。

        接下来比较重要的便是flushHandlers,它是FlushHandler类型的一个数组,定义如下:

private final FlushHandler[] flushHandlers;

        FlushHandler是什么呢?它是处理缓存刷新的线程类,线程一旦启动后,在其run()方法内,就会不停的从flushQueue队列中拉取flush请求进行处理,其类的定义如下:

 /**

 * 处理缓存刷新的线程类

 private class FlushHandler extends HasThread {

        以上就是MemStoreFlusher内执行flush流程最重要的几个成员变量,其他的变量都是一些辅助性的,这里不再做详细介绍。

        下面,我们来看下MemStoreFlusher的构造及成员变量的初始化,构造函数如下:

 /**

 * @param conf

 * @param server

 public MemStoreFlusher(final Configuration conf,

 final HRegionServer server) {

 super();

 // 赋值RegionServer实例server

 this.server = server;

 // 线程唤醒频率threadWakeFrequency,取参数hbase.server.thread.wakefrequency配置的值,默认为10s,即线程的工作频率

 this.threadWakeFrequency =

 conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);

 // 获取最大可用堆内存max

 long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();

 // 获取全局memstore所占堆内存的百分比globalMemStorePercent

 float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);

 // 获取全局memstore大小限制值globalMemStoreLimit

 this.globalMemStoreLimit = (long) (max * globalMemStorePercent);

 // 获取全局memstore大小限制值的低水平线百分比globalMemStoreLimitLowMarkPercent

 this.globalMemStoreLimitLowMarkPercent = 

 HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);

 // 获取全局memstore大小限制值的低水平线globalMemStoreLimitLowMark

 this.globalMemStoreLimitLowMark = 

 (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);

 // 获取阻塞等待时间blockingWaitTime,取参数hbase.hstore.blockingWaitTime,默认为90000

 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",

 90000);

 // 获取flush处理线程数目handlerCount,取参数hbase.hstore.flusher.count,默认为2

 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);

 // 构造handlerCount个flush处理线程数组,默认为2个,可通过hbase.hstore.flusher.count设置

 this.flushHandlers = new FlushHandler[handlerCount];

 // 记录日志信息

 LOG.info("globalMemStoreLimit=" +

 StringUtils.humanReadableInt(this.globalMemStoreLimit) +

 ", globalMemStoreLimitLowMark=" +

 StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +

 ", maxHeap=" + StringUtils.humanReadableInt(max));

        MemStoreFlusher的构造函数比较简单,我们重点分析下获取全局memstore所占堆内存的百分比globalMemStorePercent的HeapMemorySizeUtil类的getGlobalMemStorePercent()方法,和获取全局memstore大小限制值的低水平线百分比globalMemStoreLimitLowMarkPercent的HeapMemorySizeUtil类的getGlobalMemStoreLowerMark()方法。

        首先,看下获取全局memstore所占堆内存的百分比globalMemStorePercent的HeapMemorySizeUtil类的getGlobalMemStorePercent()方法,代码如下:

/**

 * Retrieve global memstore configured size as percentage of total heap.

 * 获取配置的全局memstore占整个heap内存的百分比

 * @param c

 * @param logInvalid

 public static float getGlobalMemStorePercent(final Configuration c, final boolean logInvalid) {

 // 获取全局memstore的大小,优先取参数hbase.regionserver.global.memstore.size,

 // 未配置的话再取参数hbase.regionserver.global.memstore.upperLimit,

 // 如果还未配置的话,默认为0.4

 float limit = c.getFloat(MEMSTORE_SIZE_KEY,

 c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE));

 // 如果limit的值在区间(0,0.8]之外的话

 if (limit 0.8f || limit = 0.0f) {

 if (logInvalid) {// 根据参数logInvalid确定是否记录警告日志

 LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE

 + " because supplied value outside allowed range of (0 - 0.8]");

 // 将limit设置为0.4

 limit = DEFAULT_MEMSTORE_SIZE;

 // 返回limit

 return limit;

 }
        这个方法的主要作用就是获取配置的全局memstore占整个heap内存的百分比。获取的逻辑如下:

        1、获取配置的全局memstore占整个heap内存的百分比limit:优先取参数hbase.regionserver.global.memstore.size,未配置的话再取参数hbase.regionserver.global.memstore.upperLimit,如果还未配置的话,默认为0.4;

        2、判断limit是否在区间(0,0.8]之外,根据参数logInvalid确定是否记录警告日志,并将limit设置为默认值0.4;

        3、返回limit。

        下面,我们再看下获取全局memstore大小限制值的低水平线百分比globalMemStoreLimitLowMarkPercent的HeapMemorySizeUtil类的getGlobalMemStoreLowerMark()方法,代码如下:

 /**

 * Retrieve configured size for global memstore lower water mark as percentage of total heap.

 * 获取配置的全局memstore内存占全部heap内存的低水平线百分比

 * @param c

 * @param globalMemStorePercent

 public static float getGlobalMemStoreLowerMark(final Configuration c, float globalMemStorePercent) {

 // 取新参数hbase.regionserver.global.memstore.size.lower.limit 

 String lowMarkPercentStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_KEY);

 // 如果新参数配置了的话,直接转化为double并返回

 if (lowMarkPercentStr != null) {

 return Float.parseFloat(lowMarkPercentStr);

 // 取旧参数hbase.regionserver.global.memstore.lowerLimit"

 String lowerWaterMarkOldValStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY);

 // 如果旧参数配置的话,记录警告日志信息

 if (lowerWaterMarkOldValStr != null) {

 LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use "

 + MEMSTORE_SIZE_LOWER_LIMIT_KEY);

 // 转化为double类型lowerWaterMarkOldVal

 float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr);

 // 如果参数值大于计算得到的全局memstore所占堆内存的百分比,赋值为globalMemStorePercent,并记录日志信息

 if (lowerWaterMarkOldVal globalMemStorePercent) {

 lowerWaterMarkOldVal = globalMemStorePercent;

 LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " + "because supplied "

 + MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " was " + MEMSTORE_SIZE_OLD_KEY);

 // 返回lowerWaterMarkOldVal / globalMemStorePercent

 return lowerWaterMarkOldVal / globalMemStorePercent;

 // 如果新旧参数均未配置的话,默认为0.95

 return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT;

 }
        这个方法的主要作用就是获取配置的全局memstore内存占全部heap内存的低水平线百分比。获取的逻辑如下:

        1、取新参数hbase.regionserver.global.memstore.size.lower.limit配置的值,如果新参数配置了的话,直接转化为double并返回;

        2、如果新参数未配置的话,取旧参数hbase.regionserver.global.memstore.lowerLimit配置的值,如果旧参数配置的话,记录警告日志信息,并:

              2.1、将旧参数配置的值转化为double类型lowerWaterMarkOldVal;

              2.2、如果旧参数值大于计算得到的全局memstore所占堆内存的百分比,赋值为globalMemStorePercent,并记录日志信息;

              2.3、返回lowerWaterMarkOldVal / globalMemStorePercent;

        3、如果新旧参数均未配置的话,默认为0.95。

       

        二、cacheFlusher如何处理flush请求

        通过如何初始化cacheFlusher部分的介绍,我们已经知道,在MemStoreFlusher内部,存在两个存储flush请求及其HRegion封装类的队列和集合,即flushQueue和regionsInQueue,而MemStoreFlusher对外提供了一个requestFlush()方法,我们大体看下这个方法:

/**

 * 请求刷新,

 * 即将需要刷新MemStore的HRegion放置到regionsInQueue中,

 * 同时根据HRegion构造FlushRegionEntry实例,添加到flushQueue中

 public void requestFlush(HRegion r) {

 synchronized (regionsInQueue) {// 使用synchronized关键字对regionsInQueue进行线程同步

 if (!regionsInQueue.containsKey(r)) {// 如果regionsInQueue中不存在对应HRegion

 // This entry has no delay so it will be added at the top of the flush

 // queue. Itll come out near immediately.

 // 将HRegion类型的r封装成FlushRegionEntry类型的fqe

 // 这个fqe没有delay,即延迟执行时间,所以它被添加到flush队列的顶部。不久它将出列被处理。

 FlushRegionEntry fqe = new FlushRegionEntry(r);

 // 将HRegion- FlushRegionEntry的对应关系添加到regionsInQueue集合

 // 将flush请求FlushRegionEntry添加到flushQueue队列

 // 从这里可以看出regionsInQueue、flushQueue这两个成员变量go together

 this.regionsInQueue.put(r, fqe);

 this.flushQueue.add(fqe);

 }
        requestFlush()方法的主要作用,就是添加一个flush region的请求至MemStoreFlusher内部队列。其主要逻辑如下:

        1、首先需要使用synchronized关键字对regionsInQueue进行线程同步,这么做是为了防止多线程的并发;

        2、然后判断regionsInQueue中是否存在对应的HRegion,如果regionsInQueue集合中不存在对应HRegion的话继续,否则直接返回;

        3、既然regionsInQueue集合中不存在对应HRegion,将HRegion类型的r封装成FlushRegionEntry类型的fqe;

        4、将HRegion- FlushRegionEntry的对应关系添加到regionsInQueue集合;

        5、将flush请求FlushRegionEntry添加到flushQueue队列。

        从上述4、5步就可以看出regionsInQueue、flushQueue这两个成员变量go together,并且这个fqe没有delay,即延迟执行时间,所以它被添加到flush队列的顶部,不久它将出列被处理。这个该怎么理解呢?我们还是回到flushQueue的定义,flushQueue是一个存储了Region刷新缓存请求的队列,里面存储的是实现了FlushQueueEntry接口的对象,FlushQueueEntry没有定义任何行为,但是继承了java.util.concurrent.Delayed接口,故flushQueue是java中的DelayQueue,队列里存储的对象有一个过期时间的概念。

        既然flush的请求已经被添加至flushQueue队列,相当于生产者已经把产品生产出来了,那么谁来消费呢?这个消费者的角色就是由FlushHandler线程来担任的。既然是线程,那么处理的逻辑肯定在其run()方法内,但是在研究其run()方法前,我们先看下flushQueue中存储的都是什么东西?

        我们再回顾下flushQueue的定义,它是一个存储了FlushQueueEntry的队列DelayQueue。我们先看下FlushQueueEntry的定义:

interface FlushQueueEntry extends Delayed {

 }
        一个集成了java的Delayed接口的无任何方法的空接口而已,那么它都有哪些实现类呢?答案就是WakeupFlushThread和FlushRegionEntry。在介绍这二者之前,我们首先介绍下flushQueue对应的队列类型---Java中的DelayQueue。

        众所周知,DelayQueue是一个无界的BlockingQueue,其内部存储的必然是实现了Delayed接口的对象。所以,FlushQueueEntry必须实现java的Delayed接口。而这种队列中的成员有一个最大特点,就是只有在其到期后才能出列,并且该队列内的成员都是有序的,从头至尾按照延迟到期时间的长短来排序。那么如何判断成员是否到期呢?对应成员对象的getDelay()方法返回一个小于等于0的值,就说明对应对象在队列中已到期,可以被取走。

        既然DelayQueue中存储的成员对象都是有序的,那么实现了Delayed接口的类,必须提供compareTo()方法,用以排序,并且需要实现上述getDelay()方法,判断队内成员是否到期可以被取走。

        接下来,我们分别来研究下WakeupFlushThread和FlushRegionEntry。

        首先,WakeupFlushThread非常简单,没有任何实质内容,代码如下:

/**

 * Token to insert into the flush queue that ensures that the flusher does not sleep

 * 加入到刷新队列的确保刷新器不睡眠的令牌

 static class WakeupFlushThread implements FlushQueueEntry {

 @Override

 public long getDelay(TimeUnit unit) {

 return 0;

 @Override

 public int compareTo(Delayed o) {

 return -1;

 @Override

 public boolean equals(Object obj) {

 return (this == obj);

 }
        它的主要作用是做为一个占位符或令牌插入到刷新队列flushQueue,以确保FlushHandler不会休眠。而且,其getDelay()方法返回值为0,说明其不存在延迟时间,入列后即可出列。而它的compareTo()方法返回的值是-1,说明它与其它WakeupFlushThread在队内的顺序是等价的,无前后之分,实际上WakeupFlushThread区分前后也没有意义,它本身也没有实质性的内容。

        接下来,我们再看下FlushRegionEntry类,其定义如下:

/**

 * Datastructure used in the flush queue. Holds region and retry count.

 * Keeps tabs on how old this object is. Implements {@link Delayed}. On

 * construction, the delay is zero. When added to a delay queue, well come

 * out near immediately. Call {@link #requeue(long)} passing delay in

 * milliseconds before readding to delay queue if you want it to stay there

 * a while.

 * 用在刷新队列里的数据结构。保存region和重试次数。

 * 跟踪对象多大(ps.即时间)

 * 实现了java的Delayed接口。

 * 在构造方法里,delay为0。

 * 如果你想要它在队列中保持在在被重新加入delay队列之前

 static class FlushRegionEntry implements FlushQueueEntry {

 // 待flush的HRegion

 private final HRegion region;

 // 创建时间 

 private final long createTime;

 // 何时到期

 private long whenToExpire;

 // 重入队列次数

 private int requeueCount = 0;

 FlushRegionEntry(final HRegion r) {

 // 待flush的HRegion

 this.region = r;

 // 创建时间为当前时间

 this.createTime = EnvironmentEdgeManager.currentTime();

 // 何时到期也为当前时间,意味着首次入队列时是没有延迟时间的,入列即可出列

 this.whenToExpire = this.createTime;

 * @param maximumWait

 * @return True if we have been delayed code maximumWait /code milliseconds.

 public boolean isMaximumWait(final long maximumWait) {

 return (EnvironmentEdgeManager.currentTime() - this.createTime) maximumWait;

 * @return Count of times {@link #requeue(long)} was called; i.e this is

 * number of times weve been requeued.

 public int getRequeueCount() {

 return this.requeueCount;

 * 类似重新入列的处理方法,重新入列次数requeueCount加1,何时到期未当前时间加参数when

 * @param when When to expire, when to come up out of the queue.

 * Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime()

 * to whatever you pass.

 * @return This.

 public FlushRegionEntry requeue(final long when) {

 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;

 this.requeueCount++;

 return this;

 * 判断何时到期的方法

 @Override

 public long getDelay(TimeUnit unit) {

 // 何时到期减去当前时间

 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),

 TimeUnit.MILLISECONDS);

 * 排序比较方法,根据判断何时到期的getDelay()方法来决定顺序

 @Override

 public int compareTo(Delayed other) {

 // Delay is compared first. If there is a tie, compare regions hash code

 int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -

 other.getDelay(TimeUnit.MILLISECONDS)).intValue();

 if (ret != 0) {

 return ret;

 // 何时到期时间一直的话,根据hashCode()来排序,其实也就是根据HRegion的hashCode()方法返回值来排序

 FlushQueueEntry otherEntry = (FlushQueueEntry) other;

 return hashCode() - otherEntry.hashCode();

 @Override

 public String toString() {

 return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";

 @Override

 public int hashCode() {

 int hash = (int) getDelay(TimeUnit.MILLISECONDS);

 return hash ^ region.hashCode();

 @Override

 public boolean equals(Object obj) {

 if (this == obj) {

 return true;

 if (obj == null || getClass() != obj.getClass()) {

 return false;

 Delayed other = (Delayed) obj;

 return compareTo(other) == 0;

}
        FlushRegionEntry类有几个很重要的对像:

        1、HRegion region:待flush的HRegion;

        2、long createTime:创建时间;

        3、long whenToExpire:何时到期;

        4、int requeueCount = 0:重入队列次数。

        而它的对象在初始化时,创建时间createTime设置为当前时间,何时到期whenToExpire也为当前时间,它判断是否到期的getDelay()方法为何时到期减去当前时间,也就意味着首次入队列时是没有延迟时间的,入列即可出列。另外,它在队列内部用于排序的compareTo()方法,也是首先根据判断何时到期的getDelay()方法来决定顺序,何时到期时间一致的话,根据hashCode()来排序,其实也就是根据HRegion的hashCode()方法返回值来排序。比较特别的是,这个类还提供了类似重新入列的处理方法,重新入列次数requeueCount加1,何时到期未当前时间加参数when,那么就相当于延期的了when时间变量。

        说了那么多,接下来我们看下flush请求的实际处理流程,即FlushHandler的run()方法,其代码为:

@Override

 public void run() {

 while (!server.isStopped()) {// HRegionServer未停止的话,run()方法一直运行

 FlushQueueEntry fqe = null;

 try {

 // 标志位AtomicBoolean类型的wakeupPending设置为false

 wakeupPending.set(false); // allow someone to wake us up again

 // 从flushQueue队列中拉取一个FlushQueueEntry,即fqe

 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);

 if (fqe == null || fqe instanceof WakeupFlushThread) {// 如果fqe为空,或者为WakeupFlushThread

 if (isAboveLowWaterMark()) {

 // 由于内存高于低阈值,flush线程唤醒

 LOG.debug("Flush thread woke up because memory above low water="

 + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));

 // 调用flushOneForGlobalPressure()方法,flush一个HRegion的MemStore,

 // 降低MemStore的大小,预防OOM等异常情况的发生

 if (!flushOneForGlobalPressure()) {

 // Wasnt able to flush any region, but were above low water mark

 // This is unlikely to happen, but might happen when closing the

 // 这是不可能发生的,但是当关闭全部服务器时可能发生,另外一个线程正在flush region;

 // entire server - another thread is flushing regions. Well just

 // sleep a little bit to avoid spinning, and then pretend that

 // we flushed one, so anyone blocked will check again

 // 我们将会休眠一段时间,以避免旋转,然后假装我们flush了一个region,以使得被阻塞线程再次检查

 Thread.sleep(1000);

 wakeUpIfBlocking();// 唤醒其他阻塞线程

 // Enqueue another one of these tokens so well wake up again

 // 入列另一个令牌,以使我们之后再次被唤醒

 wakeupFlushThread();

 continue;

 // fre不为空,且不为WakeupFlushThread的话,转化为FlushRegionEntry类型的fre

 FlushRegionEntry fre = (FlushRegionEntry) fqe;

 // 调用flushRegion()方法,并且如果结果为false的话,跳出循环

 if (!flushRegion(fre)) {

 break;

 } catch (InterruptedException ex) {

 continue;

 } catch (ConcurrentModificationException ex) {

 continue;

 } catch (Exception ex) {

 LOG.error("Cache flusher failed for entry " + fqe, ex);

 if (!server.checkFileSystem()) {

 break;

 // 同时清空regionsInQueue和flushQueue

 // 又是在一起啊

 synchronized (regionsInQueue) {

 regionsInQueue.clear();

 flushQueue.clear();

 // Signal anyone waiting, so they see the close flag

 // 唤醒所有的等待着,使得它们能够看到close标志

 wakeUpIfBlocking();

 // 记录日志信息

 LOG.info(getName() + " exiting");

 }
        它的主要处理逻辑为:

        1、首先HRegionServer未停止的话,run()方法一直运行;

        2、将标志位AtomicBoolean类型的wakeupPending设置为false;

        3、从flushQueue队列中拉取一个FlushQueueEntry,即fqe:

             3.1、如果fqe为空,或者为WakeupFlushThread:

                      3.1.1、如果通过isAboveLowWaterMark()方法判断全局MemStore的大小高于限制值得低水平线,调用flushOneForGlobalPressure()方法,按照一定策略,flush一个HRegion的MemStore,降低MemStore的大小,预防OOM等异常情况的发生,并入列另一个令牌,以使该线程之后再次被唤醒;

             3.2、fre不为空,且不为WakeupFlushThread的话,转化为FlushRegionEntry类型的fre:调用flushRegion()方法,并且如果结果为false的话,跳出循环;

        4、如果循环结束,同时清空regionsInQueue和flushQueue(ps:又是在一起啊O(∩_∩)O~)

        5、唤醒所有的等待着,使得它们能够看到close标志;

        6、记录日志。

        我们注意到,WakeupFlushThread的主要作用是做为一个占位符或令牌插入到刷新队列flushQueue,以确保FlushHandler不会休眠,实际上WakeupFlushThread起到的作用不仅仅是这个,在FlushHandler线程不断的poll刷新队列flushQueue中的元素时,如果获取到的是一个WakeupFlushThread,它会发起 一个检测,即RegionServer的全局MemStore大小是否超过低水平线,如果未超过,WakeupFlushThread仅仅起到了一个占位符的作用,否则,WakeupFlushThread不仅做为占位符,保证刷新线程不休眠,还按照一定策略选择该RegionServer上的一个Region刷新memstore,以缓解RegionServer内存压力。

        至于,如果全局MemStore的大小高于限制值得低水平线时,如何选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是如何发起的,我们下节再讲,敬请期待。

      






        











从数据结构比较HBase的3种memstore实现方案 HBase的memstore目前存在3种实现:DefaultMemstore、CompactingMemstore、CCSMapMemStore,本文尝试从数据结构的角度对其进行比较。
hbase源码系列(十三)缓存机制MemStore与Block Cache 这一章讲hbase的缓存机制,这里面涉及的内容也是比较多,呵呵,我理解中的缓存是保存在内存中的特定的便于检索的数据结构就是缓存。
* Chore to clean periodically the moved region list * 被移动Region列表的定期清理工作线程 private MovedRegionsCleaner movedRegionsCleaner;        原来它是HRegionServer上一个被移动Region列表的定期清理工作线程。
        在《HBase源码分析之MemStore的flush发起时机、判断条件等详情》一文中,我们详细介绍了MemStore flush的发起时机、判断条件等详情,主要是两类操作,一是会引起MemStore数据大小变化的Put、Delete、Append、Increment等操作,二是会引起HRegion变化的诸如Regin的分裂、合并以及做快照时的复制拷贝等,同样会触发MemStore的flush流程。
        前面三篇文章中,我们详细叙述了compact流程是如何在HRegion上进行的,了解了它的很多细节方面的问题。但是,这个compact在HRegionServer上是如何进行的?合并时文件是如何选择的呢?在这篇文章中,你将找到答案!         首先,在HRegionServer内部,我们发现,它定义了一个CompactSplitThread类型的成员变量compactSplitThread,单看字面意思,这就是一个合并分裂线程,那么它会不会就是HRegionServer上具体执行合并的工作线程呢?我们一步一步来看。
        前面的几篇文章,我们详细介绍了HBase中HRegion上MemStore的flsuh流程,以及HRegionServer上MemStore的flush处理流程。那么,flush到底是在什么情况下触发的呢?本文我们将详细探究下HBase中MemStore的flush流程的发起时机,看看到底都有哪些操作,或者哪些后台服务进程会触发MemStore的flush。
        继上篇文章《HBase源码分析之HRegionServer上MemStore的flush处理流程(一)》遗留的问题之后,本文我们接着研究HRegionServer上MemStore的flush处理流程,重点讲述下如何选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是如何发起的。
第十二届 BigData NoSQL Meetup — 基于hbase的New sql落地实践 立即下载