zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Hbase(四) 过滤器查询详解大数据

HBase数据 详解 查询 过滤器
2023-06-13 09:20:22 时间

引言:过滤器的类型很多,但是可以分为两大类——比较过滤器,专用过滤器
过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端;

Hbase(四) 过滤器查询详解大数据

一、hbase过滤器的分类

   1、比较过滤器

     行键过滤器 RowFilter

Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes( row-22 )));

scan.setFilter(filter1);

     列族过滤器 FamilyFilter

Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes( colfam3 )));

scan.setFilter(filter1);

     列过滤器 QualifierFilter

Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes( col-2 )));

scan.setFilter(filter1);

   值过滤器 ValueFilter

Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator( .4 ) );

scan.setFilter(filter1);

   2、专用过滤器

单列值过滤器 SingleColumnValueFilter -会返回满足条件的整行

SingleColumnValueFilter filter = new SingleColumnValueFilter(

Bytes.toBytes( colfam1 ),

Bytes.toBytes( col-5 ),

CompareFilter.CompareOp.NOT_EQUAL,

new SubstringComparator( val-5 ));

filter.setFilterIfMissing(true); //如果不设置为 true,则那些不包含指定 column 的行也会返回

scan.setFilter(filter1);

单列值排除器 SingleColumnValueExcludeFilter 返回排除了该列的结果 与上面的结果相反

前缀过滤器 PrefixFilter -针对行键

Filter filter = new PrefixFilter(Bytes.toBytes( row1 ));

scan.setFilter(filter1);

列前缀过滤器 ColumnPrefixFilter

Filter filter = new ColumnPrefixFilter(Bytes.toBytes( qual2 ));

scan.setFilter(filter1);

分页过滤器 PageFilter

代码实现:

package com.ghgj.hbase; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.hbase.HBaseConfiguration; 

import org.apache.hadoop.hbase.client.HTable; 

import org.apache.hadoop.hbase.client.Result; 

import org.apache.hadoop.hbase.client.ResultScanner; 

import org.apache.hadoop.hbase.client.Scan; 

import org.apache.hadoop.hbase.filter.BinaryComparator; 

import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; 

import org.apache.hadoop.hbase.filter.ByteArrayComparable; 

import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 

import org.apache.hadoop.hbase.filter.FamilyFilter; 

import org.apache.hadoop.hbase.filter.Filter; 

import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter; 

import org.apache.hadoop.hbase.filter.PageFilter; 

import org.apache.hadoop.hbase.filter.PrefixFilter; 

import org.apache.hadoop.hbase.filter.QualifierFilter; 

import org.apache.hadoop.hbase.filter.RegexStringComparator; 

import org.apache.hadoop.hbase.filter.RowFilter; 

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 

import org.apache.hadoop.hbase.filter.SubstringComparator; 

import org.apache.hadoop.hbase.util.Bytes; 

import org.junit.Test; 

public class HbasePageDemo { 

// 声明静态配置 

static Configuration conf = null; 

private static final String ZK_CONNECT_STR = 

 hadoop01:2181,hadoop02:2181,hadoop03:2181,hadoop04:2181,hadoop05:2181  

static { 

conf = HBaseConfiguration.create(); 

conf.set( hbase.zookeeper.quorum , ZK_CONNECT_STR); 

} 

public static void main(String[] args) throws Exception { 

String tableName =  testfilter  

String cfName =  f1  

final byte[] POSTFIX = new byte[] { 0x00 }; 

HTable table = new HTable(conf, tableName); 

Filter filter = new PageFilter(3); 

byte[] lastRow = null; 

int totalRows = 0; 

while (true) { 

Scan scan = new Scan(); 

scan.setFilter(filter); 

if(lastRow != null){ 

//注意这里添加了 POSTFIX 操作,用来重置扫描边界 

byte[] startRow = Bytes.add(lastRow,POSTFIX); 

scan.setStartRow(startRow); 

} 

ResultScanner scanner = table.getScanner(scan); 

int localRows = 0; 

Result result; 

while((result = scanner.next()) != null){ 

System.out.println(localRows++ +  :  + result); 

totalRows ++; 

lastRow = result.getRow(); 

} 

scanner.close(); 

if(localRows == 0) break; 

} 

System.out.println( total rows:  + totalRows); 

} / 

** 

* 多种过滤条件的使用方法 

* @throws Exception 

*/ 

@Test 

public void testScan() throws Exception{ 

HTable table = new HTable(conf,  person .getBytes()); 

Scan scan = new Scan(Bytes.toBytes( person_zhang_000001 ), 

Bytes.toBytes( person_zhang_000002 )); 

//前缀过滤器----针对行键 

Filter filter = new PrefixFilter(Bytes.toBytes( person )); 

//行过滤器 ---针对行键 

ByteArrayComparable rowComparator = new 

BinaryComparator(Bytes.toBytes( person_zhang_000001 )); 

RowFilter rf = new RowFilter(CompareOp.LESS_OR_EQUAL, rowComparator); 

rf = new RowFilter(CompareOp.EQUAL , new 

SubstringComparator( _2016-12-31_ )); 

//单值过滤器 1 完整匹配字节数组 

new SingleColumnValueFilter( base_info .getBytes(),  name .getBytes(), 

CompareOp.EQUAL,  zhangsan .getBytes()); 

//单值过滤器 2 匹配正则表达式 

ByteArrayComparable comparator = new RegexStringComparator( zhang.  

new SingleColumnValueFilter( info .getBytes(),  NAME .getBytes(), 

CompareOp.EQUAL, comparator); 

//单值过滤器 3 匹配是否包含子串,大小写不敏感 

comparator = new SubstringComparator( wu  

new SingleColumnValueFilter( info .getBytes(),  NAME .getBytes(), 

CompareOp.EQUAL, comparator); 

//键值对元数据过滤-----family 过滤----字节数组完整匹配 

FamilyFilter ff = new FamilyFilter(CompareOp.EQUAL , 

new BinaryComparator(Bytes.toBytes( base_info )) //表中不存 

在 inf 列族,过滤结果为空 

); 

//键值对元数据过滤-----family 过滤----字节数组前缀匹配 

ff = new FamilyFilter( 

CompareOp.EQUAL , 

new BinaryPrefixComparator(Bytes.toBytes( inf )) //表中存在以 

inf 打头的列族 info,过滤结果为该列族所有行 

); 

//键值对元数据过滤-----qualifier 过滤----字节数组完整匹配 

filter = new QualifierFilter( 

CompareOp.EQUAL , 

new BinaryComparator(Bytes.toBytes( na )) //表中不存在 na 

列,过滤结果为空 

); 

filter = new QualifierFilter( 

CompareOp.EQUAL , 

new BinaryPrefixComparator(Bytes.toBytes( na )) //表中存在以 

na 打头的列 name,过滤结果为所有行的该列数据 

); 

//基于列名(即 Qualifier)前缀过滤数据的 ColumnPrefixFilter 

filter = new ColumnPrefixFilter( na .getBytes()); 

//基于列名(即 Qualifier)多个前缀过滤数据的 MultipleColumnPrefixFilter 

byte[][] prefixes = new byte[][] {Bytes.toBytes( na ), Bytes.toBytes( me )}; 

filter = new MultipleColumnPrefixFilter(prefixes); 

//为查询设置过滤条件 

scan.setFilter(filter); 

scan.addFamily(Bytes.toBytes( base_info )); 

//一行 

// Result result = table.get(get); 

//多行的数据 

ResultScanner scanner = table.getScanner(scan); 

for(Result r : scanner){ 

/** 

for(KeyValue kv : r.list()){ 

String family = new String(kv.getFamily()); 

System.out.println(family); 

String qualifier = new String(kv.getQualifier()); 

System.out.println(qualifier); 

System.out.println(new String(kv.getValue())); 

} 

*/ 

//直接从 result 中取到某个特定的 value 

byte[] value = r.getValue(Bytes.toBytes( base_info ), 

Bytes.toBytes( name )); 

System.out.println(new String(value)); 

} 

table.close(); 

} 

}

分页过滤器  代码实现:

package com.ghgj.hbase.test1610; 

 

import org.apache.commons.lang.StringUtils; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.hbase.HBaseConfiguration; 

import org.apache.hadoop.hbase.client.HTable; 

import org.apache.hadoop.hbase.client.Result; 

import org.apache.hadoop.hbase.client.ResultScanner; 

import org.apache.hadoop.hbase.client.Scan; 

import org.apache.hadoop.hbase.filter.Filter; 

import org.apache.hadoop.hbase.filter.PageFilter; 

import org.apache.hadoop.hbase.util.Bytes; 

 

/** 

 * 501条 

 * 

 * 每页100条,求第四页 : 301 - 400 

 * 

 * pageIndex:第几页 

 * pageNumber:每页几条 

 * 

 * 

 * 在hbase当中取一部分数据的取法: 

 * scan  user_info ,{COLUMNS =   base_info:name , 

 * LIMIT =  4, STARTROW =   zhangsan_20150701_0001 } 

 * 

 * mysqL:从第几条开始,取多少条 

 * 

 * 从mysql的分页规则引申到hbase的分页:把startRow转换成mysql的第几条 

 */ 

public class HBasePageFilterDemo { 

 

 private static final String ZK_CONNECT_STR =  hadoop03:2181,hadoop04:2181,hadoop05:2181  

 

 private static final String TABLE_NAME =  user_info  

 private static final String FAMILY_BASIC =  base_info  

 private static final String FAMILY_EXTRA =  extra_info  

 private static final String COLUMN_NAME =  name  

 private static final String COLUMN_AGE =  age  

 private static final String ROW_KEY =  rk0001  

 

 private static Configuration config = null; 

 private static HTable table = null; 

 static { 

 config = HBaseConfiguration.create(); 

 config.set( hbase.zookeeper.quorum , ZK_CONNECT_STR); 

 try { 

 table = new HTable(config, TABLE_NAME); 

 } catch (Exception e) { 

 e.printStackTrace(); 

 } 

 } 

  

 public static void main(String[] args) throws Exception { 

  

// ResultScanner pageData = getPageData( zhangsan_20150701_0001 , 4); 

 ResultScanner pageData = getPageData(2, 4); 

 HBasePrintUtil.printResultScanner(pageData); 

  

// String lastRowkey = getLastRowkey(pageData); 

// System.out.println(lastRowkey); 

  

 } 

  

 public static ResultScanner getPageData(int pageIndex, int pageNumber) throws Exception{ 

 // 怎么把pageIndex 转换成 startRow 

 String startRow = null; 

  

 if(pageIndex == 1){ // 当客户方法只取第一页的分页数据时, 

 ResultScanner pageData = getPageData(startRow, pageNumber); 

 return pageData; 

  

 }else{ 

 ResultScanner newPageData = null; 

 for(int i=0; i pageIndex - 1; i++){ // 总共循环次数是比你取的页数少1 

 newPageData = getPageData(startRow, pageNumber); 

 startRow = getLastRowkey(newPageData); 

 byte[] add = Bytes.add(Bytes.toBytes(startRow), new byte[]{ 0X00 }); 

 startRow = Bytes.toString(add); 

 } 

 newPageData = getPageData(startRow, pageNumber); 

 return newPageData; 

 } 

 } 

  

 /** 

  * @param startRow 

  * @param pageNumber 

  * @return 

  * * scan  user_info ,{COLUMNS =   base_info:name , 

 * LIMIT =  4, STARTROW =   zhangsan_20150701_0001 } 

  * @throws Exception 

  */ 

 public static ResultScanner getPageData(String startRow, int pageNumber) throws Exception{ 

 Scan scan = new Scan(); 

 scan.addColumn(Bytes.toBytes( base_info ), Bytes.toBytes( name )); 

 // 設置當前查询的其实位置 

 if(!StringUtils.isBlank(startRow)){ 

 scan.setStartRow(Bytes.toBytes(startRow)); 

 } 

 // 第二个参数 

 Filter pageFilter = new PageFilter(pageNumber); 

 scan.setFilter(pageFilter); 

  

 ResultScanner rs = table.getScanner(scan); 

 return rs; 

 } 

  

 public static String getLastRowkey(ResultScanner rs){ 

 String lastRowkey = null; 

 for(Result result : rs){ 

// System.out.println(result.getRow()); 

 lastRowkey = Bytes.toString(result.getRow()); 

 } 

 return lastRowkey; 

// return null; 

 } 

}

多条件过滤时,可以使用FilterList

List Filter  filters = new ArrayList Filter  

 SingleColumnValueFilter filter =new SingleColumnValueFilter( 

 Bytes.toBytes( info ), 

 Bytes.toBytes( age ), 

 CompareOp.LESS_OR_EQUAL, 

 new BinaryComparator(Bytes.toBytes( 20 ))); 

 filters.add(filter); 

  

 SingleColumnValueFilter filter1 =new SingleColumnValueFilter( 

 Bytes.toBytes( info ), 

 Bytes.toBytes( age ), 

 CompareOp.GREATER, 

 new BinaryComparator(Bytes.toBytes( 18 ))); 

  

 filters.add(filter1); 

  

 Filter filter2 = new ValueFilter(CompareOp.EQUAL, new SubstringComparator( lisi ) ); 

 filters.add(filter2); 

  

  

 FilterList f=new FilterList(filters); 

 scan.setFilter(f);

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/7736.html

分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集