SparkMLlib學習之線性回歸
(一)回歸的概念
1,回歸與分類的區別
分類模型處理表示類別的離散變量,而回歸模型則處理可以取任意實數的目標變量。但是二者基本的原則類似,都是通過確定一個模型,將輸入特征映射到預測的輸出。回歸模型和分類模型都是監督學習的一種形式。
2.回歸分類
線性回歸模型:本質上和對應的線性分類模型一樣,唯一的區別是線性回歸使用的損失函數、相關連接函數和決策函數不同。MLlib提供了標准的最小二乘回歸模型在MLlib中,標准的最小二乘回歸不使用正則化。但是應用到錯誤預測值的損失函數會將錯誤做平方,從而放大損失。這也意味着最小平方回歸對數據中的異常點和過擬合非常敏感。因此對於分類器,我們通常在實際中必須應用一定程度的正則化。正則化分為:應用L2正則化時通常稱為嶺回歸(ridge regression),應用L1正則化是稱為LASSO(Least Absolute Shrinkage and Selection Operator)。
決策樹模型:決策樹同樣可以通過改變不純度的度量方法用於回歸分析
(二)SparkMLlib線性回歸的應用
1,數據集的選擇
http://archive.ics.uci.edu/ml/datasets/Bike+Sharing+Dataset。
2.數據集的描述
此數據是根據一系列的特征預測每小時自行車租車次數,特征類型如下:
3,數據處理及構建模型
數據集中共有17 379個小時的記錄。接下來的實驗,我們會忽略記錄中的 instant和 dteday 。忽略兩個記錄次數的變量 casual 和 registered ,只保留 cnt ( casual 和registered 的和)。最后就剩下12個變量,其中前8個是類型變量,后4個是歸一化后的實數變量。對其中8個類型變量,我們使用之前提到的二元編碼,剩下4個實數變量不做處理。另外一種二元變量化方法:http://blog.csdn.net/u010824591/article/details/50374904
import org.apache.log4j.{Level, Logger} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionWithSGD} import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Created by Damon on 17-5-22. */ object Regression { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) val conf =new SparkConf().setAppName("regression").setMaster("local[4]") val sc =new SparkContext(conf) //文件名 val file_bike="hour_nohead.csv" //調用二元向量化方法 val labeled_file=labeledFile(file_bike,sc) /*/*對目標值取對數*/ val labeled_file1=labeled_file.map(point => LabeledPoint(math.log(point.label),point.features)) */ //構建線性回歸模型,注該方法在:spark2.1.0已經拋棄了。。。。 val model_liner=LinearRegressionWithSGD.train(labeled_file,10,0.1) //val categoricalFeaturesInfo = Map[Int,Int]() //val model_DT=DecisionTree.trainRegressor(labeled_file,categoricalFeaturesInfo,"variance",5,32) val predict_vs_train=labeled_file.map{ point => (model_liner.predict(point.features),point.label) //對目標取對數后的,預測方法 /* point => (math.exp(model_liner.predict(point.features)),math.exp(point.label))*/ } predict_vs_train.take(5).foreach(println(_)) /* (135.94648455498356,16.0) (134.38058174607252,40.0) (134.1840793861374,32.0) (133.88699144084515,13.0) (133.77899037657548,1.0) */ def labeledFile(originFile:String,sc:SparkContext):RDD[LabeledPoint]={ val file_load=sc.textFile(originFile) val file_split=file_load.map(_.split(",")) /*構建映射類函數的方法:mapping*/ def mapping(rdd:RDD[Array[String]],index:Int)= rdd.map(x => x(index)).distinct.zipWithIndex().collect.toMap /*存儲每列映射方法mapping的maps集合*/ var maps:Map[Int,Map[String,Long]] = Map() /* 生成maps*/ for(i <- 2 until 10) maps += (i -> mapping(file_split,i)) /*max_size表示每列的特征之和*/ val max_size=maps.map(x =>x._2.size).sum val file_label=file_split.map{ x => var num:Int=0 var size:Int=0 /*構建長度為max_size+4的特征數組,初始值全為0*/ val arrayOfDim=Array.ofDim[Double](max_size+4) for(j<-2 until 10) { num = maps(j)(x(j)).toInt if(j==2) size=0 else size += maps(j-1).size /*為特征賦值*/ arrayOfDim(size+num)=1.0 } /*添加后面4列歸一化的特征*/ for(j<-10 until 14) arrayOfDim(max_size+(j-10))=x(j).toDouble /*生成LabeledPoint類型*/ LabeledPoint(x(14).toDouble+x(15).toDouble,Vectors.dense(arrayOfDim)) } file_label } }
4,模型性能評價
(1) MSE是均方誤差,是用作最小二乘回歸的損失函數,表示所有樣本預測值和實際值平方差的平均值。公式如下:
(2)RMSE是MSE的平方根
(3)平均絕對誤差(MAE):預測值與實際值的絕對值差的平均值
(4) 均方根對數誤差(RMSLE):預測值和目標值進行對數變換后的RMSE.
代碼如下:
/*MSE是均方誤差*/ val mse=predict_vs_train.map(x => math.pow(x._1-x._2,2)).mean() /* 平均絕對誤差(MAE)*/ val mae=predict_vs_train.map(x => math.abs(x._1-x._2)).mean() /*均方根對數誤差(RMSLE)*/ val rmsle=math.sqrt(predict_vs_train.map(x => math.pow(math.log(x._1+1)-math.log(x._2+1),2)).mean()) println(s"mse is $mse and mae is $mae and rmsle is $rmsle") /* mse is 29897.34020145107 and mae is 130.53255991178477 and rmsle is 1.4803867063174845 */
(三) 改進模型性能和參數調優
1,變換目標變量
許多機器學習模型都會假設輸入數據和目標變量的分布,比如線性模型的假設為正態分布,這里就將目標值取對數(還可以去sqrt處理)(將上文注釋去掉)實現正態分布,結果如為:mse is 47024.572159822106 and mae is 149.28861881845546 and rmsle is 1.4525632598540426
將上述結果和原始數據訓練的模型性能比較,可以看到我們提升了RMSLE的性能,但是卻沒有提升MSE和MAE的性能。
2.交叉驗證
1,創建訓練集和測試集來評估參數
2,調節參數來判斷對線性模型的影響
迭代次數及步長的影響:
//划分訓練集和測試集 val labeled_orign = labeled_file.randomSplit(Array(0.8, 0.2), 11L) val train_file = labeled_orign(0) val test_file = labeled_orign(1) /*調節迭代次數*/ val Iter_Results = Seq(1, 5, 10, 20, 50, 100).map { param => val model = LinearRegressionWithSGD.train(test_file, param, 0.01) val scoreAndLabels = test_file.map { point => (model.predict(point.features), point.label) } val rmsle = math.sqrt(scoreAndLabels.map(x => math.pow(math.log(x._1) - math.log(x._2), 2)).mean) (s"$param lambda", rmsle) } /*迭代次數的結果輸出*/ Iter_Results.foreach { case (param, rmsl) => println(f"$param, rmsle = ${rmsl}")} /*調節步長數的大小*/ val Step_Results = Seq(0.01, 0.025, 0.05, 0.1, 1.0).map { param => val model = LinearRegressionWithSGD.train(test_file, 20, param) val scoreAndLabels = test_file.map { point => (model.predict(point.features), point.label) } val rmsle = math.sqrt(scoreAndLabels.map(x => math.pow(math.log(x._1) - math.log(x._2), 2)).mean) (s"$param lambda", rmsle) } /*步長的結果輸出*/ Step_Results.foreach { case (param, rmsl) => println(f"$param, rmsle = ${rmsl}")} /*results 1 lambda, rmsle = 2.9033629718241167 5 lambda, rmsle = 2.0102924520366092 10 lambda, rmsle = 1.7548482896314488 20 lambda, rmsle = 1.5785106813100764 50 lambda, rmsle = 1.461748782192306 100 lambda, rmsle = 1.4462810196387068 步長 0.01 lambda, rmsle = 1.5785106813100764 0.025 lambda, rmsle = 1.4478358250917658 0.05 lambda, rmsle = 1.5152549319928832 0.1 lambda, rmsle = 1.5687431700715837 1.0 lambda, rmsle = NaN */
結果表明,隨着迭代次數的增加,誤差確實有所下降(即性能提高),並且下降速率和預期一樣越來越小。可以看出為什么不使用默認步長來訓練線性模型。其中默認步長為1.0,得到的RMSLE結果為 nan 。這說明SGD模型收斂到了最差的局部最優解。這種情況在步長較大的時候容易出現,原因是算法收斂太快而不能得到最優解。另外,小步長與相對較小的迭代次數(比如上面的10次)對應的訓練模型性能一般較差。而較小的步長與較大的迭代次數下通常可以收斂得到較好的解。通常來講,步長和迭代次數的設定需要權衡。較小的步長意味着收斂速度慢,需要較大的迭代次數。但是較大的迭代次數更加耗時,特別是在大數據集上。
還可以調節L1正則化和L2正則化參數。 MLlib目前支持兩種正則化方法L1和L2。 L2正則化假設模型參數服從高斯分布,L2正則化函數比L1更光滑,所以更容易計算;L1假設模型參數服從拉普拉斯分布,L1正則化具備產生稀疏解的功能,從而具備Feature Selection的能力。(由於spark 2.1.0中的線性回歸方法已經忽略了,就沒去驗證L1和L2對模型的影響)