Spark 機器學習


 

將Mahout on Spark 中的機器學習算法和MLlib中支持的算法統計如下:

主要針對MLlib進行總結

分類與回歸

分類和回歸是監督式學習;

監督式學習是指使用有標簽的數據(LabeledPoint)進行訓練,得到模型后,使用測試數據預測結果。其中標簽數據是指已知結果的特征數據。

分類和回歸的區別:預測結果的變量類型

  分類預測出來的變量是離散的(比如對郵件的分類,垃圾郵件和非垃圾郵件),對於二元分類的標簽是0和1,對於多元分類標簽范圍是0~C-1,C表示類別數目;

  回歸預測出來的變量是連續的(比如根據年齡和體重預測身高)

 

線性回歸

  線性回歸是回歸中最常用的方法之一,是指用特征的線性組合來預測輸出值。

  線性回歸算法可以使用的類有:

    LinearRegressionWithSGD
    RidgeRegressionWithSGD
    LassoWithSGD

    ridge regression 使用 L2 正規化;
    Lasso 使用 L1 正規化;

  參數:

    stepSize:梯度下降的步數

    numIterations:迭代次數

    設置intercept:是否給數據加上一個干擾特征或者偏差特征,一個始終值為1的特征,默認不增加false

  {stepSize: 1.0, numIterations: 100, miniBatchFraction: 1.0}

  模型的使用:

    1、對數據進行預測,使用model.predict()

    2、獲取數據特征的權重model.weights()

  模型的評估:

    均方誤差

例子:

 

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.LinearRegressionModel
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.linalg.Vectors

/**
  * Created by Edward on 2016/9/21.
  */
object LinearRegression {
  def main(args: Array[String]) {
    val conf: SparkConf = new SparkConf().setAppName("LinearRegression").setMaster("local")
    val sc = new SparkContext(conf)

    // Load and parse the data
    val data = sc.textFile("data/mllib/ridge-data/lpsa.data")
    val parsedData = data.map { line =>
      val parts = line.split(',')
      LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
    }.cache()

    // Building the model
    val numIterations = 100
    val model = LinearRegressionWithSGD.train(parsedData, numIterations)
//    var lr = new LinearRegressionWithSGD().setIntercept(true)
//    val model = lr.run(parsedData)

    //獲取特征權重,及干擾特征
    println("weights:%s, intercept:%s".format(model.weights,model.intercept))

    // Evaluate model on training examples and compute training error
    val valuesAndPreds = parsedData.map { point =>
      val prediction = model.predict(point.features)
      (point.label, prediction)
    }

    //計算 均方誤差
    val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
    println("training Mean Squared Error = " + MSE)

    // Save and load model
    model.save(sc, "myModelPath")
    val sameModel = LinearRegressionModel.load(sc, "myModelPath")


  }
}

 

數據:

-0.4307829,-1.63735562648104 -2.00621178480549 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
-0.1625189,-1.98898046126935 -0.722008756122123 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
-0.1625189,-1.57881887548545 -2.1887840293994 1.36116336875686 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.155348103855541
-0.1625189,-2.16691708463163 -0.807993896938655 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
0.3715636,-0.507874475300631 -0.458834049396776 -0.250631301876899 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
0.7654678,-2.03612849966376 -0.933954647105133 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
...

數據第一列表示標簽數據,也就是結果數據,其他列表示特征數據;

預測就是再給一組特征數據,預測結果;

結果:

weights:[0.5808575763272221,0.18930001482946976,0.2803086929991066,0.1110834181777876,0.4010473965597895,-0.5603061626684255,-0.5804740464000981,0.8742741176970946], intercept:0.0
training Mean Squared Error = 6.207597210613579

 

邏輯回歸 

是一種二元分類方法,也是多類分類方法;

  邏輯回歸可以使用的方法:

  LogisticRegressionWithLBFGS (建議使用這個)

  LogisticRegressionWithSGD

  參數:

  與線性回歸類似

  模型的使用:

  1、對數據進行預測,使用model.predict()
  2、獲取數據特征的權重model.weights()

  模型的評估:

  二元分類:AUC(Area Under roc Curve)

  

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}
import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, MulticlassMetrics}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils

/**
  * Created by Edward on 2016/9/21.
  */
object LogisticRegression {
  def main(args: Array[String]) {
    val conf: SparkConf = new SparkConf().setAppName("LogisticRegression").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    // Load training data in LIBSVM format.
    val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

    // Split data into training (60%) and test (40%).
    val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
    val training = splits(0).cache()
    val test = splits(1)

    // Run training algorithm to build the model
    val model = new LogisticRegressionWithLBFGS()
      .setNumClasses(10)
      .run(training)

    model.setThreshold(0.8)

    // Compute raw scores on the test set.
    val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
      val prediction = model.predict(features)
      (prediction, label)
    }

    //多元矩陣
    // Get evaluation metrics.
    //val metrics = new MulticlassMetrics(predictionAndLabels)
    //val precision = metrics.precision
    //println("Precision = " + precision)


    //二元矩陣
    val metrics = new BinaryClassificationMetrics(predictionAndLabels)
    //通過ROC對模型進行評估,值趨近於1 receiver operating characteristic (ROC), 接受者操作特征 曲線下面積
    val auROC: Double = metrics.areaUnderROC()
    println("Area under ROC = " + auROC)
    //通過PR對模型進行評估,值趨近於1 precision-recall (PR), 精確率
    val underPR: Double = metrics.areaUnderPR()
    println("Area under PR = " + underPR)

    // Save and load model
    model.save(sc, "myModelPath")
    val sameModel = LogisticRegressionModel.load(sc, "myModelPath")

  }

}

 

 

支持向量機 Support Vector Machines (SVMs) 

  分類算法,二元分類算法

  和邏輯回歸二元分類相似

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.util.MLUtils

/**
  * Created by Edward on 2016/9/21.
  */
object SVMs {
  def main(args: Array[String]) {

    val conf: SparkConf = new SparkConf().setAppName("SVM").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)


    // Load training data in LIBSVM format.
    val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")


    // Split data into training (60%) and test (40%).
    val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
    val training = splits(0).cache()
    val test = splits(1)

    // Run training algorithm to build the model
    val numIterations = 100
    val model = SVMWithSGD.train(training, numIterations)

    // Clear the default threshold.
    model.clearThreshold()

    // Compute raw scores on the test set.
    val scoreAndLabels = test.map { point =>
      println("feature="+point.features)
      val score = model.predict(point.features)
      (score, point.label)
    }

    scoreAndLabels.foreach(println(_))

    // Get evaluation metrics.
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)

    println("metrics="+metrics)

    val auROC = metrics.areaUnderROC()

    println("Area under ROC = " + auROC)

    // Save and load model
    model.save(sc, "myModelPath")
    val sameModel = SVMModel.load(sc, "myModelPath")

    sc.stop()

  }

}

 

數據:

0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 157:252 158:252 159:252 160:237 182:54 183:227 184:253 185:252 186:239 187:233 188:252 189:57 190:6 208:10 209:60 210:224
1 159:124 160:253 161:255 162:63 186:96 187:244 188:251 189:253 190:62 214:127 215:251 216:251 217:253 218:62 
...

 

協同過濾 Collaborative Filtering

  Spark中協同過濾算法主要由交替最小二乘法來實現 alternating least squares (ALS)

  參數:

  numBlocks block塊的數量,用來控制並行度

  rank 特征向量的大小

  iterations 迭代數量

  lambda 正規化參數

  alpha 用來在隱式ALS中計算置信度的常量

  方法:

  ALS.train

  模型的評估:

  均方誤差

例子:

  

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

/**
  * Created by Edward on 2016/9/22.
  */
object CollaborativeALS {
  def main(args: Array[String]) {


    val conf: SparkConf = new SparkConf().setAppName("CollaborativeALS").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    // Load and parse the data
    val data = sc.textFile("data/mllib/als/test.data")
    val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
      Rating(user.toInt, item.toInt, rate.toDouble)
    })

    // Build the recommendation model using ALS
    val rank = 10
    val numIterations = 10
    val model = ALS.train(ratings, rank, numIterations, 0.01)

    // Evaluate the model on rating data
    val usersProducts = ratings.map { case Rating(user, product, rate) =>
      (user, product)
    }
    val predictions =
      model.predict(usersProducts).map { case Rating(user, product, rate) =>
        ((user, product), rate)
      }
    val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
      ((user, product), rate)
    }.join(predictions)
    val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
      val err = (r1 - r2)
      err * err
    }.mean()

    //均方誤差
    println("Mean Squared Error = " + MSE)

    // Save and load model
    model.save(sc, "target/tmp/myCollaborativeFilter")
    val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")


  }

}

 

 

 

 

持續更新中...

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM