zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

HBase 表数据读取解析之 一个需求学习 HBaseAPI

HBase学习数据 一个 解析 读取 需求
2023-09-11 14:22:28 时间

目录

一、HBaseAPI重要的概念

1.Scan

 2.Result 

3.Filter的Scan

二、案例分析---HBaseAPI学习

1.需求

2.思路

 3.代码


一、HBaseAPI重要的概念

1.Scan

HBase中的数据表通过划分成一个个的Region来实现数据的分片,每一个Region关联一个RowKey的范围区间,而每一个Region中的数据,按RowKey的字典顺序进行组织。

正是基于这种设计,使得HBase能够轻松应对这类查询:“指定一个RowKey的范围区间,获取该区间的所有记录”, 这类查询在HBase被称之为Scan,当然了如果不指定就是全表扫描,下面是一次查询就是一次RPC访问,返回结果集给客户端。

1 . 构建Scan,指定startRow与stopRow,如果未指定的话会进行全表扫描

2 . 获取ResultScanner

3 . 遍历查询结果

4 . 关闭ResultScanner

 2.Result 

将 Scan 的 封装为 Result 对象 返回给 客户端。

3.Filter的Scan

Filter可以在Scan的结果集基础之上,对返回的记录设置更多条件值,这些条件可以与RowKey有关,可以与列名有关,也可以与列值有关,还可以将多个Filter条件组合在一起,一般组合在一起会是 FilterList ,但是一般不建议,可能存在漏数据的风险。

  • Client每一次往RegionServer发送scan请求,都会批量拿回一批数据(由Caching决定过了每一次拿回的Results数量),然后放到本次的Result Cache中
  • 应用每一次读取数据时,都是从本地的Result Cache中获取的。如果Result Cache中的数据读完了,则Client会再次往RegionServer发送scan请求获取更多的数据

二、案例分析---HBaseAPI学习

1.需求

解析如下表中的数据,定时上报的数据是按照分钟数上报,分钟后的 value=2#1#0 是要解析汇总的数据,现在就是想把 20200706 这一天的数据汇总,前提是增量的读取数据解析,会在 LastJobTime 表维护时间戳,表中的 modifyTime  大于每次解析后记录的时间戳 就读取新的 数据。

2.思路

  • 遍历 Result 结果集,从 Result 中 根据 CF 和 column 直接拿出确定的字段值,比如上面的 c , ci,ct  等字段
  • 将不确定的字段也就是按照时间 动态写进表中的字段 d:1300, d:1305等,首先遍历 cell ,根据cell 取出 Qualify ,根据字段的长度 大于等于4  全部读取解析,至此我们就可以 拿到按照时间列动态写进的value数据 

   如果大家有幸读到这里,理解我的思路就行,上面业务场景不必深究。

 3.代码

​
​
package com.kangll.hbaseapi

import java.util
import com.winner.utils.KerberosUtil
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{Cell, CellUtil, CompareOperator, HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Result, Scan, Table}
import org.apache.hadoop.hbase.filter.{SingleColumnValueFilter, SubstringComparator}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer

/** ******************************************
 *
 * @AUTHOR kangll
 * @DATE 2020/8/11 14:47
 * @DESC:
 * *******************************************
 */
// 将表中解析出的数据封装为样例类
case class InOutDataHBaseTest(rowkey: String, channel: String, counterid: String, countertype: String, devicesn: String,
                              datatype: String, hostname: String, modifytime: String, datatime: String, inNum: Int, outNum: Int)

object HBaseAPI_Test_One {

  // Kerberos认证
  KerberosUtil.kerberosAuth()

  private val spark: SparkSession = SparkSession
    .builder()
    .master("local[2]")
    .appName("spark-hbase-read")
    .getOrCreate()

  private val sc: SparkContext = spark.sparkContext

