保序回歸
1 保序回歸
保序回歸解決了下面的問題:給定包含n個數據點的序列 y_1,y_2,...,y_n , 怎樣通過一個單調的序列 beta_1,beta_2,...,beta_n 來歸納這個問題。形式上,這個問題就是為了找到
大部分時候,我們會在括號前加上權重w_i。解決這個問題的一個方法就是 pool adjacent violators algorithm(PAVA) 算法。粗略的講,PAVA算法的過程描述如下:
我們從左邊的y_1開始,右移y_1直到我們遇到第一個違例(violation)即y_i < y_i+1,然后,我們用違例之前的所有y的平方替換這些y,以滿足單調性。我們繼續這個過程,直到我們最后到達y_n。
2 近似保序
給定一個序列y_1,y_2,...,y_n,我們尋找一個近似單調估計,考慮下面的問題
在上式中,X_+表示正數部分,即X_+ = X.1 (x>0)。這是一個凸優化問題,處罰項處罰違反單調性(即beta_i > beta_i+1)的鄰近對。
在公式(2)中,隱含着一個假設,即使用等距的網格測量數據點。如果情況不是這樣,那么可以修改懲罰項為下面的形式
x_i表示y_i測量得到的值。
3 近似保序算法流程
這個算法是標准PAVA算法的修改版本,它並不從數據的左端開始,而是在需要時連接相鄰的點,以產生近似保序最優的順序。相比一下,PAVA對中間的序列並不特別感興趣,只關心最后的序列。
有下面一個引理成立。
這個引理證明的事實極大地簡化了近似保序解路徑(solution path)的構造。假設在參數值為lambda的情況下,有K_lambda個連接塊,我們用A_1,A_2,..,A_K_lambda表示。這樣我們可以重寫(2)為如下(3)的形式。
上面的公式,對beta求偏導,可以得到下面的次梯度公式。通過這個公式即可以求得beta。
為了符合方便,令s_0 = s_K_lambda = 0。並且,
現在假設,當lambda在一個區間內增長時,組A_1,A_2,...,A_K_lambda不會改變。我們可以通過相應的lambda區分(4)。
這個公式的值本身是一個常量,它意味着上式的beta是lambda的線性函數。
隨着lambda的增長,方程(5)將連續的給出解決方案的斜率直到組A_1,A_2,...,A_K_lambda改變。更加引理1,只有兩個組合並時,這才會發生。m_i表示斜率,那么對於每一個i=1,...,K_lambda - 1,A_i和A_i+1合並之后得到的公式如下
因此我們可以一直移動,直到lambda “下一個”值的到來
並且合並A_i^star和A_i^star+1,其中
注意,可能有超過一對組別到達了這個最小值,在這種情況下,會組合所有滿足條件的組別。公式(7)和(8)成立的條件是t_i,i+1大於lambda,如果沒有t_i,i+1大於lambda,說明沒有組別可以合並,算法將會終止。
算法的流程如下:
-
初始時,
lambda=0,K_lambda=n,A_i={i},i=1,2,...,n。對於每個i,解是beta_lambda,i = y_i -
重復下面過程 -
**1、**通過公式(5)計算每個組的斜率
m_i -
**2、**通過公式(6)計算沒對相鄰組的碰撞次數
t_i,i+1**3、**如果
t_i,i+1 < lambda,終止**4、**計算公式(7)中的臨界點
lambda^star,並根據斜率更新解
對於每個
i,根據公式(8)合並合適的組別(所以K_lambda^star = K_lambda - 1),並設置lambda = lambda^star。4 源碼分析
在
1.6.x版本中,並沒有實現近似保序回歸,后續會實現。現在我們只介紹一般的保序回歸算法實現。4.1 實例
import org.apache.spark.mllib.regression.{IsotonicRegression, IsotonicRegressionModel} val data = sc.textFile("data/mllib/sample_isotonic_regression_data.txt") // 創建(label, feature, weight) tuples ,權重默認設置為1.0 val parsedData = data.map { line => val parts = line.split(',').map(_.toDouble) (parts(0), parts(1), 1.0) } // Split data into training (60%) and test (40%) sets. val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0) val test = splits(1) // Create isotonic regression model from training data. // Isotonic parameter defaults to true so it is only shown for demonstration val model = new IsotonicRegression().setIsotonic(true).run(training) // Create tuples of predicted and real labels. val predictionAndLabel = test.map { point => val predictedLabel = model.predict(point._2) (predictedLabel, point._1) } // Calculate mean squared error between predicted and real labels. val meanSquaredError = predictionAndLabel.map { case (p, l) => math.pow((p - l), 2) }.mean() println("Mean Squared Error = " + meanSquaredError)4.2 訓練過程分析
parallelPoolAdjacentViolators方法用於實現保序回歸的訓練。parallelPoolAdjacentViolators方法的代碼如下:private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { val parallelStepResult = input //以(feature,label)為key進行排序 .sortBy(x => (x._2, x._1)) .glom()//合並不同分區的數據為一個數組 .flatMap(poolAdjacentViolators) .collect() .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering. poolAdjacentViolators(parallelStepResult) }parallelPoolAdjacentViolators方法的主要實現是poolAdjacentViolators方法,該方法主要的實現過程如下:var i = 0 val len = input.length while (i < len) { var j = i //找到破壞單調性的元祖的index while (j < len - 1 && input(j)._1 > input(j + 1)._1) { j = j + 1 } // 如果沒有找到違規點,移動到下一個數據點 if (i == j) { i = i + 1 } else { // 否則用pool方法處理違規的節點 // 並且檢查pool之后,之前處理過的節點是否違反了單調性約束 while (i >= 0 && input(i)._1 > input(i + 1)._1) { pool(input, i, j) i = i - 1 } i = j } }pool方法的實現如下所示。def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = { //取得i到j之間的元組組成的子序列 val poolSubArray = input.slice(start, end + 1) //求子序列sum(label * w)之和 val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum //求權重之和 val weight = poolSubArray.map(_._3).sum var i = start //子區間的所有元組標簽相同,即擁有相同的預測 while (i <= end) { //修改標簽值為兩者之商 input(i) = (weightedSum / weight, input(i)._2, input(i)._3) i = i + 1 } }經過上文的處理之后,
input根據中的label和feature均是按升序排列。對於擁有相同預測的點,我們只保留兩個特征邊界點。val compressed = ArrayBuffer.empty[(Double, Double, Double)] var (curLabel, curFeature, curWeight) = input.head var rightBound = curFeature def merge(): Unit = { compressed += ((curLabel, curFeature, curWeight)) if (rightBound > curFeature) { compressed += ((curLabel, rightBound, 0.0)) } } i = 1 while (i < input.length) { val (label, feature, weight) = input(i) if (label == curLabel) { //權重疊加 curWeight += weight rightBound = feature } else {//如果標簽不同,合並 merge() curLabel = label curFeature = feature curWeight = weight rightBound = curFeature } i += 1 } merge()最后將訓練的結果保存為模型。
//標簽集 val predictions = if (isotonic) pooled.map(_._1) else pooled.map(-_._1) //特征集 val boundaries = pooled.map(_._2) new IsotonicRegressionModel(boundaries, predictions, isotonic)4.3 預測過程分析
def predict(testData: Double): Double = { def linearInterpolation(x1: Double, y1: Double, x2: Double, y2: Double, x: Double): Double = { y1 + (y2 - y1) * (x - x1) / (x2 - x1) } //二分查找index val foundIndex = binarySearch(boundaries, testData) val insertIndex = -foundIndex - 1 // Find if the index was lower than all values, // higher than all values, in between two values or exact match. if (insertIndex == 0) { predictions.head } else if (insertIndex == boundaries.length){ predictions.last } else if (foundIndex < 0) { linearInterpolation( boundaries(insertIndex - 1), predictions(insertIndex - 1), boundaries(insertIndex), predictions(insertIndex), testData) } else { predictions(foundIndex) } }當測試數據精確匹配一個邊界,那么返回相應的特征。如果測試數據比所有邊界都大或者小,那么分別返回第一個和最后一個特征。當測試數據位於兩個邊界之間,使用
linearInterpolation方法計算特征。 這個方法是線性內插法。
-
原文鏈接:https://github.com/endymecy/spark-ml-source-analysis/blob/master/%E5%88%86%E7%B1%BB%E5%92%8C%E5%9B%9E%E5%BD%92/%E4%BF%9D%E5%BA%8F%E5%9B%9E%E5%BD%92/isotonic-regression.md
- http://fa.bianp.net/blog/2013/isotonic-regression/










