zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

【HBase】HBase系列之HBase高级篇——协处理器ObServer及EndPoint开发

HBase开发 系列 高级 observer Endpoint
2023-09-11 14:22:06 时间

上一篇:HBase系列之HBase Shell常用命令实战


指路牌

Observer

Observer 类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor 就是一些散布在 HBase Server 端代码中的 hook 钩子, 在固定的事件发生时被调用。比如:put 操作之前有钩子函数 prePut,该函数在 put 操作执 行前会被 Region Server 调用;在 put 操作之后则有 postPut 钩子函数。

  1. 编写观察者
public class UserAppendObServer extends BaseRegionObserver {
    private final static Log LOG= LogFactory.getLog(UserAppendObServer.class);
    static Connection conn = null;
    static {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "CentOS");
        try {
            LOG.info("create connection successfully");
            conn = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append) throws IOException {
        LOG.info("User Append SomeThing ~~~~~~~~~~~~~~");

        CellScanner cellScanner = append.cellScanner();
        while (cellScanner.advance()){
            Cell cell = cellScanner.current();
            if(Bytes.toString(CellUtil.cloneQualifier(cell)).equals("subscribe")){
                String followerID= Bytes.toString(CellUtil.cloneRow(cell));
                String userID=Bytes.toString(CellUtil.cloneValue(cell));
                userID=userID.substring(0,userID.length()-1);

                Append newAppend=new Append(userID.getBytes());
                newAppend.add("cf1".getBytes(),"followers".getBytes(),(followerID+"|").getBytes());
                Table table = conn.getTable(TableName.valueOf("zpark:t_follower"));
                table.append(newAppend);
                table.close();
                LOG.info(userID+" add a new follower "+followerID);
            }
        }

        return null;
    }
}

  1. 将代码打包,上传至HDFS
[root@CentOS ~]# hdfs dfs -mkdir /libs
[root@CentOS ~]# hdfs dfs -put HBase-1.0-SNAPSHOT.jar /libs/
  1. 启动hbase,并且实时查看RegionServer的启动日志
[root@CentOS ~]# rm -rf /usr/hbase-1.2.4/logs/*
[root@CentOS ~]# start-hbase.sh
[root@CentOS ~]# tail -f /usr/hbase-1.2.4/logs/hbase-root-regionserver-CentOS.log

4、给zpark:t_user添加协处理器

[root@CentOS ~]# hbase shell
hbase(main):001:0> disable 'zpark:t_user'

hbase(main):003:0> alter 'zpark:t_user' , METHOD =>'table_att','coprocessor'=>'hdfs:///libs/HBase-1.0-SNAPSHOT.jar|com.baizhi.observer.UserAppendObServer|1001'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 2.0830 seconds
hbase(main):004:0> enable 'zpark:t_user'
0 row(s) in 1.2890 seconds

参数解释:alter 表名字,METHOD=>'table_att','coprocessor'=>'jar路径|全限定名|优先级|[可选参数]'

  1. 测试监听器是否生效
hbase(main):005:0> desc 'zpark:t_user'
Table zpark:t_user is ENABLED
zpark:t_user, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs:///libs/HBase-1.0-SNAPSHOT.jar|com.baizhi.observer.UserAppendObServer|1001'}
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLO
CKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
{NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLO
CKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
2 row(s) in 0.0490 seconds
  1. 尝试执行Append命令,注意观察日志输出
hbase(main):003:0> append  'zpark:t_user','001','cf1:subscribe','002|'
0 row(s) in 0.2140 seconds
2020-10-10 17:23:20,847 INFO  [B.defaultRpcServer.handler=3,queue=0,port=16020] observer.UserAppendObServer: User Append SomeThing ~~~~~~~~~~~~~~

Endpoint

Endpoint 协处理器类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理,最常见 的用法就是进行聚合操作。

如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的操作。这样的方法无法利用底层集群的并发能力,而将所有计算都集中到 Client 端统一执行, 势必效率低下。

利用 Coprocessor,用户可以将求最大值的代码部署到HBase Server端,HBase将利用底层 cluster 的多个节点并发执行求最大值的操作。在每个 Region 范围内执行求最 大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客 户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。这样整体 的执行效率就会提高很多。

老版本的 HBase(即 HBase 0.96 之前 ) 采用 Hadoop RPC 进行进程间通信。在 HBase 0.96 版本以后,引入了新的进程间通信机制 protobuf RPC,基于 Google 公司的 protocol buffer 开源软件。HBase 需要使用 Protobuf 2.5.0 版本。我们需要借助Protobuf生成协议所需的一些代码片段。

  1. 安装protobuf-2.5.0.tar.gz,目的是能够使用protoc产生代码片段
[root@CentOS ~]# yum install -y gcc-c++
[root@CentOS ~]# tar -zxf protobuf-2.5.0.tar.gz
[root@CentOS ~]# cd protobuf-2.5.0
[root@CentOS protobuf-2.5.0]# ./configure
[root@CentOS protobuf-2.5.0]# make
[root@CentOS protobuf-2.5.0]# make install
  1. 确保安装成功,用户可以执行 protoc --version
[root@CentOS ~]# protoc --version
libprotoc 2.5.0
  1. 编写RPC所需的服务和实体类 RegionAvgService.proto
option java_package = "com.baizhi.endpoint";

option java_outer_classname = "RegionAvgServiceInterface";
option java_multiple_files = true;
option java_generic_services = true;
option optimize_for = SPEED;

message Request{
 required string groupFamillyName = 1;
 required string groupColumnName = 2;
 required string avgFamillyName = 3;
 required string avgColumnName = 4;
 required string startRow = 5;
 required string stopRow = 6;
}
message KeyValue{
  required string groupKey=1;
  required int64 count = 2;
  required double sum = 3;
}

message Response{
 repeated KeyValue arrays = 1;
}


service RegionAvgService {
 rpc queryResult(Request)
 returns(Response);
}
  1. 生成计算所需的代码片段
[root@CentOS ~]# protoc --java_out=./ RegionAvgService.proto
[root@CentOS ~]# tree com
com
└── baizhi
    └── endpoint
        ├── KeyValue.java
        ├── KeyValueOrBuilder.java
        ├── RegionAvgServiceInterface.java
        ├── RegionAvgService.java
        ├── Request.java
        ├── RequestOrBuilder.java
        ├── Response.java
        └── ResponseOrBuilder.java

2 directories, 8 files

附注:有关proto文件语法的说明参考https://blog.csdn.net/u014308482/article/details/52958148

  1. 开发所需的远程服务代码
public class UserRegionAvgEndpoint extends RegionAvgService implements Coprocessor, CoprocessorService {
    private RegionCoprocessorEnvironment env;
    private final static Log LOG= LogFactory.getLog(UserRegionAvgEndpoint.class);
    /**
     * RCP远程调用方法的实现,用户需要在该方法中实现局部计算
     * @param controller
     * @param request
     * @param done
     */
    public void queryResult(RpcController controller, Request request, RpcCallback<Response> done) {
        LOG.info("===========queryResult===========");
        try {
            //获取对应的Region
            Region region = env.getRegion();
            LOG.info("Get DataFrom Region :"+region.getRegionInfo().getRegionNameAsString());
            //查询区域的数据
            Scan scan = new Scan();
            //仅仅只查询 分组、聚合字段
            scan.setStartRow(toBytes(request.getStartRow()));
            scan.setStopRow(toBytes(request.getStopRow()));
            scan.addColumn(toBytes(request.getGroupFamillyName()),toBytes(request.getGroupColumnName()));
            scan.addColumn(toBytes(request.getAvgFamillyName()),toBytes(request.getAvgColumnName()));
            RegionScanner regionScanner = region.getScanner(scan);

            //遍历结果
            Map<String,KeyValue> keyValueMap=new HashMap<String, KeyValue>();
            boolean hasMore=false;
            List<Cell> result=new ArrayList<Cell>();

            while(hasMore=regionScanner.nextRaw(result)){
                
                Cell groupCell = result.get(0);
                Cell avgCell = result.get(1);

                String groupKey = Bytes.toString(cloneValue(groupCell));
                Double avgValue = Bytes.toDouble(cloneValue(avgCell));
                
                LOG.info(groupKey+"\t"+avgValue);

                //判断keyValueMap是否存在groupKey
                if(!keyValueMap.containsKey(groupKey)){
                    
                    KeyValue.Builder keyValueBuilder = KeyValue.newBuilder();
                    keyValueBuilder.setCount(1);
                    keyValueBuilder.setSum(avgValue);
                    keyValueBuilder.setGroupKey(groupKey);
                    
                    keyValueMap.put(groupKey,keyValueBuilder.build());
                }else{
                    //获取历史数据
                    KeyValue keyValueBuilder = keyValueMap.get(groupKey);
                    KeyValue.Builder newKeyValueBuilder = KeyValue.newBuilder();
                    //进行累计
                    newKeyValueBuilder.setSum(avgValue+keyValueBuilder.getSum());
                    newKeyValueBuilder.setCount(keyValueBuilder.getCount()+1);
                    newKeyValueBuilder.setGroupKey(keyValueBuilder.getGroupKey());
                    
                    //覆盖历史数据
                    keyValueMap.put(groupKey,newKeyValueBuilder.build());
                }
                
                //清空result
                result.clear();
            }

            //构建返回结果
            Response.Builder responseBuilder = Response.newBuilder();
            for (KeyValue value : keyValueMap.values()) {
                responseBuilder.addArrays(value);
            }
            Response response = responseBuilder.build();
            done.run(response);//将结果传输给客户端
        } catch (IOException e) {
            e.printStackTrace();
            LOG.error(e.getMessage());
        }

    }

    /**
     * 这是系统的生命周期回调方法,每个Region都会创建一个UserRegionAvgEndpoint实例
     * @param env
     * @throws IOException
     */
    public void start(CoprocessorEnvironment env) throws IOException {
        LOG.info("===========start===========");
        if(env instanceof RegionCoprocessorEnvironment){
            this.env= (RegionCoprocessorEnvironment) env;
        }else{
            throw new CoprocessorException("Env Must be RegionCoprocessorEnvironment!");
        }
    }

    /**
     * 这是系统的生命周期回调方法,每个Region都会创建一个UserRegionAvgEndpoint实例
     * @param env
     * @throws IOException
     */
    public void stop(CoprocessorEnvironment env) throws IOException {
        LOG.info("===========stop===========");
    }

    /**
     * 给框架返回RegionAvgService实例
     * @return
     */
    public Service getService() {
        LOG.info("===========getService===========");
        return this;
    }
}
  1. 给目标表添加该协处理器
hbase(main):002:0> disable 'baizhi:t_user'
0 row(s) in 2.6280 seconds

hbase(main):003:0> alter 'baizhi:t_user' , METHOD =>'table_att','coprocessor'=>'hdfs:///libs/HBase-1.0-SNAPSHOT.jar|com.baizhi.endpoint.UserRegionAvgEndpoint|1001'
Updating all regions with the new schema...
1/1 regions updated.
Done.

hbase(main):005:0> enable 'baizhi:t_user'
0 row(s) in 1.3390 seconds

参数解释:alter 表名字,METHOD=>'table_att','coprocessor'=>'jar路径|全限定名|优先级|[可选参数]'

  1. 编写客户端代码进行远程调用
Configuration conf= HBaseConfiguration.create();
conf.set(HConstants.ZOOKEEPER_QUORUM,"CentOS");
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TableName.valueOf("baizhi:t_user"));

//调用协处理器RegionAvgServiceEndpoint-> RegionAvgService
//这两个参数用于定位Region,如果用户给null,系统则会调用所有region上的RegionAvgService
byte[] starKey="0000".getBytes();
byte[] endKey="0010".getBytes();

Batch.Call<RegionAvgService, Response> batchCall = new Batch.Call<RegionAvgService, Response>() {
    RpcController rpcController=new ServerRpcController();
    BlockingRpcCallback<Response> rpcCallback=new BlockingRpcCallback<Response>();
    //只需要在这个方法内部,构建Request,在利用instance获取远程结果即可
    public Response call(RegionAvgService proxy) throws IOException {
        System.out.println(proxy.getClass());
        Request.Builder requestBuilder = Request.newBuilder();
        requestBuilder.setStartRow("0000");
        requestBuilder.setStopRow("0010");
        requestBuilder.setGroupFamillyName("cf1");
        requestBuilder.setGroupColumnName("dept");
        requestBuilder.setAvgFamillyName("cf1");
        requestBuilder.setAvgColumnName("salary");

        Request request = requestBuilder.build();

        proxy.queryResult(rpcController,request,rpcCallback);
        Response response = rpcCallback.get();
        return response;
    }
};

//调用协处理器 region信息
Map<byte[], Response> responseMaps = table.coprocessorService(RegionAvgService.class, starKey, endKey, batchCall);
Map<String,KeyValue>  toalAvgMap=new HashMap<String, KeyValue>();
//迭代所有Region的返回信息,进行汇总
for (Response value : responseMaps.values()) {
    //某一个Region的返回局部结果
    List<KeyValue> keyValues = value.getArraysList();
    for (KeyValue keyValue : keyValues) {
        if(!toalAvgMap.containsKey(keyValue.getGroupKey())){
            toalAvgMap.put(keyValue.getGroupKey(),keyValue);
        }else{
            KeyValue historyKeyValue = toalAvgMap.get(keyValue.getGroupKey());
            KeyValue.Builder newKeyValue = KeyValue.newBuilder();
            newKeyValue.setGroupKey(keyValue.getGroupKey());
            newKeyValue.setCount(historyKeyValue.getCount()+keyValue.getCount());
            newKeyValue.setSum(historyKeyValue.getSum()+keyValue.getSum());
        }
    }
}
//最终结果
Collection<KeyValue> values = toalAvgMap.values();
System.out.println("部门\t平均薪资");
for (KeyValue value : values) {
    System.out.println(value.getGroupKey()+"\t"+value.getSum()/value.getCount());
}

table.close();
conn.close();

下一篇:HBase系列之HBase HA集群搭建