SparkMLlib學習之線性回歸


SparkMLlib學習之線性回歸

(一)回歸的概念

  1,回歸與分類的區別

   分類模型處理表示類別的離散變量,而回歸模型則處理可以取任意實數的目標變量。但是二者基本的原則類似,都是通過確定一個模型,將輸入特征映射到預測的輸出。回歸模型和分類模型都是監督學習的一種形式。

  2.回歸分類

   線性回歸模型:本質上和對應的線性分類模型一樣,唯一的區別是線性回歸使用的損失函數、相關連接函數和決策函數不同。MLlib提供了標准的最小二乘回歸模型在MLlib中,標准的最小二乘回歸不使用正則化。但是應用到錯誤預測值的損失函數會將錯誤做平方,從而放大損失。這也意味着最小平方回歸對數據中的異常點和過擬合非常敏感。因此對於分類器,我們通常在實際中必須應用一定程度的正則化。正則化分為:應用L2正則化時通常稱為嶺回歸(ridge regression),應用L1正則化是稱為LASSO(Least Absolute Shrinkage and Selection Operator)。

   決策樹模型:決策樹同樣可以通過改變不純度的度量方法用於回歸分析

(二)SparkMLlib線性回歸的應用

  1,數據集的選擇

    http://archive.ics.uci.edu/ml/datasets/Bike+Sharing+Dataset。

  2.數據集的描述

    此數據是根據一系列的特征預測每小時自行車租車次數,特征類型如下:

  3,數據處理及構建模型

    數據集中共有17 379個小時的記錄。接下來的實驗,我們會忽略記錄中的 instant和 dteday 。忽略兩個記錄次數的變量 casual 和 registered ,只保留 cnt ( casual 和registered 的和)。最后就剩下12個變量,其中前8個是類型變量,后4個是歸一化后的實數變量。對其中8個類型變量,我們使用之前提到的二元編碼,剩下4個實數變量不做處理。另外一種二元變量化方法:http://blog.csdn.net/u010824591/article/details/50374904

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionWithSGD}
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Damon on 17-5-22.
  */
object Regression {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val conf =new SparkConf().setAppName("regression").setMaster("local[4]")
    val sc =new SparkContext(conf)
    //文件名
    val file_bike="hour_nohead.csv"
    //調用二元向量化方法
    val labeled_file=labeledFile(file_bike,sc)
    /*/*對目標值取對數*/
    val labeled_file1=labeled_file.map(point => LabeledPoint(math.log(point.label),point.features))
    */
    //構建線性回歸模型,注該方法在:spark2.1.0已經拋棄了。。。。
    val model_liner=LinearRegressionWithSGD.train(labeled_file,10,0.1)
    //val categoricalFeaturesInfo = Map[Int,Int]()
    //val model_DT=DecisionTree.trainRegressor(labeled_file,categoricalFeaturesInfo,"variance",5,32)
    val predict_vs_train=labeled_file.map{
      point => (model_liner.predict(point.features),point.label)
//對目標取對數后的,預測方法
     /* point => (math.exp(model_liner.predict(point.features)),math.exp(point.label))*/
    }
    predict_vs_train.take(5).foreach(println(_))
   /*
(135.94648455498356,16.0)
(134.38058174607252,40.0)
(134.1840793861374,32.0)
(133.88699144084515,13.0)
(133.77899037657548,1.0)
   */
  def labeledFile(originFile:String,sc:SparkContext):RDD[LabeledPoint]={
    val file_load=sc.textFile(originFile)
    val file_split=file_load.map(_.split(","))
    /*構建映射類函數的方法:mapping*/
    def mapping(rdd:RDD[Array[String]],index:Int)=
      rdd.map(x => x(index)).distinct.zipWithIndex().collect.toMap
    /*存儲每列映射方法mapping的maps集合*/
    var maps:Map[Int,Map[String,Long]] = Map()
    /* 生成maps*/
    for(i <- 2 until 10)
      maps += (i -> mapping(file_split,i))
    /*max_size表示每列的特征之和*/
    val max_size=maps.map(x =>x._2.size).sum
    val file_label=file_split.map{
      x =>
        var num:Int=0
        var size:Int=0
        /*構建長度為max_size+4的特征數組,初始值全為0*/
        val arrayOfDim=Array.ofDim[Double](max_size+4)
        for(j<-2 until 10) {
          num = maps(j)(x(j)).toInt
          if(j==2) size=0 else size += maps(j-1).size
          /*為特征賦值*/
          arrayOfDim(size+num)=1.0
        }
        /*添加后面4列歸一化的特征*/
        for(j<-10 until 14)
        arrayOfDim(max_size+(j-10))=x(j).toDouble
        /*生成LabeledPoint類型*/
        LabeledPoint(x(14).toDouble+x(15).toDouble,Vectors.dense(arrayOfDim))
    }
    file_label
  }
}

  4,模型性能評價

    (1) MSE是均方誤差,是用作最小二乘回歸的損失函數,表示所有樣本預測值和實際值平方差的平均值。公式如下: 

                                 

    (2)RMSE是MSE的平方根    

    (3)平均絕對誤差(MAE):預測值與實際值的絕對值差的平均值 

            

    (4) 均方根對數誤差(RMSLE):預測值和目標值進行對數變換后的RMSE. 

代碼如下:

/*MSE是均方誤差*/
    val mse=predict_vs_train.map(x => math.pow(x._1-x._2,2)).mean()
   /* 平均絕對誤差(MAE)*/
    val mae=predict_vs_train.map(x => math.abs(x._1-x._2)).mean()
    /*均方根對數誤差(RMSLE)*/
    val rmsle=math.sqrt(predict_vs_train.map(x => math.pow(math.log(x._1+1)-math.log(x._2+1),2)).mean())
    println(s"mse is $mse and mae is $mae and rmsle is $rmsle")
/*
mse is 29897.34020145107 and mae is 130.53255991178477 and rmsle is 1.4803867063174845
*/

(三) 改進模型性能和參數調優

  1,變換目標變量  

   許多機器學習模型都會假設輸入數據和目標變量的分布,比如線性模型的假設為正態分布,這里就將目標值取對數(還可以去sqrt處理)(將上文注釋去掉)實現正態分布,結果如為:mse is 47024.572159822106 and mae is 149.28861881845546 and rmsle is 1.4525632598540426

  將上述結果和原始數據訓練的模型性能比較,可以看到我們提升了RMSLE的性能,但是卻沒有提升MSE和MAE的性能。

  2.交叉驗證

   1,創建訓練集和測試集來評估參數

   2,調節參數來判斷對線性模型的影響

迭代次數及步長的影響:

//划分訓練集和測試集
    val labeled_orign = labeled_file.randomSplit(Array(0.8, 0.2), 11L)
    val train_file = labeled_orign(0)
    val test_file = labeled_orign(1)
    /*調節迭代次數*/
    val Iter_Results = Seq(1, 5, 10, 20, 50, 100).map { param =>
      val model = LinearRegressionWithSGD.train(test_file, param, 0.01)
      val scoreAndLabels = test_file.map { point =>
        (model.predict(point.features), point.label)
      }
      val rmsle = math.sqrt(scoreAndLabels.map(x => math.pow(math.log(x._1) - math.log(x._2), 2)).mean)
      (s"$param lambda", rmsle)
    }
    /*迭代次數的結果輸出*/
    Iter_Results.foreach { case (param, rmsl) => println(f"$param, rmsle = ${rmsl}")}
    /*調節步長數的大小*/
    val Step_Results = Seq(0.01, 0.025, 0.05, 0.1, 1.0).map { param =>
        val model = LinearRegressionWithSGD.train(test_file, 20, param)
        val scoreAndLabels = test_file.map { point =>
          (model.predict(point.features), point.label)
        }
        val rmsle = math.sqrt(scoreAndLabels.map(x => math.pow(math.log(x._1) - math.log(x._2), 2)).mean)
        (s"$param lambda", rmsle)
      }
    /*步長的結果輸出*/
    Step_Results.foreach { case (param, rmsl) => println(f"$param, rmsle = ${rmsl}")}
/*results
1 lambda, rmsle = 2.9033629718241167
5 lambda, rmsle = 2.0102924520366092
10 lambda, rmsle = 1.7548482896314488
20 lambda, rmsle = 1.5785106813100764
50 lambda, rmsle = 1.461748782192306
100 lambda, rmsle = 1.4462810196387068
步長
0.01 lambda, rmsle = 1.5785106813100764
0.025 lambda, rmsle = 1.4478358250917658
0.05 lambda, rmsle = 1.5152549319928832
0.1 lambda, rmsle = 1.5687431700715837
1.0 lambda, rmsle = NaN
*/

  結果表明,隨着迭代次數的增加,誤差確實有所下降(即性能提高),並且下降速率和預期一樣越來越小。可以看出為什么不使用默認步長來訓練線性模型。其中默認步長為1.0,得到的RMSLE結果為 nan 。這說明SGD模型收斂到了最差的局部最優解。這種情況在步長較大的時候容易出現,原因是算法收斂太快而不能得到最優解。另外,小步長與相對較小的迭代次數(比如上面的10次)對應的訓練模型性能一般較差。而較小的步長與較大的迭代次數下通常可以收斂得到較好的解。通常來講,步長和迭代次數的設定需要權衡。較小的步長意味着收斂速度慢,需要較大的迭代次數。但是較大的迭代次數更加耗時,特別是在大數據集上。

  還可以調節L1正則化和L2正則化參數。 MLlib目前支持兩種正則化方法L1和L2。 L2正則化假設模型參數服從高斯分布,L2正則化函數比L1更光滑,所以更容易計算;L1假設模型參數服從拉普拉斯分布,L1正則化具備產生稀疏解的功能,從而具備Feature Selection的能力。(由於spark 2.1.0中的線性回歸方法已經忽略了,就沒去驗證L1和L2對模型的影響)

 

 

 

    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM