Spark MLib
在Spark下進行機器學習,必然無法離開其提供的MLlib框架,所以接下來我們將以本框架為基礎進行實際的講解。首先我們需要了解其中最基本的結構類型,即轉換器、估計器、評估器和流水線。
首先歡迎大家Start本人關於機器學習的學習倉庫,不僅僅包含了Spark ML還包括python下的sklearn等主流庫。
一、基礎使用
接下來我們將以一個簡單的例子為基礎整體介紹在Spark下進行機器學習的使用方式,便於讀者大體熟悉完整的流程節點。當然在這其中對於部分不了解的情況下可以等在后續詳細學習的過程中進行補充即可。
1. 特征工程
這部分相關知識可以參考本人編寫的人工智能專題的開源教程,其中對該部分進行詳細的說明,下面我們將就框架提供的RFormula
進行具體的實戰操作(這里熟悉R語言的可能對此比較熟悉,本身就是借鑒了R語言,但是僅實現了其中的一個子集),對於我們需要進行特征化的數據首先我們需要定義對應的線性模型公式,具體如下。
Dataset<Row> df = session.read().json("sparkdemo/data/simple-ml");
RFormula supervised = new RFormula().setFormula("lab ~ . + color: value1 + color: value2");
當然僅僅通過上述的方式還不能實現對數據的特征化,我們還需要通過數據對其進行訓練,從而得到我們所需的轉換器,為此我們需要使用其中的fit
方法進行轉換。
RFormulaModel model = supervised.fit(df);
完成轉換器的訓練后我們就可以利用其進行實際的轉換操作,從而生成特征features
與標簽label
列,當然讀者也可以通過supervised.setLabelCol
設置標簽列名,supervised.setFeaturesCol
設置特征列名。對於監督學習都需要將數據分為樣本數據與測試數據,為此我們需要通過以下方式將數據拆分。
Dataset<Row>[] data = preparedDF.randomSplit(new double[]{0.7, 0.3});
2. 模型訓練
在Spark MLib中為估計器,這里我們將采用邏輯回歸的算法做為演示,提供一個分類算法模型的訓練,首先我們實例化我們需要的模型類,通過其提供的方式對將訓練數據傳入其中進行模型的訓練。
LogisticRegression lr = new LogisticRegression();
LogisticRegressionModel lrModel = lr.fit(data[0]);
lrModel.transform(data[1]).select("label1", "prediction").show();
如果在對數據進行特征工程的時候將標簽以及特征列的名稱進行了修改,那么我們也需要通過lr.setLabelCol
以及lr.setFeaturesCol
進行同步修改調整。同時框架也提供了explainParams
方法打印模型中可供調整的參數。
3. 流水線
對於機器學習,后期工作基本就是對各種參數的調優,為此Spark提供了友好的流水線,並基於其本平台分布式計算集群的能力助力我們縮短對不同參數模型的訓練與評估,從而提供最佳的參數模型供我們使用,下面我們將一步一步介紹如何使用其提供的該特性。首先我們定義工作流中涉及到的階段步驟,具體如下所示。
Dataset<Row> df = session.read().json("sparkdemo/data/simple-ml.json");
Dataset<Row>[] data = df.randomSplit(new double[] {0.7, 0.3});
RFormula rForm = new RFormula();
LogisticRegression lr = new LogisticRegression();
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { rForm, lr });
上述完成工作流水線各階段的任務后,接下來我們就需要指定各階段的參數列表,從而便於Spark形成不同的組合進行模型訓練。
Seq<String> formulaParam = JavaConverters.asScalaIteratorConverter(Arrays.asList("lab ~ . + color:value1", "lab ~ . + color:value1 + color:value2").iterator()).asScala().toSeq();
ParamMap[] params = new ParamGridBuilder()
.addGrid(rForm.formula(), formulaParam)
.addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0})
.addGrid(lr.regParam(), new double[]{0.1, 2.0})
.build();
有了以上其實我們就可以單純的進行模型訓練了,但是這樣訓練除的模型並無法評估出最好的一個模型。我們需要指定一個評估器用來評估實際效果是否符合最佳。這里我們主要采用了BinaryClassificationEvaluator
類。
BinaryClassificationEvaluator evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setRawPredictionCol("prediction")
.setLabelCol("label");
最后我們需要能夠自動調整超參數,並自動分配數據集的方式將上述的各部分組成從而形成最終有效的模型。
TrainValidationSplit tvs = new TrainValidationSplit()
.setTrainRatio(0.75)
.setEstimatorParamMaps(params)
.setEstimator(pipeline)
.setEvaluator(evaluator);
而具體的使用與之前邏輯回歸的方式如出一轍。
TrainValidationSplitModel model = tvs.fit(data[0]);
System.out.println(evaluator.evaluate(model.transform(data[1])));
如果讀者需要將該模型進行持久化可以采用model.write().overwrite().save("sparkdemo/data/model");
該方式進行實際的持久化,當然讀取時需要采用與寫入一致的類,否則將無法正確讀取。
二、特征工程
參考機器學習教程中對應章節的內容可彌補關於各類算法的基礎知識,接下來我們將僅從基於Spark的實戰角度出發進行列舉常用的方式對特定的數據預處理。
1. 通用
下面我們將介紹較通用的三種的針對數據進行處理的方式,其中一種上述的教程已經使用了,這里將僅做為介紹進行概述。
1) RFormula
其主要參考了基於R語言的formula設計思路,當然其中僅僅支持有限有限的操作符。並且其中對於字符串的處理是采用獨熱編碼(One-hot)的方式,具體的使用方式如下。
val supervised = new RFormula().setFormula("lab ~ . + color:value1 + color:value2")
supervised.fit(df).transform(df).show()
支持的操作符如下:
~ 分割標簽與特征
+ 將兩個特征相加,+ 0代表除去截距
- 減去一個特征, - 1代表除去截距
: 將多個特征相乘變成一個特征
. 選取所有特征
如果讀者不了解其中表達式的作用,接下來我們將舉一個例子,假設a,b為2個特征,y是應變量。利用上述的公式我們將可以寫出如下的語句。
y ~ a + b
: 對應到線性模型的公式為: y = w0 + w1 * a + w2 * b
,其中w0為截距
y ~ a + b + a:b - 1
: 對應的線性模型的公式為: y = w1 * a + w2 * b + w3 * a * b
,由於-1的存在所以沒有截距
如果讀者還是不能理解我們可以通過具體的例子來進行介紹,首先我們准備以下相關數據。
lab | value1 | value2 |
---|---|---|
good | 13 | 2.1 |
bad | 9 | 8.2 |
將上面的數據采用公式lab ~ value1 + value2
進行處理后,結果數據將如下所示。
lab | value1 | value2 | features | label |
---|---|---|---|---|
good | 13 | 2.1 | [13.0, 2.1] | 1 |
bad | 9 | 8.2 | [9.0, 8.2] | 0 |
上述我們可以看到針對字符串類型的標簽采用了字符串索引的方式進行映射,至此關於RFormula
介紹到此為止。
2) SQLTransformer
即利用Spark提供的眾多關鍵字對數據通過SQL語句的方式進行處理,這里需要注意的是不要直接
使用標名,如果需要引用本表則需要通過關鍵字__THIS__
來引用。
val basicTrans = new SQLTransformer()
.setStatement("""
SELECT sum(value1), count(*), color
FROM __THIS__
GROUP BY color
""")
basicTrans.transform(df).show()
3) VectorAssembler
如果需要將多個Boolean,Double或Vector類型做為輸入合並為一個大的向量則可以使用該函數
實現我們所需要的效果。
val va = new VectorAssembler().setInputCols(Array("value1", "value2"))
va.transform(df).show()
4) VectorIndexer
將一個向量列中的特征進行索引,一般用於決策樹。其對於離散特征的索引是基於0開始的,但是並不保證值每次對應的索引值一致,但是對於0必然是會映射到0。
val va = new VectorAssembler().setInputCols(Array("value1", "value2")).setOutputCol("features")
val vva = va.transform(df)
val vi = new VectorIndexer().setInputCol("features").setOutputCol("indexFeatures").fit(vva)
vi.transform(vva).show()
如果讀者希望控制最大的分類數,則可以通過setMaxCategories
繼續控制。
2. 連續特征
這部分我們主要使用分桶,縮放與歸一化。其中分桶對應前面所屬的分箱行為。以下僅僅介紹較為
簡單的分桶方式,如果讀者需要學習更高級的方式可以學習如局部敏感哈希(LSH)等算法。
1) 分桶(Bucketizer)
最好理解的分桶方式,即我們認為的將數值的區間限定好,讓對應的數據歸納到對應的桶內。對於
無限大與無限小則可以使用Double.PositiveInfinity
與Double.NegativeInfinity
填充。
同時為了處理null或NaN值,必須指定handlerInvalid參數,如果需要保留可以使用keep,報錯
則填入null或error,以及skip跳過。
val bucketBorders = Array(0.0, 10.0, 20.0, 30.0, 40.0, 50.0)
val bucketer = new Bucketizer().setSplits(bucketBorders).setInputCol("value1")
bucketer.transform(df).show()
2) 分桶(QuantileDiscretizer)
該方式不用用戶進行預設,其可以根據百分比進行拆分,讀者可以通過setRelativeError
設置
近似分位數計算的相對誤差。通過設置handleInvalid選擇保留還是刪除數據集中的NaN值。如果
選擇保留NaN值,則將對其進行特殊處理並將其放入自己的存儲桶中,如讀者設定4個桶,則其會被
放入一個特殊的桶[4]中。
val bucketer = new QuantileDiscretizer().setNumBuckets(5).setInputCol("value1").setOutputCol("val1category")
var bucketerModel = bucketer.fit(df)
bucketerModel.transform(df).show()
3) 歸一化(StandardScaler)
基於StandardScaler將一組特征值歸一化成平均值為0而標准偏差為1的一組新值。通過withStd標志設置單位標准差,而withMean標識將使數據在縮放之前進行中心化。稀疏向量中心化非常耗時,因為一般會將它們轉化為稠密向量。
val sScaler = new StandardScaler().setInputCol("features")
sScaler.fit(rdf).transform(rdf).show()
4) 縮放(MinMaxScaler)
將向量中的值基於給定的最小值到最大值按比例縮放。如最小值為0且最大值為1,則所有值將介於0和1之間。
val minMax = new MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
minMax.fit(rdf).transform(rdf).show()
5) 縮放(MaxAbsScaler)
基於MaxAbsScaler將每個值除以該特征的最大絕對值來縮放數據。計算后的值將在-1與1之間。
val maScaler = new MaxAbsScaler().setInputCol("features")
maScaler.fit(rdf).transform(rdf).show()
6) 縮放(ElementwiseProduct)
基於ElementwiseProduct將一個縮放向量對某向量中的每個值以不同的尺度進行縮放。
val scalingUp = new ElementwiseProduct()
.setScalingVec(Vectors.dense(10.0, 20.0))
.setInputCol("features")
scalingUp.transform(rdf).show()
7) 縮放(Normalizer)
用冪范數來縮放多維向量,Normalizer的作用范圍是每一行。其通過參數P設置幾范數,為1表示曼哈頓范數,2表示歐幾里德范數等。
val manhattanDistance = new Normalizer().setP(1).setInputCol("features")
manhattanDistance.transform(rdf).show()
3. 類型特征
針對數據的處理,往往會存在大量枚舉指標。有可能是字符串,也有可能是數字等形式。針對字符
串形式的類型數據,我們就需要將其進行轉換,從而便於進行后續的數據分析處理。
1) StringIndexer
最簡單的方式就是將對應的類型映射到對應ID形成關系。
val lblIndxr = new StringIndexer().setInputCol("lab").setOutputCol("labelInd")
val idxRes = lblIndxr.fit(rdf).transform(rdf)
idxRes.show()
基於字符串索引的方式還可以根據索引反推出對應的類型名稱。
val labelReverse = new IndexToString().setInputCol("labelInd")
labelReverse.transform(idxRes).show()
2) OneHotEncoder
基於One-hot(獨熱編碼)在類別變量索引之后進行轉換。其目的主要是因為經過字符串索引后就存在了一種大小關系,比如對應顏色這種類別是無意義的所以我們需要將其轉換為向量中的一個布爾值元素。
val ohe = new OneHotEncoder().setInputCol("labelInd")
ohe.transform(idxRes).show()
4. 文本數據特征
在實際的預處理過程中我們往往會遇到需要將文檔等由多種詞語構成的數據進行分析。而針對這類
文本數據的分析,我們需要借助以下幾個流程將這類文本數據轉換為具體的特征數據從而便於我們
進行數據的分析處理。
1) Tokenizer
首先我們需要將一段話進行分詞,分詞是將任意格式的文本轉變成一個“符號”列表或者一個單詞列表的過程。
val tkn = new Tokenizer().setInputCol("Description").setOutputCol("DescOut")
val tokenized = tkn.transform(sales.select("Description"))
tokenized.show()
2) RegexTokenizer
不僅可以基於Tokenizer
通過空格分詞,也可以使用RegexTokenizer指定的正則表達式來分詞。
val rt = new RegexTokenizer().setInputCol("Description")
.setOutputCol("DescOut")
.setPattern(" ")
.setToLowercase(true)
rt.transform(sales.select("Description")).show()
3) StopWordsRemover
分詞后的一個常見任務是過濾停用詞,這些常用詞在許多分析中沒有什么意義,因此應被刪除。
val englishStopWords = StopWordsRemover.loadDefaultStopWords("english")
val stops = new StopWordsRemover().setStopWords(englishStopWords)
.setInputCol("DescOut")
.setOutputCol("StopWords")
stops.transform(tokenized).show()
4) NGram
字符串分詞和過濾停用詞之后,會得到可作為特征的一個詞集合。
val bigram = new NGram().setInputCol("DescOut").setN(2)
bigram.transform(tokenized.select("DescOut")).show()
5) CountVectorizer
一旦有了詞特征,就可以開始對單詞和單詞組合進行計數,以便在我們的模型中使用可以通過setMinTF來決定詞庫中是否包含某項,通過setVocabSize設置總的最大單詞量。
val cv = new CountVectorizer()
.setInputCol("DescOut")
.setOutputCol("countVec")
.setVocabSize(500)
.setMinTF(1)
.setMinDF(2)
val fittedCV = cv.fit(tokenized)
fittedCV.transform(tokenized).show()
實際上它是一個稀疏向量,包含總的詞匯量、詞庫中某單詞的索引,以及該單詞的計數。
6) TF-IDF
另一種將本文轉換為數值表示的方法是使用詞頻-逆文檔頻率(TF-IDF)。最簡單的情況是,TF-IDF度量一個單詞在每個文檔中出現的頻率,並根據該單詞出現過的文檔數進行加權,結果是在較少文檔中出現的單詞比在許多文檔中出現的單詞權重更大。如果讀者希望能夠更深入的單獨了解該技術可以閱讀本文章
val tf = new HashingTF().setInputCol("DescOut")
.setOutputCol("TFOut")
.setNumFeatures(10000)
var idf = new IDF().setInputCol("TFOut")
.setOutputCol("IDFOut")
.setMinDocFreq(2)
idf.fit(tf.transform(tokenized)).transform(tf.transform(tokenized)).show()
輸出顯示總的詞匯量、文檔中出現的每個單詞的哈希值,以及這些單詞的權重。
7) Word2Vec
因為機器學習只能接受數值形式,為此需要進行轉換,而Word2Vec就是詞嵌入的中的一種。而Spark內使用的是基於skip-gram
模型,而該模型主要是根據輸入的詞語推算出上下文可能與該詞組合的其他詞語。如果希望學習Word2Vec則可以參考本文檔
val word2Vec = new Word2Vec().setInputCol("DescOut").setOutputCol("result").setVectorSize(3).setMinCount(0)
val model = word2Vec.fit(tokenized)
model.transform(tokenized).show()
5. 特征操作
1) PCA
主成分(PCA)是一種數據方法, 用於找到我們的數據中最重要的成分。PCA使用參數k指定要創建的輸出特征的數量,這通常應該比輸入向量的尺寸小的多。
val pca = new PCA().setInputCol("features").setK(2)
pca.fit(featureDF).transform(featureDF).show()
6. 多項式擴展
1) PolynomialExpansion
多項式擴展基於所有輸入列生成交互變量。對於一個二階多項式,Spark把特征向量中的每個值乘以所有其他值,然后將結果存儲成特征。
val pe = new PolynomialExpansion().setInputCol("features").setDegree(2)
pe.transform(featureDF).show()
多項式擴展會增大特征空間,從而導致高計算成本和過擬合效果,所以請效性使用。
7. 特征選擇
1) ChiSqSelector
ChiSqSelector利用統計測試來確定與我們試圖預測的標簽無關的特征,並刪除不相關的特征。其提供了以下集中方法:
- numTopFea tures:基於p-value排序
- percentile:采用輸入特征的比例
- fpr:設置截斷p-value
val chisq = new ChiSqSelector().setFeaturesCol("features")
.setLabelCol("labelInd")
.setNumTopFeatures(1)
chisq.fit(featureDF).transform(featureDF).show()
三、 分類算法
1. 邏輯回歸
邏輯回歸是一種線性模型,為輸入的每個特征賦以權重之后將他們組合在一起,從而獲得該輸入屬於特定類的概率。
超參數
參數名 | 說明 |
---|---|
family | 可以設置為multinomial (多分類)或binary (二分類) |
elasticNetParam | 從0到1的浮點值。該參數依照彈性網絡正則化的方法將L1正則化和L2正則化混合(即兩者的線性組合) |
fitIntercept | 此超參數決定是否適應截距 |
regParam | 確定在目標函數中正則化項的權重,它的選擇和數據集的噪聲情況和數據維度有關 |
standardization | 可以為true或false,設置它用於決定在將輸入數據傳遞到模型之前是否要對其標准化 |
訓練參數
參數名 | 說明 |
---|---|
maxIter | 迭代次數 |
tol | 此值指定一個用於停止迭代的閾值 |
weightCol | 權重列的名稱,用於賦予某些行更大的權重 |
預測參數
參數名 | 說明 |
---|---|
threshold | 此參數事預測時的概率閾值,你可以根據需要調整此參數以平衡誤報和漏報 |
對於多項式分類模型(多分類) lrModel.coefficientMatrix和lrModel.interceptVector可以用來得到系數和截距值
val lr = new LogisticRegression()
val lrModel = lr.fit(bInput)
println(lrModel.coefficients) // 輸出 系數
println(lrModel.intercept) // 輸出 截距
2. 決策樹
決策樹是一種更友好和易於理解的分類方法,因為它類似人類經常使用的簡單決策模型。
超參數
參數名 | 說明 |
---|---|
maxDepth | 指定最大深度 |
maxBins | 確定應基於連續特征創建多少個槽,更多的槽提供更細的粒度級別 |
impurity | 不純度表示是否應該在某葉子結點拆分的度量(信息增益),此參數可以設置為entropy或者gini |
minInfoGain | 此參數確定可用於分割的最小信息增益 |
minInstancePerNode | 此參數確定需要在一個節點結束訓練的實例最小數目 |
訓練參數
參數名 | 說明 |
---|---|
checkpointInterval | 檢查點是一種在訓練過程中保存模型的方法,此方法可以保證當集群節點因某種原因奔潰時不影響整個訓練過程 |
val dt = new DecisionTreeClassifier()
val dtModel = dt.fit(bInput)
println(dtModel.explainParams())
3. 隨機森林與梯度提升
隨機森林,我們只訓練大量的樹,然后平均他們的結果做出預測。利用梯度提升樹,每棵樹進行加權預測。
隨機森林超參數
參數名 | 說明 |
---|---|
numTrees | 用於訓練的樹總數 |
featureSubsetStrategy | 此參數確定拆分時應考慮多少特征 |
梯度提升樹超參數
參數名 | 說明 |
---|---|
lossType | 損失函數,目前僅支持logistic loss損失 |
maxIter | 迭代次數 |
stepSize | 代表算法的學習速度 |
val rfClassifier = new RandomForestClassifier()
println(rfClassifier.explainParams())
val rfModel = rfClassifier.fit(bInput)
val gbtClassifier = new GBTClassifier()
println(gbtClassifier.explainParams())
val gbtModel = gbtClassifier.fit(bInput)
4. 朴素貝葉素
朴素貝葉素分類是基於貝葉斯定理的分類方法,通常用於文本或文檔分類,所有的輸入特征碧璽為非負數。
超參數
參數名 | 說明 |
---|---|
modelType | 可選bernoulli或multinomial |
weightCol | 允許對不同的數據點賦值不同的權重 |
訓練參數
參數名 | 說明 |
---|---|
smoothing | 它指定使用加法平滑時的正則化量,改設置有助於平滑分兩類數據 |
val nb = new NaiveBayes()
println(nb.explainParams())
val nbModel = nb.fit(bInput)
對於二分類,我們使用BinaryClassificationEvaluator
,它支持優化兩個不同的指標areaUnderRoc
和areaUnderPR
對於多分類,需要使用MulticlassClassificationEvaluator
,它支持優化f1
,weightedPrecision
,weightedRecall
,accuracy
四、 回歸算法
1. 線性回歸
線性回歸假定輸入特征的線性組合(每個特征乘以權重的綜合)將得到(有一定高斯誤差的)輸出結果,具體參數同分類中該算法。
val lr = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
println(lr.explainParams())
val lrModel = lr.fit(df)
2. 廣義線性回歸
線性回歸的廣義形式使你可以更細粒度地控制使用各種回歸模型
超參數
參數名 | 說明 |
---|---|
family | 指定在模型中使用的誤差分布,支持Poisson、binomial、gamma、Caussian和tweedie。 |
link | 鏈接函數的名稱,指定線性預測器與分布函數平均值之間的關系,支持cloglog、probit、logit、reverse、sqrt、identity和log |
solver | 指定的優化算法。 |
variancePower | Tweedie分布方差函數中的冪。 |
linkPower | Tweedie分布的乘冪鏈接函數索引。 |
預測參數
參數名 | 說明 |
---|---|
linkPredictionCol | 指定一個列名,為每個預測保存我們的鏈接函數。 |
val glr = new GeneralizedLinearRegression().setFamily("gaussian").setLink("identity").setMaxIter(10).setRegParam(0.3).setLinkPredictionCol("linkOut")
println(glr.explainParams())
val glrModel = glr.fit(df)
3. 決策樹
用於回歸分析的決策樹不是在每個葉子節點上輸出離散的標簽,而是一個連續的數值,但是可解釋性和模型結構仍然適用。對應參數可以參考分類中該算法。
val dtr = new DecisionTreeRegressor()
println(dtr.explainParams())
val dtrModel = dtr.fit(df)
4. 隨機森林和梯度提升樹
隨機森林和梯度提升樹模型可應用於分類和回歸,它們與決策樹具有相同的基本概念,不是訓練一棵樹而是很多樹來做回歸分析。
val rf = new RandomForestRegressor()
println(rf.explainParams())
val rfModel = rf.fit(df)
val gbt = new GBTRegressor()
println(gbt.explainParams())
var gbtModel = gbt.fit(df)
5. 評估器和自動化模型校正
用於回歸任務的評估器稱為 RegressionEvaluator,支持許多常見的回歸度量標准。與分類評估器一樣,RegressionEvaluator 需要兩項輸入,一個表示預測值,另一個表示真是標簽的值。
val glr = new GeneralizedLinearRegression()
.setFamily("gaussian")
.setLink("identity")
val pipeline = new Pipeline().setStages(Array(glr))
val params = new ParamGridBuilder().addGrid(glr.regParam, Array(0, 0.5, 1)).build()
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setPredictionCol("prediction")
.setLabelCol("label")
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(params)
.setNumFolds(2)
val model = cv.fit(df)
五、 推薦系統
主要采用ALS(交替最小二乘法)為每個用戶和物品建立k維的特征向量,從而可以通過用戶和物品向量的點積來估算該用戶
對物品的評分值,所以只需要用戶-物品對的評分數據作為輸入數據集,其中有三列:用戶Id列、物品Id列和評分列。評分
可以是顯式的,即我們想要直接預測的數值登記;或隱式的,在這種情況下,每個分數表示用戶和物品之間的交互強度,它
衡量用戶對該物品的偏好程度。
超參數
參數名 | 說明 |
---|---|
rank | rank確定了用戶和物品特征向量的維度,這通常是通過實驗來調整,一個重要權衡是過高的秩導致過擬合,而過低的秩導致不能做出最好的預測 |
alpha | 在基於隱式反饋的數據上進行訓練時,alpha設置偏好的基線置信度,這個值越大則越認為用戶和他沒有評分的物品之間沒有關聯 |
regParam | 控制正則化參數來防止過擬合,需要測試不同的值來找到針對你的問題的最優的值 |
implicitPrefs | 此布爾值指定是在隱式數據還是顯式數據上進行訓練 |
nonnegative | 如果設置為true,則將非負約束置於最小二乘問題上,並且只返回非負特征向量,這可以提高某些應用程序的性能 |
訓練參數
其中最終主要的就是數據塊,通常的做法是每個數據塊大概分配一百萬到五百萬各評分值,如果每個數據塊的數據量少於這個數字,則太多的數據塊可能會影響性能。
參數名 | 說明 |
---|---|
numUserBlocks | 確定將用戶數據拆分成多少各數據塊 |
numItemBlocks | 確定將物品數據拆分為多少各數據塊 |
maxIter | 訓練的迭代次數 |
checkpointInterval | 設置檢查點可以在訓練過程中保存模型狀態 |
seed | 指定隨機種子幫助復現實驗結果 |
預測參數
主要的參數為冷啟動策略,該參數用來設置模型應為未出現過在訓練集中的用戶或物品推薦什么,可通過coldStartStrategy設置,可以選擇drop和nan設置。
val conf = new SparkConf().setAppName("ScalaSparkML").setMaster("local")
val spark = SparkSession.builder().config(conf).getOrCreate()
val ratings = spark.read.textFile("data/sample_movielens_ratings.txt")
.selectExpr("split(value, '::') as col")
.selectExpr("cast(col[0] as int) as userId",
"(col[1] as int) as movieId",
"(col[2] as float) as rating",
"(col[3] as long) as timestamp")
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
val als = new ALS().setMaxIter(5).setRegParam(0.01).setUserCol("userId").setItemCol("movieId").setRatingCol("rating")
println(als.explainParams())
val alsModel = als.fit(ratings)
val predictions = alsModel.transform(test)
模型的recommendForAllUsers方法返回對應某個userId的DataFrame,包含推薦電影的數組,以及每個影片的評分。recommendForAllItems返回對應某個
movieId的DataFrame以及最有可能給該影片打高分的前幾個用戶。
alsModel.recommendForAllUsers(10)
.selectExpr("userId", "explode(recommendations)").show()
alsModel.recommendForAllItems(10)
.selectExpr("movieId", "explode(recommendations)").show()
針對訓練的模型效果我們依然需要針對其進行評估,這里我們可以采用與回歸算法中相同的評估器進行評估。
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
對於度量指標我們通過回歸度量指標與排名指標進行度量,首先是回歸度量指標,我們可以簡單的查看每個用戶和項目的實際評級與預測值的接近程度。
val regComparison = predictions.select("rating", "prediction")
.rdd.map(x => (x.getFloat(0).toDouble, x.getFloat(1).toDouble))
val metrics = new RegressionMetrics(regComparison)
六、 無監督學習
1. k-means
k-means算法中,用戶在數據集中隨機選擇數量為k的數據點作為處理聚類的聚類中心,未分配的點基於他們與這些
聚類中心的相似度(歐氏距離)倍分配到離他們最近的聚類中。分配之后,再根據被分配到一個聚類的數據點再計算
聚類的新中心,並重復該過程,直到到達有限的迭代次數或直到收斂。
超參數
參數名 | 說明 |
---|---|
k | 指定你希望的聚類數量 |
訓練參數
參數名 | 說明 |
---|---|
initMode | 初始化模式是確定質心初始位置的算法,可支持 random(隨機初始化)與k-means |
initSteps | k-means模式初始化所需要的步數 |
maxIter | 迭代次數 |
tol | 該閾值指定質心改變到該程度后優化結束 |
val km = new KMeans().setK(3)
println(km.explainParams())
val kmModel = km.fit(sales)
val summary = kmModel.summary
summary.clusterSizes // 中心點數量
計算數據點與每個聚類中心點的距離,可以采用ClusterEvaluator評估器
2. 二分k-means
該算法為k-means變體,關鍵區別在於,它不是通過“自下而上”的聚類,而是自上而下的聚類方法。就是通過創建一個組
然后進行拆分直到拆分到k個組。
超參數
參數名 | 說明 |
---|---|
k | 指定你希望的聚類數量 |
訓練參數
參數名 | 說明 |
---|---|
minDivisibleClusterSize | 指定一個可分聚類中的最少數據點數 |
maxIter | 迭代次數 |
val bkm = new BisectingKMeans().setK(5).setMaxIter(5)
println(bkm.explainParams())
val bkmModel = bkm.fit(sales)
val summary = bkmModel.summary
summary.clusterSizes
3. 高斯混合模型
一種簡單理解高斯混合模型的方法是,它們就像k0means的軟聚類斑斑(軟聚類softclustering即每個數據點可以划分到多個聚類中),而
k-means創建硬聚合(即每個點僅在一個聚類中),高斯混合模型GMM依照概率而不是硬性邊界進行聚類。
超參數
參數名 | 說明 |
---|---|
k | 聚類數量 |
訓練參數
參數名 | 說明 |
---|---|
maxIter | 迭代次數 |
tol | 指定一個閾值來代表將模型優化到什么程度就夠了 |
val gmm = new GaussianMixture().setK(5)
println(gmm.explainParams())
val model = gmm.fit(sales)
model.gaussiansDF.show()
model.summary.probability.show()
4. LDA主題模型
隱含狄利克雷分布式一種通常用於對文本文檔執行主體建模的分層聚類模型。LDA試圖從與這些主題相關聯的一系列文檔和關鍵字
中提取高層次的主題,然后它將每個文檔解釋為多個輸入主題的組合。
超參數
參數名 | 說明 |
---|---|
k | 用於指定從數據中提取的主題數量 |
docConcentration | 文檔分布的濃度參數向量 |
topicConcentration | 主題分布的濃度參數向量 |
訓練參數
參數名 | 說明 |
---|---|
maxIter | 最大迭代次數 |
optimizer | 指定是使用EM還是在線訓練方法來優化DA模型 |
learningDecay | 學習率,即指數衰減率 |
learningOffset | 一個正數值的學習參數,在前幾次迭代中會遞減 |
optimizeDocConcentration | 指示docConcentration是否在訓練過程中進行優化 |
subsamplingRate | 在微型批量梯度下降的每次迭代中采樣的樣本比例 |
seed | 隨機種子 |
checkpointInterval | 檢查點 |
預測參數
參數名 | 說明 |
---|---|
topicDistributionCol | 將每個文檔的主題混合分布輸出作為一列保存起來 |
val tkn = new Tokenizer().setInputCol("Description").setOutputCol("DescOut")
val tokenized = tkn.transform(sales.drop("features"))
val cv = new CountVectorizer()
.setInputCol("DescOut")
.setOutputCol("features")
.setVocabSize(500)
.setMinTF(0)
.setMinDF(0)
.setBinary(true)
val cvFitted = cv.fit(tokenized)
val prepped = cvFitted.transform(tokenized)
val lda = new LDA().setK(10).setMaxIter(5)
println(lda.explainParams())
val model = lda.fit(prepped)
model.describeTopics(3).show()
七、 模型服務化
當我們訓練好模型后,往往需要將模型導出,便於實際應用服務導入從而實現具體的功能實現。為此下述我們將列舉多種
常用的方式進行介紹。從而便於讀者可以根據實際的場景便於選擇具體的方式方法。
1. PMML
下面我們將以Spark ML訓練模型,並將模型導出為PMML供Java應用進行調用,為此我們需要使用以下幾個類庫。
首先我們需要將訓練得到的模型持久化以便於實際服務的加載使用,由於苯本篇幅相關的模型由Spark ML訓練而得,所
以我們將在在訓練結束后進行。需要注意的是僅支持PipelineModel
類型持久化,如果是單一的模型如LogisticRegression
則需要將其填入到具體的Pipeline
對象中,以下為具體的持久化代碼。
val df = spark.read.json("data/simple-ml.json")
val iris = df.schema
var model = pipeline.fit(train)
val pmml = new PMMLBuilder(iris, model).build()
JAXBUtil.marshalPMML(pmml, new StreamResult(new File("data/model")))
其中PMMLBuilder
需要將數據模型的元數據,以及對應訓練好的模型放入構造函數中,借助於JAXBUtil
工具類將PMML持久化為具體的文件。完成上述的模型寫入后,我們就可以在具體需使用的應用中引用依賴,然后基於下述的方式進行讀取即可。
var eval = new LoadingModelEvaluatorBuilder().load(new File("data/model")).build()
eval.verify()
var inputFields = eval.getInputFields()
val arguments = new LinkedHashMap[FieldName, FieldValue]
for (inputField <- inputFields) {
var fieldName = inputField.getFieldName()
var value = data.get(fieldName.getValue())
val inputValue = inputField.prepare(value)
arguments.put(fieldName, inputValue)
}
var result = eval.evaluate(arguments)
var resultRecoard = EvaluatorUtil.decodeAll(result)
上述代碼中我們需要通過其提供的getInputFields
方法獲取算法模型所需要的入參,根據入參的name
匹配到實際業務場景中對應的數據,當然這里筆者實際數據名稱與模型名稱一致,所以直接采用data.get(fieldName.getValue())
獲取到對應的值,通過inputField.prepare(value)
轉換到要求的對象類型。最后將數據填入字典后通過eval.evaluate(arguments)
即可使用對應算法模型進行模型調用。當然返回的結果也是字典類型,我們需要根據實際需要從中讀取我們感興趣的值即可。