算法說明
線性回歸是利用稱為線性回歸方程的函數對一個或多個自變量和因變量之間關系進行建模的一種回歸分析方法,只有一個自變量的情況稱為簡單回歸,大於一個自變量情況的叫做多元回歸,在實際情況中大多數都是多元回歸。
線性回歸(Linear Regression)問題屬於監督學習(Supervised Learning)范疇,又稱分類(Classification)或歸納學習(Inductive Learning)。這類分析中訓練數據集中給出的數據類型是確定的。機器學習的目標是,對於給定的一個訓練數據集,通過不斷的分析和學習產生一個聯系屬性集合和類標集合的分類函數(Classification Function)或預測函數)Prediction Function),這個函數稱為分類模型(Classification Model——或預測模型(Prediction Model)。通過學習得到的模型可以是一個決策樹、規格集、貝葉斯模型或一個超平面。通過這個模型可以對輸入對象的特征向量預測或對對象的類標進行分類。
回歸問題中通常使用最小二乘(Least Squares)法來迭代最優的特征中每個屬性的比重,通過損失函數(Loss Function)或錯誤函數(Error Function)定義來設置收斂狀態,即作為梯度下降算法的逼近參數因子。
實例介紹
該例子給出了如何導入訓練集數據,將其解析為帶標簽點的RDD,然后使用了LinearRegressionWithSGD 算法來建立一個簡單的線性模型來預測標簽的值,最后計算了均方差來評估預測值與實際值的吻合度。
線性回歸分析的整個過程可以簡單描述為如下三個步驟:
(1)尋找合適的預測函數,即上文中的 h(x) ,用來預測輸入數據的判斷結果。這個過程是非常關鍵的,需要對數據有一定的了解或分析,知道或者猜測預測函數的“大概”形式,比如是線性函數還是非線性函數,若是非線性的則無法用線性回歸來得出高質量的結果。
(2)構造一個Loss函數(損失函數),該函數表示預測的輸出(h)與訓練數據標簽之間的偏差,可以是二者之間的差(h-y)或者是其他的形式(如平方差開方)。綜合考慮所有訓練數據的“損失”,將Loss求和或者求平均,記為 J(θ) 函數,表示所有訓練數據預測值與實際類別的偏差。
(3)顯然, J(θ) 函數的值越小表示預測函數越准確(即h函數越准確),所以這一步需要做的是找到 J(θ) 函數的最小值。找函數的最小值有不同的方法,Spark中采用的是梯度下降法(stochastic gradient descent,SGD)。
程序代碼
import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors object LinearRegression { def main(args:Array[String]): Unit ={ // 屏蔽不必要的日志顯示終端上 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) // 設置運行環境 val conf = new SparkConf().setAppName("Kmeans").setMaster("local[4]") val sc = new SparkContext(conf) // Load and parse the data val data = sc.textFile("/home/hadoop/upload/class8/lpsa.data") val parsedData = data.map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) } // Building the model val numIterations = 100 val model = LinearRegressionWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training error val valuesAndPreds = parsedData.map { point => val prediction = model.predict(point.features) (point.label, prediction) } val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce (_ + _) / valuesAndPreds.count println("training Mean Squared Error = " + MSE) sc.stop() } }
執行情況
第一步 啟動Spark集群
$cd /app/hadoop/spark-1.1.0 $sbin/start-all.sh
第二步 在IDEA中設置運行環境
在IDEA運行配置中設置LinearRegression運行配置,由於讀入的數據已經在程序中指定,故在該設置界面中不需要設置輸入參數
第三步 執行並觀察輸出