一.算法簡介
線性回歸(Linear Regression)是利用稱為線性回歸方程的最小平方函數對一個或多個自變量和因變量之間關系進行建模的一種回歸分析。這種函數是一個或多個稱為回歸系數的模型參數的線性組合。只有一個自變量的情況稱為一元線性回歸,大於一個自變量情況的叫做多元線性回歸。對於一元線性回歸,其線性方程為:h(x) = a1x1 + b。對於多元線性方程,在上述方程中增加一個自變量,得到其線性方程為:h(x) = a1x1 + a2x2 + b。因此,不論是一元線性方程還是多元線性方程,可統一寫成如下格式:h(x) = aTx。若上式中x0=1,而求線性方程則演變為求方程的參數aT。對於參數a的求解,需要檢測評估a是否為最優,所以說需要對h函數進行評估。一般這個函數被稱為損失函數(loss function),用來描述h函數好壞的程度。
損失函數為:
損失函數是x(i)的估計值與真實值之差的平方和,其中1/2系數是為了在求導的時候使得系數為1。如何調整a以使得取得最小值有很多方法,其中有最小二乘法(min square)和梯度下降法等。
二.最小二乘法
將訓練特征表示為X矩陣,結果表示為y向量,仍然是線性回歸方程,誤差函數不變。那么a可以直接由如下公式得出:a = (XTX)-1XT-y,但此方法要求X是列滿秩的,而且求矩陣的逆比較慢。
備注:
1.線性獨立【線性無關】:指一組向量中任意一個向量都不能有其它一個或幾個向量線性表示。
2.上述公式中紅色部分表示列秩。
3.一個矩陣的列秩是矩陣的線性無關的列向量的極大數目,行秩與其類似。矩陣的列秩和行秩總是相等的,可以稱為矩陣的秩。
三.梯度下降算法
1.批量梯度下降
初始時aT可設置為0,然后迭代使用公式計算aT中的每個參數,直至收斂為止。由於每次迭代計算aT時,都使用了整個樣本集,因此我們稱該梯度下降算法為批量梯度下降算法【batch gradient descent】。
2.隨機梯度下降
當樣本集數據量很大時,批量梯度下降算法沒迭代一次的復雜度為O(mn),復雜度很高。因此,為了減小復雜度,當m很大時,一般會使用隨機梯度下降算法【stochastic gradient descent】,算法如下:
即每讀取一條樣本,就迭代對aT進行更新。然后判斷是否收斂,若沒有收斂則繼續讀取樣本進行計算,若樣本讀取完還未收斂,則重新開始讀取樣本。這樣迭代一次的復雜度為O(n)。對於大數據集,很可能只需要讀取一小部分數據函數就收斂了。所以當數據量很大時,更傾向於選擇隨機梯度下降算法。
不過,相比較批量梯度下降算法,隨機梯度下降算法使得函數趨近於最小值的速度更快,但是有可能會在最小值周圍震盪,造成永遠無法收斂。但是在實踐中,大部分值都能夠接近最小值,效果也都還不錯。為了減小震盪,可以設置當變化小於某個閾值時認定為收斂。
四.源碼分析
MLlib的線性回歸模型采用隨機梯度下降算法來優化目標函數。MLlib實現了分布式的隨機梯度下降算法。其分布方法為:在每次迭代中,隨機抽取一定比例的樣本作為當前迭代的計算樣本;對計算樣本中的每一個樣本分布計算梯度【分布式計算】;然后再通過聚合函數對樣本的梯度進行累計,得到該樣本的平均梯度損失;最后根據最新的梯度及上次迭代的權重進行權重的更新。線性回歸模型沒有使用正則化方法。
MLlib線性回歸源碼執行流程及相關概念:
線性回歸伴生對象四LinearRegressionWithSGD,該對象是建立線性回歸模型的入口,該對象主要定義訓練線性回歸模型的train方法。train方法可通過設置訓練參數進行模型訓練,其參數主要包括:
1.input:訓練樣本,格式為RDD[LabeledPoint],其中LabelPoint的格式為(label,features)
2.numIterations:迭代次數,默認為100
3.stepSize:每次迭代步長,默認為1
4.miniBatchFraction:每次迭代參與計算的樣本比例,默認為1,表示全部樣本參與計算
5.initialWeights:初始化權重
線性回歸類是LinearRegressionWithSGD,該類是基於隨機梯度下降法的線性回歸模型,該類繼承了GeneralizedLinearAlgorithm廣義回歸類。該類主要初始化梯度下降的方法、梯度更新方法、優化方式等,其中線性回歸的梯度計算的損失函數不采用正則化;然后根據初始化的方法調用繼承GeneralizedLinearAlgorithm的run方法開始訓練模型。
五.代碼實現
1 import org.apache.spark.sql.SparkSession 2 import org.apache.spark.sql.DataFrame 3 import org.apache.spark.ml.feature.VectorAssembler 4 import org.apache.spark.ml.regression.LinearRegression 5 /** 6 * Created by zhen on 2018/3/10. 7 */ 8 object LinearRegression { 9 def main(args: Array[String]) { 10 //設置環境 11 val spark = SparkSession.builder ().appName ("LinearRegressionTest").master ("local[2]").getOrCreate() 12 val sc = spark.sparkContext 13 val sqlContext = spark.sqlContext 14 //准備訓練集合 15 val raw_data = sc.textFile("src/sparkMLlib/man.txt") 16 val map_data = raw_data.map{x=> 17 val mid = x.replaceAll(","," ,") 18 val split_list = mid.substring(0,mid.length-1).split(",") 19 for(x <- 0 until split_list.length){ 20 if(split_list(x).trim.equals("")) split_list(x) = "0.0" else split_list(x) = split_list(x).trim 21 } 22 ( split_list(1).toDouble,split_list(2).toDouble,split_list(3).toDouble,split_list(4).toDouble, 23 split_list(5).toDouble,split_list(6).toDouble,split_list(7).toDouble,split_list(8).toDouble, 24 split_list(9).toDouble,split_list(10).toDouble,split_list(11).toDouble) 25 } 26 val mid = map_data.sample(false,0.6,0)//隨機取樣,訓練模型 27 val df = sqlContext.createDataFrame(mid) 28 val colArray = Array("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11") 29 val data = df.toDF("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11") 30 val assembler = new VectorAssembler().setInputCols(colArray).setOutputCol("features") 31 val vecDF = assembler.transform(data) 32 //准備預測集合 33 val map_data_for_predict = map_data 34 val df_for_predict = sqlContext.createDataFrame(map_data_for_predict) 35 val data_for_predict = df_for_predict.toDF("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11") 36 val colArray_for_predict = Array("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11") 37 val assembler_for_predict = new VectorAssembler().setInputCols(colArray_for_predict).setOutputCol("features") 38 val vecDF_for_predict: DataFrame = assembler_for_predict.transform(data_for_predict) 39 // 建立模型,進行預測 40 // 設置線性回歸參數 41 val lr1 = new LinearRegression() 42 val lr2 = lr1.setFeaturesCol("features").setLabelCol("c5").setFitIntercept(true) 43 // RegParam:正則化 44 val lr3 = lr2.setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8) 45 // 將訓練集合代入模型進行訓練 46 val lrModel = lr3.fit(vecDF) 47 // 輸出模型全部參數 48 lrModel.extractParamMap() 49 //coefficients 系數 intercept 截距 50 println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}") 51 // 模型進行評價 52 val trainingSummary = lrModel.summary 53 trainingSummary.residuals.show() 54 println(s"均方根差: ${trainingSummary.rootMeanSquaredError}")//RMSE:均方根差 55 println(s"判定系數: ${trainingSummary.r2}")//r2:判定系數,也稱為擬合優度,越接近1越好 56 val predictions = lrModel.transform(vecDF_for_predict) 57 val predict_result = predictions.selectExpr("features","c5", "round(prediction,1) as prediction") 58 predict_result.rdd.saveAsTextFile("src/sparkMLlib/manResult") 59 sc.stop() 60 } 61 }
六.執行結果
性能評估:
結果: