spark-ML基礎


一、ML組件

ML的標准API使用管道(pipeline)這樣的方式,可以將多個算法或者數據處理過程整合到一個管道或者一個流程里運行,其中包含下面幾個部分: 
1. dataFrame:用於ML的dataset,保存數據 
2. transformer:將一個dataFrame按照某種計算轉換成另外一個dataFrame,例如把一個包含特征的dataFrame通過模型預測,生成一個包含特征和預測的dataFrame 
3. estimator:根據訓練樣本進行模型訓練(fit),並且得到一個對應的transformer 
4. pipeline:將多個transformer和estimator串成一個ML的工作流 
5. parameter:transformer和estimator共用一套API來確定參數

transformer

包含特征轉換和已學習得到的數據模型,它實現了一個方法transform() 
1、一個特征transformer可能將一個dataFrame的某些列映射成新的列,然后輸出處理后的新的dataFrame;

2、一個學習得到的模型將讀取一個包含特征的dataFrame,對每個樣本進行預測,並且把預測結果附加到這個dataFrame,得到一個新的dataFrame

Estimators

主要用於訓練模型,實現了一個方法fit(),接受一個包含特征的dataFrame,然后訓練得到一個模型,那個模型就是一個transformer 
例如:一個LogisticRegression是一個estimator,然后通過調用fit(),得到一個LogisticRegressionModel,這是一個transformer。

每個transformer和estimator都有一個唯一ID,用於保存對應的參數

pipeline

例如一個文本挖掘包含以下三個步驟: 
1. 將文本切分成詞 
2. 將詞轉換成特征向量 
3. 訓練得到一個模型,然后用於預測

spark ML將這樣一個工作流定義為pipeline,一個pipeline包含多個PipelineStages (transformer和estimator),通過dataFrame在各個stage中進行傳遞。

image

這是一個訓練模型的例子,包含了三個步驟,藍色的是指transformer,紅色是estimator

image
這是一個使用已訓練模型預測樣本的例子,

Parameters

一個Paramap包含多個(parameter, value)的鍵值對 
有兩種方法將參數傳給算法: 
1. 將參數設置到算法的一個實例,例如lr是LogisticRegression的一個實例,則他可以調用lr.setMaxIter(10)來設置訓練循環次數 
2. 將paramap作為輸入參數,給fit()或者transform(),這些參數會都會覆蓋掉原來set的值  

    我們可以將paramap傳給不同實例,例如lr1和lr2是LogisticRegression的兩個實例,我們可以建立ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)的參數列表,即將兩個實例的參數都放在paramMap中

    可以使用import/export導出模型或者pipeline到磁盤上

二、基本數據結構

完整內容請參考官方文檔 https://spark.apache.org/docs/latest/mllib-data-types.html

(一)核心概念

1、本地向量 LocalVecotr

MLlib的本地向量主要分為兩種,DenseVector和SparseVector,顧名思義,前者是用來保存稠密向量,后者是用來保存稀疏向量,其創建方式主要有一下三種(三種方式均創建了向量(1.0, 0.0, 2.0):

注意,ml package中有同樣的類。

import org.apache.spark.ml.linalg.{Vector, Vectors}  

//創建一個稠密向量  
val dv : Vector = Vectors.dense(1.0,0.0,3.0);  
//創建一個稀疏向量(第一種方式)  
val sv1: Vector = Vectors.sparse(3, Array(0,2), Array(1.0,3.0));  
//創建一個稀疏向量(第二種方式)  
val sv2 : Vector = Vectors.sparse(3, Seq((0,1.0),(2,3.0)))  
  • 1
  • 2
  • 3

對於稠密向量:很直觀,你要創建什么,就加入什么,其函數聲明為Vectors.dense(values : Array[Double]) 
對於稀疏向量,當采用第一種方式時,3表示此向量的長度,第一個Array(0,2)表示的索引,第二個Array(1.0, 3.0)與前面的Array(0,2)是相互對應的,表示第0個位置的值為1.0,第2個位置的值為3 
對於稀疏向量,當采用第二種方式時,3表示此向量的長度,后面的比較直觀,Seq里面每一對都是(索引,值)的形式。

tips:由於scala中會默認包含scal.collection.immutalbe.Vector,所以當使用MLlib中的Vector時,需要顯式的指明import路徑

2、向量標簽 LabelVector

向量標簽和向量是一起的,簡單來說,可以理解為一個向量對應的一個特殊值,這個值的具體內容可以由用戶指定,比如你開發了一個算法A,這個算法對每個向量處理之后會得出一個特殊的標記值p,你就可以把p作為向量標簽。同樣的,更為直觀的話,你可以把向量標簽作為行索引,從而用多個本地向量構成一個矩陣(當然,MLlib中已經實現了多種矩陣) 
其使用代碼為:

import org.apache.spark.ml.linalg.Vectors  
import org.apache.spark.ml.feature.LabeledPoint  
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))  
  • 1
  • 2
  • 3

對於pos變量,第一個參數表示這個數據的分類,1.0的具體含義只有你自己知道咯,可以使行索引,可以使特殊值神馬的 
從文件中直接讀入一個LabeledPoint

MLlib提供了一種快捷的方法,可以讓用戶直接從文件中讀取LabeledPoint格式的數據。規定其輸入文件的格式為:

label index1:value1 index2:value2.....  
  • 1
  • 2

然后通過

val spark = SparkSession.builder.appName("NaiveBayesExample").getOrCreate()
val data = spark.read.format("libsvm").load("/tmp/ljhn1829/aplus/training_data3")
  • 1
  • 2
  • 3

直接讀入即可。 
關於libsvm格式的詳細說明請見下面內容。

3、本地矩陣

既然是算數運算包,肯定少不了矩陣包,先上代碼:

import org.apache.spark.mllib.linalg.{Matrix, Matrices}  
val dm : Matrix = Matrices.dense(3,2, Array(1.0,3.0,5.0,2.0,4.0,6.0))  
  • 1
  • 2
  • 3

上面的代碼段創建了一個稠密矩陣:

1.0     2.0
3.0     4.0
5.0     6.0
  • 1
  • 2
  • 3
  • 4

很明顯,創建的時候是將原來的矩陣按照列變成一個一維矩陣之后再初始化的。

稀疏矩陣:

val eye = Matrices.sparse(3, 3, Array(0, 1, 2, 3), Array(0, 1, 2), Array(1, 1, 1))
  • 1
  • 2

4、分布式矩陣

(ml package未找到類似的類) 
MLlib提供了三種分布式矩陣的實現,依據你數據的不同的特點,你可以選擇不同類型的數據: 
(1)RowMatrix

RowMatrix矩陣只是將矩陣存儲起來,要注意的是,此種矩陣不能按照行號訪問。(我也不知道為什么這樣鳥。。)

import org.apache.spark.mllib.linalg.Vector  
import org.apache.spark.mllib.linalg.distributed.RowMatrix  
val rows: RDD[Vector] = ...//  
val mat: RowMatrix = new RowMatrix(rows)  

val m = mat.numRows()  
val n = mat.numCols()  
  • 1
  • 2
  • 3

RowMatrix要從RDD[Vector]構造,m是mat的行數,n是mat的列 
Multivariate summary statistics

顧名思義,這個類里面包含了矩陣中的很多常見信息,怎么使用呢?

import org.apache.spark.mllib.linalg.Matrix  
import org.apache.spark.mllib.linalg.distributed.RowMatrix  
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary  

val mat: RowMatrix = ..  

val summy : MultivariateStatisticalSummary = mat.computeColumnSummaryStatistics()  
println(summy.mean)//平均數  
  • 1
  • 2
  • 3
  • 4

通過這個類,可以得到平均數,矩陣中非0個數,具體的數據看看幫助文檔 
(2)IndexedRowMatrix

IndexedRowMatrix矩陣和RowMatrix矩陣的不同之處在於,你可以通過索引值來訪問每一行。其他的,沒啥區別。。 
(3)CoordinateMatrix

當你的數據特別稀疏的時候怎么辦?采用這種矩陣吧。先上代碼:

import org.apache.spark.mllib.linalg.distributed.{CoordinatedMatrix, MatrixEntry}  

val entries : RDD[MatrixEntry] = ..  
val mat: CoordinateMatrix = new CoordinateMatrix(entries)  
  • 1
  • 2
  • 3
  • 4
  • 5

CoordinateMatrix矩陣中的存儲形式是(row,col,value),就是原始的最稀疏的方式,所以如果矩陣比較稠密,別用這種數據格式

(二)libsvm數據格式

首先介紹一下 libSVM的數據格式

Label 1:value 2:value … 
Label:是類別的標識,比如上節train.model中提到的1 -1,你可以自己隨意定,比如-10,0,15。當然,如果是回歸,這是目標值,就要實事求是了。 
Value:就是要訓練的數據,從分類的角度來說就是特征值,數據之間用空格隔開 
比如: 
-15 1:0.708 2:1056 3:-0.3333 
需要注意的是,如果特征值為0,則這列數據可以不寫,因此特征冒號前面的(姑且稱做序號)可以不連續。如: 
-15 1:0.708 3:-0.3333 
表明第2個特征值為0,從編程的角度來說,這樣做可以減少內存的使用,並提高做矩陣內積時的運算速度。我們平時在matlab中產生的數據都是沒有序號的常規矩陣,所以為了方便最好編一個程序進行轉化。

spark提供了方便的工具類來加載這些數據

val spark = SparkSession.builder.appName("NaiveBayesExample").getOrCreate()
val data = spark.read.format("libsvm").load("/tmp/ljhn1829/aplus/training_data3")
  • 1
  • 2
  • 3

(三)fit()/transform()方法的參數DF包含哪些列

  模型抽象為三個基本類,estimators(實現fit方法), transformers(實現transform方法), pipelines,一個正常的模型應該同時實現 fit 和 transform 兩個方法

  fit 的DataFrame需要包含兩列 featuresCol 和 labelCol 默認名字為 label
  transform 之前的DataFrame需要有一列名字為features,輸出三列(依賴於參數),三列有默認名字,都可以通過setter函數進行設置。
    predictedCol 預測的標簽,默認名字為 prediction
    rawPredictedCol 預測的向量,默認名字為 rawPrediction
    probabilityCol 預測的概率,默認名字為 probability

范例1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object logr {
  def main(args: Array[String]): Unit = {
    import org.apache.spark.ml.classification.LogisticRegression
    import org.apache.spark.ml.param.ParamMap
    import org.apache.spark.ml.linalg.{Vector, Vectors}
    import org.apache.spark.sql.Row

    val ss=SparkSession.builder().master("local").appName("haha").getOrCreate()
    Logger.getRootLogger.setLevel(Level.WARN)
    val training = ss.createDataFrame(Seq(
      (1.0, Vectors.dense(0.0, 1.1, 0.1)),
      (0.0, Vectors.dense(2.0, 1.0, -1.0)),
      (0.0, Vectors.dense(2.0, 1.3, 1.0)),
      (1.0, Vectors.dense(0.0, 1.2, -0.5))
    )).toDF("label", "features")

    //創建一個LogisticRegression實例,這是一個Estimator.
    val lr = new LogisticRegression()
    //打印參數
    println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

    //調用實例的set方法設置參數
    lr.setMaxIter(10)
      .setRegParam(0.01)

    // 學習LogisticRegression模型,model1是一個transformer
    val model1 = lr.fit(training)

    println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)

    // 通過paramap來設置參數
    val paramMap = ParamMap(lr.maxIter -> 20)
      .put(lr.maxIter, 30)
      .put(lr.regParam -> 0.1, lr.threshold -> 0.55)

    // 兩個ParamMap之間可以相加合並.
    val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name
    val paramMapCombined = paramMap ++ paramMap2

    val model2 = lr.fit(training, paramMapCombined)
    println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)

    //測試數據
    val test = ss.createDataFrame(Seq(
      (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
      (0.0, Vectors.dense(3.0, 2.0, -0.1)),
      (1.0, Vectors.dense(0.0, 2.2, -1.5))
    )).toDF("label", "features")

    //model2的transform()會只選擇features的數據,不會把label數據包含進去
    model2.transform(test)
      .select("features", "label", "myProbability", "prediction")
      .collect()
      .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
        println(s"($features, $label) -> prob=$prob, prediction=$prediction")
      }
  }
}

驗證

ML里面用CrossValidator類來做交叉驗證,這個類包含一個estimator、一堆paramMap、和一個evaluator。 
evaluator有三個子類,包括regressionEvaluator, BinaryClassificationEvaluator, MulticlassClassificationEvaluator。

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object logr {
  def main(args: Array[String]): Unit = {

    import org.apache.spark.ml.linalg.Vector
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.classification.LogisticRegression
    import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
    import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
    import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
    import org.apache.spark.sql.Row

    val ss = SparkSession.builder().master("local").appName("haha").getOrCreate()
    Logger.getRootLogger.setLevel(Level.WARN)

    val training = ss.createDataFrame(Seq(
      (0L, "a b c d e spark", 1.0),
      (1L, "b d", 0.0),
      (2L, "spark f g h", 1.0),
      (3L, "hadoop mapreduce", 0.0),
      (4L, "b spark who", 1.0),
      (5L, "g d a y", 0.0),
      (6L, "spark fly", 1.0),
      (7L, "was mapreduce", 0.0),
      (8L, "e spark program", 1.0),
      (9L, "a e c l", 0.0),
      (10L, "spark compile", 1.0),
      (11L, "hadoop software", 0.0)
    )).toDF("id", "text", "label")

    val tokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")
    val hashingTF = new HashingTF()
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("features")
    val lr = new LogisticRegression()
      .setMaxIter(10)
    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, hashingTF, lr))

    // ParamGridBuilder創建參數grid,保存所有要做驗證的參數
    val paramGrid = new ParamGridBuilder()
      .addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
      .addGrid(lr.regParam, Array(0.1, 0.01))
      .build()

    // 這里將pipeline作為一個estimator傳遞給cv,這里默認的評估是ROC
    val cv = new CrossValidator()
      .setEstimator(pipeline)
      .setEvaluator(new BinaryClassificationEvaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(2) // Use 3+ in practice

    // 訓練模型,選擇最優參數
    val cvModel = cv.fit(training)

    val test = ss.createDataFrame(Seq(
      (4L, "spark i j k"),
      (7L, "apache hadoop")
    )).toDF("id", "text")

    // cvModel將會用最優的參數進行預測
    cvModel.transform(test)
      .select("id", "text", "probability", "prediction")
      .collect()
      .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
        println(s"($id, $text) --> prob=$prob, prediction=$prediction")
      }
  }
}
  • ML中除了cv以外,還有一種指定樣本划分的驗證方式,TrainValidationSplit 類,默認是0.75,即3/4用於做訓練,1/4用於做測試。其他跟cv一樣
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
object logr {
  def main(args: Array[String]): Unit = {

    val ss = SparkSession.builder().master("local").appName("haha").getOrCreate()
    Logger.getRootLogger.setLevel(Level.WARN)
    
    val data = ss.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")
    val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)

    val lr = new LinearRegression()

    val paramGrid = new ParamGridBuilder()
      .addGrid(lr.regParam, Array(0.1, 0.01))
      .addGrid(lr.fitIntercept)
      .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
      .build()

    // 創建trainValidationSplit類
    val trainValidationSplit = new TrainValidationSplit()
      .setEstimator(lr)
      .setEvaluator(new RegressionEvaluator)
      .setEstimatorParamMaps(paramGrid)
      // 指定划分百分比
      .setTrainRatio(0.8)

    val model = trainValidationSplit.fit(training)

    model.transform(test)
      .select("features", "label", "prediction")
      .show()
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM