zl程序教程

您现在的位置是:首页 >  后端

当前栏目

人工智能-推荐系统-模块02:离线推荐模块【基于LFM模型的推荐(ALS算法)、基于物品的协同过滤推荐(Item-CF)、基于用户的协同过滤推荐(User-CF)、基于内容的相似推荐(Tf-idf)】

算法模块系统人工智能 基于 用户 模型 内容
2023-09-27 14:20:38 时间

一、基于LFM(隐语义模型)的离线推荐(ALS算法):基于用户行为数据

1、用ALS算法训练隐语义模型(LFM)

在这里插入图片描述

  • 均方根误差(RMSE):均方误差的算术平方根,预测值与真实值之间的误差
    在这里插入图片描述
  • 参数调整 可以通过均方根误差,来多次调整参数值,选择RMSE最小的一组参数值:rank,iterations,lambda

2、计算用户推荐矩阵

在这里插入图片描述

3、计算商品相似度矩阵

在这里插入图片描述

4、案例代码

ALS算法需要构建机器学习模型

4.1 通过网格搜索选取ALS模型最优超参数

ALSHyperParametersTrainer.scala

import breeze.numerics.sqrt
import com.atguigu.offline.OfflineRecommender.MONGODB_RATING_COLLECTION
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
  * Project: ECommerceRecommendSystem
  * Created on 2019/4/27 11:24
  */
object ALSTrainer {
  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender"
    )
    // 创建一个spark config
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
    // 创建spark session
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._
    implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )

    // 加载数据
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[ProductRating]
      .rdd
      .map(
      rating => Rating(rating.userId, rating.productId, rating.score)
    ).cache()

    // 数据集切分成训练集和测试集
    val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
    val trainingRDD = splits(0)
    val testingRDD = splits(1)

    // 核心实现:输出最优参数
    adjustALSParams( trainingRDD, testingRDD )

    spark.stop()
  }

  def adjustALSParams(trainData: RDD[Rating], testData: RDD[Rating]): Unit ={
    // 遍历数组中定义的参数取值
    val result = for( rank <- Array(5, 10, 20, 50); lambda <- Array(1, 0.1, 0.01) )
      yield {
        val model = ALS.train(trainData, rank, 10, lambda)
        val rmse = getRMSE( model, testData )
        ( rank, lambda, rmse )
      }
    // 按照rmse排序并输出最优参数
    println(result.minBy(_._3))
  }

  def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
    // 构建userProducts,得到预测评分矩阵
    val userProducts = data.map( item=> (item.user, item.product) )
    val predictRating = model.predict(userProducts)

    // 按照公式计算rmse,首先把预测评分和实际评分表按照(userId, productId)做一个连接
    val observed = data.map( item=> ( (item.user, item.product),  item.rating ) )
    val predict = predictRating.map( item=> ( (item.user, item.product),  item.rating ) )

    sqrt(
      observed.join(predict).map{
      case ( (userId, productId), (actual, pre) ) =>
        val err = actual - pre
        err * err
    }.mean()
    )
  }
}

4.2 利用最优超参数训练ALS模型,然后进行基于LFM的推荐

OfflineRecommenderALS.scala

import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix

/**
  * Project: ECommerceRecommendSystem
  * Created on 2019/4/27 10:19
  */

case class ProductRating( userId: Int, productId: Int, score: Double, timestamp: Int )
case class MongoConfig( uri: String, db: String )

// 定义标准推荐对象
case class Recommendation( productId: Int, score: Double )
// 定义用户的推荐列表
case class UserRecs( userId: Int, recs: Seq[Recommendation] )
// 定义商品相似度列表
case class ProductRecs( productId: Int, recs: Seq[Recommendation] )

object OfflineRecommender {
  // 定义mongodb中存储的表名
  val MONGODB_RATING_COLLECTION = "Rating"

  val USER_RECS = "UserRecs"
  val PRODUCT_RECS = "ProductRecs"
  val USER_MAX_RECOMMENDATION = 20

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender"
    )
    // 创建一个spark config
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
    // 创建spark session
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._
    implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )

    // 加载数据
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[ProductRating]
      .rdd
      .map(
        rating => (rating.userId, rating.productId, rating.score)
      ).cache()

    // 提取出所有用户和商品的数据集
    val userRDD = ratingRDD.map(_._1).distinct()
    val productRDD = ratingRDD.map(_._2).distinct()

    // 核心计算过程
    // 1. 训练隐语义模型
    val trainData = ratingRDD.map(x=>Rating(x._1,x._2,x._3))
    // 定义模型训练的参数,rank隐特征个数,iterations迭代词数,lambda正则化系数
    val ( rank, iterations, lambda ) = ( 5, 10, 0.01 )  // 人工随机初始化的超参数
    val model = ALS.train( trainData, rank, iterations, lambda )

    // 2. 获得预测评分矩阵,得到用户的推荐列表
    // 用userRDD和productRDD做一个笛卡尔积,得到空的userProductsRDD表示的评分矩阵
    val userProducts = userRDD.cartesian(productRDD)
    val preRating = model.predict(userProducts)

    // 从预测评分矩阵中提取得到用户推荐列表
    val userRecs = preRating.filter(_.rating>0)
      .map(
        rating => ( rating.user, ( rating.product, rating.rating ) )
      )
      .groupByKey()
      .map{
        case (userId, recs) =>
          UserRecs( userId, recs.toList.sortWith(_._2>_._2).take(USER_MAX_RECOMMENDATION).map(x=>Recommendation(x._1,x._2)) )
      }
      .toDF()
    userRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", USER_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    // 3. 利用商品的特征向量,计算商品的相似度列表
    val productFeatures = model.productFeatures.map{
      case (productId, features) => ( productId, new DoubleMatrix(features) )
    }
    // 两两配对商品,计算余弦相似度
    val productRecs = productFeatures.cartesian(productFeatures)
      .filter{
        case (a, b) => a._1 != b._1
      }
      // 计算余弦相似度
      .map{
        case (a, b) =>
          val simScore = consinSim( a._2, b._2 )
          ( a._1, ( b._1, simScore ) )
      }
      .filter(_._2._2 > 0.4)
      .groupByKey()
      .map{
        case (productId, recs) =>
          ProductRecs( productId, recs.toList.sortWith(_._2>_._2).map(x=>Recommendation(x._1,x._2)) )
      }
      .toDF()
    productRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", PRODUCT_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()
  }
  def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double ={
    product1.dot(product2)/ ( product1.norm2() * product2.norm2() )
  }
}

二、基于物品的协同过滤离线推荐(Item-CF):基于用户行为数据

在这里插入图片描述

  • 怎样找到商品 A 的相似商品?—— 与A有相同标签的商品,喜欢A的人同样喜欢的商品
  • 根据行为数据的相似度计算—— Item-CF:根据行为数据,找到喜欢了商品A的用户,同时喜欢了哪些商品,喜欢的人重合度越高相似度就越大
  • 基于物品的协同过滤(Item-CF),只需通过埋点收集用户的常规行为数据(比如点击、收藏、购买)就可以得到商品间的相似度,在实际项目中应用很广
  • 比如:A商品详情页中的推荐:购买了A商品的用户也购买了以下商品…
  • “同现相似度”—— 利用行为数据计算不同商品间的相似度
    在这里插入图片描述
    其中:
    • N i N_i Ni 是购买商品 i i i (或对商品 i i i 评分)的用户列表, N j N_j Nj 是购买商品 j j j 的用户列表
    • N i ∩ N j N_i ∩ N_j NiNj 表示同时购买了商品 i i i 与商品 j j j 的用户总数、 ∣ N i ∣ ∣ N j ∣ \sqrt{|N_i||N_j|} Ni∣∣Nj 是对热门商品的惩罚项

在这里插入图片描述
ItemCFRecommender.scala(Item-CF 不需要构建模型)

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * Project: ECommerceRecommendSystem
  * Created on 2019/4/29 10:48
  */

case class ProductRating( userId: Int, productId: Int, score: Double, timestamp: Int )
case class MongoConfig( uri: String, db: String )

// 定义标准推荐对象
case class Recommendation( productId: Int, score: Double )
// 定义商品相似度列表
case class ProductRecs( productId: Int, recs: Seq[Recommendation] )

object ItemCFRecommender {
  // 定义常量和表名
  val MONGODB_RATING_COLLECTION = "Rating"
  val ITEM_CF_PRODUCT_RECS = "ItemCFProductRecs"
  val MAX_RECOMMENDATION = 10

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender"
    )
    // 创建一个spark config
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ItemCFRecommender")
    // 创建spark session
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._
    implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )

    // 加载数据,转换成DF进行处理
    val ratingDF = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[ProductRating]
      .map(
        x => ( x.userId, x.productId, x.score )
      )
      .toDF("userId", "productId", "score")
      .cache()

    // TODO: 核心算法,计算同现相似度,得到商品的相似列表
    // 统计每个商品的评分个数,按照productId来做group by
    val productRatingCountDF = ratingDF.groupBy("productId").count()
    // 在原有的评分表上rating添加count
    val ratingWithCountDF = ratingDF.join(productRatingCountDF, "productId")

    // 将评分按照用户id两两配对,统计两个商品被同一个用户评分过的次数
    val joinedDF = ratingWithCountDF.join(ratingWithCountDF, "userId")
      .toDF("userId","product1","score1","count1","product2","score2","count2")
      .select("userId","product1","count1","product2","count2")
    // 创建一张临时表,用于写sql查询
    joinedDF.createOrReplaceTempView("joined")

    // 按照product1,product2 做group by,统计userId的数量,就是对两个商品同时评分的人数
    val cooccurrenceDF = spark.sql(
      """
        |select product1
        |, product2
        |, count(userId) as cocount
        |, first(count1) as count1
        |, first(count2) as count2
        |from joined
        |group by product1, product2
      """.stripMargin
    ).cache()

    // 提取需要的数据,包装成( productId1, (productId2, score) )
    val simDF = cooccurrenceDF.map{
      row =>
        val coocSim = cooccurrenceSim( row.getAs[Long]("cocount"), row.getAs[Long]("count1"), row.getAs[Long]("count2") )
        ( row.getInt(0), ( row.getInt(1), coocSim ) )
    }
      .rdd
      .groupByKey()
      .map{
        case (productId, recs) =>
          ProductRecs( productId, recs.toList
                                        .filter(x=>x._1 != productId)
                                        .sortWith(_._2>_._2)
                                        .take(MAX_RECOMMENDATION)
                                        .map(x=>Recommendation(x._1,x._2)) )
      }
      .toDF()

    // 保存到mongodb
    simDF.write
      .option("uri", mongoConfig.uri)
      .option("collection", ITEM_CF_PRODUCT_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()
  }

  // 按照公式计算同现相似度
  def cooccurrenceSim(coCount: Long, count1: Long, count2: Long): Double ={
    coCount / math.sqrt( count1 * count2 )
  }
}

三、基于用户的协同过滤离线推荐(User-CF):基于用户行为数据

User-CF 不需要构建模型

四、基于内容的相似离线推荐(Tf-idf):完全基于内容,与用户无关

在这里插入图片描述

  • 怎样找到商品 A 的相似商品?—— 与A有相同标签的商品,喜欢A的人同样喜欢的商品
  • 根据 UGC (用户给商品打的标签)的特征提取 —— 利用TF-IDF算法从商品内容标签中提取特征
  • 基于商品的用户标签信息,用TF-IDF算法提取特征向量
    在这里插入图片描述
  • 计算特征向量的余弦相似度,从而得到商品的相似列表
    在这里插入图片描述
  • 在实际应用中,一般会在商品详情页、或商品购买页将相似商品推荐出来

ContentRecommender.scala

import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix

/**
  * Project: ECommerceRecommendSystem
  * Created on 2019/4/29 9:08
  */

case class Product( productId: Int, name: String, imageUrl: String, categories: String, tags: String )
case class MongoConfig( uri: String, db: String )

// 定义标准推荐对象
case class Recommendation( productId: Int, score: Double )

// 定义商品相似度列表
case class ProductRecs( productId: Int, recs: Seq[Recommendation] )

object ContentRecommender {
  // 定义mongodb中存储的表名
  val MONGODB_PRODUCT_COLLECTION = "Product"
  val CONTENT_PRODUCT_RECS = "ContentBasedProductRecs"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender"
    )
    // 创建一个spark config
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ContentRecommender")
    // 创建spark session
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._
    implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )

    // 载入数据,做预处理
    val productTagsDF = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_PRODUCT_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[Product]
      .map(
        x => ( x.productId, x.name, x.tags.map(c=> if(c=='|') ' ' else c) )
      )
      .toDF("productId", "name", "tags")
      .cache()

    // TODO: 用TF-IDF提取商品特征向量
    // 1. 实例化一个分词器,用来做分词,默认按照空格分
    val tokenizer = new Tokenizer().setInputCol("tags").setOutputCol("words")
    // 用分词器做转换,得到增加一个新列words的DF
    val wordsDataDF = tokenizer.transform(productTagsDF)

    // 2. 定义一个HashingTF工具,计算频次
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(800)
    val featurizedDataDF = hashingTF.transform(wordsDataDF)

    // 3. 定义一个IDF工具,计算TF-IDF
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    // 训练一个idf模型
    val idfModel = idf.fit(featurizedDataDF)
    // 得到增加新列features的DF
    val rescaledDataDF = idfModel.transform(featurizedDataDF)

    // 对数据进行转换,得到RDD形式的features
    val productFeatures = rescaledDataDF.map{
      row => ( row.getAs[Int]("productId"), row.getAs[SparseVector]("features").toArray )
    }
      .rdd
      .map{
        case (productId, features) => ( productId, new DoubleMatrix(features) )
      }

    // 两两配对商品,计算余弦相似度
    val productRecs = productFeatures.cartesian(productFeatures)
      .filter{
        case (a, b) => a._1 != b._1
      }
      // 计算余弦相似度
      .map{
      case (a, b) =>
        val simScore = consinSim( a._2, b._2 )
        ( a._1, ( b._1, simScore ) )
    }
      .filter(_._2._2 > 0.4)
      .groupByKey()
      .map{
        case (productId, recs) =>
          ProductRecs( productId, recs.toList.sortWith(_._2>_._2).map(x=>Recommendation(x._1,x._2)) )
      }
      .toDF()
    productRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", CONTENT_PRODUCT_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()
  }
  def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double ={
    product1.dot(product2)/ ( product1.norm2() * product2.norm2() )
  }
}