spark与pandas 如何构建分类模型
本章通过sklearn,鸢尾花数据,通过pandas的DataFrame与spark的DataFrame之间转化,构建spark多分类模型,并且图调参以及得到最佳参数和评价分数。具体代码流程如下:
导入相关工具包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import numpy as np
import pandas as pd
import sklearn.datasets as sd
from pyspark.context import SparkContext
from pyspark.sql.context import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator导入数据以及转换数据结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
sc = SparkContext('local','iris_logistic')
spark = SparkSession(sc)
#获取鸢尾花数据
data = sd.load_iris()
#data_x = data.data[:100,:]
#data_y = data.target[:100]
data_x = data.data
data_y = data.target
#转为DataFrame类型-pandas
data_x = pd.DataFrame(data_x)
data_x.columns = ['sw','sh','dw','dh']
print(data_x.dtypes)
data_x['label'] = data_y
#转为DataFrame类型-spark
data_x = spark.createDataFrame(data_x)
print(data_x)
#labeledPoint 只能实现二分类
#parsedData = data_x.rdd.map(lambda row:LabeledPoint(row[-1], Vectors.dense(row[:-1])))
#print(parsedData .collect())
#training = sc.parallelize(training)
train_x, test_x = data_x.randomSplit([.75,.25],38)
#通过Row 建立X和Y 多分类数据结构
row = Row( 'label','features')
train_x = train_x.rdd.map(lambda v: (row(v[-1],Vectors.dense(v[:-1])))).toDF()
test_x = test_x.rdd.map(lambda v: (row(v[-1],Vectors.dense(v[:-1])))).toDF()
print(train_x.show(2))构建模型以及输出最佳参数-评价分数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36#建立模型
lr = LogisticRegression()
#构建参数图,进行网筛选最佳参数
grid = ParamGridBuilder()\
.addGrid(lr.maxIter, [8, 18])\
.addGrid(lr.regParam, [0.1, 0.01]) \
.addGrid(lr.fitIntercept, [False, True])\
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
.build()
#建立多分类 结果估计器
evaluator = MulticlassClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
parallelism=2,numFolds=2)
#训练以及预测
cvModel = cv.fit(train_x)
pre=cvModel.transform(test_x)
print(evaluator.evaluate(pre))
#输出结果
selected = pre.select("features", "label", "probability","prediction")
for s in selected.collect():
print(s)
def getBestParam(cvModel):
params = cvModel.getEstimatorParamMaps()
avgMetrics = cvModel.avgMetrics
all_params = list(zip(params, avgMetrics))
print(all_params)
best_param = sorted(all_params, key=lambda x: x[1], reverse=True)[0][0]
return best_param
#输出最佳参数
best_param = getBestParam(cvModel)
for p, v in best_param.items():
print("{} : {}".format(p.name, v))