Apache Spark技术实战(三)利用Spark将json文件导入Cassandra &SparkR的安装及使用
将存在于json文件中的数据导入到cassandra数据库,目前由cassandra提供的官方工具是json2sstable,由于对cassandra本身了解不多,这个我还没有尝试成功。
但想到spark sql中可以读取json文件,而spark-cassadra-connector又提供了将RDD存入到数据库的功能,我想是否可以将两者结合一下。
创建KeySpace和Table为了减少复杂性,继续使用实战3中的keyspace和table,
CREATE KEYSPACE test WITH replication = {class: SimpleStrategy, replication_factor: 1 }; CREATE TABLE test.kv(key text PRIMARY KEY, value int);启动spark-shell
与实战3中描述一致。
bin/spark-shell --driver-class-path /root/working/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector_2.10-1.1.0-SNAPSHOT.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-thrift/jars/cassandra-thrift-2.0.9.jar:/root/.ivy2/cache/org.apache.thrift/libthrift/jars/libthrift-0.9.1.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-clientutil/jars/cassandra-clientutil-2.0.9.jar:/root/.ivy2/cache/com.datastax.cassandra/cassandra-driver-core/jars/cassandra-driver-core-2.0.4.jar:/root/.ivy2/cache/io.netty/netty/bundles/netty-3.9.0.Final.jar:/root/.ivy2/cache/com.codahale.metrics/metrics-core/bundles/metrics-core-3.0.2.jar:/root/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.7.7.jar:/root/.ivy2/cache/org.apache.commons/commons-lang3/jars/commons-lang3-3.3.2.jar:/root/.ivy2/cache/org.joda/joda-convert/jars/joda-convert-1.2.jar:/root/.ivy2/cache/joda-time/joda-time/jars/joda-time-2.3.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-all/jars/cassandra-all-2.0.9.jar:/root/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.2.jar准备json文件
以spark自带的person.json文件为例,内容如下所示
{"name":"Andy", "age":30} {"name":"Justin", "age":19}
假设person.json文件存储在$SPARK_HOME目录,在启动spark-shell之后,执行如下语句
sc.stop import com.datastax.spark.connector._ import org.apache.spark._ val conf = new SparkConf() conf.set("spark.cassandra.connection.host", "127.0.0.1") val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val path = "./people.json" val people = sqlContext.jsonFile(path) people.map(p= (p.getString(10),p.getInt(0))) .saveToCassandra("test","kv",SomeColumns("key","value"))
注意:
jsonFile返回的是jsonRDD,其中每一个成员是Row类型,并不行直接将saveToCassandra作用于jsonRDD,需要先作一步转换即map过程 map中使用到的getXXX函数是在事先已知数据类型的情况下取出其值 最后saveToCassandra触发数据的存储过程另外一个地方值得记录一下,如果在cassandra中创建的表使用了uuid作为primary key,在scala中使用如下函数来生成uuid
import java.util.UUID UUID.randomUUID
使用cqlsh来查看数据是否已经真正的写入到test.kv表中。
本次实验结合了以下知识:
本文简要介绍如何使用spark-cassandra-connector将json文件导入到cassandra数据库,这是一个使用spark的综合性示例。
假设已经阅读技术实战之3,并安装了如下软件
scala spark sql spark RDD的转换函数 spark-cassandra-connector 二 SparkR的安装及使用根据论坛上的信息,在Sparkrelease计划中,在Spark 1.3中有将SparkR纳入到发行版的可能。本文就提前展示一下如何安装及使用SparkR.
SparkR的出现解决了R语言中无法级联扩展的难题,同时也极大的丰富了Spark在机器学习方面能够使用的Lib库。SparkR和Spark MLLIB将共同构建出Spark在机器学习方面的优势地位。
使用SparkR能让用户同时使用Spark RDD提供的丰富Api,也可以调用R语言中丰富的Lib库。
安装SparkR先决条件:
已经安装好openjdk 7 安装好了R安装步骤:
步骤1: 运行R Shell
bash# R
步骤2:在R shell中安装rJava
install.packages("rJava")
步骤3: 在R shell中安装devtools
install.packages("devtools")
步骤4: 安装好rJava及devtools,接下来安装SparkR
library(devtools) install_github("amplab-extras/SparkR-pkg", subdir="pkg")使用SparkR来运行wordcount
安装完SparkR之后,可以用wordcount来检验安装正确与否。
步骤1:在R shell中加载SparkR
library(SparkR)
步骤2:初始化SparkContext及执行wordcount
sc - sparkR.init(master="local", "RwordCount") lines - textFile(sc, "README.md") words - flatMap(lines, function(line) { strsplit(line, " ")[[1]] wordCount - lapply(words, function(word) { list(word, 1L) }) counts - reduceByKey(wordCount, "+", 2L) output - collect(counts) for (wordcount in output) { cat(wordcount[[1]], ": ", wordcount[[2]], "\n")
如果想将SparkR运行于集群环境中,只需要将master=local,换成spark集群的监听地址即可
时间匆忙,还有两件事情没有来得及细细分析。
SparkR的代码实现 如果很好的将R中支持的数据挖掘算法与Spark并行化处理能力很好的结合 https://github.com/amplab-extras/SparkR-pkg《Apache Spark 中文实战攻略下册》电子版地址 《Apache Spark 中文实战攻略(下册)》让企业大数据平台性能更优。阿里、Databricks、领英、Intel都在用!Spark 企业级最佳实践中文解读全收纳!
《Apache Spark 中文实战攻略上册》电子版地址 《Apache Spark 中文实战攻略(上册)》全新收录了Spark+AI Summit 2020 中文精华版峰会,Apache Spark 3.0性能优化与基础实战一书看遍!
《Apache Spark 中文实战攻略下册》电子版 《Apache Spark 中文实战攻略(下册)》让企业大数据平台性能更优。阿里、Databricks、领英、Intel都在用!Spark 企业级最佳实践中文解读全收纳!
相关文章
- Eureka&CAP原理
- [- Flutter 数据&状态篇 -] InheritedWidget
- Apache配置虚拟主机_apache中配置虚拟主机的作用
- Apache配置虚拟主机_apache启动但是访问不到
- 零零信安-D&D数据泄露报警日报【第22期】
- 谷粒学院day0&day1——项目介绍与mybatis plus入门
- Javaweb03-servlet&filter
- MCE | 单胺能 & 非单胺能对抑郁症的作用
- R&D奇谭 第6期:找程序员女婿
- 草料二维码&腾讯云HiFlow联合直播精彩回顾
- 性能测试(第3集)第18讲:JMeter HTTP Request&参数化&CVS Data Set Config&函数助手
- Topaz Photo AI mac&win(人工智能降噪软件)
- 解密Prompt系列2. 冻结Prompt微调LM: T5 & PET & LM-BFF
- 7 Papers & Radios | 21℃室温超导引爆物理圈;微软发布视觉ChatGPT
- 跨越适配&性能那道坎,企鹅电竞Android weex优化
- Oracle故障处理Rman-06207&Rman-06214的方法
- heartbeat amp DDOS攻击资源扫描.c
- 解决多种web问题Linux下Apache解决多种Web问题:让你的网络更安全可靠(linux下apache)
- 以Apache、MySQL和PHP组成的最强技术栈(apache mysql php)
- Apache与MySQL联动改变Web服务(apache跟mysql)
- Apache与MySQL的完美融合开启精彩程序之旅(apache加mysql)
- AMP MySQL升级提升数据库性能的必要之举(amp mysql升级)
- 如何在Apache中配置MySQL数据库连接(mysql、apache)
- Oracle替换技术一种提高工作效率的新方法(oracle &替换)
- AT&T 在 10 城提供 sub-6GHz 5G 应急安全接入
- 使用Apache&花生壳架设Web服务器
- 修复IE9&safari的sort方法
- 磁盘缓存专题之一缓存命中和缓存未命中&缓存与缓冲间的差异
- Linux下APACHE&PHP&MYSQL&CGI修改版
- win8下XAMPP中Apache模块无效(apache无法打开)的解决方法
- js调用图片隐藏&显示实现代码