  private val hbaseConf: Configuration = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "hdp301")
  hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")

  def getOriginalData() = {
    import spark.implicits._
    import collection.mutable._

    // HBase 源数据库表
    val HBASE_TAG_TABLE = "trafficData"
    // 维护的时间戳,增量读取解析数据
    val HBASE_LAST_JOBTIME = "LastJobTime"
    // 创建连接对象
    val conn: Connection = ConnectionFactory.createConnection()
    val tag_table: Table = conn.getTable(TableName.valueOf(HBASE_TAG_TABLE))
    val time_table: Table = conn.getTable(TableName.valueOf(HBASE_LAST_JOBTIME))

    // 通过 rowkey 查询 HBase 表的 lastjobtime
    val get = new Get("TrafficDateTime".getBytes())
    val mdResult: Result = time_table.get(get)
    // get 直接拿到 时间戳
    val modifyTime: String = Bytes.toString(mdResult.getValue(Bytes.toBytes("t"), Bytes.toBytes("m")))

    // 查询原始数据
    val scan = new Scan()
    // 单列值过滤器 当 表中的 modifyTime 大于时间戳时 增量读取解析
    val mdValueFilter = new SingleColumnValueFilter(
      "d".getBytes(),
      "t".getBytes(),
      CompareOperator.GREATER_OR_EQUAL,
      new SubstringComparator(modifyTime) // 大于等于增量的时间戳
    )

    // scan 的条数,默认为100 扫描100 返给 客户端 Result 缓存读取
    scan.setCaching(200)
    // 设置过滤,下推到服务器 ,减少返回给客户端的数据量和 rowkey 指定范围结合更佳
    scan.setFilter(mdValueFilter)
    import collection.JavaConversions._
    val iter: util.Iterator[Result] = tag_table.getScanner(scan).iterator()
    // 存放定义的样例类
    val basicListTmp = new ListBuffer[InOutDataHBaseTest]()

    while (iter.hasNext) {
      var rowkey = ""
      var datatime = ""
      var inNum = 0
      var outNum = 0
      val result: Result = iter.next()
      val channel = Bytes.toString(result.getValue(Bytes.toBytes("d"), Bytes.toBytes("c")))
      val counterid = Bytes.toString(result.getValue(Bytes.toBytes("d"), Bytes.toBytes("ci")))
      val countertype = Bytes.toString(result.getValue(Bytes.toBytes("d"), Bytes.toBytes("ct")))
      val devicesn = Bytes.toString(result.getValue(Bytes.toBytes("d"), Bytes.toBytes("ds")))
      val datatype = Bytes.toString(result.getValue(Bytes.toBytes("d"), Bytes.toBytes("dt")))
      val hostname = Bytes.toString(result.getValue(Bytes.toBytes("d"), Bytes.toBytes("h")))
      val modifytime = Bytes.toString(result.getValue(Bytes.toBytes("t"), Bytes.toBytes("md")))

      rowkey = Bytes.toString(result.getRow)
      // 拿到 Result 的cell ,遍历 cell 拿到 columnName后判断列名取出 3#2#1 value 值 
      val cells = result.listCells()
      for (cell <- cells) {
        //  Cell工具类 获取到 列名
        var  cname = Bytes.toString(CellUtil.cloneQualifier(cell))
        if (cname.length >= 4) {
          datatime = rowkey.split("#")(1)+cname
          val cvalue = Bytes.toString(CellUtil.cloneValue(cell))
          val arr = cvalue.split("#")
          inNum = arr(0).toInt
          outNum = arr(1).toInt
          println(datatime + "--------" + inNum + "------" + outNum)
          // 将解析出的 cell 数据 放到 List 的样例类中
          basicListTmp += InOutDataHBaseTest(rowkey, channel, counterid, countertype,
            devicesn, datatype, hostname, modifytime, datatime, inNum, outNum)
        }
      }
    }
    // 获取 返回的 basicListTmp 并且返回
    val basicList: ListBuffer[InOutDataHBaseTest] = basicListTmp.map(x => InOutDataHBaseTest(x.rowkey, x.channel, x.counterid, x.countertype,
      x.devicesn, x.datatype, x.hostname, x.modifytime, x.datatime ,x.inNum, x.outNum))
    basicList.toSet
  }

  def main(args: Array[String]): Unit = {
    getOriginalData().foreach(println(_))
  }
}

 

以上就是 对公司表数据的读取解析示例,当然了 读取还可以根据 rowkey 优化,因为 rowkey 是自定义设计的 ,hostname+ channel md5 加密 散列而成,可以根据 指定 rowkey 的扫描范围---- withStartRow()和withStopRow(),再加上 增量的解析数据速度就比较完美了。

 参考:https://www.sohu.com/a/284932698_100109711