zl程序教程

您现在的位置是:首页 >  云平台

当前栏目

HMaster分析之Region的负载均衡实现(一)

负载 实现 分析 均衡 region
2023-09-27 14:29:33 时间

        HBase中Region是表按行方向切分的一个个数据区域,由RegionServer负责管理,并向外提供数据读写服务。如果一个RegionServer上的Region过多,那么该RegionServer对应的就会承担过多的读写等服务请求,也就有可能在高并发访问的情况下,造成服务器性能下降甚至宕机。如此,RegionServer间Region的动态负载均衡,也就成了HBase实现高性能读写请求访问的一个需要解决的问题。那么,Region是如何在RegionServer间动态负载均衡的呢?

        在HMaster中,有几个成员变量定义如下:

  LoadBalancer balancer;// 实现Region动态负载均衡的实体

  private BalancerChore balancerChore;// 完成动态负载均衡的工作线程
  // Tracker for load balancer state

  // 加载balancer状态的跟踪器

  LoadBalancerTracker loadBalancerTracker;

        那么,这三个变量在HMaster启动时,是如何初始化的呢?

        在HMaster的构造函数中,有如下调用:

 // 开启Master的各种管理者

 startActiveMasterManager(infoPort);
        startActiveMasterManager()方法中,又有如下调用:

 finishActiveMasterInitialization(status);
        接下来,finishActiveMasterInitialization()方法中,开始了动态负载均衡中用到的各实体的初始化。代码如下:

 initializeZKBasedSystemTrackers();

 //initialize load balancer

 // 初始化balancer

 this.balancer.setClusterStatus(getClusterStatus());

 this.balancer.setMasterServices(this);

 this.balancer.initialize();

 // 创建工作线程balancerChore,它会周期性的调用HMaster的balance()方法,调用周期为参数

 // hbase.balancer.period配置的值,未配置的话默认为5分钟

 this.balancerChore = new BalancerChore(this);

 Threads.setDaemonThreadRunning(balancerChore.getThread());
       而initializeZKBasedSystemTrackers()方法中,完成了三个相关组件的初始化,从而可以实现finishActiveMasterInitialization()方法中后面对balancer的设置。

 // 初始化LoadBalancer类型的成员变量balancer

 // 利用发射技术,优先加载hbase.master.loadbalancer.class参数配置的均衡器类,

 // 参数未配置再加载StochasticLoadBalancer

 this.balancer = LoadBalancerFactory.getLoadBalancer(conf);

 // 初始化LoadBalancerTracker类型的成员变量loadBalancerTracker

 this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);

 // 启动loadBalancerTracker

 this.loadBalancerTracker.start();
        首先,介绍下BalancerChore类型的成员变量balancerChore,它为一个后台线程,会周期性的调用HMaster的balance()方法,周期性的完成实际的Region的负载均衡。它的定义如下:

