spark与pandas 如何构建分类模型


spark与pandas 如何构建分类模型

本章通过sklearn,鸢尾花数据,通过pandas的DataFrame与spark的DataFrame之间转化,构建spark多分类模型,并且图调参以及得到最佳参数和评价分数。具体代码流程如下:

  1. 导入相关工具包
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import numpy as np
    import pandas as pd

    import sklearn.datasets as sd


    from pyspark.context import SparkContext
    from pyspark.sql.context import SparkSession

    from pyspark.sql import Row
    from pyspark.ml.linalg import Vectors
    from pyspark.mllib.regression import LabeledPoint

    from pyspark.ml.tuning import ParamGridBuilder,CrossValidator

    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel

    from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
  2. 导入数据以及转换数据结果
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31

    sc = SparkContext('local','iris_logistic')
    spark = SparkSession(sc)

    #获取鸢尾花数据
    data = sd.load_iris()
    #data_x = data.data[:100,:]
    #data_y = data.target[:100]
    data_x = data.data
    data_y = data.target
    #转为DataFrame类型-pandas
    data_x = pd.DataFrame(data_x)
    data_x.columns = ['sw','sh','dw','dh']
    print(data_x.dtypes)
    data_x['label'] = data_y
    #转为DataFrame类型-spark
    data_x = spark.createDataFrame(data_x)
    print(data_x)

    #labeledPoint 只能实现二分类
    #parsedData = data_x.rdd.map(lambda row:LabeledPoint(row[-1], Vectors.dense(row[:-1])))
    #print(parsedData .collect())
    #training = sc.parallelize(training)

    train_x, test_x = data_x.randomSplit([.75,.25],38)
    #通过Row 建立X和Y 多分类数据结构
    row = Row( 'label','features')
    train_x = train_x.rdd.map(lambda v: (row(v[-1],Vectors.dense(v[:-1])))).toDF()
    test_x = test_x.rdd.map(lambda v: (row(v[-1],Vectors.dense(v[:-1])))).toDF()

    print(train_x.show(2))
  3. 构建模型以及输出最佳参数-评价分数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    #建立模型
    lr = LogisticRegression()
    #构建参数图,进行网筛选最佳参数
    grid = ParamGridBuilder()\
    .addGrid(lr.maxIter, [8, 18])\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()
    #建立多分类 结果估计器
    evaluator = MulticlassClassificationEvaluator()
    cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
    parallelism=2,numFolds=2)
    #训练以及预测
    cvModel = cv.fit(train_x)

    pre=cvModel.transform(test_x)

    print(evaluator.evaluate(pre))
    #输出结果
    selected = pre.select("features", "label", "probability","prediction")
    for s in selected.collect():
    print(s)

    def getBestParam(cvModel):
    params = cvModel.getEstimatorParamMaps()
    avgMetrics = cvModel.avgMetrics
    all_params = list(zip(params, avgMetrics))
    print(all_params)
    best_param = sorted(all_params, key=lambda x: x[1], reverse=True)[0][0]
    return best_param

    #输出最佳参数
    best_param = getBestParam(cvModel)
    for p, v in best_param.items():
    print("{} : {}".format(p.name, v))