人工智能-推荐系统-模块02:离线推荐模块【基于LFM模型的推荐(ALS算法)、基于物品的协同过滤推荐(Item-CF)、基于用户的协同过滤推荐(User-CF)、基于内容的相似推荐(Tf-idf)】
2023-09-27 14:20:38 时间
一、基于LFM(隐语义模型)的离线推荐(ALS算法):基于用户行为数据
1、用ALS算法训练隐语义模型(LFM)
- 均方根误差(RMSE):均方误差的算术平方根,预测值与真实值之间的误差
- 参数调整 可以通过均方根误差,来多次调整参数值,选择RMSE最小的一组参数值:rank,iterations,lambda
2、计算用户推荐矩阵
3、计算商品相似度矩阵
4、案例代码
ALS算法需要构建机器学习模型
4.1 通过网格搜索选取ALS模型最优超参数
ALSHyperParametersTrainer.scala
import breeze.numerics.sqrt
import com.atguigu.offline.OfflineRecommender.MONGODB_RATING_COLLECTION
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
/**
* Project: ECommerceRecommendSystem
* Created on 2019/4/27 11:24
*/
object ALSTrainer {
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )
// 加载数据
val ratingRDD = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(
rating => Rating(rating.userId, rating.productId, rating.score)
).cache()
// 数据集切分成训练集和测试集
val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
val trainingRDD = splits(0)
val testingRDD = splits(1)
// 核心实现:输出最优参数
adjustALSParams( trainingRDD, testingRDD )
spark.stop()
}
def adjustALSParams(trainData: RDD[Rating], testData: RDD[Rating]): Unit ={
// 遍历数组中定义的参数取值
val result = for( rank <- Array(5, 10, 20, 50); lambda <- Array(1, 0.1, 0.01) )
yield {
val model = ALS.train(trainData, rank, 10, lambda)
val rmse = getRMSE( model, testData )
( rank, lambda, rmse )
}
// 按照rmse排序并输出最优参数
println(result.minBy(_._3))
}
def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
// 构建userProducts,得到预测评分矩阵
val userProducts = data.map( item=> (item.user, item.product) )
val predictRating = model.predict(userProducts)
// 按照公式计算rmse,首先把预测评分和实际评分表按照(userId, productId)做一个连接
val observed = data.map( item=> ( (item.user, item.product), item.rating ) )
val predict = predictRating.map( item=> ( (item.user, item.product), item.rating ) )
sqrt(
observed.join(predict).map{
case ( (userId, productId), (actual, pre) ) =>
val err = actual - pre
err * err
}.mean()
)
}
}
4.2 利用最优超参数训练ALS模型,然后进行基于LFM的推荐
OfflineRecommenderALS.scala
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
/**
* Project: ECommerceRecommendSystem
* Created on 2019/4/27 10:19
*/
case class ProductRating( userId: Int, productId: Int, score: Double, timestamp: Int )
case class MongoConfig( uri: String, db: String )
// 定义标准推荐对象
case class Recommendation( productId: Int, score: Double )
// 定义用户的推荐列表
case class UserRecs( userId: Int, recs: Seq[Recommendation] )
// 定义商品相似度列表
case class ProductRecs( productId: Int, recs: Seq[Recommendation] )
object OfflineRecommender {
// 定义mongodb中存储的表名
val MONGODB_RATING_COLLECTION = "Rating"
val USER_RECS = "UserRecs"
val PRODUCT_RECS = "ProductRecs"
val USER_MAX_RECOMMENDATION = 20
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )
// 加载数据
val ratingRDD = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(
rating => (rating.userId, rating.productId, rating.score)
).cache()
// 提取出所有用户和商品的数据集
val userRDD = ratingRDD.map(_._1).distinct()
val productRDD = ratingRDD.map(_._2).distinct()
// 核心计算过程
// 1. 训练隐语义模型
val trainData = ratingRDD.map(x=>Rating(x._1,x._2,x._3))
// 定义模型训练的参数,rank隐特征个数,iterations迭代词数,lambda正则化系数
val ( rank, iterations, lambda ) = ( 5, 10, 0.01 ) // 人工随机初始化的超参数
val model = ALS.train( trainData, rank, iterations, lambda )
// 2. 获得预测评分矩阵,得到用户的推荐列表
// 用userRDD和productRDD做一个笛卡尔积,得到空的userProductsRDD表示的评分矩阵
val userProducts = userRDD.cartesian(productRDD)
val preRating = model.predict(userProducts)
// 从预测评分矩阵中提取得到用户推荐列表
val userRecs = preRating.filter(_.rating>0)
.map(
rating => ( rating.user, ( rating.product, rating.rating ) )
)
.groupByKey()
.map{
case (userId, recs) =>
UserRecs( userId, recs.toList.sortWith(_._2>_._2).take(USER_MAX_RECOMMENDATION).map(x=>Recommendation(x._1,x._2)) )
}
.toDF()
userRecs.write
.option("uri", mongoConfig.uri)
.option("collection", USER_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// 3. 利用商品的特征向量,计算商品的相似度列表
val productFeatures = model.productFeatures.map{
case (productId, features) => ( productId, new DoubleMatrix(features) )
}
// 两两配对商品,计算余弦相似度
val productRecs = productFeatures.cartesian(productFeatures)
.filter{
case (a, b) => a._1 != b._1
}
// 计算余弦相似度
.map{
case (a, b) =>
val simScore = consinSim( a._2, b._2 )
( a._1, ( b._1, simScore ) )
}
.filter(_._2._2 > 0.4)
.groupByKey()
.map{
case (productId, recs) =>
ProductRecs( productId, recs.toList.sortWith(_._2>_._2).map(x=>Recommendation(x._1,x._2)) )
}
.toDF()
productRecs.write
.option("uri", mongoConfig.uri)
.option("collection", PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double ={
product1.dot(product2)/ ( product1.norm2() * product2.norm2() )
}
}
二、基于物品的协同过滤离线推荐(Item-CF):基于用户行为数据
- 怎样找到商品 A 的相似商品?—— 与A有相同标签的商品,喜欢A的人同样喜欢的商品
- 根据行为数据的相似度计算—— Item-CF:根据行为数据,找到喜欢了商品A的用户,同时喜欢了哪些商品,喜欢的人重合度越高相似度就越大
- 基于物品的协同过滤(Item-CF),只需通过埋点收集用户的常规行为数据(比如点击、收藏、购买)就可以得到商品间的相似度,在实际项目中应用很广
- 比如:A商品详情页中的推荐:购买了A商品的用户也购买了以下商品…
- “同现相似度”—— 利用行为数据计算不同商品间的相似度
其中:- N i N_i Ni 是购买商品 i i i (或对商品 i i i 评分)的用户列表, N j N_j Nj 是购买商品 j j j 的用户列表
- N i ∩ N j N_i ∩ N_j Ni∩Nj 表示同时购买了商品 i i i 与商品 j j j 的用户总数、 ∣ N i ∣ ∣ N j ∣ \sqrt{|N_i||N_j|} ∣Ni∣∣Nj∣ 是对热门商品的惩罚项
ItemCFRecommender.scala(Item-CF 不需要构建模型)
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* Project: ECommerceRecommendSystem
* Created on 2019/4/29 10:48
*/
case class ProductRating( userId: Int, productId: Int, score: Double, timestamp: Int )
case class MongoConfig( uri: String, db: String )
// 定义标准推荐对象
case class Recommendation( productId: Int, score: Double )
// 定义商品相似度列表
case class ProductRecs( productId: Int, recs: Seq[Recommendation] )
object ItemCFRecommender {
// 定义常量和表名
val MONGODB_RATING_COLLECTION = "Rating"
val ITEM_CF_PRODUCT_RECS = "ItemCFProductRecs"
val MAX_RECOMMENDATION = 10
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ItemCFRecommender")
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )
// 加载数据,转换成DF进行处理
val ratingDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.map(
x => ( x.userId, x.productId, x.score )
)
.toDF("userId", "productId", "score")
.cache()
// TODO: 核心算法,计算同现相似度,得到商品的相似列表
// 统计每个商品的评分个数,按照productId来做group by
val productRatingCountDF = ratingDF.groupBy("productId").count()
// 在原有的评分表上rating添加count
val ratingWithCountDF = ratingDF.join(productRatingCountDF, "productId")
// 将评分按照用户id两两配对,统计两个商品被同一个用户评分过的次数
val joinedDF = ratingWithCountDF.join(ratingWithCountDF, "userId")
.toDF("userId","product1","score1","count1","product2","score2","count2")
.select("userId","product1","count1","product2","count2")
// 创建一张临时表,用于写sql查询
joinedDF.createOrReplaceTempView("joined")
// 按照product1,product2 做group by,统计userId的数量,就是对两个商品同时评分的人数
val cooccurrenceDF = spark.sql(
"""
|select product1
|, product2
|, count(userId) as cocount
|, first(count1) as count1
|, first(count2) as count2
|from joined
|group by product1, product2
""".stripMargin
).cache()
// 提取需要的数据,包装成( productId1, (productId2, score) )
val simDF = cooccurrenceDF.map{
row =>
val coocSim = cooccurrenceSim( row.getAs[Long]("cocount"), row.getAs[Long]("count1"), row.getAs[Long]("count2") )
( row.getInt(0), ( row.getInt(1), coocSim ) )
}
.rdd
.groupByKey()
.map{
case (productId, recs) =>
ProductRecs( productId, recs.toList
.filter(x=>x._1 != productId)
.sortWith(_._2>_._2)
.take(MAX_RECOMMENDATION)
.map(x=>Recommendation(x._1,x._2)) )
}
.toDF()
// 保存到mongodb
simDF.write
.option("uri", mongoConfig.uri)
.option("collection", ITEM_CF_PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
// 按照公式计算同现相似度
def cooccurrenceSim(coCount: Long, count1: Long, count2: Long): Double ={
coCount / math.sqrt( count1 * count2 )
}
}
三、基于用户的协同过滤离线推荐(User-CF):基于用户行为数据
User-CF 不需要构建模型
四、基于内容的相似离线推荐(Tf-idf):完全基于内容,与用户无关
- 怎样找到商品 A 的相似商品?—— 与A有相同标签的商品,喜欢A的人同样喜欢的商品
- 根据 UGC (用户给商品打的标签)的特征提取 —— 利用TF-IDF算法从商品内容标签中提取特征
- 基于商品的用户标签信息,用TF-IDF算法提取特征向量
- 计算特征向量的余弦相似度,从而得到商品的相似列表
- 在实际应用中,一般会在商品详情页、或商品购买页将相似商品推荐出来
ContentRecommender.scala
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
/**
* Project: ECommerceRecommendSystem
* Created on 2019/4/29 9:08
*/
case class Product( productId: Int, name: String, imageUrl: String, categories: String, tags: String )
case class MongoConfig( uri: String, db: String )
// 定义标准推荐对象
case class Recommendation( productId: Int, score: Double )
// 定义商品相似度列表
case class ProductRecs( productId: Int, recs: Seq[Recommendation] )
object ContentRecommender {
// 定义mongodb中存储的表名
val MONGODB_PRODUCT_COLLECTION = "Product"
val CONTENT_PRODUCT_RECS = "ContentBasedProductRecs"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ContentRecommender")
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )
// 载入数据,做预处理
val productTagsDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Product]
.map(
x => ( x.productId, x.name, x.tags.map(c=> if(c=='|') ' ' else c) )
)
.toDF("productId", "name", "tags")
.cache()
// TODO: 用TF-IDF提取商品特征向量
// 1. 实例化一个分词器,用来做分词,默认按照空格分
val tokenizer = new Tokenizer().setInputCol("tags").setOutputCol("words")
// 用分词器做转换,得到增加一个新列words的DF
val wordsDataDF = tokenizer.transform(productTagsDF)
// 2. 定义一个HashingTF工具,计算频次
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(800)
val featurizedDataDF = hashingTF.transform(wordsDataDF)
// 3. 定义一个IDF工具,计算TF-IDF
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
// 训练一个idf模型
val idfModel = idf.fit(featurizedDataDF)
// 得到增加新列features的DF
val rescaledDataDF = idfModel.transform(featurizedDataDF)
// 对数据进行转换,得到RDD形式的features
val productFeatures = rescaledDataDF.map{
row => ( row.getAs[Int]("productId"), row.getAs[SparseVector]("features").toArray )
}
.rdd
.map{
case (productId, features) => ( productId, new DoubleMatrix(features) )
}
// 两两配对商品,计算余弦相似度
val productRecs = productFeatures.cartesian(productFeatures)
.filter{
case (a, b) => a._1 != b._1
}
// 计算余弦相似度
.map{
case (a, b) =>
val simScore = consinSim( a._2, b._2 )
( a._1, ( b._1, simScore ) )
}
.filter(_._2._2 > 0.4)
.groupByKey()
.map{
case (productId, recs) =>
ProductRecs( productId, recs.toList.sortWith(_._2>_._2).map(x=>Recommendation(x._1,x._2)) )
}
.toDF()
productRecs.write
.option("uri", mongoConfig.uri)
.option("collection", CONTENT_PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double ={
product1.dot(product2)/ ( product1.norm2() * product2.norm2() )
}
}
相关文章
- 【算法】【字符串模块】判断括号字符串是否有效,如果无效则求最长的有效长度
- 【算法】【字符串模块】字符串数组中两个字符之间的最短距离以及对hashcode以及equals的理解
- 【算法】【字符串模块】字符串中的调整和替换
- 【算法】【递归与动态规划模块】矩阵的最小路径和
- 【算法】【二叉树模块】通过先序遍历和中序遍历获取后序遍历数组(不重构二叉树)
- 【算法】【二叉树模块】根据二叉树中序遍历序列计算所有可能的二叉树的总个数并生成所有二叉树
- 【算法】【栈和队列模块】只用一个栈来排序另一个栈
- 【算法】【栈和队列模块】猫狗队列:使用队列收纳两种不同的元素并且能够实时获取
- 【算法】【二叉树模块】判断数组是否能够重建搜索二叉树并实现重建搜索二叉树的过程
- 【算法】【二叉树模块】树的基本先序、中序、后序遍历算法(7种)
- 【算法】【栈和队列模块】求一个[0,1]矩阵中最大的1矩阵面积
- 数据挖掘分类算法之决策树(zz)
- 【MATLAB教程案例31】基于matlab的人脸检测相关算法的仿真与分析——肤色模型与形态学图像处理方法
- 【MATLAB教程案例20】关于优化类算法的改进方向探索及matlab仿真对比分析
- C#,栅栏油漆算法(Painting Fence Algorithm)的源代码
- C#,阿格里数(Ugly Number)的多种算法与源代码
- 0.算法刷题总览
- 《数据结构与算法 C语言版》—— 1.7上机实验
- baselines算法库common/retro_wrappers.py模块分析
- baselines算法库common/vec_env/vec_env.py模块分析
- baselines算法库common/tile_images.py模块分析
- 「Java数据结构和算法」手撕快速、归并、基数排序,图解解析 + 代码实现。
- 华为OD机试 - 不含101的数(JavaScript) | 机试题+算法思路+考点+代码解析 【2023】
- 华为OD机试 - 九宫格按键输入(Python) | 机试题+算法思路+考点+代码解析 【2023】
- 11.1 Ford-Fulkerson算法及Edmonds-Karp算法
- Mahout贝叶斯算法拓展篇3---分类无标签数据
- 矿Java开发学习之旅------>Java排序算法经典的二分法插入排序