Spark On HBASE
MapReduce早已经对接了HBase,以HBase作为数据源,完成批量数据的读写。如今继MapReduce之后的Spark在大数据领域有着举足轻重的地位,无论跑批,流处理,甚至图计算等都有它的用武之地。Spark对接HBase成为不少用户的需求。
二、Spark On HBASE 1.可以解决的问题Spark和HBASE无缝对接意味着我们不再需要关心安全和RDD与HBase交互的细节。更方便应用Spark带来的批处理,流处理等能力。比如以下常见的应用场景:
以HBase作为存储,通过Spark对流式数据处理。 以HBase作为存储,完成大规模的图或者DAG的计算。 通过Spark对HBase做BulkLoad操作 同Spark SQL对HBase数据做交互式分析 2.社区相关的工作目前已经有多种Spark对接HBase的实现,这里我们选取三个有代表的工作进行分析:
2.1 华为: Spark-SQL-on-HBase特点
扩展了Spark SQL的parse功能来对接HBase。通过coprocessor和自定义filter来提升读写性能。
优点
扩展了对应的cli功能,支持scala shell和python shell 多种性能优化方式,甚至支持sub plan到coprocessor实现partial aggregation. 支持java和Python API 支持row key组合 支持常用DDL和DML(包括bulkload,但不支持update)缺点
不支持支持基于时间戳和版本的查询 不支持安全 Row key支持原始类型或者String,不支持复杂数据类型使用示例
在HBase中创建表,并写入数据$HBase_Home/bin/hbase shell create hbase_numbers, f for i in 1..100 do for j in 1..2 do put hbase_numbers, "row#{i}", "f:c#{j}", "#{i}#{j}" end end使用sparksql创建表并与HBase表建立映射
$SPARK_HBASE_Home/bin/hbase-sql CREATE TABLE numbers rowkey STRING, a STRING, b STRING, PRIMARY KEY (rowkey) MAPPED BY hbase_numbers COLS=[a=f.c1, b=f.c2];
select a, b from numbers where b "980"2.2 Hortonworks: Apache HBase Connector
特点
以简单的方式实现了标准的Spark Datasource API,使用Spark Catalyst引擎做查询优化。同时通过scratch来构建RDD,也实现了许多常见的查询优化。
优点
native avro支持 谓词下推和分区裁剪 支持row key组合缺点
SQL语法不够丰富,只支持spark sql原有的语法 只支持java原始类型 不支持多语言API使用示例
定义 HBase Catalogdef catalog = s"""{ |"table":{"namespace":"default", "name":"table1"}, |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} |}""".stripMargin使用SQL查询
// Load the dataframe val df = withCatalog(catalog) //SQL example df.createOrReplaceTempView("table") sqlContext.sql("select count(col1) from table").show2.3 Cloudrea: SparkOnHBase
特点
通过简单的接口实现链接Spark与HBASE, 支持常用的bulk读写。架构图如下:
优点
通过get或者scan直接生成rdd, 并可以使用API完成更高级的功能 支持组合rowkey 支持多种bulk操作 为spark和 spark streaming提供相似的API 支持谓词下推优化缺点
不支持复杂数据类型 SQL只支持spark sql原有的语法使用示例
直接使用scan创建一个RDDSparkConf sparkConf = new SparkConf().setAppName( "Scan_RDD").set("spark.executor.memory", "2000m").setMaster( "spark://xx.xx.xx.xx:7077") .setJars(new String[]{"/path/to/hbase.jar"}); val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() val hbaseContext = new HBaseContext(sc, conf) var scan = new Scan() scan.setCaching(100) var getRdd = hbaseContext.hbaseRDD(tableName, scan)创建一个RDD并把RDD的内容写入HBase
val sc = new SparkContext(sparkConf) //This is making a RDD of //(RowKey, columnFamily, columnQualifier, value) val rdd = sc.parallelize(Array( (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) //Create the HBase config like you normally would then //Pass the HBase configs and SparkContext to the HBaseContext val conf = HBaseConfiguration.create(); val hbaseContext = new HBaseContext(sc, conf); //Now give the rdd, table name, and a function that will convert a RDD record to a put, and finally // A flag if you want the puts to be batched hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, tableName, //This function is really important because it allows our source RDD to have data of any type // Also because puts are not serializable (putRecord) { val put = new Put(putRecord._1) putRecord._2.foreach((putValue) put.add(putValue._1, putValue._2, putValue._3)) true);2.4 综合对比
3. 最后
社区中有不少Spark on HBase的工作,出发点都是为了提供更易用,更高效的接口。其中Cloudrea的SparkOnHbase更加灵活简单,在2015年8月被提交到HBase的主干(trunk)上,模块名为HBase-Spark Module,目前准备在HBASE 2.0 正式Release, 相信这个特性一定是HBase新版本的一个亮点。 于此同时云HBase也会与社区同步发展,使用包括但不限于Spark On HBase的新特性,届时欢迎大家尝鲜。
如若文章中有不准确的描述,请多多指正,谢谢!
4. 参考https://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/
http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/
https://issues.apache.org/jira/browse/HBASE-13992
http://blog.madhukaraphatak.com/introduction-to-spark-two-part-6/
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-catalyst.htmlh
Spark查询Hbase小案例 写作目的 1)正好有些Spark连接HBase的需求,当个笔记本,到时候自己在写的时候,可以看 2)根据rowkey查询其实我还是查询了好久才找到,所以整理了一下 3)好久没发博客了,水一篇
Spark on HBase Connector:如何在Spark侧设置HBase参数 X-Pack Spark可以使用Spark on HBase Connector直接对接HBase数据库,读取HBase数据表数据。有时在读取HBase时需要设置HBase的一些参数调整性能,例如通过设置hbase.client.scanner.caching的大小调整读取HBase数据的性能。
数据库云HBase 版本spark服务支持D1机型 信息摘要: 数据库云HBase 版本spark服务支持D1机型,适合起步超过20T数据库的大客户,每GB存储单价最低。适用客户: 大企业版本/规格功能: spark支持D1机型产品文档: 数据库云HBase 版本spark服务支持D1机型,具体spark服务参考https://help.
线下沙龙最全资料下载往期回顾 阿里云栖开发者沙龙 - BigData NoSQL Meetup(上海站)【精彩直播+最全资料下载】阿里云栖开发者沙龙 - BigData NoSQL Meetup(上海站)业内大咖齐聚,各大技术社区支持,与你畅聊 BigData NoSQL中国HBase技术社区第十届meetup——HBase生态实践(杭州站) 中国HBase技术社区第十届meetup——HBase生态实践(杭州站)回顾,4位数据库技术大咖共话HBase技术实践。
第十二届 BigData NoSQL Meetup — 基于hbase的New sql落地实践 立即下载
相关文章
- Hbase架构和读写流程
- java连接hbase报错
- Apache HBase 入门
- 亿级下ApsaraDB HBase Phoenix秒级内RT在大搜车实践
- HBase – Memstore Flush深度解析
- hbase操作
- hbase 程序优化 参数调整方法
- HBase集群部署脚本
- HBase提供的工具
- HBase性能调优
- Java 向Hbase表插入数据报(org.apache.hadoop.hbase.client.HTablePool$PooledHTable cannot be cast to org.apac)
- 利用CombineFileInputFormat把netflix data set 导入到Hbase里
- 利用Hbase做二度关系人脉存储
- HBase-1.2.4 Allow block cache to be external分析