/**

 * Chore that will call HMaster.balance{@link org.apache.hadoop.hbase.master.HMaster#balance()} when

 * needed.

 * 在需要的时候钓调用HMaster的balance()方法的工作线程。

@InterfaceAudience.Private

public class BalancerChore extends Chore {

 private static final Log LOG = LogFactory.getLog(BalancerChore.class);

 private final HMaster master;

 // 构造方法,线程的名称为master的serverName + "-BalancerChore",

 // chore()方法循环调用周期为参数hbase.balancer.period,默认为5分钟

 public BalancerChore(HMaster master) {

 super(master.getServerName() + "-BalancerChore",

 master.getConfiguration().getInt("hbase.balancer.period", 300000),

 master);

 this.master = master;

 * 线程的run()方法会周期性的调用chore()方法

 @Override

 protected void chore() {

 try {

 // 调用HMaster的balance()方法,完成实际的Region动态负载均衡

 master.balance();

 } catch (IOException e) {

 LOG.error("Failed to balance.", e);

}
      它的父类Chore为HBase中的抽象线程类,HBase中很多工作线程都是继承自Chore,它的run()方法定义如下:

 /**

 * @see java.lang.Thread#run()

 @Override

 public void run() {

 try {

 boolean initialChoreComplete = false;

 while (!this.stopper.isStopped()) {

 // 开始时间

 long startTime = System.currentTimeMillis();

 try {

 // 如果是第一次循环,完成初始化工作

 if (!initialChoreComplete) {

 initialChoreComplete = initialChore();

 } else {

 // 第一次后的每次循环,则周期性的调用chore()方法

 chore();

 } catch (Exception e) {

 LOG.error("Caught exception", e);

 if (this.stopper.isStopped()) {

 continue;

 // 睡眠期睡眠一定的时间,然后再去调用chore()方法

 this.sleeper.sleep(startTime);

 } catch (Throwable t) {

 LOG.fatal(getName() + "error", t);

 } finally {

 LOG.info(getName() + " exiting");

 cleanup();

 }

    下面,我们再重点分析下HMaster的balance()方法,源码如下:

 public boolean balance() throws IOException {

 // if master not initialized, dont run balancer.

 // 如果master没有被初始化,不能运行balancer

 if (!this.initialized) {

 LOG.debug("Master has not been initialized, dont run balancer.");

 return false;

 // Do this call outside of synchronized block.

 int maximumBalanceTime = getBalancerCutoffTime();

 synchronized (this.balancer) {// 在this.balancer上同步

 // If balance not true, dont run balancer.

 // 从loadBalancerTracker处获取balancer是否已开启,如果没有,则返回false

 if (!this.loadBalancerTracker.isBalancerOn()) return false;

 // Only allow one balance run at at time.

 if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {

 Map String, RegionState regionsInTransition =

 this.assignmentManager.getRegionStates().getRegionsInTransition();

 LOG.debug("Not running balancer because " + regionsInTransition.size() +

 " region(s) in transition: " + org.apache.commons.lang.StringUtils.

 abbreviate(regionsInTransition.toString(), 256));

 return false;

 if (this.serverManager.areDeadServersInProgress()) {

 LOG.debug("Not running balancer because processing dead regionserver(s): " +

 this.serverManager.getDeadServers());

 return false;

 if (this.cpHost != null) {

 try {

 if (this.cpHost.preBalance()) {

 LOG.debug("Coprocessor bypassing balancer request");

 return false;

 } catch (IOException ioe) {

 LOG.error("Error invoking master coprocessor preBalance()", ioe);

 return false;

 // 获取表名- {ServerName- Region列表的映射集合}的映射集合assignmentsByTable

 Map TableName, Map ServerName, List HRegionInfo assignmentsByTable =

 this.assignmentManager.getRegionStates().getAssignmentsByTable();

 List RegionPlan plans = new ArrayList RegionPlan 

 //Give the balancer the current cluster state.

 // 设置balancer中集群最新的状态

 this.balancer.setClusterStatus(getClusterStatus());

 // 循环assignmentsByTable中的value:每个表的ServerName- Region列表的映射集合

 for (Map ServerName, List HRegionInfo assignments : assignmentsByTable.values()) {

 List RegionPlan partialPlans = this.balancer.balanceCluster(assignments);

 if (partialPlans != null) plans.addAll(partialPlans);

 long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;

 // 移动的Region总个数

 int rpCount = 0; // number of RegionPlans balanced so far

 // Region的移动计划总耗时

 long totalRegPlanExecTime = 0;

 // Region的移动计划不为空

 if (plans != null !plans.isEmpty()) {

 // 循环处理

 for (RegionPlan plan: plans) {

 LOG.info("balance " + plan);

 // 开始时间

 long balStartTime = System.currentTimeMillis();

 //TODO: bulk assign

 // 调用assignmentManager的balance()方法执行计划

 this.assignmentManager.balance(plan);

 // 累加Region的移动计划总耗时

 totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;

 // 累加移动的Region总个数

 rpCount++;

 if (rpCount plans.size() 

 // if performing next balance exceeds cutoff time, exit the loop

 // 如果完成下一个balance的时间超过cutoffTime,退出循环

 // 这个完成时间是预估的,Region移动的平均耗时,用一个粗略的算法,已完成Region移动的总 耗时/已完成Region移动的总个数

 (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) cutoffTime) {

 //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)

 LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +

 maximumBalanceTime);

 break;

 // 如果协处理器主机不为空,运行协处理器的钩子方法postBalance()

 if (this.cpHost != null) {

 try {

 this.cpHost.postBalance(rpCount plans.size() ? plans.subList(0, rpCount) : plans);

 } catch (IOException ioe) {

 // balancing already succeeded so dont change the result

 LOG.error("Error invoking master coprocessor postBalance()", ioe);

 // If LoadBalancer did not generate any plans, it means the cluster is already balanced.

 // Return true indicating a success.

 return true;

 }

        balance()方法中,首先会利用HMaster中的成员变量assignmentManager,获取表名- {ServerName- Region列表的映射集合}的映射集合assignmentsByTable,如下:

 // 获取表名- {ServerName- Region列表的映射集合}的映射集合assignmentsByTable

 Map TableName, Map ServerName, List HRegionInfo assignmentsByTable =

 this.assignmentManager.getRegionStates().getAssignmentsByTable();
        然后,通过上面提到的HMaster的成员变量balancer的balanceCluster()方法,获得Region的 移动计划列表,添加到数据结构List RegionPlan 类型的plans中。如下:

 // 循环assignmentsByTable中的value:每个表的ServerName- Region列表的映射集合

 for (Map ServerName, List HRegionInfo assignments : assignmentsByTable.values()) {

 List RegionPlan partialPlans = this.balancer.balanceCluster(assignments);

 if (partialPlans != null) plans.addAll(partialPlans);

 }
        紧接着,循环处理这个移动计划列表plans,开始移动Region,如下:

 // Region的移动计划不为空

 if (plans != null !plans.isEmpty()) {

 // 循环处理

 for (RegionPlan plan: plans) {

 LOG.info("balance " + plan);

 // 开始时间

 long balStartTime = System.currentTimeMillis();

 //TODO: bulk assign

 // 调用assignmentManager的balance()方法执行计划

 this.assignmentManager.balance(plan);

 // 累加Region的移动计划总耗时

 totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;

 // 累加移动的Region总个数

 rpCount++;

 if (rpCount plans.size() 

 // if performing next balance exceeds cutoff time, exit the loop

 // 如果完成下一个balance的时间超过cutoffTime,退出循环

 // 这个完成时间是预估的,Region移动的平均耗时,用一个粗略的算法,已完成Region移动的总 耗时/已完成Region移动的总个数

 (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) cutoffTime) {

 //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)

 LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +

 maximumBalanceTime);

 break;

 }
        移动计划的执行,实际上是由assignmentManager的balance()实现的,并且,在执行移动计划时,会根据以往执行过的计划的平均耗时是否超过一定阈值,来确定是继续此移动计划还是跳过转而执行下一个。

        最后,如果协处理器主机不为空,运行协处理器的钩子方法postBalance(),如下:

 // 如果协处理器主机不为空,运行协处理器的钩子方法postBalance()

 if (this.cpHost != null) {

 try {

 this.cpHost.postBalance(rpCount plans.size() ? plans.subList(0, rpCount) : plans);

 } catch (IOException ioe) {

 // balancing already succeeded so dont change the result

 LOG.error("Error invoking master coprocessor postBalance()", ioe);

 }
      那么,最关键的几个问题,就是:

     1、需要移动的Region是如何被选中,它又要被移动往哪里?

     2、Region移动的执行,具体的流程是怎么样的?

      且听下回分解~