Spark-ML-基于云平台和用户日志的推荐系统
数据收集:spark stareming从Azure Queue收集数据,通过自定义的spark stareming receiver,源源不断的消费流式数据。
数据处理: spark stareming分析用户行为日志数据,通过实时的聚集,统计报表现有的应用的运营信息,,也可以通过离线的训练模型,对实现数据进行预测和标注。
结果输出:hdfs
数据收集用到了这个东西,miner是个js可以收集用户的行为日志,前端收集和回传用户行为日志。
协同过滤(Collaborative Filtering, 简称CF),wiki上的定义是:简单来说是利用某兴趣相投、拥有共同经验之群体的喜好来推荐使用者感兴趣的资讯,个人透过合作的机制给予资讯相当程度的回应(如评分)并记录下来以达到过滤的目的进而帮助别人筛选资讯,回应不一定局限于特别感兴趣的,特别不感兴趣资讯的纪录也相当重要。
以上定义太拗口,举个简单的例子:我现在多年不看日本anime的新番了,最近突然又想看几部新番,但又不知道这么多新番中看哪些比较好,于是我就找几个同样喜欢日本动漫的朋友来咨询。我第一个想咨询的朋友是和我口味最像的,我们都特别喜欢看《虫师》、《黑之契约者》、《寒蝉》这样的小众动画;我问的第二个朋友和我口味差不多,他特别喜欢看《钢炼》《无头骑士异闻录》这样的动画,我虽然喜欢,但不像他那么喜欢;由于身边喜欢日本动画的朋友不多,剩下第三个可以咨询的是一个宅女,平常经常看腐、宅、基的动漫,完全跟我不是一路人,于是问了下她推荐的片子,并将这些片子打上的黑名单的标签。然后我就开始看第一个朋友推荐的片子了,要是时间特别多又很无聊我可能会看第二个朋友推荐的,但打死我也不会看第三个朋友推荐的。这就是协同过滤的一个简化、小众版。
如何进行相似度度量
接着上面的例子,我可以通过我和其它朋友共同喜欢某个或某类动漫来确定我们的口味是否一样,那么如何以数学或者机器的形式来表示这个“口味一样”呢?通常,是通过“距离”来表示,例如:欧几里得距离、皮尔逊相关度、曼哈顿距离、Jaccard系数等等。
欧几里得距离
欧几里德距离(Euclidean Distance),最初用于计算欧几里得空间中两个点的距离,在二维空间中,就是我们熟悉的两点间的距离,x、y表示两点,维度为n:
[Math Processing Error]d(x,y)=(∑in(xi−yi)2)
相似度:
[Math Processing Error]sim(x,y)=11+d(x,y)
皮尔逊相关度
皮尔逊相关度(Pearson Correlation Coefficient),用于判断两组数据与某一直线拟合程度的一种度量,取值在[-1,1]之间。当数据不是很规范的时候(如偏差较大),皮尔逊相关度会给出较好的结果。
[Math Processing Error]p(x,y)=∑xiyi−nxy¯(n−1)SxSy=n∑xiyi−∑xi∑yin∑xi2−(∑xi)2n∑yi2−(∑yi)2
曼哈顿距离
曼哈顿距离(Manhattan distance),就是在欧几里得空间的固定直角坐标系上两点所形成的线段对轴产生的投影的距离总和。
[Math Processing Error]d(x,y)=∑∥xi−yi∥
Jaccard系数
Jaccard系数,也称为Tanimoto系数,是Cosine相似度的扩展,也多用于计算文档数据的相似度。通常应用于x为布尔向量,即各分量只取0或1的时候。此时,表示的是x,y的公共特征的占x,y所占有的特征的比例:
[Math Processing Error]T(x,y)=x∙y∥x∥2+∥y∥2−x∙y=∑xiyi∑xi2+∑yi2−∑xiyi
计算推荐
根据上述“距离”的算法,我们可以找出与自己“口味一样”的人了,但这并不是目的,目的是找出推荐的物品。一种常用的做法是选出与你兴趣相同的N个人,然后根据这N个人的记录来进行加权推荐。具体如下,假设已经计算出欧几里得相似度:
Sim.Sum 1.75 2 1.05
总计 Sim.Sum 9.31 8.09 5.95*
其中,s.x开头的表示相似度与评分的乘积,Sim.Sum表示打过分的朋友的相似度之和。可以看出根据三位友人的推荐,我从这三个东西中A来看。
Item CF与User CF
基于用户的协同过滤(User CF),其基本思想相当简单,基于用户对物品的偏好找到相邻邻居用户,然后将邻居用户喜欢的推荐给当前用户。上述过程就属于User CF。
基于物品的CF(Item CF)的原理和基于用户的CF类似,只是在计算邻居时采用物品本身,而不是从用户的角度,即基于用户对物品的偏好找到相似的物品,然后根据用户的历史偏好,推荐相似的物品给他。两者的计算复杂度和适用场景皆不同。
public class UserSideCF implements Serializable { private static final Pattern TAB = Pattern.compile("\t"); public MatrixFactorizationModel buildModel(RDD Rating rdd) { //训练模型 int rank = 10; int numIterations = 20; MatrixFactorizationModel model = ALS.train(rdd, rank, numIterations, 0.01); return model; public RDD Rating [] splitData() { //分割数据,一部分用于训练,一部分用于测试 SparkConf sparkConf = new SparkConf().setAppName("JavaALS").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD String lines = sc.textFile("/home/nodin/ml-100k/u.data"); JavaRDD Rating ratings = lines.map(line - { String[] tok = TAB.split(line); int x = Integer.parseInt(tok[0]); int y = Integer.parseInt(tok[1]); double rating = Double.parseDouble(tok[2]); return new Rating(x, y, rating); RDD Rating [] splits = ratings.rdd().randomSplit(new double[]{0.6,0.4}, 11L); return splits; public static void main(String[] args) { UserSideCF cf = new UserSideCF(); RDD Rating [] splits = cf.splitData(); MatrixFactorizationModel model = cf.buildModel(splits[0]); Double MSE = cf.getMSE(splits[0].toJavaRDD(), model); System.out.println("Mean Squared Error = " + MSE); //训练数据的MSE Double MSE1 = cf.getMSE(splits[1].toJavaRDD(), model); System.out.println("Mean Squared Error1 = " + MSE1); //测试数据的MSE public Double getMSE(JavaRDD Rating ratings, MatrixFactorizationModel model) { //计算MSE JavaPairRDD usersProducts = ratings.mapToPair(rating - new Tuple2 (rating.user(), rating.product())); JavaPairRDD Tuple2 Integer, Integer , Double predictions = model.predict(usersProducts.rdd()) .toJavaRDD() .mapToPair(new PairFunction Rating, Tuple2 Integer, Integer , Double () { @Override public Tuple2 Tuple2 Integer, Integer , Double call(Rating rating) throws Exception { return new Tuple2 (new Tuple2 (rating.user(), rating.product()), rating.rating()); JavaPairRDD Tuple2 Integer, Integer , Double ratesAndPreds = ratings .mapToPair(new PairFunction Rating, Tuple2 Integer, Integer , Double () { @Override public Tuple2 Tuple2 Integer, Integer , Double call(Rating rating) throws Exception { return new Tuple2 (new Tuple2 (rating.user(), rating.product()), rating.rating()); JavaPairRDD joins = ratesAndPreds.join(predictions); return joins.mapToDouble(new DoubleFunction Tuple2 Tuple2 Integer, Integer , Tuple2 Double, Double () { @Override public double call(Tuple2 Tuple2 Integer, Integer , Tuple2 Double, Double o) throws Exception { double err = o._2()._1() - o._2()._2(); return err * err; }).mean(); }
Hadoop和Spark的异同 Hadoop实质上是解决大数据大到无法在一台计算机上进行存储、无法在要求的时间内进行处理的问题,是一个分布式数据基础设施。 HDFS,它将巨大的数据集分派到一个由普通计算机组成的集群中的多个节点进行存储,通过将块保存到多个副本上,提供高可靠的文件存储。 MapReduce,通过简单的Mapper和Reducer的抽象提供一个编程模型,可以在一个由几十台上百台的机器上并发地分布式处理大量数据集,而把并发、分布式和故障恢复等细节隐藏。
HADOOP MapReduce 处理 Spark 抽取的 Hive 数据【解决方案一】 今天咱先说问题,经过几天测试题的练习,我们有从某题库中找到了新题型,并且成功把我们干趴下,昨天今天就干了一件事,站起来。 java mapeduce 清洗 hive 中的数据 ,清晰之后将driver代码 进行截图提交。
Storm与Spark、Hadoop三种框架对比 Storm与Spark、Hadoop这三种框架,各有各的优点,每个框架都有自己的最佳应用场景。所以,在不同的应用场景下,应该选择不同的框架。
大数据Spark企业级实战与Hadoop实战&PDF和PPT 今天给大家分享的是《大数据Spark企业级实战》与《Hadoop实战》《大数据处理系统·Hadoop源代码情景分析》《50个大厂大数据算法教程》等销量排行前10名的大数据技术书籍(文末领取PDF版)。这些书籍具有以下几个优点:易读、实践性强,对解决工作中遇到的业务问题具有一定启发性。
相关文章
- 日志平台之ELKStack
- 实习日志 08/14更新
- SpringBoot:Windows平台下JAR包的启动,停止和日志分割脚本
- 日志调试不理想?试试分布式追踪优势
- 分布式系列教程(39) -分布式日志采集系统ELK
- 《ELK Stack权威指南(第2版)》一3.6 Java日志
- SQL server查询每个库的数据文件大小和日志文件大小
- kafka日志保留时间设置无效问题
- ELK实时日志分析平台环境部署--完整记录(ElasticSearch+Logstash+Kibana )
- DataHub: 现代数据栈的元数据平台--如何从DataHub容器中提取日志?
- python解析基于xml格式的日志文件
- Kafka - 主题Topic与消费者消息Offset日志记录机制
- 基于Docker的ELK日志平台搭建
- 《嵌入式系统开发之道——菜鸟成长日志与项目经理的私房菜》——第1章 系统•嵌入•硬件 01-01 Welcome on board!
- Hadoop案例(一)之日志清洗
- Qt开源作品21-日志重定向输出类
- ELK_elk+redis 搭建日志分析平台
- 【Linux篇<Day13>】——nmcli网络参数配置、ssh远程管理、日志管理
- Loadrunner日志设置与查看
- 秋式网站日志分析器[IISLogViewer] V3版本发布
- 使用log4javascript记录日志
- (三)xxx项目系统之日志全面监控,实现主流开发语言和平台日志监控分析和告警,简单高效的非侵入式接入项目
- ELK实时日志分析平台环境部署,以及可视化展示
- 【Java】slf4j日志框架
- 数据库系统原理课程总结8——备份与日志初步、并发模拟实验