Spark MLib完整基礎入門教程


Spark MLib

在Spark下進行機器學習,必然無法離開其提供的MLlib框架,所以接下來我們將以本框架為基礎進行實際的講解。首先我們需要了解其中最基本的結構類型,即轉換器、估計器、評估器和流水線。

graph LR A[轉換器] --> B(估計器) B --> C(評估器) C --> D[模型]

首先歡迎大家Start本人關於機器學習的學習倉庫,不僅僅包含了Spark ML還包括python下的sklearn等主流庫。

一、基礎使用

接下來我們將以一個簡單的例子為基礎整體介紹在Spark下進行機器學習的使用方式,便於讀者大體熟悉完整的流程節點。當然在這其中對於部分不了解的情況下可以等在后續詳細學習的過程中進行補充即可。

點擊此處查看代碼示例

1. 特征工程

這部分相關知識可以參考本人編寫的人工智能專題的開源教程,其中對該部分進行詳細的說明,下面我們將就框架提供的RFormula進行具體的實戰操作(這里熟悉R語言的可能對此比較熟悉,本身就是借鑒了R語言,但是僅實現了其中的一個子集),對於我們需要進行特征化的數據首先我們需要定義對應的線性模型公式,具體如下。

Dataset<Row> df = session.read().json("sparkdemo/data/simple-ml");
RFormula supervised = new RFormula().setFormula("lab ~ . + color: value1 + color: value2");

當然僅僅通過上述的方式還不能實現對數據的特征化,我們還需要通過數據對其進行訓練,從而得到我們所需的轉換器,為此我們需要使用其中的fit方法進行轉換。

RFormulaModel model = supervised.fit(df);

完成轉換器的訓練后我們就可以利用其進行實際的轉換操作,從而生成特征features與標簽label列,當然讀者也可以通過supervised.setLabelCol設置標簽列名,supervised.setFeaturesCol設置特征列名。對於監督學習都需要將數據分為樣本數據與測試數據,為此我們需要通過以下方式將數據拆分。

Dataset<Row>[] data = preparedDF.randomSplit(new double[]{0.7, 0.3});

2. 模型訓練

在Spark MLib中為估計器,這里我們將采用邏輯回歸的算法做為演示,提供一個分類算法模型的訓練,首先我們實例化我們需要的模型類,通過其提供的方式對將訓練數據傳入其中進行模型的訓練。

LogisticRegression lr = new LogisticRegression();
LogisticRegressionModel lrModel = lr.fit(data[0]);
lrModel.transform(data[1]).select("label1", "prediction").show();

如果在對數據進行特征工程的時候將標簽以及特征列的名稱進行了修改,那么我們也需要通過lr.setLabelCol以及lr.setFeaturesCol進行同步修改調整。同時框架也提供了explainParams方法打印模型中可供調整的參數。

3. 流水線

對於機器學習,后期工作基本就是對各種參數的調優,為此Spark提供了友好的流水線,並基於其本平台分布式計算集群的能力助力我們縮短對不同參數模型的訓練與評估,從而提供最佳的參數模型供我們使用,下面我們將一步一步介紹如何使用其提供的該特性。首先我們定義工作流中涉及到的階段步驟,具體如下所示。

Dataset<Row> df = session.read().json("sparkdemo/data/simple-ml.json");
Dataset<Row>[] data = df.randomSplit(new double[] {0.7, 0.3});

RFormula rForm = new RFormula();
LogisticRegression lr = new LogisticRegression();
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { rForm, lr });

上述完成工作流水線各階段的任務后,接下來我們就需要指定各階段的參數列表,從而便於Spark形成不同的組合進行模型訓練。

Seq<String> formulaParam = JavaConverters.asScalaIteratorConverter(Arrays.asList("lab ~ . + color:value1", "lab ~ . + color:value1 + color:value2").iterator()).asScala().toSeq();

ParamMap[] params = new ParamGridBuilder()
    .addGrid(rForm.formula(), formulaParam)
    .addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0})
    .addGrid(lr.regParam(), new double[]{0.1, 2.0})
    .build();

有了以上其實我們就可以單純的進行模型訓練了,但是這樣訓練除的模型並無法評估出最好的一個模型。我們需要指定一個評估器用來評估實際效果是否符合最佳。這里我們主要采用了BinaryClassificationEvaluator類。

BinaryClassificationEvaluator evaluator = new BinaryClassificationEvaluator()
    .setMetricName("areaUnderROC")
    .setRawPredictionCol("prediction")
    .setLabelCol("label");

最后我們需要能夠自動調整超參數,並自動分配數據集的方式將上述的各部分組成從而形成最終有效的模型。

TrainValidationSplit tvs = new TrainValidationSplit()
    .setTrainRatio(0.75)
    .setEstimatorParamMaps(params)
    .setEstimator(pipeline)
    .setEvaluator(evaluator);

而具體的使用與之前邏輯回歸的方式如出一轍。

TrainValidationSplitModel model = tvs.fit(data[0]);
System.out.println(evaluator.evaluate(model.transform(data[1])));

如果讀者需要將該模型進行持久化可以采用model.write().overwrite().save("sparkdemo/data/model");該方式進行實際的持久化,當然讀取時需要采用與寫入一致的類,否則將無法正確讀取。

二、特征工程

參考機器學習教程中對應章節的內容可彌補關於各類算法的基礎知識,接下來我們將僅從基於Spark的實戰角度出發進行列舉常用的方式對特定的數據預處理。

1. 通用

下面我們將介紹較通用的三種的針對數據進行處理的方式,其中一種上述的教程已經使用了,這里將僅做為介紹進行概述。

點擊此處查看代碼示例

1) RFormula

其主要參考了基於R語言的formula設計思路,當然其中僅僅支持有限有限的操作符。並且其中對於字符串的處理是采用獨熱編碼(One-hot)的方式,具體的使用方式如下。

val supervised = new RFormula().setFormula("lab ~ . + color:value1 + color:value2")
supervised.fit(df).transform(df).show()

支持的操作符如下:

~ 分割標簽與特征
+ 將兩個特征相加,+ 0代表除去截距
- 減去一個特征, - 1代表除去截距
: 將多個特征相乘變成一個特征
. 選取所有特征

如果讀者不了解其中表達式的作用,接下來我們將舉一個例子,假設a,b為2個特征,y是應變量。利用上述的公式我們將可以寫出如下的語句。

y ~ a + b: 對應到線性模型的公式為: y = w0 + w1 * a + w2 * b,其中w0為截距
y ~ a + b + a:b - 1: 對應的線性模型的公式為: y = w1 * a + w2 * b + w3 * a * b,由於-1的存在所以沒有截距

如果讀者還是不能理解我們可以通過具體的例子來進行介紹,首先我們准備以下相關數據。

lab value1 value2
good 13 2.1
bad 9 8.2

將上面的數據采用公式lab ~ value1 + value2進行處理后,結果數據將如下所示。

lab value1 value2 features label
good 13 2.1 [13.0, 2.1] 1
bad 9 8.2 [9.0, 8.2] 0

上述我們可以看到針對字符串類型的標簽采用了字符串索引的方式進行映射,至此關於RFormula介紹到此為止。

2) SQLTransformer

即利用Spark提供的眾多關鍵字對數據通過SQL語句的方式進行處理,這里需要注意的是不要直接
使用標名,如果需要引用本表則需要通過關鍵字__THIS__來引用。

val basicTrans = new SQLTransformer()
    .setStatement("""
        SELECT sum(value1), count(*), color
        FROM __THIS__
        GROUP BY color
    """)
basicTrans.transform(df).show()

3) VectorAssembler

如果需要將多個Boolean,Double或Vector類型做為輸入合並為一個大的向量則可以使用該函數
實現我們所需要的效果。

val va = new VectorAssembler().setInputCols(Array("value1", "value2"))
va.transform(df).show()

4) VectorIndexer

將一個向量列中的特征進行索引,一般用於決策樹。其對於離散特征的索引是基於0開始的,但是並不保證值每次對應的索引值一致,但是對於0必然是會映射到0。

val va = new VectorAssembler().setInputCols(Array("value1", "value2")).setOutputCol("features")
val vva = va.transform(df)

val vi = new VectorIndexer().setInputCol("features").setOutputCol("indexFeatures").fit(vva)
vi.transform(vva).show()

如果讀者希望控制最大的分類數,則可以通過setMaxCategories繼續控制。

2. 連續特征

這部分我們主要使用分桶,縮放與歸一化。其中分桶對應前面所屬的分箱行為。以下僅僅介紹較為
簡單的分桶方式,如果讀者需要學習更高級的方式可以學習如局部敏感哈希(LSH)等算法。

1) 分桶(Bucketizer)

最好理解的分桶方式,即我們認為的將數值的區間限定好,讓對應的數據歸納到對應的桶內。對於
無限大與無限小則可以使用Double.PositiveInfinityDouble.NegativeInfinity填充。
同時為了處理null或NaN值,必須指定handlerInvalid參數,如果需要保留可以使用keep,報錯
則填入null或error,以及skip跳過。

val bucketBorders = Array(0.0, 10.0, 20.0, 30.0, 40.0, 50.0)
val bucketer = new Bucketizer().setSplits(bucketBorders).setInputCol("value1")
bucketer.transform(df).show()

2) 分桶(QuantileDiscretizer)

該方式不用用戶進行預設,其可以根據百分比進行拆分,讀者可以通過setRelativeError設置
近似分位數計算的相對誤差。通過設置handleInvalid選擇保留還是刪除數據集中的NaN值。如果
選擇保留NaN值,則將對其進行特殊處理並將其放入自己的存儲桶中,如讀者設定4個桶,則其會被
放入一個特殊的桶[4]中。

val bucketer = new QuantileDiscretizer().setNumBuckets(5).setInputCol("value1").setOutputCol("val1category")
var bucketerModel = bucketer.fit(df)
bucketerModel.transform(df).show()

3) 歸一化(StandardScaler)

基於StandardScaler將一組特征值歸一化成平均值為0而標准偏差為1的一組新值。通過withStd標志設置單位標准差,而withMean標識將使數據在縮放之前進行中心化。稀疏向量中心化非常耗時,因為一般會將它們轉化為稠密向量。

val sScaler = new StandardScaler().setInputCol("features")
sScaler.fit(rdf).transform(rdf).show()

4) 縮放(MinMaxScaler)

將向量中的值基於給定的最小值到最大值按比例縮放。如最小值為0且最大值為1,則所有值將介於0和1之間。

val minMax = new MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
minMax.fit(rdf).transform(rdf).show()

5) 縮放(MaxAbsScaler)

基於MaxAbsScaler將每個值除以該特征的最大絕對值來縮放數據。計算后的值將在-1與1之間。

val maScaler = new MaxAbsScaler().setInputCol("features")
maScaler.fit(rdf).transform(rdf).show()

6) 縮放(ElementwiseProduct)

基於ElementwiseProduct將一個縮放向量對某向量中的每個值以不同的尺度進行縮放。

val scalingUp = new ElementwiseProduct()
    .setScalingVec(Vectors.dense(10.0, 20.0))
    .setInputCol("features")
scalingUp.transform(rdf).show()

7) 縮放(Normalizer)

用冪范數來縮放多維向量,Normalizer的作用范圍是每一行。其通過參數P設置幾范數,為1表示曼哈頓范數,2表示歐幾里德范數等。

val manhattanDistance = new Normalizer().setP(1).setInputCol("features")
manhattanDistance.transform(rdf).show()

3. 類型特征

針對數據的處理,往往會存在大量枚舉指標。有可能是字符串,也有可能是數字等形式。針對字符
串形式的類型數據,我們就需要將其進行轉換,從而便於進行后續的數據分析處理。

1) StringIndexer

最簡單的方式就是將對應的類型映射到對應ID形成關系。

val lblIndxr = new StringIndexer().setInputCol("lab").setOutputCol("labelInd")
val idxRes = lblIndxr.fit(rdf).transform(rdf)
idxRes.show()

基於字符串索引的方式還可以根據索引反推出對應的類型名稱。

val labelReverse = new IndexToString().setInputCol("labelInd")
labelReverse.transform(idxRes).show()

2) OneHotEncoder

基於One-hot(獨熱編碼)在類別變量索引之后進行轉換。其目的主要是因為經過字符串索引后就存在了一種大小關系,比如對應顏色這種類別是無意義的所以我們需要將其轉換為向量中的一個布爾值元素。

val ohe = new OneHotEncoder().setInputCol("labelInd")
ohe.transform(idxRes).show()

4. 文本數據特征

在實際的預處理過程中我們往往會遇到需要將文檔等由多種詞語構成的數據進行分析。而針對這類
文本數據的分析,我們需要借助以下幾個流程將這類文本數據轉換為具體的特征數據從而便於我們
進行數據的分析處理。

1) Tokenizer

首先我們需要將一段話進行分詞,分詞是將任意格式的文本轉變成一個“符號”列表或者一個單詞列表的過程。

val tkn = new Tokenizer().setInputCol("Description").setOutputCol("DescOut")
val tokenized = tkn.transform(sales.select("Description"))
tokenized.show()

2) RegexTokenizer

不僅可以基於Tokenizer通過空格分詞,也可以使用RegexTokenizer指定的正則表達式來分詞。

val rt = new RegexTokenizer().setInputCol("Description")
    .setOutputCol("DescOut")
    .setPattern(" ")
    .setToLowercase(true)
rt.transform(sales.select("Description")).show()

3) StopWordsRemover

分詞后的一個常見任務是過濾停用詞,這些常用詞在許多分析中沒有什么意義,因此應被刪除。

val englishStopWords = StopWordsRemover.loadDefaultStopWords("english")
val stops = new StopWordsRemover().setStopWords(englishStopWords)
    .setInputCol("DescOut")
    .setOutputCol("StopWords")
stops.transform(tokenized).show()

4) NGram

字符串分詞和過濾停用詞之后,會得到可作為特征的一個詞集合。

val bigram = new NGram().setInputCol("DescOut").setN(2)
bigram.transform(tokenized.select("DescOut")).show()

5) CountVectorizer

一旦有了詞特征,就可以開始對單詞和單詞組合進行計數,以便在我們的模型中使用可以通過setMinTF來決定詞庫中是否包含某項,通過setVocabSize設置總的最大單詞量。

val cv = new CountVectorizer()
    .setInputCol("DescOut")
    .setOutputCol("countVec")
    .setVocabSize(500)
    .setMinTF(1)
    .setMinDF(2)
val fittedCV = cv.fit(tokenized)
fittedCV.transform(tokenized).show()

實際上它是一個稀疏向量,包含總的詞匯量、詞庫中某單詞的索引,以及該單詞的計數。

6) TF-IDF

另一種將本文轉換為數值表示的方法是使用詞頻-逆文檔頻率(TF-IDF)。最簡單的情況是,TF-IDF度量一個單詞在每個文檔中出現的頻率,並根據該單詞出現過的文檔數進行加權,結果是在較少文檔中出現的單詞比在許多文檔中出現的單詞權重更大。如果讀者希望能夠更深入的單獨了解該技術可以閱讀本文章

val tf = new HashingTF().setInputCol("DescOut")
    .setOutputCol("TFOut")
    .setNumFeatures(10000)
var idf = new IDF().setInputCol("TFOut")
    .setOutputCol("IDFOut")
    .setMinDocFreq(2)
idf.fit(tf.transform(tokenized)).transform(tf.transform(tokenized)).show()

輸出顯示總的詞匯量、文檔中出現的每個單詞的哈希值,以及這些單詞的權重。

7) Word2Vec

因為機器學習只能接受數值形式,為此需要進行轉換,而Word2Vec就是詞嵌入的中的一種。而Spark內使用的是基於skip-gram模型,而該模型主要是根據輸入的詞語推算出上下文可能與該詞組合的其他詞語。如果希望學習Word2Vec則可以參考本文檔

val word2Vec = new Word2Vec().setInputCol("DescOut").setOutputCol("result").setVectorSize(3).setMinCount(0)
val model = word2Vec.fit(tokenized)
model.transform(tokenized).show()

5. 特征操作

1) PCA

主成分(PCA)是一種數據方法, 用於找到我們的數據中最重要的成分。PCA使用參數k指定要創建的輸出特征的數量,這通常應該比輸入向量的尺寸小的多。

val pca = new PCA().setInputCol("features").setK(2)
pca.fit(featureDF).transform(featureDF).show()

6. 多項式擴展

1) PolynomialExpansion

多項式擴展基於所有輸入列生成交互變量。對於一個二階多項式,Spark把特征向量中的每個值乘以所有其他值,然后將結果存儲成特征。

val pe = new PolynomialExpansion().setInputCol("features").setDegree(2)
pe.transform(featureDF).show()

多項式擴展會增大特征空間,從而導致高計算成本和過擬合效果,所以請效性使用。

7. 特征選擇

1) ChiSqSelector

ChiSqSelector利用統計測試來確定與我們試圖預測的標簽無關的特征,並刪除不相關的特征。其提供了以下集中方法:

  1. numTopFea tures:基於p-value排序
  2. percentile:采用輸入特征的比例
  3. fpr:設置截斷p-value
val chisq = new ChiSqSelector().setFeaturesCol("features")
    .setLabelCol("labelInd")
    .setNumTopFeatures(1)
chisq.fit(featureDF).transform(featureDF).show()

三、 分類算法

1. 邏輯回歸

邏輯回歸是一種線性模型,為輸入的每個特征賦以權重之后將他們組合在一起,從而獲得該輸入屬於特定類的概率。

超參數

參數名 說明
family 可以設置為multinomial(多分類)或binary(二分類)
elasticNetParam 從0到1的浮點值。該參數依照彈性網絡正則化的方法將L1正則化和L2正則化混合(即兩者的線性組合)
fitIntercept 此超參數決定是否適應截距
regParam 確定在目標函數中正則化項的權重,它的選擇和數據集的噪聲情況和數據維度有關
standardization 可以為true或false,設置它用於決定在將輸入數據傳遞到模型之前是否要對其標准化

訓練參數

參數名 說明
maxIter 迭代次數
tol 此值指定一個用於停止迭代的閾值
weightCol 權重列的名稱,用於賦予某些行更大的權重

預測參數

參數名 說明
threshold 此參數事預測時的概率閾值,你可以根據需要調整此參數以平衡誤報和漏報

對於多項式分類模型(多分類) lrModel.coefficientMatrix和lrModel.interceptVector可以用來得到系數和截距值

val lr = new LogisticRegression()
val lrModel = lr.fit(bInput)

println(lrModel.coefficients) // 輸出 系數
println(lrModel.intercept) // 輸出 截距

2. 決策樹

決策樹是一種更友好和易於理解的分類方法,因為它類似人類經常使用的簡單決策模型。

超參數

參數名 說明
maxDepth 指定最大深度
maxBins 確定應基於連續特征創建多少個槽,更多的槽提供更細的粒度級別
impurity 不純度表示是否應該在某葉子結點拆分的度量(信息增益),此參數可以設置為entropy或者gini
minInfoGain 此參數確定可用於分割的最小信息增益
minInstancePerNode 此參數確定需要在一個節點結束訓練的實例最小數目

訓練參數

參數名 說明
checkpointInterval 檢查點是一種在訓練過程中保存模型的方法,此方法可以保證當集群節點因某種原因奔潰時不影響整個訓練過程
val dt = new DecisionTreeClassifier()
val dtModel = dt.fit(bInput)
println(dtModel.explainParams())

3. 隨機森林與梯度提升

隨機森林,我們只訓練大量的樹,然后平均他們的結果做出預測。利用梯度提升樹,每棵樹進行加權預測。

隨機森林超參數

參數名 說明
numTrees 用於訓練的樹總數
featureSubsetStrategy 此參數確定拆分時應考慮多少特征

梯度提升樹超參數

參數名 說明
lossType 損失函數,目前僅支持logistic loss損失
maxIter 迭代次數
stepSize 代表算法的學習速度
val rfClassifier = new RandomForestClassifier()
println(rfClassifier.explainParams())
val rfModel = rfClassifier.fit(bInput)

val gbtClassifier = new GBTClassifier()
println(gbtClassifier.explainParams())
val gbtModel = gbtClassifier.fit(bInput)

4. 朴素貝葉素

朴素貝葉素分類是基於貝葉斯定理的分類方法,通常用於文本或文檔分類,所有的輸入特征碧璽為非負數。

超參數

參數名 說明
modelType 可選bernoulli或multinomial
weightCol 允許對不同的數據點賦值不同的權重

訓練參數

參數名 說明
smoothing 它指定使用加法平滑時的正則化量,改設置有助於平滑分兩類數據
val nb = new NaiveBayes()
println(nb.explainParams())
val nbModel = nb.fit(bInput)

對於二分類,我們使用BinaryClassificationEvaluator,它支持優化兩個不同的指標areaUnderRocareaUnderPR
對於多分類,需要使用MulticlassClassificationEvaluator,它支持優化f1,weightedPrecision,weightedRecall,accuracy

四、 回歸算法

1. 線性回歸

線性回歸假定輸入特征的線性組合(每個特征乘以權重的綜合)將得到(有一定高斯誤差的)輸出結果,具體參數同分類中該算法。

val lr = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
println(lr.explainParams())
val lrModel = lr.fit(df)

2. 廣義線性回歸

線性回歸的廣義形式使你可以更細粒度地控制使用各種回歸模型

超參數

參數名 說明
family 指定在模型中使用的誤差分布,支持Poisson、binomial、gamma、Caussian和tweedie。
link 鏈接函數的名稱,指定線性預測器與分布函數平均值之間的關系,支持cloglog、probit、logit、reverse、sqrt、identity和log
solver 指定的優化算法。
variancePower Tweedie分布方差函數中的冪。
linkPower Tweedie分布的乘冪鏈接函數索引。

預測參數

參數名 說明
linkPredictionCol 指定一個列名,為每個預測保存我們的鏈接函數。
val glr = new GeneralizedLinearRegression().setFamily("gaussian").setLink("identity").setMaxIter(10).setRegParam(0.3).setLinkPredictionCol("linkOut")
println(glr.explainParams())
val glrModel = glr.fit(df)

3. 決策樹

用於回歸分析的決策樹不是在每個葉子節點上輸出離散的標簽,而是一個連續的數值,但是可解釋性和模型結構仍然適用。對應參數可以參考分類中該算法。

val dtr = new DecisionTreeRegressor()
println(dtr.explainParams())
val dtrModel = dtr.fit(df)

4. 隨機森林和梯度提升樹

隨機森林和梯度提升樹模型可應用於分類和回歸,它們與決策樹具有相同的基本概念,不是訓練一棵樹而是很多樹來做回歸分析。

val rf = new RandomForestRegressor()
println(rf.explainParams())
val rfModel = rf.fit(df)

val gbt = new GBTRegressor()
println(gbt.explainParams())
var gbtModel = gbt.fit(df)

5. 評估器和自動化模型校正

用於回歸任務的評估器稱為 RegressionEvaluator,支持許多常見的回歸度量標准。與分類評估器一樣,RegressionEvaluator 需要兩項輸入,一個表示預測值,另一個表示真是標簽的值。

val glr = new GeneralizedLinearRegression()
    .setFamily("gaussian")
    .setLink("identity")

val pipeline = new Pipeline().setStages(Array(glr))
val params = new ParamGridBuilder().addGrid(glr.regParam, Array(0, 0.5, 1)).build()
val evaluator = new RegressionEvaluator()
    .setMetricName("rmse")
    .setPredictionCol("prediction")
    .setLabelCol("label")
val cv = new CrossValidator()
    .setEstimator(pipeline)
    .setEvaluator(evaluator)
    .setEstimatorParamMaps(params)
    .setNumFolds(2)
val model = cv.fit(df)

五、 推薦系統

主要采用ALS(交替最小二乘法)為每個用戶和物品建立k維的特征向量,從而可以通過用戶和物品向量的點積來估算該用戶
對物品的評分值,所以只需要用戶-物品對的評分數據作為輸入數據集,其中有三列:用戶Id列、物品Id列和評分列。評分
可以是顯式的,即我們想要直接預測的數值登記;或隱式的,在這種情況下,每個分數表示用戶和物品之間的交互強度,它
衡量用戶對該物品的偏好程度。

超參數

參數名 說明
rank rank確定了用戶和物品特征向量的維度,這通常是通過實驗來調整,一個重要權衡是過高的秩導致過擬合,而過低的秩導致不能做出最好的預測
alpha 在基於隱式反饋的數據上進行訓練時,alpha設置偏好的基線置信度,這個值越大則越認為用戶和他沒有評分的物品之間沒有關聯
regParam 控制正則化參數來防止過擬合,需要測試不同的值來找到針對你的問題的最優的值
implicitPrefs 此布爾值指定是在隱式數據還是顯式數據上進行訓練
nonnegative 如果設置為true,則將非負約束置於最小二乘問題上,並且只返回非負特征向量,這可以提高某些應用程序的性能

訓練參數

其中最終主要的就是數據塊,通常的做法是每個數據塊大概分配一百萬到五百萬各評分值,如果每個數據塊的數據量少於這個數字,則太多的數據塊可能會影響性能。

參數名 說明
numUserBlocks 確定將用戶數據拆分成多少各數據塊
numItemBlocks 確定將物品數據拆分為多少各數據塊
maxIter 訓練的迭代次數
checkpointInterval 設置檢查點可以在訓練過程中保存模型狀態
seed 指定隨機種子幫助復現實驗結果

預測參數

主要的參數為冷啟動策略,該參數用來設置模型應為未出現過在訓練集中的用戶或物品推薦什么,可通過coldStartStrategy設置,可以選擇drop和nan設置。

val conf = new SparkConf().setAppName("ScalaSparkML").setMaster("local")
val spark = SparkSession.builder().config(conf).getOrCreate()

val ratings = spark.read.textFile("data/sample_movielens_ratings.txt")
            .selectExpr("split(value, '::') as col")
            .selectExpr("cast(col[0] as int) as userId",
            "(col[1] as int) as movieId",
            "(col[2] as float) as rating",
            "(col[3] as long) as timestamp")
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
val als = new ALS().setMaxIter(5).setRegParam(0.01).setUserCol("userId").setItemCol("movieId").setRatingCol("rating")
println(als.explainParams())

val alsModel = als.fit(ratings)
val predictions = alsModel.transform(test)

模型的recommendForAllUsers方法返回對應某個userId的DataFrame,包含推薦電影的數組,以及每個影片的評分。recommendForAllItems返回對應某個
movieId的DataFrame以及最有可能給該影片打高分的前幾個用戶。

alsModel.recommendForAllUsers(10)
    .selectExpr("userId", "explode(recommendations)").show()
alsModel.recommendForAllItems(10)
    .selectExpr("movieId", "explode(recommendations)").show()

針對訓練的模型效果我們依然需要針對其進行評估,這里我們可以采用與回歸算法中相同的評估器進行評估。

val evaluator = new RegressionEvaluator()
    .setMetricName("rmse")
    .setLabelCol("rating")
    .setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

對於度量指標我們通過回歸度量指標與排名指標進行度量,首先是回歸度量指標,我們可以簡單的查看每個用戶和項目的實際評級與預測值的接近程度。

val regComparison = predictions.select("rating", "prediction")
                    .rdd.map(x => (x.getFloat(0).toDouble, x.getFloat(1).toDouble))
val metrics = new RegressionMetrics(regComparison)

六、 無監督學習

1. k-means

k-means算法中,用戶在數據集中隨機選擇數量為k的數據點作為處理聚類的聚類中心,未分配的點基於他們與這些
聚類中心的相似度(歐氏距離)倍分配到離他們最近的聚類中。分配之后,再根據被分配到一個聚類的數據點再計算
聚類的新中心,並重復該過程,直到到達有限的迭代次數或直到收斂。

超參數

參數名 說明
k 指定你希望的聚類數量

訓練參數

參數名 說明
initMode 初始化模式是確定質心初始位置的算法,可支持 random(隨機初始化)與k-means
initSteps k-means模式初始化所需要的步數
maxIter 迭代次數
tol 該閾值指定質心改變到該程度后優化結束
val km = new KMeans().setK(3)
println(km.explainParams())
val kmModel = km.fit(sales)

val summary = kmModel.summary
summary.clusterSizes // 中心點數量

計算數據點與每個聚類中心點的距離,可以采用ClusterEvaluator評估器

2. 二分k-means

該算法為k-means變體,關鍵區別在於,它不是通過“自下而上”的聚類,而是自上而下的聚類方法。就是通過創建一個組
然后進行拆分直到拆分到k個組。

超參數

參數名 說明
k 指定你希望的聚類數量

訓練參數

參數名 說明
minDivisibleClusterSize 指定一個可分聚類中的最少數據點數
maxIter 迭代次數
val bkm = new BisectingKMeans().setK(5).setMaxIter(5)
println(bkm.explainParams())
val bkmModel = bkm.fit(sales)

val summary = bkmModel.summary
summary.clusterSizes

3. 高斯混合模型

一種簡單理解高斯混合模型的方法是,它們就像k0means的軟聚類斑斑(軟聚類softclustering即每個數據點可以划分到多個聚類中),而
k-means創建硬聚合(即每個點僅在一個聚類中),高斯混合模型GMM依照概率而不是硬性邊界進行聚類。

超參數

參數名 說明
k 聚類數量

訓練參數

參數名 說明
maxIter 迭代次數
tol 指定一個閾值來代表將模型優化到什么程度就夠了
val gmm = new GaussianMixture().setK(5)
println(gmm.explainParams())
val model = gmm.fit(sales)
model.gaussiansDF.show()
model.summary.probability.show()

4. LDA主題模型

隱含狄利克雷分布式一種通常用於對文本文檔執行主體建模的分層聚類模型。LDA試圖從與這些主題相關聯的一系列文檔和關鍵字
中提取高層次的主題,然后它將每個文檔解釋為多個輸入主題的組合。

超參數

參數名 說明
k 用於指定從數據中提取的主題數量
docConcentration 文檔分布的濃度參數向量
topicConcentration 主題分布的濃度參數向量

訓練參數

參數名 說明
maxIter 最大迭代次數
optimizer 指定是使用EM還是在線訓練方法來優化DA模型
learningDecay 學習率,即指數衰減率
learningOffset 一個正數值的學習參數,在前幾次迭代中會遞減
optimizeDocConcentration 指示docConcentration是否在訓練過程中進行優化
subsamplingRate 在微型批量梯度下降的每次迭代中采樣的樣本比例
seed 隨機種子
checkpointInterval 檢查點

預測參數

參數名 說明
topicDistributionCol 將每個文檔的主題混合分布輸出作為一列保存起來
val tkn = new Tokenizer().setInputCol("Description").setOutputCol("DescOut")
val tokenized = tkn.transform(sales.drop("features"))
val cv = new CountVectorizer()
    .setInputCol("DescOut")
    .setOutputCol("features")
    .setVocabSize(500)
    .setMinTF(0)
    .setMinDF(0)
    .setBinary(true)
val cvFitted = cv.fit(tokenized)
val prepped = cvFitted.transform(tokenized)

val lda = new LDA().setK(10).setMaxIter(5)
println(lda.explainParams())
val model = lda.fit(prepped)

model.describeTopics(3).show()

七、 模型服務化

當我們訓練好模型后,往往需要將模型導出,便於實際應用服務導入從而實現具體的功能實現。為此下述我們將列舉多種
常用的方式進行介紹。從而便於讀者可以根據實際的場景便於選擇具體的方式方法。

1. PMML

點擊此處查看代碼示例

下面我們將以Spark ML訓練模型,並將模型導出為PMML供Java應用進行調用,為此我們需要使用以下幾個類庫。

首先我們需要將訓練得到的模型持久化以便於實際服務的加載使用,由於苯本篇幅相關的模型由Spark ML訓練而得,所
以我們將在在訓練結束后進行。需要注意的是僅支持PipelineModel類型持久化,如果是單一的模型如LogisticRegression則需要將其填入到具體的Pipeline對象中,以下為具體的持久化代碼。

val df = spark.read.json("data/simple-ml.json")
val iris = df.schema
var model = pipeline.fit(train)

val pmml = new PMMLBuilder(iris, model).build()
JAXBUtil.marshalPMML(pmml, new StreamResult(new File("data/model")))

其中PMMLBuilder需要將數據模型的元數據,以及對應訓練好的模型放入構造函數中,借助於JAXBUtil工具類將PMML持久化為具體的文件。完成上述的模型寫入后,我們就可以在具體需使用的應用中引用依賴,然后基於下述的方式進行讀取即可。

var eval = new LoadingModelEvaluatorBuilder().load(new File("data/model")).build()
eval.verify()

var inputFields = eval.getInputFields()
val arguments = new LinkedHashMap[FieldName, FieldValue]

for (inputField <- inputFields) {
    var fieldName = inputField.getFieldName()
    var value = data.get(fieldName.getValue())
    val inputValue = inputField.prepare(value)

    arguments.put(fieldName, inputValue)
}

var result = eval.evaluate(arguments)
var resultRecoard = EvaluatorUtil.decodeAll(result)

上述代碼中我們需要通過其提供的getInputFields方法獲取算法模型所需要的入參,根據入參的name匹配到實際業務場景中對應的數據,當然這里筆者實際數據名稱與模型名稱一致,所以直接采用data.get(fieldName.getValue())獲取到對應的值,通過inputField.prepare(value)轉換到要求的對象類型。最后將數據填入字典后通過eval.evaluate(arguments)即可使用對應算法模型進行模型調用。當然返回的結果也是字典類型,我們需要根據實際需要從中讀取我們感興趣的值即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM