zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

HBase API 详细例子(封装的DAO类)

HBase封装API 详细 例子 DAO
2023-09-14 09:00:23 时间

HBase lib目录下所有JAR包复制到项目中,Hbase 版本0.98.5

package com.zxing.imgQRCode;

import java.io.IOException;

import java.util.LinkedList;

import java.util.List;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.MasterNotRunningException;

import org.apache.hadoop.hbase.ZooKeeperConnectionException;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.HConnection;

import org.apache.hadoop.hbase.client.HConnectionManager;

import org.apache.hadoop.hbase.client.HTableInterface;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;

import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;

public class HbaseConnection {

    private String rootDir;

    private String zkServer;

    private String port;

    private Configuration conf;

    private HConnection hConn=null;

    public HbaseConnection(String rootDir, String zkServer, String port) {

        super();

        this.rootDir = rootDir;

        this.zkServer = zkServer;

        this.port = port;

        conf=HBaseConfiguration.create();

        conf.set("hbase.rootdir", rootDir);

        conf.set("hbase.zookeeper.quorum ", zkServer);

        conf.set("hbase.zookeeper.property.clientPort", port);

        try {

            hConn=HConnectionManager.createConnection(conf);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    //创建表

    public void crateTable(String tableName,List String  cols){

        try {

            HBaseAdmin admin=new HBaseAdmin(conf);

            if(admin.tableExists(tableName))

                throw new IOException("table exists");

            else{

                

                HTableDescriptor tableDesc=new HTableDescriptor(tableName);

                for(String col:cols){

                    

                    HColumnDescriptor colDesc=new HColumnDescriptor(col);

                    colDesc.setCompressionType(Algorithm.GZ);

                    colDesc.setDataBlockEncoding(DataBlockEncoding.DIFF);

                    tableDesc.addFamily(colDesc);

                }

                admin.createTable(tableDesc);

            }

            

        } catch (MasterNotRunningException e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        } catch (ZooKeeperConnectionException e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        } catch (IOException e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

        }

        

        

    }

    //插入数据

    public void saveData(String tableName,List Put  puts){

        

        try {

            HTableInterface table =hConn.getTable(tableName);

            table.put(puts);

            table.setAutoFlush(false);

            table.flushCommits();

            

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

    //得到数据

    public Result getData(String tableName,String rowkey){

        try {

            HTableInterface table =hConn.getTable(tableName);

            Get get=new Get(rowkey.getBytes());

            return table.get(get);

        } catch (IOException e) {

            

            e.printStackTrace();

        }

        return null;

        

    }     

    //输出result结果

    public void format(Result result){

        

        String rowkey=Bytes.toString(result.getRow());

        KeyValue[] kvs=result.raw();

        for (KeyValue kv:kvs){

                String family= Bytes.toString(kv.getFamily());

            String qualifier= Bytes.toString(kv.getQualifier());

            System.out.println("rowkey- "+rowkey+"family- "+family+"qualifier- "+qualifier);

        }

    }

    //全表扫描

    public void hbaseScan(String tableName){

        

        Scan scan=new Scan();//扫描器

        scan.setCaching(1000);//缓存1000条数据,一次读取1000条

        try {

            HTableInterface table =hConn.getTable(tableName);

            ResultScanner scanner=table.getScanner(scan);//返回迭代器

            for(Result res:scanner){

                format(res);

            }

            

        } catch (IOException e) {

            

            e.printStackTrace();

        }

    }

    //比较过滤器

    public void filterTest(String tableName){

        Scan scan=new Scan();//扫描器

        scan.setCaching(1000);//缓存1000条数据,一次读取1000条

        RowFilter filter =new RowFilter(CompareFilter.CompareOp.EQUAL,new BinaryComparator("Jack".getBytes()));

        RowFilter filter1 =new RowFilter(CompareFilter.CompareOp.EQUAL,new RegexStringComparator("J\\w+"));

        scan.setFilter(filter);

        try {

            HTableInterface table =hConn.getTable(tableName);

            ResultScanner scanner=table.getScanner(scan);//返回迭代器

            for(Result res:scanner){

                format(res);

            }

            

        } catch (IOException e) {

            

            e.printStackTrace();

        }

    }

    //PageFilter分页

    public void pageFilterTest(String tableName){

        PageFilter filter = new  PageFilter(4);

       byte[] lastRow=null;

       int pageCount=0; //记录第几页

       try {

        HTableInterface table =hConn.getTable(tableName);

        while(++pageCount 0){

            System.out.println("pageCount = "+ pageCount);

            Scan scan=new Scan();

            scan.setFilter(filter);

            if(lastRow!=null){

                scan.setStartRow(lastRow);

            }

            

            ResultScanner scanner=table.getScanner(scan);

            int count=0;//计数器

            for(Result res:scanner){

                

                lastRow=res.getRow();

                if(++count 3)

                    break;

                format(res);

                if(count 3){

                    break;

                }

                

            }

        }

        

    } catch (IOException e) {

        e.printStackTrace();

    }

    }

    public static void main(String[] args) {

            String rootDir="hdfs://ns1/hbase";

            String zkServer="10.128.129.230";//集群内网IP

            String port="2181";

            //

            HbaseConnection conn=new HbaseConnection(rootDir, zkServer, port);

            List String  cols=new LinkedList String 

            cols.add("basicInfo");

            cols.add("moreInfo");

             conn.crateTable("students", cols);

             //

             List Put  puts=new LinkedList Put 

             Put put1=new Put("Tom".getBytes());//rowkey

             put1.add("basicInfo".getBytes(), "age".getBytes(), "27".getBytes());

             put1.add("moreInfo".getBytes(), "tel".getBytes(), "110".getBytes());

             Put put2=new Put("Jim".getBytes());

             put2.add("basicInfo".getBytes(), "age".getBytes(), "28".getBytes());

             put2.add("moreInfo".getBytes(), "tel".getBytes(), "111".getBytes());

             puts.add(put1);

             puts.add(put2);

             conn.saveData("students", puts);

             

             //

             

             

           Result result=  conn.getData("students", "Tom");

             conn.format(result);

             //

             conn.hbaseScan("students");

               //

             conn.filterTest("students");

             

              //

             conn.pageFilterTest("students");

    }

}



常用接口

package test;

import hbase.HbaseUtils;

import java.io.IOException;

import java.util.Calendar;

import java.util.Date;

import java.util.Iterator;

import java.util.Map.Entry;

import java.util.concurrent.TimeUnit;

import net.sf.json.JSONObject;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.client.HTableInterface;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.Filter;

import org.apache.hadoop.hbase.filter.PageFilter;

import org.apache.hadoop.hbase.util.Bytes;

import org.junit.Test;

import com.xd.iis.se.common.Constants;

import com.xd.iis.se.hbase.CommHbaseUtils;

import com.xd.iis.se.hbutils.MeUtils;

import commn.CommonConstants;

public class SyncTestUtils {

    //hbase表名(hbaseapi包中的Constants类中定义了表名和数字的映射关系)

private  final static  String wz_content="wz_content";//1

private  final static  String lt_content="lt_content";//4

private  final static  String wb_content="wb_content";//2

private static  final   String wb_comment="wb_comment";//45

private static final String sinawb_user="sinawb_user";// 微博用户表

/*   TitleHs的定义在hbaseapi包中SwitchBeanAndJsonUtils类中jsonToDocument方法里

 *   从326行代码开始

 * hbase表字段定义hbaseapi包中HIContentCommon类

 * pfsearch包中IContentCommon类

 * */

    @Test

    public void hbaseTableNameToDigitalMapping() {

        

        for(Entry String, String      entry: Constants.rstypemp.entrySet()){

            

            System.out.println(entry.getKey()+":"+entry.getValue());

        }

    }

    @Test

    public void seconds(){

            System.out.println(new Date().getTime());

              System.out.println(System.nanoTime());

              System.out.println(System.currentTimeMillis());

        //时间转换

            System.out.println(TimeUnit.NANOSECONDS.convert(10L, TimeUnit.MILLISECONDS));

    }

    //毫秒转换成日期

    @Test

    public void millsToDate(){

            String mills="1460459403324";

            

            Date date=new Date(Long.parseLong(mills));

          System.out.println(date);

          System.out.println(date.getTime());

    }

    //手工干预生成19位的全网微博评论tokenKey(键,rowkey)=wbcomment_key

    //TokenTable=hotmanwb_token

    /*

     * hbase(main):003:0  scan hotmanwb_token,LIMIT= 2

     ROW                                          COLUMN+CELL                                                                                                                         

     hotmanwb_key                                 column=content:date, timestamp=1459310036375, value=1459310031972331086                                                             

     hotmanwbcomment_key                          column=content:date, timestamp=1460600117890, value=1460600079542140091                                                             

     ltcomment_key                                column=content:date, timestamp=1460600483717, value=1460600441668719114                                                             

     wzcomment_key                                column=content:date, timestamp=1460599817280, value=1460599777817713930  

     

     */

    @Test

    public void generateTokenKeyForWeiboComment(){

        Calendar calendar = Calendar.getInstance();

         calendar.setTime(new Date());

         //三十天前的时间

         calendar.add(calendar.DATE, -30);

         Date date = calendar.getTime();

         //第一位表示星期

         System.out.println(date);

        //13位加6位拼成19位

        String startTime=date.getTime()+"000000";

        

        System.out.println("startTime:"+startTime);

        //插入或者更新时间

    HbaseUtils.insertData("hotmanwb_token", "wbcomment_key", startTime);

        //1458109165143000000   19位

        

    }

    //hbase根据表名和rowkey查询一条数据(tokenkey)

    @Test

    public void findByRowKey(){

        

        String startTime=HbaseUtils.QueryByCondition1("hotmanwb_token", "wbcomment_key");

        System.out.println(startTime);

    }

    //hbase返回前几条数据

    /*key:014604646505869913352145

    key:014604646505954866550445

    key:014604988869079841915645

    key:014605014502935460283945

    key:014605014503712711041745*/

    @Test

    public  void  scanTopRowComment(){

        ResultScanner resultScanner = null;

        HTableInterface table = HbaseUtils.pool.getTable(wz_content);

        try {

            Scan scan = new Scan();

            //设置过滤器,只返回20条

            Filter filter = new PageFilter(5); 

            scan.setFilter(filter);

        //RegionServer是否应当缓存 当前客户端访问过的数据块    如果是随机的get 这个最好为false

            scan.setCacheBlocks(true);

            

          /*简而言之就是  batch 是qualifier column级别的       caching是row级别的

            batch 就是每次迭代从服务器获取的记录数, 设置太小 会频繁到服务器取数据,

                                    太大 会对客户端造成比较大的压力,  具体根据需要使用 , 正常使用可以不必管

                                    它, 大批量读取可以考虑用它改善性能

                                    这里要注意了: 这个记录数是qualifier不是row, 如果一个row有17个qualifier,

            setBatch(5),一个row就会分散到4个Result中, 分别持有5,5,5,2个qualifier

                                  (默认一个row的所有qualifier会在一个Result中)*/

            /*scan.setBatch(100);*/    //setFilter与setBatch不能都打开,会冲突

            //setCaching发给scanners的缓存的Row的数量

            scan.setCaching(100);

            scan.setMaxVersions(1);

            resultScanner = table.getScanner(scan);

        /*    for (Result r : rs) {

                return new String(r.getRow());

            }*/

            Iterator Result  res = resultScanner.iterator();// 返回查询遍历器

            while (res.hasNext()) {

                Result result = res.next();

                System.out.println(result);

                System.out.println("key:" + new String(result.getRow()));

                //date列存的是json字符串

                String value = new String(result.getValue(

                        CommonConstants.CRAWLERCONTENT_TABLE_COLUMNS

                                .getBytes(),

                        CommonConstants.CRAWLERCONTENT_TABLE_COLUMN2

                                .getBytes()), "ISO8859-1");

                System.out.println("value:" + value);

                

                JSONObject js = JSONObject.fromObject(value);

                System.out.println(js);

            }

            

            

        } catch (Exception e) {

            e.printStackTrace();

        }finally{

            //这样一定要记住 用完close

            if(resultScanner!=null)  resultScanner.close();

        }

    }

    //根据rowkey范围扫描

    @Test

    public  void  scanByRowKeyRangeComment(){

        ResultScanner resultScanner = null;

        HTableInterface table = HbaseUtils.pool.getTable(wb_comment);

        String startRow="01420459403324297147";//

        String stopRow="014605014503712711";//20位

        try {

            Scan scan = new Scan();

            //设置过滤器,只返回20条

            Filter filter = new PageFilter(5); 

            scan.setFilter(filter);

            scan.setStartRow(startRow.getBytes());

            scan.setStopRow(stopRow.getBytes());

        //RegionServer是否应当缓存 当前客户端访问过的数据块    如果是随机的get 这个最好为false

            scan.setCacheBlocks(true);

            

          /*简而言之就是  batch 是qualifier column级别的       caching是row级别的

            batch 就是每次迭代从服务器获取的记录数, 设置太小 会频繁到服务器取数据,

                                    太大 会对客户端造成比较大的压力,  具体根据需要使用 , 正常使用可以不必管

                                    它, 大批量读取可以考虑用它改善性能

                                    这里要注意了: 这个记录数是qualifier不是row, 如果一个row有17个qualifier,

            setBatch(5),一个row就会分散到4个Result中, 分别持有5,5,5,2个qualifier

                                  (默认一个row的所有qualifier会在一个Result中)*/

            /*scan.setBatch(100);*/    //setFilter与setBatch不能都打开,会冲突

            //setCaching发给scanners的缓存的Row的数量

            scan.setCaching(100);

            scan.setMaxVersions(1);

            resultScanner = table.getScanner(scan);

        /*    for (Result r : rs) {

                return new String(r.getRow());

            }*/

            Iterator Result  res = resultScanner.iterator();// 返回查询遍历器

            while (res.hasNext()) {

                Result result = res.next();

                System.out.println(result);

                System.out.println("key:" + new String(result.getRow()));

                //date列存的是json字符串

                String value = new String(result.getValue(

                        CommonConstants.CRAWLERCONTENT_TABLE_COLUMNS

                                .getBytes(),

                        CommonConstants.CRAWLERCONTENT_TABLE_COLUMN2

                                .getBytes()), "ISO8859-1");

                System.out.println("value:" + value);

                

                JSONObject js = JSONObject.fromObject(value);

                System.out.println(js);

            }

            

            

        } catch (Exception e) {

            e.printStackTrace();

        }finally{

            //这样一定要记住 用完close

            if(resultScanner!=null)  resultScanner.close();

        }

    }

    @Test

    //hbase生成行键(hbaseApi包) 第一个url参数无用

    public void createRowKey(){

        

    //typemp.put("wb_comment", "45");// 微博评论表对应编码最后两位

    String newRowKey=MeUtils.createKeyCode("", "wb_comment");

    System.out.println(newRowKey);

/*String oldRowKey=MeUtils.createKeyCode_oid("http://www.baidu.com", "wb_comment");

    System.out.println(oldRowKey);*/

    //rowkey=114606860784008157170445 24位

    //1+19位时间戳+2位随机数+2位表名

    }

    /*Timestamp

    HBase通过row和column确定一份数据,这份数据的值可能有多个版本,不同版本的值按照时间倒序排序,即最新的数据排在最前面,

    查询时默认返回最新版本。如上例中row key=1的author:nickname值有两个版本,分别为1317180070811对应的“一叶渡江”和1317180718830对应的“yedu”

    (对应到实际业务可以理解为在某时刻修改了nickname为yedu,但旧值仍然存在)。Timestamp默认为系统当前时间(精确到毫秒),也可以在写入数据时指定该值。

    Value

    每个值通过4个键唯一索引,tableName+RowKey+ColumnKey+Timestamp= value,

    例如上例中{tableName=’blog’,RowKey=’1’,ColumnName=’author:nickname’,Timestamp=’ 1317180718830’}索引到的唯一值是“yedu”。

    */

    /*大Solr(192.168.20.190对应三个域名)

         # 24 index

        solr_24h=http://solr-24h.wyq.cn/solr

        # month index

        solr_month=http://solr-month.wyq.cn/solr

        # week index

        solr_week=http://solr-week.wyq.cn/solr

    */    

    @Test

    public  void  scanTopRowContent(){

        ResultScanner resultScanner = null;

        

        HTableInterface table = HbaseUtils.pool.getTable(wz_content);

        try {

            Scan scan = new Scan();

            //设置过滤器,只返回20条

            Filter filter = new PageFilter(5); 

            scan.setFilter(filter);

        //RegionServer是否应当缓存 当前客户端访问过的数据块    如果是随机的get 这个最好为false

            scan.setCacheBlocks(true);

            

          /*简而言之就是  batch 是qualifier column级别的       caching是row级别的

            batch 就是每次迭代从服务器获取的记录数, 设置太小 会频繁到服务器取数据,

                                    太大 会对客户端造成比较大的压力,  具体根据需要使用 , 正常使用可以不必管

                                    它, 大批量读取可以考虑用它改善性能

                                    这里要注意了: 这个记录数是qualifier不是row, 如果一个row有17个qualifier,

            setBatch(5),一个row就会分散到4个Result中, 分别持有5,5,5,2个qualifier

                                  (默认一个row的所有qualifier会在一个Result中)*/

            /*scan.setBatch(100);*/    //setFilter与setBatch不能都打开,会冲突

            //setCaching发给scanners的缓存的Row的数量

            scan.setCaching(100);

            scan.setMaxVersions(1);

            resultScanner = table.getScanner(scan);

         // 返回查询遍历器

            Iterator Result  res = resultScanner.iterator();

            while (res.hasNext()) {

                System.out.println("--------------行分割线-------------");

                Result result = res.next();

                System.out.println("\n"+"------单个result--------");

                System.out.println(result);

                System.out.println("\n"+"------result中Cells--------");

                //由{row key, Family:Qualifier, version} 唯一确定的单元。cell中的数据是没有类型的,全部是以字节的形式进行存储的

                 for  (Cell cell : result.rawCells()) {

                         //rowkey

                    System.out.println("Rowkey : " +Bytes.toString (CellUtil.cloneRow(cell)));

                              //列簇+列(Family是第一级列,Qualifier是第二级列)

                            System.out.println("Familiy:Quilifier : " +Bytes.toString (CellUtil.cloneFamily(cell))

                                     +":"+Bytes.toString (CellUtil.cloneQualifier (cell))); 

                              //值

                            System.out.println ("Value : " +Bytes.toString (CellUtil.cloneValue (cell)));

                            System.out.println("TimeStamp : "  +cell.getTimestamp());

                }

                 

                 

            /*     //老API

                 System.out.println("\n"+ "------result中KeyValues--------");   

                for( KeyValue kv:result.list()){  

                     

                      System.out.println(String.format("row:%s, family:%s, qualifier:%s, qualifiervalue:%s, timestamp:%s.",   

                              Bytes.toString(kv.getRow()),   

                              Bytes.toString(kv.getFamily()),   

                              Bytes.toString(kv.getQualifier()),   

                              Bytes.toString(kv.getValue()),  

                              kv.getTimestamp()));       

                  } */ 

            }

            

            

        } catch (Exception e) {

            e.printStackTrace();

        }finally{

            //这样一定要记住 用完close

            if(resultScanner!=null)  resultScanner.close();

        }

    }

    //SecureCRT上传下载文件

    //sz  下载命令

    //rz -be 上传文件 单独用rz会有两个问题:上传中断、上传文件变化(md5不同),

    /*解决办法是上传是用rz -be,并且去掉弹出的对话框中“Upload files as ASCII”前的勾选。

    -a, –ascii

    -b, –binary 用binary的方式上传下载,不解释字符为ascii

    -e, –escape 强制escape 所有控制字符,比如Ctrl+x,DEL等

    rar,gif等文件文件采用 -b 用binary的方式上传。

    文件比较大而上传出错的话,采用参数 -e*/

    //根据rowkey查找数据

    @Test

    public void select(){

        String ID="114615497672016941968326";

        try {

            String json=CommHbaseUtils.select(ID);

            System.out.println(json);

            JSONObject js = JSONObject.fromObject(json);

            System.out.println(js);

        } catch (IOException e) {

            

            e.printStackTrace();

        }

        

    }

    /*//血和泪的经验教训

    ArrayList非线程安全,即使使用Collections.synchronizedList(new ArrayList SolrInputDocument ())

    访问方法size()方法得出来的大小也是错的,还是推荐使用vector代替

    //因为solrserver服务的url配置文件pfs.properties未打包进去,找不到url发生空指针异常

    ScheduledExecutorService对于线程中发生的http服务方面的异常无法捕获,jstack -l命令打印信息

     java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method)

    - parking to wait for   0x0000000712e8e7e8  (a 

    解决方案:

    异常可以替代使用Timer定时器来捕获

    */

    /*修改properties文件编码

     * 全局修改:

     *  window-  preference -  general -  content types 

        找到右边的 java properties file ,将其编码改为 utf-8  

          单个文件修改:

          右击该properties文件--properties--Resource--Text file encoding,

          选中other,选择其它编码方式,如UTF-8或GBK,这样就能在properties里面输入中文,而不会自动转成Unicode了。

     * */

}


本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1698822


前端pua: JSON API还有二次封装的必要吗? JSON 是 JavaScript Object Notation 的缩写,最初是被设计为 JavaScript 的一个子集,因其和编程语言无关,所以成为了一种开放标准的常见数据格式。虽然 JSON 是源自于JavaScript,但到目前很多编程语言都有了 JSON 解析的库,如 C、Java、Python 等。
干货 | 通用 api 封装实战,带你深入理解 PO 在普通的接口自动化测试中,如果接口的参数,比如 url,headers等传参改变,或者测试用例的逻辑、断言改变,那么整个测试代码都需要改变。apiobject设计模式借鉴了pageobject的设计模式,可以实现一个优雅、强大的接口测试框架。
apiobject设计模式可以简单分为6个模块,分别是API对象、接口测试框架、配置模块、数据封装、Utils、测试用例。 - 接口测试框架:ba
第十二届 BigData NoSQL Meetup — 基于hbase的New sql落地实践 立即下载