Spark_2.2.0_LogisticRegression_scala


Spark_2.2.0_LogisticRegression_scala

针对spark 2.2.0版本,从hive读取数据,转为DataFrame,以及利用scala工具和sparkML相关工具包进行数据处理与建模,然后对模型进行详细评价,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.HiveContext

import scala.collection.mutable.ListBuffer

import util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.rand

import org.apache.spark.ml.stat.ChiSquareTest
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.classification.LogisticRegression

import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.{Pipeline,PipelineStage}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.feature.{IndexToString, OneHotEncoder,StringIndexer, VectorIndexer,VectorAssembler}

import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression, LogisticRegressionModel,LogisticRegressionSummary}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158

//配置环境
val hiveCtx = new HiveContext(sc)
val myPath = "hdfs://shmbdcluster/user/bdoc/35/services/hive/shmc/tmp_ana_hm_train_feature/"

//string 格式读取
//val data= spark.read.format("com.databricks.spark.csv").option("delimiter","\t").load("hdfs://shmbdcluster/user/bdoc/35/services/hive/shmc/tmp_ana_hm_train_feature/month_id=202009/*").cache()

//可以直接读取为label features 的DataFrame
val rawData = sc.textFile(myPath).filter(row => !(row.contains("\\N")))

//读取label and features 全为string 需要进行分割
val colsLength = rawData.first.split(",").slice(0, 46).length.toInt
val rdd = rawData.map(_.split(',').slice(0, 46).map(_.toString)).map(p=>Row(p:_*))

//根据列数 直接取名
//val colNames =(0 until colsLength).map(i => "col_" + (i+1))
val colNames = Array("user_id","phone_no","offer_id","label","is_zn","FML_FLAG","LCL_FLAG","GENDER_ID","device_flag","LVL1_PLAN_ID","CREDIT_LEVEL_ID","CROWD_TOGETHER_FLAG","lan_offer_lvl2_type_snt","CROWD_MAIN_PHONE_NO_FLAG","age","dou","mou","jw_num","bill_fee","use_kday","band_rate","eff_kdate","group_num","exp_kdate","lacci_num","ONLINE_id","have_child","have_older","days_home1","days_workl","start_zdate","man_call_dur","sum_up_kflow","man_call_num","group_num_wsyd","sum_down_kflow","group_num_yiwang","area_id","term_model_snt","lacci_home1_snt","lacci_work1_snt","community_jid_snt","community_kid_snt","create_area_id_snt","term_brand_name_snt","multi_term_cata3_name_snt")
//创建结构体 DoubleType
val schema = StructType(colNames.map(fieldName => StructField(fieldName.toString,StringType)))
//创建dataFrame
val data = spark.createDataFrame(rdd,schema)

//1.打乱数据 通过创建一列随机数,排序来对DataFrame数据进行打乱
val randr = data.select("user_id").map(row => (row.getAs[String](0), Random.nextDouble())).toDF("user_id","rand")
randr.show(5)
val df1 = data.join(randr,Seq[String]("user_id","user_id"),"left").orderBy("rand")
df1.show(4)
val df = df1.drop("rand")
//2.数据打乱 进行rand()函数产生一列数据,直接记性打乱
var HouseWifiDF_newsam = df.orderBy(rand())

//列出相关结果变量
//val colNames_1 =(1 until colsLength).map(i => "col_" + (i+1)).toArray
val wcol = {Array("label","is_zn","FML_FLAG","LCL_FLAG","GENDER_ID","device_flag","LVL1_PLAN_ID","CREDIT_LEVEL_ID","CROWD_TOGETHER_FLAG","lan_offer_lvl2_type_snt","CROWD_MAIN_PHONE_NO_FLAG","age","dou","mou","jw_num","bill_fee","use_kday","band_rate","eff_kdate","group_num","exp_kdate","lacci_num","ONLINE_id","have_child","have_older","days_home1","days_workl","start_zdate","man_call_dur","sum_up_kflow","man_call_num","group_num_wsyd","sum_down_kflow","group_num_yiwang","area_id","term_model_snt","lacci_home1_snt","lacci_work1_snt","community_jid_snt","community_kid_snt","create_area_id_snt","term_brand_name_snt","multi_term_cata3_name_snt")}

