Spark機器學習(3):保序回歸算法


保序回歸即給定了一個無序的數字序列,通過修改其中元素的值,得到一個非遞減的數字序列,要求是使得誤差(預測值和實際值差的平方)最小。比如在動物身上實驗某種葯物,使用了不同的劑量,按理說劑量越大,有效的比例就應該越高,但是如果發現了劑量大反而有效率降低了,這個時候就只有把無序的兩個元素合並了,重新計算有效率,直到計算出來的有效率不大於比下一個元素的有效率。

MLlib使用的是PAVA(Pool Adjacent Violators Algorithm)算法,並且是分布式的PAVA算法。首先在每個分區的樣本集序列運行PAVA算法,保證局部有序,然后再對整個樣本集運行PAVA算法,保證全局有序。

代碼:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.regression.{IsotonicRegression, IsotonicRegressionModel, LabeledPoint}

object IsotonicRegression {
  def main(args: Array[String]) {
    // 設置運行環境
    val conf = new SparkConf().setAppName("Istonic Regression Test")
      .setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\MachineLearning\\MachineLearning.jar"))
    val sc = new SparkContext(conf)
    Logger.getRootLogger.setLevel(Level.WARN)

    // 讀取樣本數據並解析
    val dataRDD = sc.textFile("hdfs://master:9000/ml/data/sample_isotonic_regression_data.txt")
    val parsedDataRDD = dataRDD.map { line =>
      val parts = line.split(',').map(_.toDouble)
      (parts(0), parts(1), 1.0)
    }

    // 樣本數據划分,訓練樣本占0.7,測試樣本占0.3
    val dataParts = parsedDataRDD.randomSplit(Array(0.7, 0.3), seed = 25L)
    val trainRDD = dataParts(0)
    val testRDD = dataParts(1)

    // 建立保序回歸模型並訓練
    val model = new IsotonicRegression().setIsotonic(true).run(trainRDD)
// 計算誤差
    val prediction = testRDD.map { line =>
      val predicted = model.predict(line._2)
      (predicted, line._2, line._1)
    }
    val showPrediction = prediction.collect
    println
    println("Prediction" + "\t" + "Feature")
    for (i <- 0 to showPrediction.length - 1) {
      println(showPrediction(i)._1 + "\t" + showPrediction(i)._2)
    }
    val MSE = prediction.map { case (p, _, l1) => math.pow((p - l1), 2) }.mean()
    println("MSE = " + MSE)
  }
}

運行結果:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM