Apache Spark源碼走讀之22 -- 淺談mllib中線性回歸的算法實現


歡迎轉載,轉載請注明出處,徽滬一郎。

概要

本文簡要描述線性回歸算法在Spark MLLib中的具體實現,涉及線性回歸算法本身及線性回歸並行處理的理論基礎,然后對代碼實現部分進行走讀。

線性回歸模型

機器學習算法是的主要目的是找到最能夠對數據做出合理解釋的模型,這個模型是假設函數,一步步的推導基本遵循這樣的思路

  1. 假設函數
  2. 為了找到最好的假設函數,需要找到合理的評估標准,一般來說使用損失函數來做為評估標准
  3. 根據損失函數推出目標函數
  4. 現在問題轉換成為如何找到目標函數的最優解,也就是目標函數的最優化

具體到線性回歸來說,上述就轉換為

梯度下降法

那么如何求得損失函數的最優解,針對最小二乘法來說可以使用梯度下降法。

算法實現

隨機梯度下降

正則化

 如何解決這些問題呢?可以采用收縮方法(shrinkage method),收縮方法又稱為正則化(regularization)。 主要是嶺回歸(ridge regression)和lasso回歸。通過對最小二乘估計加 入罰約束,使某些系數的估計為0。

線性回歸的代碼實現

上面講述了一些數學基礎,在將這些數學理論用代碼來實現的時候,最主要的是把握住相應的假設函數和最優化算法是什么,有沒有相應的正則化規則。

對於線性回歸,這些都已經明確,分別為

  1. Y = A*X + B 假設函數
  2. 隨機梯度下降法
  3. 嶺回歸或Lasso法,或什么都沒有

那么Spark mllib針對線性回歸的代碼實現也是依據該步驟來組織的代碼,其類圖如下所示

函數調用路徑

train->run,run函數的處理邏輯

  1. 利用最優化算法來求得最優解,optimizer.optimize
  2. 根據最優解創建相應的回歸模型, createModel

runMiniBatchSGD是真正計算Gradient和Loss的地方

def runMiniBatchSGD(
      data: RDD[(Double, Vector)],
      gradient: Gradient,
      updater: Updater,
      stepSize: Double,
      numIterations: Int,
      regParam: Double,
      miniBatchFraction: Double,
      initialWeights: Vector): (Vector, Array[Double]) = {

    val stochasticLossHistory = new ArrayBuffer[Double](numIterations)

    val numExamples = data.count()
    val miniBatchSize = numExamples * miniBatchFraction

    // if no data, return initial weights to avoid NaNs
    if (numExamples == 0) {

      logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
      return (initialWeights, stochasticLossHistory.toArray)

    }

    // Initialize weights as a column vector
    var weights = Vectors.dense(initialWeights.toArray)
    val n = weights.size

    /**
     * For the first iteration, the regVal will be initialized as sum of weight squares
     * if it's L2 updater; for L1 updater, the same logic is followed.
     */
    var regVal = updater.compute(
      weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2

    for (i  (c, v) match { case ((grad, loss), (label, features)) =>
            val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
            (grad, loss + l)
          },
          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
            (grad1 += grad2, loss1 + loss2)
          })

      /**
       * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
       * and regVal is the regularization value computed in the previous iteration as well.
       */
      stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
      val update = updater.compute(
        weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
      weights = update._1
      regVal = update._2
    }

    logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
      stochasticLossHistory.takeRight(10).mkString(", ")))

    (weights, stochasticLossHistory.toArray)

  }

 上述代碼中最需要引起重視的部分是aggregate函數的使用,先看下aggregate函數的定義

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
  }

aggregate函數有三個入參,一是初始值ZeroValue,二是seqOp,三為combOp.

  1. seqOp seqOp會被並行執行,具體由各個executor上的task來完成計算
  2. combOp combOp則是串行執行, 其中combOp操作在JobWaiter的taskSucceeded函數中被調用

為了進一步加深對aggregate函數的理解,現舉一個小小例子。啟動spark-shell后,運行如下代碼

val z = sc. parallelize (List (1 ,2 ,3 ,4 ,5 ,6),2)
z.aggregate (0)(math.max(_, _), _ + _)
// 運 行 結 果 為 9
res0: Int = 9

仔細觀察一下運行時的日志輸出, aggregate提交的job由一個stage(stage0)組成,由於整個數據集被分成兩個partition,所以為stage0創建了兩個task並行處理。

LeastSquareGradient

講完了aggregate函數的執行過程, 回過頭來繼續講組成seqOp的gradient.compute函數。

LeastSquareGradient用來計算梯度和誤差,注意cmopute中cumGraident會返回改變后的結果。這里計算公式依據的就是cost-function中的▽Q(w)

class LeastSquaresGradient extends Gradient {
  override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
    val brzData = data.toBreeze
    val brzWeights = weights.toBreeze
    val diff = brzWeights.dot(brzData) - label
    val loss = diff * diff
    val gradient = brzData * (2.0 * diff)

    (Vectors.fromBreeze(gradient), loss)
  }

  override def compute(
      data: Vector,
      label: Double,
      weights: Vector,
      cumGradient: Vector): Double = {
    val brzData = data.toBreeze
    val brzWeights = weights.toBreeze
    //dot表示點積,是接受在實數R上的兩個向量並返回一個實數標量的二元運算,它的結果是歐幾里得空間的標准內積。
    //兩個向量的點積寫作a·b。點乘的結果叫做點積,也稱作數量積
    val diff = brzWeights.dot(brzData) - label

    //下面這句話完成y += a*x
    brzAxpy(2.0 * diff, brzData, cumGradient.toBreeze)

    diff * diff
  }
}

在上述代碼中頻繁出現breeze相關的函數,你一定會很好奇,這是個什么新鮮玩藝。

說 開 了 其 實 一 點 也 不 稀 奇, 由 於 計 算 中 有 大 量 的 矩 陣(Matrix)及 向量(Vector)計算,為了更好支持和封裝這些計算引入了breeze庫。

Breeze, Epic及Puck是scalanlp中三大支柱性項目, 具體可參數www.scalanlp.org

正則化過程

根據本次迭代出來的梯度和誤差對權重系數進行更新,這個時候就需要用上正則化規則了。也就是下述語句會觸發權重系數的更新

  val update = updater.compute(
     weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)

以嶺回歸為例,看其更新過程的代碼實現。

class SquaredL2Updater extends Updater {
  override def compute(
      weightsOld: Vector,
      gradient: Vector,
      stepSize: Double,
      iter: Int,
      regParam: Double): (Vector, Double) = {
    // add up both updates from the gradient of the loss (= step) as well as
    // the gradient of the regularizer (= regParam * weightsOld)
    // w' = w - thisIterStepSize * (gradient + regParam * w)
    // w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient
    val thisIterStepSize = stepSize / math.sqrt(iter)
    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
    brzWeights :*= (1.0 - thisIterStepSize * regParam)
    brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
    val norm = brzNorm(brzWeights, 2.0)

    (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
  }
}

結果預測

計算出權重系數(weights)和截距intecept,就可以用來創建線性回歸模型LinearRegressionModel,利用模型的predict函數來對觀測值進行預測

class LinearRegressionModel (
    override val weights: Vector,
    override val intercept: Double)
  extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {

  override protected def predictPoint(
      dataMatrix: Vector,
      weightMatrix: Vector,
      intercept: Double): Double = {
    weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
  }
}

 注意LinearRegression的構造函數需要權重(weights)和截距(intercept)作為入參,對新的變量做出預測需要調用predictPoint

一個完整的示例程序

在spark-shell中執行如下語句來親自體驗一下吧。

import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
val parsedData = data.map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}

// Building the model
val numIterations = 100
val model = LinearRegressionWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
val valuesAndPreds = parsedData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
println("training Mean Squared Error = " + MSE)

小結

再次強調,找到對應的假設函數,用於評估的損失函數最優化求解方法正則化規則


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM