Spark_2.2.0_ALS_recommend_scala

Spark_2.2.0_ALS_recommend_scala

针对spark 2.2.0版本,从hive读取数据,转为rdd(由于scala 版本原因,用spark.ml会报错,只能用spark.mllib,所以会转为rdd),以及利用scala工具和sparkML相关工具包进行数据处理与建模,然后对模型进行详细评价,代码如下:

1
2
3
4
5
6
7
8
import spark.implicits._

import org.apache.spark.rdd._
import org.apache.spark.sql._

import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// /home/shmc/spark_bak/bin/spark-shell --keytab shmc.keytab --principal  shmc/bdoc@BCH  --master yarn --queue root.bdoc.shmc --executor-memory 8g --executor-cores 8 --num-executors 64 --driver-memory 64g --conf spark.ui.port=5051  < ls_recomm_model.scala >als_recomm_model.log

//利用样本类创建实列匹配为DataFrame
case class UserInfo(userId: Long, content_code: String, rating: Float, content_code_x: Long, lan_phone_no_x: Long) extends java.io.Serializable{}

case class Movie(movieId: Int, title: String)

case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)

def parseRatingData(str: String): Rating = {
val fields = str.split(",")
assert(fields.size == 5)
Rating(fields(4).toInt, fields(3).toInt, fields(2).toDouble)
}

def parseMovieData(str: String): Movie = {
val fields = str.split(",")
assert(fields.size == 5)
Movie(fields(3).toInt, fields(1).toString)
}

//val spark = SparkSession.builder.master("local").appName("ALSR").getOrCreate()

val myPath = "hdfs://shmbdcluster/user/bdoc/35/services/hive/shmc/st_ana_sp_recomm_feature/"

var ratingsData = spark.read.textFile(myPath).map(parseRatingData _).cache()

var moviesData = spark.read.textFile(myPath).map(parseMovieData _).cache()

//spark-ml DataFrame 传入模型als 与scala版本不匹配报错
//所以用spark-mlib 重写的,需要rdd格式
val ratingsDF = ratingsData.rdd
val moviesDF = moviesData.toDF

// split to data set and test set
val tempPartitions = ratingsDF.randomSplit(Array(0.9, 0.1), 38L)
val trainingSetOfRatingsData = tempPartitions(0)
val testSetOfRatingData = tempPartitions(1)

// training model .setAlpha(0.18) .setImplicitPrefs(true) 隐式调参 效果不好
val recomModel = {
new ALS()
.setRank(68)
.setBlocks(-1)
.setLambda(0.24)
.setIterations(15)
.setSeed(38L)
.run(trainingSetOfRatingsData)
}

//预测
val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{
case Rating(user, product, rating) => (user, product)
})

val formatResultOfTestSet = testSetOfRatingData.map{
case Rating(user, product, rating) => ((user, product), rating)
}

val formatResultOfPredictionResult = predictResultOfTestSet.map {
case Rating(user, product, rating) => ((user, product), rating)
}

val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)

//评分
val mse = finalResultForComparison.map {
case ((user, product), (ratingOfTest, ratingOfPrediction)) =>
val error = (ratingOfTest - ratingOfPrediction)
error*error
}.mean()

println(s"mse error: $mse")
val rmse = Math.sqrt(mse)
println(s"mse error: $rmse")

val MAE = finalResultForComparison.map {
case ((user, product), (ratingOfTest, ratingOfPrediction)) =>
val error = (ratingOfTest - ratingOfPrediction)
Math.abs(error)
}.mean()

println(s"mae error: $MAE")


//val recomResult = recomModel.recommendProducts(1000, 10)
//println(s"Recommend Movie to User ID 1000")
//println(recomResult.mkString("\n"))

//保存结果
val recomR = recomModel.recommendProductsForUsers(10)

val rs = recomR.flatMap(x => {
val user = x._1.toString
val rat = x._2

var res = List[(String,String)]()
for( r <- rat){
res=res:+(user,r.product+","+r.rating+"\n")
}
res
})

val rss = rs.map(r => r._1 +"," + r._2)

rss.saveAsTextFile("hdfs://shmbdcluster/user/bdoc/35/services/hive/shmc/st_ana_sp_recomm_result3")

val movieTitles = moviesDF.as[(Int, String)]// .rdd .collectAsMap()

val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))
println(recommendMoviesWithTitle.mkString("\n"))