sparkml_151_classification_tree


spark-ml-1.5.1-分类模型

spark-ml-1.5.1 针对历史spark ml进行模块化整理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//导入相关工具包
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.feature.{StandardScalerModel, StandardScaler}
import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer}

import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.classification.DecisionTreeClassificationModel

import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

#载入数据
val raw_data1 = sc.textFile("/DOMAIN_B/XINYUN/BI/BILLING_CDR/hive_warehouse/yx_*.db/*")
val data1 = raw_data1.map(s=>Vectors.dense(s.trim.split(',').slice(1,19).map(_.toDouble))).toDF()

// label数值化
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data)

// 对类别变量数值化,即加索引
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4) // features with > 4 distinct values are treated as continuous
.fit(data)

// 划分测试集合训练集
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// 加载DecisionTree model.
val dt = new DecisionTreeClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")

// 根据以前label索引,返回预测索引
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)

// 建立通道
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))

// 训练模型
val model = pipeline.fit(trainingData)

// 预测
val predictions = model.transform(testData)

// 采样5样本输出
predictions.select("predictedLabel", "label", "features").show(5)

// 对测试集进行评分
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("precision")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))

//输出树
val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println("Learned classification tree model:\n" + treeModel.toDebugString)

spark-ml 2.* xgboost example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package ML

import ClassificationMetrics.ClassificationMetrics

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.feature.Imputer
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.log4j.Logger
import org.apache.log4j.Level
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier


object ML {

// Turn off useless log
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

def main(args: Array[String]) = {

println("Start")

// Set up Spark
val sparkConf = new SparkConf()
val spark = SparkSession.builder.master("local").appName("ML on Scala / Spark").config(sparkConf).getOrCreate()

import spark.implicits._


/******* DATA PROCESSING AND FEATURE ENGINEERING *******/

// Loading data
val census_data : DataFrame = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("/Users/az02234/Documents/Personnal_Git/Scala-Spark-ML-Example/src/main/resources/census_data.csv")

// Some description
census_data.show(5)
println(census_data.count())
census_data.printSchema()


// Set list of column
val target = "income"
val categorical_variable_string = Array("workclass", "education", "occupation", "relationship", "sex", "marital_status", "native_country")
val continuous_variable = Array("year_education", "capital_gain", "capital_loss", "hours_per_week")

// Drop useless column and rename incorrect names
val census_data_corrected: DataFrame = census_data.withColumnRenamed("education.num", "year_education").drop("fnlwgt")

// Set up binary target variable
val census_data_with_target: DataFrame = census_data_corrected.withColumn("income", when($"income"==="<=50K", 0).otherwise(1))

// Set value to class "unknown" if not known for categorical variable
val census_data_without_null: DataFrame = categorical_variable_string.foldLeft(census_data_with_target)((df: DataFrame, colname: String) => df.withColumn(colname, when(col(colname).isNull, "unknown").otherwise(col(colname))))

// Set int column to float
val census_data_recast: DataFrame = continuous_variable.foldLeft(census_data_without_null)((df, colname) => df.withColumn(colname, col(colname).cast(DoubleType)))

// Set value to median if not known for numerical variable
val imputer = new Imputer()
.setInputCols(continuous_variable)
.setOutputCols(continuous_variable.map(x => x + "_imputed"))

// String vectorization
val string_indexer_list = for (column_to_index <- categorical_variable_string) yield {
new StringIndexer().setInputCol(column_to_index).setOutputCol(column_to_index+"_indexed")
}

// One-hot encoding
val one_hot_encoder_model= new OneHotEncoderEstimator()
.setInputCols(categorical_variable_string.map(x => x +"_indexed"))
.setOutputCols(categorical_variable_string.map(x => x + "_encoded"))

// Vector assembling
val assembler = new VectorAssembler()
.setInputCols(categorical_variable_string.map(x => x+"_encoded") ++ continuous_variable.map(x => x+"_imputed"))
.setOutputCol("features")

//val transformation_array = string_indexer_list ++ Array(imputer) ++ Array(one_hot_encoder_model) ++ Array(assembler)
val transformation_array = string_indexer_list ++ Array(one_hot_encoder_model, imputer, assembler)
println(transformation_array)

val transformation_pipeline: Pipeline = new Pipeline().setStages(transformation_array)
val transformation_pipeline_model: PipelineModel = transformation_pipeline.fit(census_data_recast)

val transformed_data: DataFrame = transformation_pipeline_model.transform(census_data_recast).select("income", "features")

// Test to do here
// Some description of the transformed data
transformed_data.printSchema()
transformed_data.show(5)


/******* ML ALGORITHMS *******/

// Split in train and test sample
val Array(trainingData, testData): Array[DataFrame] = transformed_data.randomSplit(Array(0.7, 0.3))

// Set up metric class
val metricsComputer: ClassificationMetrics = new ClassificationMetrics().setColumnLabelName("income")


// Random forest
val rf = new RandomForestClassifier()
.setLabelCol("income")
.setFeaturesCol("features")
.setNumTrees(200)

// Train model. This also runs the indexers.
val rf_model = rf.fit(trainingData)

// Make predictions.
val predictions_rf = rf_model.transform(testData)

// Print metrics for random forest
val metrics_rf = metricsComputer.fit(predictions_rf)
println(metrics_rf.classificationReport())


// Gradient boosting

// Define parameters of the gradient boosting
val xgbParam = Map("booster" -> "gbtree",
"verbosity" -> 3,
"eta" -> 0.3,
"gamma" -> 0.5,
"max_depth" -> 10,
"subsample" -> 0.4,
"colsample_bytree" -> 0.5,
"colsample_bylevel" -> 1,
"colsample_bynode" -> 1,
"objective" -> "binary:logistic",
"num_round" -> 100,
"train_test_ratio " -> 0.9)

// Create gradient boosting classifier object
val xgbClassifier = new XGBoostClassifier(xgbParam)
.setFeaturesCol("features")
.setLabelCol("income")
.setMissing(0)

val XGBmodel = xgbClassifier.fit(trainingData)

val predictionsXGB = XGBmodel.transform(testData)

// Print metrics for XGB
val metricsXGB = metricsComputer.fit(predictionsXGB)
println(metricsXGB.classificationReport())

spark.stop()
println("Done")
}

}

spark 逻辑回归

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package Spark_MLlib

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.functions

case class data_schema(features:Vector,label:String)
object 二项逻辑回归__二分类 {
val spark=SparkSession.builder().master("local").getOrCreate()
import spark.implicits._ //支持把一个RDD隐式转换为一个DataFrame
def main(args: Array[String]): Unit = {
val df =spark.sparkContext.textFile("file:///home/soyo/桌面/spark编程测试数据/soyo.txt")
.map(_.split(",")).map(x=>data_schema(Vectors.dense(x(0).toDouble,x(1).toDouble,x(2).toDouble,x(3).toDouble),x(4))).toDF()
df.show(130)
df.createOrReplaceTempView("data_schema")
val df_data=spark.sql("select * from data_schema where label !='soyo2'") //这里soyo2需要加单引号,不然报错
// df_data.map(x=>x(1)+":"+x(0)).collect().foreach(println)
df_data.show()
val labelIndexer=new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df_data)
val featureIndexer=new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(df_data) //目的在特征向量中建类别索引
val Array(trainData,testData)=df_data.randomSplit(Array(0.7,0.3))
val lr=new LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10).setRegParam(0.5).setElasticNetParam(0.8)//setRegParam:正则化参数,设置elasticnet混合参数为0.8,setFamily("multinomial"):设置为多项逻辑回归,不设置setFamily为二项逻辑回归
val labelConverter=new IndexToString().setInputCol("prediction").setOutputCol("predictionLabel").setLabels(labelIndexer.labels)

val lrPipeline=new Pipeline().setStages(Array(labelIndexer,featureIndexer,lr,labelConverter))
val lrPipeline_Model=lrPipeline.fit(trainData)
val lrPrediction=lrPipeline_Model.transform(testData)
lrPrediction.show(false)
// lrPrediction.take(100).foreach(println)
//模型评估
val evaluator=new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
val lrAccuracy=evaluator.evaluate(lrPrediction)
println("准确率为: "+lrAccuracy)
val lrError=1-lrAccuracy
println("错误率为: "+lrError)
val LRmodel=lrPipeline_Model.stages(2).asInstanceOf[LogisticRegressionModel]
println("二项逻辑回归模型系数的向量: "+LRmodel.coefficients)
println("二项逻辑回归模型的截距: "+LRmodel.intercept)
println("类的数量(标签可以使用的值): "+LRmodel.numClasses)
println("模型所接受的特征的数量: "+LRmodel.numFeatures)
//对模型的总结(summary)目前只支持二项逻辑斯蒂回归,多项式逻辑回归并不支持(用的是spark 2.2.0)
println(LRmodel.hasSummary)
val trainingSummary = LRmodel.summary
//损失函数,可以看到损失函数随着循环是逐渐变小的,损失函数越小,模型就越好
val objectiveHistory =trainingSummary.objectiveHistory
objectiveHistory.foreach(println)
//强制转换为BinaryLogisticRegressionSummary
val binarySummary= trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]
//ROC曲线下方的面积,越接近1说明模型越好
val area_ROC=binarySummary.areaUnderROC
println("ROC 曲线下的面积为: "+area_ROC)
//fMeasureByThreshold:返回一个带有beta = 1.0的两个字段(阈值,f - measure)曲线的dataframe
val fMeasure=binarySummary.fMeasureByThreshold
println("fMeasure的行数: "+fMeasure.collect().length)
fMeasure.show(100)
val maxFMeasure=fMeasure.select(functions.max("F-Measure")).head().getDouble(0)
println("最大的F-Measure的值为: "+maxFMeasure)
//最优的阈值
val bestThreashold=fMeasure.where($"F-Measure"===maxFMeasure).select("threshold").head().getDouble(0)
println("最优的阈值为:"+bestThreashold)
/* 这样求的不是最优的阈值
val s=fMeasure.select(functions.max("threshold")).head().getDouble(0)
println(s)
*/
LRmodel.setThreshold(bestThreashold)

}
}

逻辑回归算法的参数说明

LogisticRegression 逻辑回归线性/分类算法,它的相关参数设置说明如下:

<1> setMaxIter():设置最大迭代次数

<2> setRegParam(): 设置正则项的参数,控制损失函数与惩罚项的比例,防止整个训练过程过拟合,默认为0

<3> setElasticNetParam():使用L1范数还是L2范数
setElasticNetParam=0.0 为L2正则化;
setElasticNetParam=1.0 为L1正则化;
setElasticNetParam=(0.0,1.0) 为L1,L2组合

<4> setFeaturesCol():指定特征列的列名,传入Array类型,默认为features

<5>setLabelCol():指定标签列的列名,传入String类型,默认为label

<6>setPredictionCol():指定预测列的列名,默认为prediction

<7>setFitIntercept(value:Boolean):是否需要偏置,默认为true(即是否需要y=wx+b中的b)

<8>setStandardization(value:Boolean):模型训练时,是否对各特征值进行标准化处理,默认为true

<9>fit:基于训练街训练出模型

<10>transform:基于训练出的模型对测试集进行预测

<11>setTol(value:Double):设置迭代的收敛公差。值越小准确性越高但是迭代成本增加。默认值为1E-6。(即损失函数)

<12>setWeightCol(value:String):设置某特征列的权重值,如果不设置或者为空,默认所有实例的权重为1。

上面与线性回归一致,还有一些特殊的:
<1> setFamily:值为”auto”,根据类的数量自动选择系列,如果numClasses=1或者numClasses=2,设置为二项式,否则设 置为多项式;
值为”binomial”,为二元逻辑回归;
值为”multinomial”,为多元逻辑回归

<2> setProbabilityCol:设置预测概率值的列名,默认为probability(即每个类别预测的概率值)

<3> setRawPredictionCol:指定原始预测列名,默认为rawPrediction

<4>setThreshold(value:Double):二元类阈值[0-1],默认为0.5,如果预测值大于0.5则为1,否则为0

<5>setThresholds(value:Array[Double]):多元分类阈值[0-1],默认为0.5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import org.apache.spark.SparkConf
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression, LogisticRegressionSummary}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.sql.{SparkSession, functions}

val resource = ClassLoader.getSystemResource("classification/data_banknote_authentication.txt")
val resourceLocation = resource.toURI.toString

val sparkConf = new SparkConf()
.setAppName("BinLogisticRegression")
.setMaster("local")

val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

// Load data, rdd
import spark.implicits._
val parsedRDD = spark.read
.textFile(resourceLocation)
.map(_.split(","))
.map(eachRow => {
val a = eachRow.map(x => x.toDouble)
// 返回4元组
(a(0), a(1), a(2), a(3), a(4))
})
val df = parsedRDD.toDF(
"f0", "f1", "f2", "f3", "label").cache()

/**
* Define a VectorAssembler transformer to transform source features data to be a vector
* This is helpful when raw input data contains non-feature columns, and it is common for
* such a input data file to contain columns such as "ID", "Date", etc.
*/
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("f0", "f1", "f2", "f3"))
.setOutputCol("features")

val dataset = vectorAssembler.transform(df)

val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")
.setRegParam(0.2)
.setElasticNetParam(0.8)
.setMaxIter(10)

//数据随机拆分成训练集和测试集
val Array(trainingData, testData) = dataset.randomSplit(Array(0.8, 0.2))

val lrModel = lr.fit(trainingData)

println("*******************模型训练的报告*******************")

println("模型当前使用的分类阈值:" + lrModel.getThreshold)
// println("模型当前使用的多层分类阈值:" + lrModel.getThresholds)
println("模型特征列:" + lrModel.getFeaturesCol)
println("模型标签列:" + lrModel.getLabelCol)

println("逻辑回归模型系数的向量: " + lrModel.coefficients)

println("逻辑回归模型的截距: " + lrModel.intercept)

println("类的数量(标签可以使用的值): " + lrModel.numClasses)

println("模型所接受的特征的数量: " + lrModel.numFeatures)

val trainingSummary = lrModel.binarySummary
//损失函数,可以看到损失函数随着循环是逐渐变小的,损失函数越小,模型就越好
println(s"总的迭代次数:${trainingSummary.totalIterations}")
println("===============损失函数每轮迭代的值================")
val objectiveHistory = trainingSummary.objectiveHistory
objectiveHistory.foreach(loss => println(loss))

//roc的值
val trainingRocSummary = trainingSummary.roc
println("roc曲线描点值行数:" + trainingRocSummary.count())
println("=====================roc曲线的值=================")
trainingRocSummary.show(false)

//ROC曲线下方的面积:auc, 越接近1说明模型越好
val trainingAUC = trainingSummary.areaUnderROC
println(s"AUC(areaUnderRoc): ${trainingAUC}")

// F1值就是precision和recall的调和均值, 越高越好
val trainingFMeasure = trainingSummary.fMeasureByThreshold
println("fMeasure的行数: " + trainingFMeasure.collect().length)

println("threshold --- F-Measure 的关系:")
trainingFMeasure.show(10)

val trainingMaxFMeasure = trainingFMeasure.select(functions.max("F-Measure"))
.head()
.getDouble(0)
println("最大的F-Measure的值为: " + trainingMaxFMeasure)

//最优的阈值
val trainingBestThreshold = trainingFMeasure.where($"F-Measure" === trainingMaxFMeasure)
.select("threshold")
.head()
.getDouble(0)
println("最优的阈值为:" + trainingBestThreshold)

// 设置模型最优阈值
lrModel.setThreshold(trainingBestThreshold)

println("模型调优后使用的分类阈值:" + lrModel.getThreshold)

println("************************************************************")

// 通过使用测试集做评估
println("**********************测试集数据获取的模型评价报告*******************")
val testSummary: LogisticRegressionSummary = lrModel.evaluate(testData)
val testBinarySummary: BinaryLogisticRegressionSummary = testSummary.asBinary

//获取预测后的数据情况
val predictionAndLabels = testSummary.predictions.select($"prediction", $"label")
.as[(Double, Double)]
.cache()

// 显示 label prediction 分组后的统计
println("测试集的数据量:" + testSummary.predictions.count())
println("label prediction 分组后的统计:")
predictionAndLabels.groupBy("label", "prediction").count().show()
predictionAndLabels.show(false)

// precision-recall 的关系
println("============precision-recall================")
val pr = testBinarySummary.pr
pr.show(false)

//roc的值
val rocSummary = testBinarySummary.roc
println("roc曲线描点值行数:" + rocSummary.count())
println("=====================roc曲线的值=================")
rocSummary.show(false)

//ROC曲线下方的面积:auc, 越接近1说明模型越好
val auc = testBinarySummary.areaUnderROC
println(s"ROC 曲线下的面积auc为: ${auc}")

val AUC = testBinarySummary.areaUnderROC
println(s"areaUnderRoc:${AUC}")

// F1值就是precision和recall的调和均值, 越高越好
val fMeasure = testBinarySummary.fMeasureByThreshold
println("fMeasure的行数: " + fMeasure.collect().length)

println("threshold --- F-Measure 的关系:")
fMeasure.show(10)

val maxFMeasure = fMeasure.select(functions.max("F-Measure"))
.head()
.getDouble(0)
println("最大的F-Measure的值为: " + maxFMeasure)

//最优的阈值
val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
.select("threshold")
.head()
.getDouble(0)
println("最优的阈值为:" + bestThreshold)

// // 设置模型最优阈值
// lrModel.setThreshold(bestThreshold)
//
// println("模型调优后使用的分类阈值:" + lrModel.getThreshold)

// 多分类指标
val multiclassMetrics = new MulticlassMetrics(predictionAndLabels.rdd)
println("混淆矩阵 Confusion matrix:")
val confusionMatrix = multiclassMetrics.confusionMatrix
println(confusionMatrix)
println(s"TN(True negative: 预测为负例,实际为负例):${confusionMatrix.apply(0, 0)}")
println(s"FP(False Positive: 预测为正例,实际为负例):${confusionMatrix.apply(0, 1)}")
println(s"FN(False negative: 预测为负例,实际为正例):${confusionMatrix.apply(1, 0)}")
println(s"TP(True Positive: 预测为正例,实际为正例):${confusionMatrix.apply(1, 1)}")

println("准确率:" + multiclassMetrics.accuracy)
spark.close()

logisticRegression example_*

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.attribute.NominalAttribute
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}

def getMinuteOfDay(depTime: String) : Int = (depTime.toInt / 100).toInt * 60 + (depTime.toInt % 100)
case class Flight(Month: String, DayofMonth: String, DayOfWeek: String, DepTime: Int, CRSDepTime: Int, ArrTime: Int, CRSArrTime: Int, UniqueCarrier: String, ActualElapsedTime: Int, CRSElapsedTime: Int, AirTime: Int, ArrDelay: Double, DepDelay: Int, Origin: String, Distance: Int)
val flight2007 = sc.textFile("/tmp/airflightsdelays/flights_2007.csv.bz2")
val header = flight2007.first
val trainingData = flight2007.filter(x => x != header).map(x => x.split(",")).filter(x => x(21) == "0").filter(x => x(17) == "ORD").filter(x => x(14) != "NA").map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt)).toDF
trainingData.cache
val flight2008 = sc.textFile("/tmp/airflightsdelays/flights_2008.csv.bz2")
val testingData = flight2008.filter(x => x != header).map(x => x.split(",")).filter(x => x(21) == "0").filter(x => x(17) == "ORD").filter(x => x(14) != "NA").map(p => Flight(p(1), p(2), p(3), getMinuteOfDay(p(4)), getMinuteOfDay(p(5)), getMinuteOfDay(p(6)), getMinuteOfDay(p(7)), p(8), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toDouble, p(15).toInt, p(16), p(18).toInt)).toDF
testingData.cache

import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler

val monthIndexer = new StringIndexer().setInputCol("Month").setOutputCol("MonthCat")
val dayofMonthIndexer = new StringIndexer().setInputCol("DayofMonth").setOutputCol("DayofMonthCat")
val dayOfWeekIndexer = new StringIndexer().setInputCol("DayOfWeek").setOutputCol("DayOfWeekCat")
val uniqueCarrierIndexer = new StringIndexer().setInputCol("UniqueCarrier").setOutputCol("UniqueCarrierCat")
val originIndexer = new StringIndexer().setInputCol("Origin").setOutputCol("OriginCat")

val assembler = new VectorAssembler().setInputCols(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "OriginCat", "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "ActualElapsedTime", "CRSElapsedTime", "AirTime","DepDelay", "Distance")).setOutputCol("rawFeatures")

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.StandardScaler

val slicer = new VectorSlicer().setInputCol("rawFeatures").setOutputCol("slicedfeatures").setNames(Array("MonthCat", "DayofMonthCat", "DayOfWeekCat", "UniqueCarrierCat", "DepTime", "ArrTime", "ActualElapsedTime", "AirTime", "DepDelay", "Distance"))
val scaler = new StandardScaler().setInputCol("slicedfeatures").setOutputCol("features").setWithStd(true).setWithMean(true)
val binarizerClassifier = new Binarizer().setInputCol("ArrDelay").setOutputCol("binaryLabel").setThreshold(15.0)
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("binaryLabel").setFeaturesCol("features")
val lrPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, slicer, scaler, binarizerClassifier, lr))
val lrModel = lrPipeline.fit(trainingData)
val lrPredictions = lrModel.transform(testingData)
lrPredictions.select("prediction", "binaryLabel", "features").show(20)

import org.apache.spark.ml.feature.Bucketizer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.feature.PCA

val indexer = new VectorIndexer().setInputCol("rawFeatures").setOutputCol("rawFeaturesIndexed").setMaxCategories(10)
val pca = new PCA().setInputCol("rawFeaturesIndexed").setOutputCol("features").setK(10)
val bucketizer = new Bucketizer().setInputCol("ArrDelay").setOutputCol("multiClassLabel").setSplits(Array(Double.NegativeInfinity, 0.0, 15.0, Double.PositiveInfinity))
val dt = new DecisionTreeClassifier().setLabelCol("multiClassLabel").setFeaturesCol("features")
val dtPipeline = new Pipeline().setStages(Array(monthIndexer, dayofMonthIndexer, dayOfWeekIndexer, uniqueCarrierIndexer, originIndexer, assembler, indexer, pca, bucketizer, dt))
val dtModel = dtPipeline.fit(trainingData)
val dtPredictions = dtModel.transform(testingData)
dtPredictions.select("prediction", "multiClassLabel", "features").show(20)