val colNames_1 = {Array("is_zn","FML_FLAG","LCL_FLAG","GENDER_ID","device_flag","LVL1_PLAN_ID","CREDIT_LEVEL_ID","CROWD_TOGETHER_FLAG","lan_offer_lvl2_type_snt","CROWD_MAIN_PHONE_NO_FLAG","age","dou","mou","jw_num","bill_fee","use_kday","band_rate","eff_kdate","group_num","exp_kdate","lacci_num","ONLINE_id","have_child","have_older","days_home1","days_workl","start_zdate","man_call_dur","sum_up_kflow","man_call_num","group_num_wsyd","sum_down_kflow","group_num_yiwang","area_id","term_model_snt","lacci_home1_snt","lacci_work1_snt","community_jid_snt","community_kid_snt","create_area_id_snt","term_brand_name_snt","multi_term_cata3_name_snt")}
val continuousVariable = Array("dou","mou","jw_num","bill_fee","use_kday","eff_kdate","group_num","exp_kdate","start_zdate","man_call_dur","sum_up_kflow","man_call_num","group_num_wsyd","sum_down_kflow","group_num_yiwang","have_child","have_older")
val categoricalVariable = colNames_1.filter(v => !continuousVariable.contains(v))

//对连续性变量进行类型转换
val wdata = df.select(wcol.map(name => col(name).cast(DoubleType)): _*)

//数据集划分
val Array(trainingData, tData) = wdata.randomSplit(Array(0.75, 0.25), seed = 38)
val Array(testingData, vaildingData) = tData.randomSplit(Array(0.6, 0.4), seed = 38)

/*
//采用Pileline方式处理机器学习流程 对每一个类别特征 进行onehot
val stagesArray = new ListBuffer[PipelineStage]()
for (cate <- categoricalVariable) {
//使用StringIndexer 建立类别索引
val indexer = new StringIndexer().setInputCol(cate).setOutputCol(s"${cate}_Index")
// 使用OneHotEncoder将分类变量转换为二进制稀疏向量
val encoder = new OneHotEncoder().setInputCol(indexer.getOutputCol).setOutputCol(s"${cate}_onehot")
stagesArray.append(indexer, encoder)
}

val assemblerInputs = categoricalVariable.map(_ + "_onehot")
// 使用VectorAssembler将所有特征转换为一个向量
*/

//对离散变量进行合成向量
val categoricalAssembler = new VectorAssembler().setInputCols(categoricalVariable).setOutputCol("categoricalFeature")
//对连续变量进行合成向量
val continuousAssembler = new VectorAssembler().setInputCols(continuousVariable).setOutputCol("continuousFeature")
//对连续性变量进行标准化
val scaler = new StandardScaler().setInputCol("continuousFeature").setOutputCol("continuousFeature1").setWithStd(true).setWithMean(true)
//对全部变量合成向量Features
val Assembler = new VectorAssembler().setInputCols(Array("categoricalFeature","continuousFeature1")).setOutputCol("features")

//建立逻辑回归模型 .setMaxIter(68)
val lr = new LogisticRegression().setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("label").setFeaturesCol("features")
val lr = new LogisticRegression().setLabelCol("label").setFeaturesCol("features")

//建立通道一次Array顺序进行数据操作转换
//val lrPipeline = new Pipeline().setStages(stagesArray.toArray ++ Array(categoricalAssembler,continuousAssembler,scaler,Assembler, lr))
val lrPipeline = new Pipeline().setStages(Array(categoricalAssembler,continuousAssembler,scaler,Assembler, lr))
val lrModel = lrPipeline.fit(trainingData)
//保存模型
val model_path = "hdfs://shmbdcluster/user/bdoc/35/services/hive/shmc/hm_lr_model_v2"
lrModel.write.overwrite().save(model_path)

