文章導讀:
1. Naive Bayes算法
2. Adaboost算法
3. Spark ML的使用
4. 自定義擴展Spark ML
1. Naive Bayes算法
朴素貝葉斯算法算是生成模型中一個最經典的分類算法之一了,常用的有Bernoulli和Multinomial兩種。在文本分類上經常會用到這兩種方法。在詞袋模型中,對於一篇文檔$d$中出現的詞$w_0,w_1,...,w_n$, 這篇文章被分類為$c$的概率為$$p(c|w_0,w_1,...,w_n) = \frac{p(c,w_0,w_1,...,w_n)}{p(w_0,w_1,...,w_n)} = \frac{p(w_0,w_1,...,w_n|c)*p(c)}{p(w_0,w_1,...,w_n)}$$ 對於一篇給定文章,分母為常數,基於朴素貝葉斯的各詞在一篇文章中出現獨立性假設,最后我們需要比較的就是在不同類別$c$下$p(w_0|c)*p(w_1|c)*...*p(w_n|c)*p(c)$的大小。
naive bayes模型的參數就是在每個類別$c$下各詞出現的概率的$p(w_0|c),p(w_1|c),...,p(w_n|c))$和該類別出現的概率$p(c)$,參數的估計通常就是根據訓練樣本進行詞頻的統計並計算相應概率,其中$$p(c) = \frac{count(c)}{count(doc)}$$,即為訓練數據中c類別文章的總數量除以訓練集中文章的總數量。針對$p(w_i|c)$的估計,Bernoulli和Multinomial略有不同。
- Bernoulli
文章中某詞$ w_i$出現過,則記為1,所以$$p(w_i|c) = \frac{count(w_i,c)}{count(c)}$$ 即為在類別為c的訓練集文章中出現詞$w_i$的文章數量除以訓練集中為別為c的文章總數量
- Multinomial
這種情況下文章的詞並不是非0即1的one hot特征,而是帶有權重的數值特征,通常可以使用tf或者tf-idf值。$$p(w_i|c) = \frac{T_{ci}}{\sum_{t}{T_{ct}}}$$ 其中$T_{ci}$為類別c的訓練文章中詞$w_i$的所有權重和,$\sum_{t}{T_{ct}}$為類別c的文章中所有詞的權重之和。預測的時候對於詞$w_i$計算該詞在該文章中的權重$T_i$,使用$p(w_i|c)^{T_i}$作為連乘部分的概率。不過實際上經常使用對數概率,所以可以將指數運算變為乘法運算,在代碼中就可以利用矩陣相乘直接計算。
還有一些細節問題,例如數據稀疏,平滑處理等因為不是本文的重點,這里就不詳細解釋了。
2. Adaboost算法
作為一種boosting方法,adaboost在很多算法上都有着不俗的表現。不過在基於naive bayes的文檔分類領域,貌似實際效果很一般。在stack overflow上也看到有人討論,說adaboost對於多個弱分類器的提升效果很不錯,但是naive bayes的文檔分類通常已經有很不錯的表現了,提升效果一般。不過不管效果提升怎么樣,實現一下試試也沒什么壞處,順便還可以熟悉一下spark的相關操作。經典的adaboost算法適用於二分類的情況,但是我們的文本是多分類的情況,依靠多個二分類器表決不失為一種方法,但是比較麻煩,好在找到了介紹多分類adaboost算法的論文,照着論文依葫蘆畫瓢也不難。下面先分別多分類和二分類的adaboost
2.1 二分類adaboost
對於給定的二類分類的訓練數據集$$T = {(x_1, y_2),(x_2, y_2)...,(x_N, y_N)}$$ 其中每個$x$是一個樣本的特征向量,$y\in\{-1, +1\}$,算法流程如下:
- 初始化各個樣本的權重為$$D_1 = (w_{11}, w_{12}, ... , w_{1i}, ... , w_{1N}), w_{1i} = \frac{1}{N}, i = 1, 2, ... , N$$
- 對於第m次迭代,$m = 1, 2, ..., M$:
- 每次迭代使用帶有當前權重$D_m$的樣本進行訓練,得到一個基本分類器$G_m(x)$
- 計算在分類器$D_m$下,訓練樣本分類結果的誤差率$$e_m = \sum^{N}_{i = 1}{w_{mi}I(G_m{x_i} \neq{y_i})}$$,因為每一步權重都做了歸一化,所以分母不用再除以樣本權重之和
- 根據誤差率$e_m$計算分類器$D_m$的系數 $$\alpha_m = log\frac{1-e_m}{e_m}$$
- 根據系數$\alpha_m$更新各樣本的權重$$D_{m+1} = {w_{m+1, 1}, w_{m+1, 2}, ... , w_{m+1, N}}$$ $$w_{m+1, i} = w_{m, i} * exp(\alpha_m * I(G_m{x_i} \neq{y_i}))$$
- 對$D_{m+1}$做歸一化處理,使$\sum_{i = 1}^{N}{w_{m+1, i}} = 1$
- 最后對多個分類器$D_m$的結果進行加權表決,$$c(x) = argmax_k\sum_{m = 1}^{M}{\alpha_m*I(D_m(x) = k)}$$
注意到對於二分類的adaboost需要每次的分類誤差率$e_m \leq{\frac{1}{2}}$,否則的話將會導致$\alpha_m < 0$,然后樣本權重的更新將會朝着反方向進行。
2.2 多分類adaboost
對於K分類的情況,算法基本與二分類的情況一致。但是要求每次的分類誤差率$e_m \leq{\frac{1}{2}}$是非常困難的,聯系到二分類誤差率閾值選擇$\frac{1}{2}$,K分類的情況選擇誤差率為$\frac{K-1}{K}$,然后$\alpha_m$的計算改為 $$\alpha_m = log(\frac{1-e_m}{e_m}) + log(K-1)$$ 容易驗證只要$e_m \leq{\frac{K-1}{K}}$,則有$\alpha_m \geq{log(\frac{1-\frac{K-1}{K}}{\frac{K-1}{K}}) + log(K-1)} = log(\frac{1}{K-1}) + log(K-1) = 0$,這種情況下,多分類adaboost對於被誤分的樣本的側重加大了,因為$\alpha_m$因為添加了正項$log(K-1)$而增大了。
adaboost的一種解釋是模型為加法模型,損失函數為指數函數,學習算法為前向分步算法的分類算法,這個以后再另外寫一篇。這里給出一個比較直觀好懂的解釋:
- 迭代過程中誤差率小的模型具有大的模型系數,也就是說表現好的子模型在最后加權的時候具有更大的“話語權”
- 迭代過程中上一次被誤分的樣本在下一次訓練時將會具有更大的權重,更容易被分類正確
3. Spark ML的使用
提到Spark ml就不得不提Spark mllib,兩者的區別主要在於ml面向的數據是Dataset,而mllib面向的是rdd,Dataset相當於在底層rdd的基礎上做了進一步的優化。而且ml中一系列算法更適合創建包含從數據清洗到特征工程再到模型訓練等一系列工作的ML pipelines,這個類似於sklearn中的pipeline,非常簡潔好用。
pipeline中的Transformer,Estimator,Stage等概念官方文檔上寫的很清楚,而且還有事例,就不在這里解釋了。這里以naive bayes為例簡單介紹一下怎么利用spark ml的pipelines進行機器學習模型的訓練和預測。
首先是pipelines的創建:
1 // pipeline for train 2 def createPipiline(dataset: Dataset[_]): Pipeline = { 3 // step 1 sentence 拆成 words 4 val tokenizer = new RegexTokenizer().setInputCol("sentence").setOutputCol("words").setPattern(",") 5 // step 2 label 轉化為以0開始的labelIndex 為了適應spark.ml 6 val indexer = new StringIndexer().setInputCol("label").setOutputCol("labelIndex").fit(dataset) 7 // step3 統計tf詞頻 8 val countModel = new CountVectorizer().setInputCol("words").setOutputCol("rawFeatures") 9 // step4 tf-idf 10 val idfModel = new IDF().setInputCol("rawFeatures").setOutputCol("features") 11 // step5 normalize tf-idf vector 12 val normalizer = new Normalizer().setInputCol("features").setOutputCol("normalizedFeatures") 13 // step6 naive bayes model 14 val naiveBayes = new NaiveBayes().setFeaturesCol("normalizedFeatures").setLabelCol("labelIndex").setWeightCol("obsWeights").setPredictionCol("prediction").setModelType("multinomial").setSmoothing(1.0) 15 // step7 predict label to real label 16 val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(indexer.labels) 17 18 new Pipeline().setStages(Array(tokenizer, indexer, countModel, idfModel, normalizer, naiveBayes, labelConverter)) 19 }
這里注意到我在創建這個pipeline的時候還傳入了訓練數據,但是一般情況下訓練數據是在擬合模型而不是在模型建立的時候就提前傳入的。這里是因為最后面那個labelConverter的transformer需要使用indexer.labels這個參數,而indexer要獲取這個參數就要提前擬合訓練數據,也就是indexer的創建發生在整個pipeline的擬合之前,所以我就先穿入了訓練數據集。注意到這里訓練數據就相當於被訓練了兩次,所以可以先cache()操作一下。
pipeline創建好以后的使用就相對簡單多了,傳入數據就可以了。
1 val pipeline = ModelUsage.createPipeline(dataRDDTrain) 2 // train and test 3 val combinedModel = pipeline.fit(dataRDDTrain) 4 val predictResult = combinedModel.transform(dataRDDTest).select("predictedLabel", "label").rdd.map(row => (row.getDouble(0), row.getDouble(1))) 5 val evaluator = new MulticlassMetrics(predictResult) 6 7 println("confusionMatrix:") 8 println(evaluator.confusionMatrix) 9 println(evaluator.accuracy)
注意到ML擬合的結果都是Double類型的,比如說我一個label是55但是輸出是55.0,評估模型准確度的時候注意一下就好,影響不大。
Spark ML的一個好處是數據dataset像水一樣通過預先創建好的pipeline,可以指定每一個stage處理的column名,再添加生成的數據到新的一列。自始至終,這些中間數據都在結果的dataset里,想要哪些數據指定列名就可以了。這樣的話就避免了每次都要處理數據使它們符合中間模型的輸入結構,而且最后還要自己再整合需要的字段到一起。
由於我們的文章數據特點比較鮮明,沒有任何參數調優,在4w(80% train 20% test)的四分類數據上就已經有了95%的正確率了。
4. 自定義擴展Spark ML
既然直接用現有naive bayes模型就已經有了95%的正確率,那要是加上adaboost呢?
直接實現adaboost算法很簡單,但是畢竟spark ml的pipeline這么好用,而dataset這么好的封裝加上這么多現有的類似StringIndxer等工具類transformer總不能全部重寫吧。所以就想到怎么去自定義一個跟Spark ML兼容的model,上網查了查發現了以下幾篇比較有用的文章。
Extend Spark ML for your own model/transformer types
結合這兩篇文章的內容,在已有的naivebayes模型基礎上進行了改進實現了與Spark ML兼容的adaboost naivebayes model。
注意:
1. 由於我們的模型需要先擬合訓練數據得到模型,隨后才能使用模型,這里面分別涉及到estimator和transformer,因此我們需要分別實現這兩個部分。
2. 我要實現的adaboost+naivebayes模型是一個概率模型,因此我的Estimator和Transformer分別繼承自ProbabilisticClassifier和ProbabilisticClassificationModel,而不是最原始的Estimator和Transformer,這樣就減少了很多不必要的代碼重寫,但是如果是想玩玩整整自己實現一個模型的話就要從最基本的一點點開始寫了,可以參照上面第一篇文章所講,這里就不多細說了。
當然可能會有疑問,既然可以繼承ProbabilisticClassifier,那為什么不直接集成NaiveBayes不是更簡單么?我一開始也是這樣想的,但是發現Spark ML里NaiveBayes里大部分方法和屬性都是私有或者受保護的,我要改就得修改Spark源碼,但是我的Spark程序是在公司服務器運行的,總不能每次都讓公司用我改過之后的Spark包吧。。。
4.1 模型參數
首先,對於任何一個模型模型的訓練,我們一般都會需要傳遞一些參數,這里利用scala的trait實現一個參數接口。
1 trait AdaboostNaiveBayesParams extends Params { 2 // 進行adaboost時的最高迭代次數 3 final val maxIter: IntParam = new IntParam(this, "maxIter", "max number of iterations") 4 def getMaxIter: Int = $(maxIter) 5 // 進行adaboost時准確率變化小於某個閾值時迭代提前終止 6 final val threshold: DoubleParam = new DoubleParam(this, "threshold", "improvement threshold among iterations") 7 def getThreshold: Double = $(threshold) 8 // 朴素Bayes的平滑系數 9 final val smoothing : DoubleParam = new DoubleParam(this, "smoothing", "naive bayes smooth") 10 def getSmoothing : Double = $(smoothing) 11 // 朴素Bayes類型"multinomial"(default) and "bernoulli" 12 final val modelType : Param[String] = new Param[String](this, "modelType", "naive bayes model type") 13 def getModelType : String = $(modelType) 14 }
這一部分沒什么解釋的,都是一些模型常用參數。
4.2 模型Estimator
這一部分可以說是最重要的部分,Estimator擬合好了,Transformer基本屬於調用一下就好了。先貼代碼,再一行行解釋。
1 class AdaboostNaiveBayes(override val uid: String) 2 extends ProbabilisticClassifier[Vector, AdaboostNaiveBayes, AdaboostNaiveBayesModel] 3 with AdaboostNaiveBayesParams { 4 5 def this() = this(Identifiable.randomUID("AdaboostNaiveBayes")) 6 7 // model parameters assignment 8 def setMaxIter(value: Int): this.type = set(maxIter, value) 9 def setThreshold(value: Double): this.type = set(threshold, value) 10 def setSmoothing(value: Double): this.type = set(smoothing, value) 11 def setModelType(value: String): this.type = set(modelType, value) 12 13 setMaxIter(20) 14 setThreshold(0.02) 15 setSmoothing(1.0) 16 setModelType("multinomial") 17 18 // method used by fit() 19 override protected def train(dataset: Dataset[_]): AdaboostNaiveBayesModel = { 20 21 val datasetSize = dataset.count().toInt 22 val labelSize = dataset.select("label").distinct().count() 23 24 // 各子模型及其權重 25 val modelWeights = new Array[Double]($(maxIter)) 26 val modelArray = new Array[NaiveBayesModel]($(maxIter)) 27 28 var alpha = 0.0 29 // 初始化各樣本等權重 30 val dataWeight: (Double, Double, Double) => Double = (obsWeight: Double, labelIndex: Double, prediction: Double) => { 31 if (labelIndex == prediction) { 32 obsWeight 33 } 34 else { 35 obsWeight * math.exp(alpha) 36 } 37 } 38 val sqlfunc = udf(dataWeight) 39 // 初始化還沒有prediction 40 var temp = dataset.withColumn("obsWeights", lit(1.0)) 41 var i = 0 42 var error1 = 2.0 43 var error2 = 1.0// && (error1 - error2) > $(threshold) 44 var weightSum = datasetSize.toDouble*datasetSize 45 46 while (i < $(maxIter)) { 47 val naiveBayes = new NaiveBayes().setFeaturesCol($(featuresCol)).setLabelCol($(labelCol)).setWeightCol("obsWeights") 48 .setPredictionCol($(predictionCol)).setModelType($(modelType)).setSmoothing($(smoothing)).fit(temp) 49 temp = naiveBayes.transform(temp).cache() 50 51 var error = temp.select("labelIndex", "prediction", "obsWeights").rdd.map(row => { 52 if (row(0) != row(1)) 53 row.getDouble(2) 54 else 55 0.0 56 } 57 ).sum()/(datasetSize) 58 val t5 = System.nanoTime() 59 error1 = error2 60 error2 = error 61 alpha = Math.log((labelSize - 1) * (1 - error) / error) 62 63 modelWeights(i) = alpha 64 modelArray(i) = naiveBayes 65 // 更新權重 66 temp = temp.withColumn("obsWeights", sqlfunc(col("obsWeights"), col($(labelCol)), col($(predictionCol)))); 67 weightSum = temp.select("obsWeights").rdd.map(row => (row.getDouble(0))).sum() 68 temp = temp.drop($(predictionCol), $(rawPredictionCol), $(probabilityCol)) 69 temp = temp.withColumn("obsWeights", col("obsWeights")/(weightSum/datasetSize)) 70 71 i += 1 72 } 73 74 new AdaboostNaiveBayesModel(uid, i, modelWeights, modelArray) 75 } 76 77 override def copy(extra: ParamMap): AdaboostNaiveBayes = defaultCopy(extra) 78 }
1-3行是繼承ProbabilisticClassifer和實現前面我們自己定義的AdaboostNaiveBayesParam參數接口,ProbabilisticClassifer的繼承使用看看源碼里NaiveBayes是怎么做的就可以照着學了。
5行是一個最基本的構造函數,分配給對象一個id值
77行是一個拷貝構造函數,這個必須要實現,最簡單的可以直接像這里一樣調用defaultCopy函數就好了。這個函數用來在引入新的參數的時候復制當前stage返回加入新參數后的一個新模型
8-11行是給模型設定初始參數用的,這幾個函數沒有定義在AdaboostNaiveBayesParam里是因為這些參數的傳入只發生在模型擬合前,在預測的時候是不能設定的,所以對后面的Transformer應該是不可見的,因此只在這里定義。注意到這些函數的返回類型和模型類型一致,其實就是每一步都返回一個加入的參數的新的模型,這里就利用了之前的拷貝構造函數。
13-16行是給模型設定默認參數。
19行開始的train函數就是我們在對模型調用fit方法時使用的函數。返回的是一個AdaboostNaiveBayesModel,是我們隨后需要定義的跟AdaboostNaiveBayes這個Estimator對應的Transformer。
21-22行分別獲取數據集的數量和其中label的數量
25-26是初始化所有子模型及其權重,因為adaboost每一次迭代都會生成一個新的模型並計算該模型在最終結果投票時的權重。
30-38是一個自定義udf函數,對每個樣本計算預測的label和真實label,並根據該樣本的現有權重obsWeight進行更新,可以理解為如果分類正確,其權重不變,否則增大其權重。
40行 初始化所有樣本為等權重,如果樣本數據非常不平衡的話,可以嘗試在這一步就引入偏差權重,我由於使用的數據各個類之間數量是一樣的,所以全部初始話為1
41-44行初始化一些錯誤率等參數
46行開始進行adaboost迭代過程。
47-49行是在當前樣本權重情況下調用普通的NaiveBayes進行訓練的到當前迭代下的子模型
49行這個cache一定不能少,否則迭代的速度只能呵呵了,畢竟temp用到了非常多次的action。
51-57行是計算該模型的錯誤率
59-61行是更新誤差,並計算該模型的權重alpha
63-64行是保存當前子模型和權重
66-69行是利用之前定義的udf函數更新所有樣本的權重並對其進行歸一化
74 行是利用計算得到的參數去構建一個AdaboostNaiveBayesModel,這里傳入所有的子模型及其權重,i表示的是總迭代次數,就是子模型的數量。
4.3 模型Transformer
這里要實現的AdaboostNaiveBayesModel是從ProbabilisticClassificationModel,因此要手動實現對應的必須要的幾個方法。
代碼如下:
1 class AdaboostNaiveBayesModel(override val uid: String, val iternums: Int, val modelWeights: Array[Double], val modelArray: Array[NaiveBayesModel]) 2 extends ProbabilisticClassificationModel[Vector, AdaboostNaiveBayesModel] 3 with AdaboostNaiveBayesParams { 4 5 override val numClasses = modelArray(0).pi.size 6 7 private def multinomialCalculation(features: Vector): Vector = { 8 val result: Vector = new DenseVector(new Array(numClasses)) 9 for (i <- 0 until iternums) { 10 val prob: Vector = modelArray(i).theta.multiply(features) 11 prob.foreachActive { (index, value) => { 12 prob.toArray(index) = value + modelArray(i).pi(index) 13 } 14 } 15 result.toArray(prob.argmax) = result(prob.argmax) + modelWeights(i) 16 } 17 result 18 } 19 20 override def predictRaw(features: Vector): Vector = { 21 multinomialCalculation(features) 22 } 23 24 override def raw2probabilityInPlace(rawPrediction: Vector): Vector = { 25 rawPrediction match { 26 case dv: DenseVector => 27 var i = 0 28 val size = dv.size 29 val maxLog = dv.values.max 30 for (i <- 0 until size) { 31 dv.values(i) = math.exp(dv.values(i) - maxLog) 32 } 33 val probSum = dv.values.sum 34 35 for (i <- 0 until size) { 36 dv.values(i) = dv.values(i) / probSum 37 } 38 dv 39 case sv: SparseVector => 40 throw new RuntimeException("Unexpected error in AdaboostNaiveBayesModel:" + 41 " raw2probabilityInPlace encountered SparseVector") 42 } 43 }
44 override def copy(extra: ParamMap) = {
45 defaultCopy(extra)
46 }
47 }
第5行是讀取一下總的標簽個數以供后面使用
44-46行是拷貝構造函數
20-22行是對一個輸入計算它在各個label下的得分,這個得分的大小表示的是判斷到該標簽概率的大小,但是並不是概率值,因為我們的BayesModel模型參數是做了log變換的
24-43行是怎么講結果向量轉化為和為1的概率值,30-32行是個小技巧,我一開始好奇為什么一定要減掉maxLog,因為這個按理說並不會影響到后面的計算結果,后來發現這樣能避免浮點數的問題,因為不減的話,會出現求完math.exp后值約為零的情況,導致后面的計算出現問題
這樣就完成了概率模型需要的幾個方法了,可以對一個輸入給出一個概率向量,每個維度代表在這個類的概率。
5. 寫在最后
利用自定義的adaboost+naivebayes模型,測試准確率從95%增加到了96.5%左右。由於訓練數據比較好,95%已經很不錯了,這里主要是通過寫一個自定義模型學習一下Spark ML方面的知識。之前都只是聽說過,從來沒用過,學習一下還是很有必要的,畢竟不能總指望着單機就能搞定所有問題。
不過注意到這里我並不是從0開始造輪子,我是從ProbabilisticClassification繼承過來加以修改的,如果想要做其他模型的修改還是推薦看上面的兩篇文章,然后多看看Spark ML源碼里類似的模型並根據自己的需要進行修改。
然后scala也是為了用Spark ML現學的,代碼可以優化的地方估計很多。
這是本人第一篇博客,希望以后可以堅持寫,作為對自己工作學習的總結筆記。