MongoDB Spark Connector 实战指南
Why Spark with MongoDB?
高性能,官方号称 100x faster,因为可以全内存运行,性能提升肯定是很明显的
简单易用,支持 Java、Python、Scala、SQL 等多种语言,使得构建分析应用非常简单
统一构建 ,支持多种数据源,通过 Spark RDD 屏蔽底层数据差异,同一个分析应用可运行于不同的数据源;
应用场景广泛,能同时支持批处理以及流式处理
MongoDB Spark Connector 为官方推出,用于适配 Spark 操作 MongoDB 数据;本文以 Python 为例,介绍 MongoDB Spark Connector 的使用,帮助你基于 MongoDB 构建第一个分析应用。
准备 MongoDB 环境
安装 MongoDB 参考 Install MongoDB Community Edition on Linux
(福利推荐:阿里云、腾讯云、华为云服务器最新限时优惠活动,云服务器1核2G仅88元/年、2核4G仅698元/3年,点击这里立即抢购>>>)
mkdir mongodata
mongod –dbpath mongodata –port 9555
准备 Spark python 环境
参考 PySpark – Quick Guide
下载 Spark
cd /home/mongo-spark
wget
tar zxvf spark-2.4.4-bin-hadoop2.7.tgz
设置 Spark 环境变量
export SPARK_HOME=/home/mongo-spark/spark-2.4.4-bin-hadoop2.7
export PATH=$PATH:/home/mongo-spark/spark-2.4.4-bin-hadoop2.7/bin
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH=$SPARK_HOME/python:$PATH
运行 Spark RDD 示例
count.py
from pyspark import SparkContext
sc = SparkContext(“local”, “count app”)
words = sc.parallelize (
[“scala”,
“java”,
“hadoop”,
“spark”,
“akka”,
“spark vs hadoop”,
“pyspark”,
“pyspark and spark”]
)
counts = words.count()
$SPARK_HOME/bin/spark-submit count.py
Number of elements in RDD → 8
如果上述程序运行成功,说明 Spark python 环境准备成功,还可以测试 Spark 的其他 RDD 操作,比如 collector、filter、map、reduce、join 等,更多买QQ示例参考 PySpark – Quick Guide
Spark 操作 MongoDB 数据
参考 Spark Connector Python Guide
准备测试数据 test.coll01 插入3条测试数据,test.coll02 未空
mongo –port 9555
db.coll01.find()
{ “_id” : 1, “type” : “apple”, “qty” : 5 }
{ “_id” : 2, “type” : “orange”, “qty” : 10 }
{ “_id” : 3, “type” : “banana”, “qty” : 15 }
db.coll02.find()
准备操作脚本,将输入集合的数据按条件进行过滤,写到输出集合
mongo-spark-test.py
from pyspark.sql import SparkSession
Create Spark Session
spark = SparkSession
.builder .appName("myApp") .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:9555/test.coll01") .config("spark.mongodb.output.uri", "mongodb://127.0.0.1:9555/test.coll") .getOrCreate()
Read from MongoDB
df = spark.read.format(“mongo”).load()
df.show()
Filter and Write
df.filter(df[‘qty’] >= 10).write.format(“mongo”).mode(“append”).save()
Use SQL
df.createOrReplaceTempView(“temp”)
some_fruit = spark.sql(“SELECT type, qty FROM temp WHERE type LIKE ‘%e%'”)
some_fruit.show()
运行脚本
$SPARK_HOME/bin/spark-submit –packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 mongo-spark-test.py
mongo –port 9555
db.coll02.find()
{ “_id” : 2, “qty” : 10, “type” : “orange” }
{ “_id” : 3, “qty” : 15, “type” : “banana” }
你还在原价购买阿里云、腾讯云、华为云、天翼云产品?那就亏大啦!现在申请成为四大品牌云厂商VIP用户,可以3折优惠价购买云服务器等云产品,并且可享四大云服务商产品终身VIP优惠价,还等什么?赶紧点击下面对应链接免费申请VIP客户吧:
相关文章
- ServerManager.exe 0xc0000135 应用程序错误
- IIS7/8下提示 HTTP 错误 404.13 - Not Found 请求筛选模块被配置为拒绝超过请求内容长度的请求
- Eclipse oxygen 版本汉化教程
- 微软汉字转拼音
- Ueditor 前后端分离实现文件上传到独立服务器
- 汉字转拼音类(多音字)
- 兼容IE,chrome 等所有浏览器 回到顶部代码
- unicode 汉字编码表
- 检测URL地址是否有响应
- Cesium for Unreal加载倾斜摄影
- 智能制造车间生产线可视化
- Blender修改视野范围
- UE4 蓝图查找Actor和Actor标签
- UE 实现镜头平移,旋转和缩放
- UE 实现鼠标点选模型
- UE导入FBX、GLTF模型
- 三维引擎导入obj模型不可见总结
- 三维引擎导入obj模型全黑总结
- 使用SVG做模型贴图的思路
- 光伏逆变器建筑设计工具