//根据全量数据划分 开发集、测试集和验证集 分别进行评价
//val lrPredictions = lrModel.transform(testingData)
val lrPredictions = lrModel.transform(vaildingData)
lrPredictions.select("prediction", "label", "features").show(20)
//得到模型概率
val predictionProb = lrPredictions.select("probability").map(row => row.getAs[DenseVector]("probability").toArray).map(x=>x(1)).rdd
predictionProb.take(4)
//得到模型准确率
val evaluator=new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction")
val lrAccuracy=evaluator.evaluate(lrPredictions)
println("准确率为: "+lrAccuracy)
val lrError=1-lrAccuracy
println("错误率为: "+lrError)


//模型相关评估
//val pipelineModel = lrPipeline.bestModel.asInstanceOf[PipelineModel]
val LRmodel=lrModel.stages(4).asInstanceOf[LogisticRegressionModel]
println("二项逻辑回归模型系数的向量: "+LRmodel.coefficients)
println("二项逻辑回归模型的截距: "+LRmodel.intercept)
println("类的数量(标签可以使用的值): "+LRmodel.numClasses)
println("模型所接受的特征的数量: "+LRmodel.numFeatures)
//对模型的总结(summary)目前只支持二项逻辑斯蒂回归,多项式逻辑回归并不支持(用的是spark 2.2.0)
println(LRmodel.hasSummary)
val trainingSummary = LRmodel.summary
//损失函数,可以看到损失函数随着循环是逐渐变小的,损失函数越小,模型就越好
val objectiveHistory =trainingSummary.objectiveHistory
objectiveHistory.foreach(println)
//强制转换为BinaryLogisticRegressionSummary
val binarySummary= trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]
//ROC曲线下方的面积,越接近1说明模型越好
val area_ROC=binarySummary.areaUnderROC
println("ROC 曲线下的面积为: "+area_ROC)
//fMeasureByThreshold:返回一个带有beta = 1.0的两个字段(阈值,f - measure)曲线的dataframe
val fMeasure=binarySummary.fMeasureByThreshold
println("fMeasure的行数: "+fMeasure.collect().length)
fMeasure.show(100)
val maxFMeasure=fMeasure.select(functions.max("F-Measure")).head().getDouble(0)
println("最大的F-Measure的值为: "+maxFMeasure)
//最优的阈值
val bestThreashold=fMeasure.where($"F-Measure"===maxFMeasure).select("threshold").head().getDouble(0)
println("最优的阈值为:"+bestThreashold)

LRmodel.setThreshold(bestThreashold)


//对模型各个数据集合精准评估
val predictionAndLabels = lrPredictions.select($"prediction", $"label").as[(Double, Double)].cache()

// 显示 label prediction 分组后的统计
predictionAndLabels.groupBy("label", "prediction").count().show()
predictionAndLabels.show(false)

// 多分类指标
val multiclassMetrics = new MulticlassMetrics(predictionAndLabels.rdd)
println("混淆矩阵 Confusion matrix:")
val confusionMatrix = multiclassMetrics.confusionMatrix
println(confusionMatrix)
val TN = confusionMatrix.apply(0, 0)
println(s"TN(True negative: 预测为负例,实际为负例):${TN}")
val FP = confusionMatrix.apply(0, 1)
println(s"FP(False Positive: 预测为正例,实际为负例):${FP}")
val FN = confusionMatrix.apply(1, 0)
println(s"FN(False negative: 预测为负例,实际为正例):${FN}")
val TP = confusionMatrix.apply(1, 1)
println(s"TP(True Positive: 预测为正例,实际为正例):${TP}")

val Accuracy = (TP+TN)/(TP+FP+TN+FN)
println("Accuracy: " + Accuracy)
val precision = TP/(TP+FP)
println("precision: " + precision )
val recall = TP/(TP+FN)
println("recall: " + recall)
val F1_score = 2*precision*recall/(precision + recall)
println("F1_score: " + F1_score)