zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Hbase三Java,python操作Hbase详解大数据

PythonJAVAHBase数据 详解 操作
2023-06-13 09:20:26 时间
python操作Hbase

由于Hbase是java开发的,所有如需要用python进行对Hbase的操作就需要借助Thrift等工具让语言透明化

安装Thrift之前所需准备

yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libeventdevel zlib-devel python-devel ruby-devel openssl-devel


进入源码目录并查找thrift对python的支持模块:find . -name Hbase.thrift,查找后地址为:./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift


进入查找后的目录:cd ./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/


进入gen-py目录,将hbase目录拷贝到需要运行python脚本文件的同级目录中,命令:cp -raf gen-py/hbase/ /test/hbase_test


检查端口是否被监听
命令:netstat -antup | grep 9090

执行python文件,对hbase进行操作
transport = TSocket.TSocket(master, 9090) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Hbase.Client(protocol) transport.open() #============================== base_info_contents = ColumnDescriptor(name=meta-data:, maxVersions=1) other_info_contents = ColumnDescriptor(name=flags:, maxVersions=1) client.createTable(new_music_table, [base_info_contents, other_info_contents]) print client.getTableNames()

运行python文件,命令:python create_table.py

创建insert_data.py文件,进行插入数据操作
transport = TSocket.TSocket(master, 9090) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Hbase.Client(protocol) transport.open() tableName = new_music_table rowKey = 1100 mutations = [Mutation(column="meta-data:name", value="wangqingshui"), / Mutation(column="meta-data:tag", value="pop"), / Mutation(column="flags:is_valid", value="TRUE")] client.mutateRow(tableName, rowKey, mutations, None)
transport = TSocket.TSocket(master, 9090) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Hbase.Client(protocol) transport.open() tableName = new_music_table rowKey = 1100 result = client.getRow(tableName, rowKey, None) for r in result: print the row is , r.row print the name is , r.columns.get(meta-data:name).value print the flag is , r.columns.get(flags:is_valid).value
transport = TSocket.TSocket(master, 9090) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Hbase.Client(protocol) transport.open() tableName = new_music_table scan = TScan() id = client.scannerOpenWithScan(tableName, scan, None) result = client.scannerGetList(id, 10) for r in result: print ====== print the row is , r.row for k, v in r.columns.items(): print "/t".join([k, v.value]) 模块存放位置

hbase python以及thrift python

Hbase三Java,python操作Hbase详解大数据

Java操作Hbase 向Hbase中写记录
package com.cxqy.baseoperation; 

import java.io.IOException; 

import java.util.ArrayList; 

import java.util.List; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.hbase.Cell; 

import org.apache.hadoop.hbase.HBaseConfiguration; 

import org.apache.hadoop.hbase.KeyValue; 

import org.apache.hadoop.hbase.client.Get; 

import org.apache.hadoop.hbase.client.HTable; 

import org.apache.hadoop.hbase.client.Put; 

import org.apache.hadoop.hbase.client.Result; 

import org.apache.hadoop.hbase.client.ResultScanner; 

import org.apache.hadoop.hbase.client.Scan; 

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 

import org.apache.hadoop.hbase.filter.FilterList; 

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 

import org.apache.hadoop.hbase.util.Bytes; 

public class HbasePutOneRecord { 

 public static final String TableName = "user_action_table"; 

 public static final String ColumnFamily = "action_log"; 

 public static Configuration conf = HBaseConfiguration.create(); 

 private static HTable table; 

 public static void addOneRecord(String tableName, String rowKey, String family, String qualifier, String value) 

 throws IOException { 

 table = new HTable(conf, tableName); 

 Put put = new Put(Bytes.toBytes(rowKey)); 

 put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value)); 

 table.put(put); 

 System.out.println("insert record " + rowKey + " to table " + tableName + " success"); 

 public static void main(String[] args) throws IOException { 

 conf.set("hbase.master", "192.168.87.200:60000"); 

 conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202"); 

 // TODO Auto-generated method stub 

 try { 

 addOneRecord(TableName, "ip=192.168.87.200-001", ColumnFamily, "ip", "192.168.87.101"); 

 addOneRecord(TableName, "ip=192.168.87.200-001", ColumnFamily, "userid", "1100"); 

 addOneRecord(TableName, "ip=192.168.87.200-002", ColumnFamily, "ip", "192.168.1.201"); 

 addOneRecord(TableName, "ip=192.168.87.200-002", ColumnFamily, "userid", "1200"); 

 addOneRecord(TableName, "ip=192.168.87.200-003", ColumnFamily, "ip", "192.168.3.201"); 

 addOneRecord(TableName, "ip=192.168.87.200-003", ColumnFamily, "userid", "1300"); 

 } catch (Exception e) { 

 // TODO Auto-generated catch block 

 e.printStackTrace(); 

}
从Hbase中读记录
package com.cxqy.baseoperation; 

import java.io.IOException; 

import java.util.ArrayList; 

import java.util.List; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.hbase.Cell; 

import org.apache.hadoop.hbase.HBaseConfiguration; 

import org.apache.hadoop.hbase.KeyValue; 

import org.apache.hadoop.hbase.client.Get; 

import org.apache.hadoop.hbase.client.HTable; 

import org.apache.hadoop.hbase.client.Result; 

import org.apache.hadoop.hbase.client.ResultScanner; 

import org.apache.hadoop.hbase.client.Scan; 

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 

import org.apache.hadoop.hbase.filter.FilterList; 

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 

import org.apache.hadoop.hbase.util.Bytes; 

public class HbaseGetOneRecord { 

 public static final String TableName = "user_action_table"; 

 public static final String ColumnFamily = "action_log"; 

 public static Configuration conf = HBaseConfiguration.create(); 

 private static HTable table; 

 public static void selectRowKey(String tablename, String rowKey) throws IOException { 

 table = new HTable(conf, tablename); 

 Get g = new Get(rowKey.getBytes()); 

 Result rs = table.get(g); 

 System.out.println("== " + new String(rs.getRow())); 

 for (Cell kv : rs.rawCells()) { 

 System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------"); 

 System.out.println("Column Family: " + new String(kv.getFamily())); 

 System.out.println("Column :" + new String(kv.getQualifier())); 

 System.out.println("value : " + new String(kv.getValue())); 

 public static void main(String[] args) throws IOException { 

 conf.set("hbase.master", "192.168.87.200:60000"); 

 conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202"); 

 // TODO Auto-generated method stub 

 try { 

 selectRowKey(TableName, "ip=192.168.87.200-003"); 

 } catch (Exception e) { 

 // TODO Auto-generated catch block 

 e.printStackTrace(); 

}
在Hbase中删除某个记录
package com.cxqy.baseoperation; 

import java.io.IOException; 

import java.util.ArrayList; 

import java.util.List; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.hbase.Cell; 

import org.apache.hadoop.hbase.HBaseConfiguration; 

import org.apache.hadoop.hbase.KeyValue; 

import org.apache.hadoop.hbase.client.Delete; 

import org.apache.hadoop.hbase.client.Get; 

import org.apache.hadoop.hbase.client.HTable; 

import org.apache.hadoop.hbase.client.Put; 

import org.apache.hadoop.hbase.client.Result; 

import org.apache.hadoop.hbase.client.ResultScanner; 

import org.apache.hadoop.hbase.client.Scan; 

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 

import org.apache.hadoop.hbase.filter.FilterList; 

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 

import org.apache.hadoop.hbase.util.Bytes; 

public class HbaseDelOneRecord { 

 public static final String TableName = "user_action_table"; 

 public static final String ColumnFamily = "action_log"; 

 public static Configuration conf = HBaseConfiguration.create(); 

 private static HTable table; 

 public static void delOneRecord(String tableName, String rowKey) throws IOException { 

 table = new HTable(conf, tableName); 

 List Delete list = new ArrayList Delete 

 Delete delete = new Delete(rowKey.getBytes()); 

 list.add(delete); 

 table.delete(list); 

 System.out.println("delete record " + rowKey + " success!"); 

 public static void main(String[] args) throws IOException { 

 conf.set("hbase.master", "192.168.87.200:60000"); 

 conf.set("hbase.zookeeper.quorum", "192.168.87.200,192.168.87.201,192.168.87.202"); 

 // TODO Auto-generated method stub 

 try { 

 delOneRecord(TableName, "ip=192.168.87.200-002"); 

 } catch (Exception e) { 

 // TODO Auto-generated catch block 

 e.printStackTrace(); 

}
从Hbase中批量读记录
package com.cxqy.baseoperation; 

import java.io.IOException; 

import java.util.ArrayList; 

import java.util.List; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.hbase.Cell; 

import org.apache.hadoop.hbase.HBaseConfiguration; 

import org.apache.hadoop.hbase.KeyValue; 

import org.apache.hadoop.hbase.client.Delete; 

import org.apache.hadoop.hbase.client.Get; 

import org.apache.hadoop.hbase.client.HTable; 

import org.apache.hadoop.hbase.client.Put; 

import org.apache.hadoop.hbase.client.Result; 

import org.apache.hadoop.hbase.client.ResultScanner; 

import org.apache.hadoop.hbase.client.Scan; 

import org.apache.hadoop.hbase.filter.BinaryComparator; 

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 

import org.apache.hadoop.hbase.filter.Filter; 

import org.apache.hadoop.hbase.filter.FilterList; 

import org.apache.hadoop.hbase.filter.RowFilter; 

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 

import org.apache.hadoop.hbase.util.Bytes; 

public class HbaseScanManyRecords { 

 public static final String TableName = "user_action_table"; 

 public static final String ColumnFamily = "action_log"; 

 public static Configuration conf = HBaseConfiguration.create(); 

 private static HTable table; 

 public static void getManyRecords(String tableName) throws IOException { 

 table = new HTable(conf, tableName); 

 Scan scan = new Scan(); 

 ResultScanner scanner = table.getScanner(scan); 

 for (Result result : scanner) { 

 for (KeyValue kv : result.raw()) { 

 System.out.print(new String(kv.getRow()) + " "); 

 System.out.print(new String(kv.getFamily()) + ":"); 

 System.out.print(new String(kv.getQualifier()) + " "); 

 System.out.print(kv.getTimestamp() + " "); 

 System.out.println(new String(kv.getValue())); 

 public static void getManyRecordsWithFilter(String tableName, String rowKey) throws IOException { 

 table = new HTable(conf, tableName); 

 Scan scan = new Scan(); 

// scan.setStartRow(Bytes.toBytes("ip=10.11.1.2-996")); 

// scan.setStopRow(Bytes.toBytes("ip=10.11.1.2-997")); 

 Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(rowKey))); 

 scan.setFilter(filter); 

 ResultScanner scanner = table.getScanner(scan); 

 for (Result result : scanner) { 

 for (KeyValue kv : result.raw()) { 

 System.out.print(new String(kv.getRow()) + " "); 

 System.out.print(new String(kv.getFamily()) + ":"); 

 System.out.print(new String(kv.getQualifier()) + " "); 

 System.out.print(kv.getTimestamp() + " "); 

 System.out.println(new String(kv.getValue())); 

 public static void getManyRecordsWithFilter(String tableName, ArrayList String rowKeyList) throws IOException { 

 table = new HTable(conf, tableName); 

 Scan scan = new Scan(); 

 List Filter filters = new ArrayList Filter 

 for(int i = 0; i rowKeyList.size(); i++) { 

 filters.add(new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(rowKeyList.get(i))))); 

 FilterList filerList = new FilterList(FilterList.Operator.MUST_PASS_ONE, filters); 

 scan.setFilter(filerList); 

 ResultScanner scanner = table.getScanner(scan); 

 for (Result result : scanner) { 

 System.out.println("==============="); 

 for (KeyValue kv : result.raw()) { 

 System.out.print(new String(kv.getRow()) + " "); 

 System.out.print(new String(kv.getFamily()) + ":"); 

 System.out.print(new String(kv.getQualifier()) + " "); 

 System.out.print(kv.getTimestamp() + " "); 

 System.out.println(new String(kv.getValue())); 

 public static void main(String[] args) throws IOException { 

 conf.set("hbase.master", "192.168.159.30:60000"); 

 conf.set("hbase.zookeeper.quorum", "192.168.159.30,192.168.159.31,192.168.159.32"); 

 // TODO Auto-generated method stub 

 try { 

// getManyRecords(TableName); 

// getManyRecordsWithFilter(TableName, "ip=192.11.1.200-0"); 

 ArrayList String whiteRowKeyList =new ArrayList (); 

 whiteRowKeyList.add("ip=192.168.87.200-001"); 

 whiteRowKeyList.add("ip=192.168.87.200-003"); 

 getManyRecordsWithFilter(TableName, whiteRowKeyList); 

 //getManyRecords(TableName); 

 } catch (Exception e) { 

 // TODO Auto-generated catch block 

 e.printStackTrace(); 

}

9152.html

分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集