保序回歸即給定了一個無序的數字序列,通過修改其中元素的值,得到一個非遞減的數字序列,要求是使得誤差(預測值和實際值差的平方)最小。比如在動物身上實驗某種葯物,使用了不同的劑量,按理說劑量越大,有效的比例就應該越高,但是如果發現了劑量大反而有效率降低了,這個時候就只有把無序的兩個元素合並了,重新計算有效率,直到計算出來的有效率不大於比下一個元素的有效率。
MLlib使用的是PAVA(Pool Adjacent Violators Algorithm)算法,並且是分布式的PAVA算法。首先在每個分區的樣本集序列運行PAVA算法,保證局部有序,然后再對整個樣本集運行PAVA算法,保證全局有序。
代碼:
import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.regression.{IsotonicRegression, IsotonicRegressionModel, LabeledPoint} object IsotonicRegression { def main(args: Array[String]) { // 設置運行環境 val conf = new SparkConf().setAppName("Istonic Regression Test") .setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\MachineLearning\\MachineLearning.jar")) val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) // 讀取樣本數據並解析 val dataRDD = sc.textFile("hdfs://master:9000/ml/data/sample_isotonic_regression_data.txt") val parsedDataRDD = dataRDD.map { line => val parts = line.split(',').map(_.toDouble) (parts(0), parts(1), 1.0) } // 樣本數據划分,訓練樣本占0.7,測試樣本占0.3 val dataParts = parsedDataRDD.randomSplit(Array(0.7, 0.3), seed = 25L) val trainRDD = dataParts(0) val testRDD = dataParts(1) // 建立保序回歸模型並訓練 val model = new IsotonicRegression().setIsotonic(true).run(trainRDD) // 計算誤差 val prediction = testRDD.map { line => val predicted = model.predict(line._2) (predicted, line._2, line._1) } val showPrediction = prediction.collect println println("Prediction" + "\t" + "Feature") for (i <- 0 to showPrediction.length - 1) { println(showPrediction(i)._1 + "\t" + showPrediction(i)._2) } val MSE = prediction.map { case (p, _, l1) => math.pow((p - l1), 2) }.mean() println("MSE = " + MSE) } }
運行結果: