Hbase(四) 过滤器查询详解大数据
2023-06-13 09:20:22 时间
引言:过滤器的类型很多,但是可以分为两大类——比较过滤器,专用过滤器
过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端;
一、hbase过滤器的分类
1、比较过滤器
行键过滤器 RowFilter
Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes( row-22 ))); scan.setFilter(filter1);
列族过滤器 FamilyFilter
Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes( colfam3 ))); scan.setFilter(filter1);
列过滤器 QualifierFilter
Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes( col-2 ))); scan.setFilter(filter1);
值过滤器 ValueFilter
Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator( .4 ) ); scan.setFilter(filter1);
2、专用过滤器
单列值过滤器 SingleColumnValueFilter -会返回满足条件的整行
SingleColumnValueFilter filter = new SingleColumnValueFilter( Bytes.toBytes( colfam1 ), Bytes.toBytes( col-5 ), CompareFilter.CompareOp.NOT_EQUAL, new SubstringComparator( val-5 )); filter.setFilterIfMissing(true); //如果不设置为 true,则那些不包含指定 column 的行也会返回 scan.setFilter(filter1);
单列值排除器 SingleColumnValueExcludeFilter 返回排除了该列的结果 与上面的结果相反
前缀过滤器 PrefixFilter -针对行键
Filter filter = new PrefixFilter(Bytes.toBytes( row1 )); scan.setFilter(filter1);
列前缀过滤器 ColumnPrefixFilter
Filter filter = new ColumnPrefixFilter(Bytes.toBytes( qual2 )); scan.setFilter(filter1);
分页过滤器 PageFilter
代码实现:
package com.ghgj.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; public class HbasePageDemo { // 声明静态配置 static Configuration conf = null; private static final String ZK_CONNECT_STR = hadoop01:2181,hadoop02:2181,hadoop03:2181,hadoop04:2181,hadoop05:2181 static { conf = HBaseConfiguration.create(); conf.set( hbase.zookeeper.quorum , ZK_CONNECT_STR); } public static void main(String[] args) throws Exception { String tableName = testfilter String cfName = f1 final byte[] POSTFIX = new byte[] { 0x00 }; HTable table = new HTable(conf, tableName); Filter filter = new PageFilter(3); byte[] lastRow = null; int totalRows = 0; while (true) { Scan scan = new Scan(); scan.setFilter(filter); if(lastRow != null){ //注意这里添加了 POSTFIX 操作,用来重置扫描边界 byte[] startRow = Bytes.add(lastRow,POSTFIX); scan.setStartRow(startRow); } ResultScanner scanner = table.getScanner(scan); int localRows = 0; Result result; while((result = scanner.next()) != null){ System.out.println(localRows++ + : + result); totalRows ++; lastRow = result.getRow(); } scanner.close(); if(localRows == 0) break; } System.out.println( total rows: + totalRows); } / ** * 多种过滤条件的使用方法 * @throws Exception */ @Test public void testScan() throws Exception{ HTable table = new HTable(conf, person .getBytes()); Scan scan = new Scan(Bytes.toBytes( person_zhang_000001 ), Bytes.toBytes( person_zhang_000002 )); //前缀过滤器----针对行键 Filter filter = new PrefixFilter(Bytes.toBytes( person )); //行过滤器 ---针对行键 ByteArrayComparable rowComparator = new BinaryComparator(Bytes.toBytes( person_zhang_000001 )); RowFilter rf = new RowFilter(CompareOp.LESS_OR_EQUAL, rowComparator); rf = new RowFilter(CompareOp.EQUAL , new SubstringComparator( _2016-12-31_ )); //单值过滤器 1 完整匹配字节数组 new SingleColumnValueFilter( base_info .getBytes(), name .getBytes(), CompareOp.EQUAL, zhangsan .getBytes()); //单值过滤器 2 匹配正则表达式 ByteArrayComparable comparator = new RegexStringComparator( zhang. new SingleColumnValueFilter( info .getBytes(), NAME .getBytes(), CompareOp.EQUAL, comparator); //单值过滤器 3 匹配是否包含子串,大小写不敏感 comparator = new SubstringComparator( wu new SingleColumnValueFilter( info .getBytes(), NAME .getBytes(), CompareOp.EQUAL, comparator); //键值对元数据过滤-----family 过滤----字节数组完整匹配 FamilyFilter ff = new FamilyFilter(CompareOp.EQUAL , new BinaryComparator(Bytes.toBytes( base_info )) //表中不存 在 inf 列族,过滤结果为空 ); //键值对元数据过滤-----family 过滤----字节数组前缀匹配 ff = new FamilyFilter( CompareOp.EQUAL , new BinaryPrefixComparator(Bytes.toBytes( inf )) //表中存在以 inf 打头的列族 info,过滤结果为该列族所有行 ); //键值对元数据过滤-----qualifier 过滤----字节数组完整匹配 filter = new QualifierFilter( CompareOp.EQUAL , new BinaryComparator(Bytes.toBytes( na )) //表中不存在 na 列,过滤结果为空 ); filter = new QualifierFilter( CompareOp.EQUAL , new BinaryPrefixComparator(Bytes.toBytes( na )) //表中存在以 na 打头的列 name,过滤结果为所有行的该列数据 ); //基于列名(即 Qualifier)前缀过滤数据的 ColumnPrefixFilter filter = new ColumnPrefixFilter( na .getBytes()); //基于列名(即 Qualifier)多个前缀过滤数据的 MultipleColumnPrefixFilter byte[][] prefixes = new byte[][] {Bytes.toBytes( na ), Bytes.toBytes( me )}; filter = new MultipleColumnPrefixFilter(prefixes); //为查询设置过滤条件 scan.setFilter(filter); scan.addFamily(Bytes.toBytes( base_info )); //一行 // Result result = table.get(get); //多行的数据 ResultScanner scanner = table.getScanner(scan); for(Result r : scanner){ /** for(KeyValue kv : r.list()){ String family = new String(kv.getFamily()); System.out.println(family); String qualifier = new String(kv.getQualifier()); System.out.println(qualifier); System.out.println(new String(kv.getValue())); } */ //直接从 result 中取到某个特定的 value byte[] value = r.getValue(Bytes.toBytes( base_info ), Bytes.toBytes( name )); System.out.println(new String(value)); } table.close(); } }
分页过滤器 代码实现:
package com.ghgj.hbase.test1610; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.util.Bytes; /** * 501条 * * 每页100条,求第四页 : 301 - 400 * * pageIndex:第几页 * pageNumber:每页几条 * * * 在hbase当中取一部分数据的取法: * scan user_info ,{COLUMNS = base_info:name , * LIMIT = 4, STARTROW = zhangsan_20150701_0001 } * * mysqL:从第几条开始,取多少条 * * 从mysql的分页规则引申到hbase的分页:把startRow转换成mysql的第几条 */ public class HBasePageFilterDemo { private static final String ZK_CONNECT_STR = hadoop03:2181,hadoop04:2181,hadoop05:2181 private static final String TABLE_NAME = user_info private static final String FAMILY_BASIC = base_info private static final String FAMILY_EXTRA = extra_info private static final String COLUMN_NAME = name private static final String COLUMN_AGE = age private static final String ROW_KEY = rk0001 private static Configuration config = null; private static HTable table = null; static { config = HBaseConfiguration.create(); config.set( hbase.zookeeper.quorum , ZK_CONNECT_STR); try { table = new HTable(config, TABLE_NAME); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { // ResultScanner pageData = getPageData( zhangsan_20150701_0001 , 4); ResultScanner pageData = getPageData(2, 4); HBasePrintUtil.printResultScanner(pageData); // String lastRowkey = getLastRowkey(pageData); // System.out.println(lastRowkey); } public static ResultScanner getPageData(int pageIndex, int pageNumber) throws Exception{ // 怎么把pageIndex 转换成 startRow String startRow = null; if(pageIndex == 1){ // 当客户方法只取第一页的分页数据时, ResultScanner pageData = getPageData(startRow, pageNumber); return pageData; }else{ ResultScanner newPageData = null; for(int i=0; i pageIndex - 1; i++){ // 总共循环次数是比你取的页数少1 newPageData = getPageData(startRow, pageNumber); startRow = getLastRowkey(newPageData); byte[] add = Bytes.add(Bytes.toBytes(startRow), new byte[]{ 0X00 }); startRow = Bytes.toString(add); } newPageData = getPageData(startRow, pageNumber); return newPageData; } } /** * @param startRow * @param pageNumber * @return * * scan user_info ,{COLUMNS = base_info:name , * LIMIT = 4, STARTROW = zhangsan_20150701_0001 } * @throws Exception */ public static ResultScanner getPageData(String startRow, int pageNumber) throws Exception{ Scan scan = new Scan(); scan.addColumn(Bytes.toBytes( base_info ), Bytes.toBytes( name )); // 設置當前查询的其实位置 if(!StringUtils.isBlank(startRow)){ scan.setStartRow(Bytes.toBytes(startRow)); } // 第二个参数 Filter pageFilter = new PageFilter(pageNumber); scan.setFilter(pageFilter); ResultScanner rs = table.getScanner(scan); return rs; } public static String getLastRowkey(ResultScanner rs){ String lastRowkey = null; for(Result result : rs){ // System.out.println(result.getRow()); lastRowkey = Bytes.toString(result.getRow()); } return lastRowkey; // return null; } }
多条件过滤时,可以使用FilterList
List Filter filters = new ArrayList Filter SingleColumnValueFilter filter =new SingleColumnValueFilter( Bytes.toBytes( info ), Bytes.toBytes( age ), CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes( 20 ))); filters.add(filter); SingleColumnValueFilter filter1 =new SingleColumnValueFilter( Bytes.toBytes( info ), Bytes.toBytes( age ), CompareOp.GREATER, new BinaryComparator(Bytes.toBytes( 18 ))); filters.add(filter1); Filter filter2 = new ValueFilter(CompareOp.EQUAL, new SubstringComparator( lisi ) ); filters.add(filter2); FilterList f=new FilterList(filters); scan.setFilter(f);
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/7736.html
分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集相关文章
- 数据透视表上线!如何在纯前端实现这个强大的数据分析功能?
- hbase 过滤数据
- HBase数据定义
- Flink开发-Hive数据导入HBase中
- Hbase—— rowkey 过滤器(rowfilter)详解大数据
- Hadoop综合练习第九节–HBase基础知识详解大数据
- HBase学习之路 (八)HBase大牛博客详解大数据
- HBase学习之路 (二)HBase集群安装详解大数据
- Hbase三Java,python操作Hbase详解大数据
- HBase入门基础教程详解大数据
- HBase中的HMaster、HRegionServer、Zookeeper详解大数据
- Hbase安装详解大数据
- HBase周边知识详解大数据
- HBase深入学习(1)详解大数据
- HBase不稳定因素分析详解大数据
- Hbase内存磁盘大致关系详解大数据
- Hadoop、Hbase、Hive、Spark分布式系统架构详解大数据
- HBase数据迁移到Kafka实战详解大数据
- HBase查询优化之Short-Circuit Local Reads详解大数据
- HBase – Phoenix剖析详解大数据
- 高可用Hadoop平台-HBase集群搭建详解大数据
- Spring Boot 2.x :通过 spring-boot-starter-hbase 集成 HBase
- 如何在Linux上启动HBase?(linux启动hbase)
- HBase与Oracle 比较两款数据库的优缺点(hbase和oracle)
- 谷歌 HBaseCon West 2017 大数据研讨会开幕在即,搞 Apache HBase 的开发者不可错过