朴素貝葉斯算法源碼分析及代碼實戰【python sklearn/spark ML】


一.簡介  
  貝葉斯定理是關於隨機事件A和事件B的條件概率的一個定理。通常在事件A發生的前提下事件B發生的概率,與在事件B發生的前提下事件A發生的概率是不一致的。然而,這兩者之間有確定的
關系,貝葉斯定理就是這種關系的陳述。其中,L(A|B)表示在B發生的前提下,A發生的概率。L表示要取對數的意思。
  關鍵詞解釋:
    1.p(A),p(B)表示A,B發生的概率,也稱先驗概率或邊緣概率。
    2.p(B|A)表示在A發生的前提下,B發生的概率,也稱后驗概率。
  基本公式:p(A|B) = p(AB)/p(B)
  圖解:
      
  備注:p(AB) = p(BA)都是指A,B同時發生的概率,所以可得貝葉斯公式:p(B|A) = p(AB)/p(A) = p(A|B)p(B)/p(A)導入數據得 = 0.5*0.4/0.8 = 0.25
  貝葉斯公式:p(B|A) = p(A|B)p(B)/p(A)
  圖解:同上
  朴素貝葉斯分類是一種十分簡單的分類算法,其算法基礎是對於給出的待分類項,求解在此項出現的條件下各類別出現的概率,哪個最大,就認為此待分類項屬於哪個類別。
  實現步驟:
        

二.代碼實現【python】

 1 # -*- coding: utf-8 -*-
 2 """
 3 Created on Tue Oct 28 14:40:38 2018
 4 
 5 @author: zhen
 6 """
 7 from sklearn.datasets import fetch_20newsgroups
 8 from sklearn.model_selection import train_test_split
 9 from sklearn.feature_extraction.text import CountVectorizer
10 from sklearn.naive_bayes import MultinomialNB
11 from sklearn.metrics import classification_report
12 # 數據獲取
13 news = fetch_20newsgroups(subset='all')
14 
15 # 數據預處理:分割訓練集和測試集
16 x_train, x_test, y_train, y_test = train_test_split(news.data, news.target, test_size=0.25, random_state=33)
17 # 文本特征向量化
18 vec = CountVectorizer()
19 x_train = vec.fit_transform(x_train)
20 x_test = vec.transform(x_test)
21 
22 # 使用朴素貝葉斯進行訓練(多項式模型)
23 mnb = MultinomialNB()
24 mnb.fit(x_train, y_train)
25 y_predict = mnb.predict(x_test)
26 
27 # 獲取預測結果
28 print(classification_report(y_test, y_predict, target_names = news.target_names))
29 print("the accuracy of MultinomialNB is:", mnb.score(x_test, y_test))

三.結果【python】

  

 四.代碼實現【Spark】

 1 package big.data.analyse.ml
 2 
 3 import org.apache.log4j.{Level,Logger}
 4 import org.apache.spark.NaiveBayes
 5 import org.apache.spark.ml.linalg.Vectors
 6 import org.apache.spark.ml.feature.LabeledPoint
 7 import org.apache.spark.sql.{SparkSession}
 8 
 9 /**
10   * Created by zhen on 2019/9/9.
11   */
12 object NaiveBayesAnalyse {
13   Logger.getLogger("org").setLevel(Level.WARN)
14   def main(args: Array[String]) {
15     val spark = SparkSession.builder().appName("NaiveBayes").master("local[2]").getOrCreate()
16 
17     /**
18       * 加載數據
19       */
20     val test_data_array = Array("0,1.2-0.5-0.2","0,2.1-0.3-0.2","0,3.6-0.1-1.0","0,4.6-0.3-0.2",
21           "1,0.4-1.5-0.2","1,0.2-2.6-0.8","1,0.6-3.3-0.6","1,0.1-4.3-0.4",
22           "2,0.1-0.4-1.8","2,0.2-0.4-2.1","2,0.3-0.1-3.3","2,0.5-0.2-4.1")
23     
24     val sc = spark.sparkContext
25     val test_data = sc.parallelize(test_data_array).map(row => {
26       val array = row.split(",")
27       LabeledPoint(array(0).toDouble,Vectors.dense(array(1).split("-").map(_.toDouble)))
28     })
29 
30     /**
31       * 拆分數據為訓練數據和測試數據
32       */
33     val splits = test_data.randomSplit(Array(0.8, 0.2), seed = 11L)
34     val train = splits(0)
35     val test = splits(1)
36 
37     /**
38       * 創建朴素貝葉斯模型並訓練
39       * 使用多項式模型
40       */
41     val model = NaiveBayes.train(train, lambda = 1.0, modelType = "multinomial")
42 
43     /**
44       * 預測
45       */
46     val predict = test.map(row => (row.label, model.predict(row.features)))
47     val predict_show = predict.take(20)
48     val test_take = test.take(20)
49     println("預測結果:")
50     println("label" + "\t" + "features" + "\t" + "predict")
51     for(i <- 0 until predict_show.length){
52       println(predict_show(i)._1 + "\t" + test_take(i).features +"\t" +  predict_show(i)._2)
53     }
54 
55     val acc = 1.0 * predict.filter(row => row._1 == row._2).count() / test.count()
56     println("預測准確度:"+acc)
57   }
58 }

五.模擬源碼實現【Spark】

NaiveBayes朴素貝葉斯類:
  1 package org.apache.spark
  2 
  3 import org.apache.spark.ml.feature.LabeledPoint
  4 import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector}
  5 import org.apache.spark.rdd.RDD
  6 
  7 /**
  8   * Created by zhen on 2019/9/11.
  9   */
 10 object NaiveBayes{
 11   /**
 12     * 多項式模型類別
 13     */
 14   val Multinomial : String = "multinomial"
 15 
 16   /**
 17     * 伯努利模式類型
 18     */
 19   val Bernoulli : String = "bernoulli"
 20 
 21   /**
 22     * 設置模型支持的類別
 23     */
 24   val supportedModelTypes = Set(Multinomial, Bernoulli)
 25 
 26   /**
 27     * 訓練一個朴素貝葉斯模型
 28     * @param input 樣本RDD
 29     * @return
 30     */
 31   def train(input : RDD[LabeledPoint]) : NaiveBayesModel = {
 32     new NaiveBayes().run(input)
 33   }
 34 
 35   /**
 36     * 訓練一個朴素貝葉斯模型
 37     * @param input 樣本RDD
 38     * @param lambda 平滑系數
 39     * @return
 40     */
 41   def train(input : RDD[LabeledPoint], lambda : Double) : NaiveBayesModel = {
 42     new NaiveBayes(lambda, Multinomial).run(input)
 43   }
 44 
 45   /**
 46     * 訓練一個朴素貝葉斯模型
 47     * @param input 樣本RDD
 48     * @param lambda 平滑系數
 49     * @param modelType 模型類型
 50     * @return
 51     */
 52   def train(input : RDD[LabeledPoint], lambda : Double, modelType : String) : NaiveBayesModel = {
 53     require(supportedModelTypes.contains(modelType), s"NaiveBayes was created with an unknown modelType:$modelType.")
 54     new NaiveBayes(lambda, modelType).run(input)
 55   }
 56 }
 57 
 58 /**
 59   * 貝葉斯分類類
 60   * @param lambda 平滑系數
 61   * @param modelType 模型類型
 62   */
 63 class NaiveBayes private(private var lambda : Double,
 64                          private var modelType : String) extends Serializable{
 65 
 66   import NaiveBayes.{Bernoulli, Multinomial}
 67 
 68   def this(lambda : Double) = this(lambda, NaiveBayes.Multinomial)
 69 
 70   def this() = this(1.0, NaiveBayes.Multinomial)
 71 
 72   /**
 73     * 設置平滑參數
 74     * @param lambda
 75     * @return
 76     */
 77   def setLambda(lambda : Double) : NaiveBayes = {
 78     this.lambda = lambda
 79     this
 80   }
 81 
 82   /**
 83     * 獲取平滑參數
 84     * @return
 85     */
 86   def getLambda : Double = this.lambda
 87 
 88   /**
 89     * 設置模型類型
 90     * @param modelType
 91     * @return
 92     */
 93   def setModelType(modelType : String) : NaiveBayes = {
 94     require(NaiveBayes.supportedModelTypes.contains(modelType), s"NaiveBayes was created with an unknown modelType :$modelType.")
 95     this.modelType = modelType
 96     this
 97   }
 98 
 99   /**
100     * 獲取模型類型
101     * @return
102     */
103   def getModelType : String  = this.modelType
104 
105   /**
106     * 運行算法
107     * @param data
108     * @return
109     */
110   def run(data : RDD[LabeledPoint]) : NaiveBayesModel = {
111     val requireNonnegativeValues : Vector => Unit = (v : Vector) => {
112       val values = v match {
113         case sv : SparseVector => sv.values
114         case dv : DenseVector => dv.values
115       }
116       if(!values.forall(_ >= 0.0)){
117         throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.")
118       }
119     }
120 
121     val requireZeroOneBernoulliValues : Vector => Unit = (v : Vector) => {
122       val values = v match{
123         case sv : SparseVector => sv.values
124         case dv : DenseVector => dv.values
125       }
126       if(!values.forall(v => v == 0.0 || v == 1.0)){
127         throw new SparkException(s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.")
128       }
129     }
130 
131 
132     /**
133       * 對每個標簽進行聚合操作,求得每個標簽標簽對應特征的頻數
134       * 以label為key,聚合同一個label的features,返回(label, (計數, features之和))
135       */
136     println("訓練數據:")
137     data.foreach(println)
138     val aggregated = data.map(row => (row.label, row.features))
      .combineByKey[(Long, DenseVector)](
139 createCombiner = (v : Vector) => { //完成樣本從v到c的轉化:(v:Vector) -> (c:(Long, DenseVector)) 140 if(modelType == Bernoulli){ 141 requireZeroOneBernoulliValues(v) 142 }else{ 143 requireNonnegativeValues(v) 144 } 145 (1L, v.copy.toDense) 146 }, 147 mergeValue = (c : (Long, DenseVector), v : Vector) => { // 合並 148 requireNonnegativeValues(v) 149 BLAS.axpy(1.0, v, c._2) 150 (c._1 + 1L, c._2) 151 }, 152 mergeCombiners = (c1 : (Long, DenseVector), c2 : (Long, DenseVector)) => { 153 BLAS.axpy(1.0, c2._2, c1._2) 154 (c1._1 + c2._1, c1._2) 155 } 156 ).collect() 157 158 val numLabels = aggregated.length // 類別標簽數量 159 160 var numDocuments = 0L // 文檔數量 161 aggregated.foreach{case (_, (n, _)) => 162 numDocuments += n 163 } 164 165 val numFeatures = aggregated.head match {case (_, (_, v)) => v.size} // 特征數量 166 167 val labels = new Array[Double](numLabels) // 標簽列表列表 168 169 val pi = new Array[Double](numLabels) // pi類別的先驗概率 170 171 val theta = Array.fill(numLabels)(new Array[Double](numFeatures)) // theta各個特征在類別中的條件概率 172 173 val piLogDenom = math.log(numDocuments + numLabels * lambda) //聚合計算theta 174 175 var i = 0 176 aggregated.foreach{case (label, (n, sumTermFreqs)) => 177 labels(i) = label 178 pi(i) = math.log(n + lambda) - piLogDenom // 計算先驗概率,並取log 179 val thetaLogDenom = modelType match { 180 case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) // 多項式模型 181 case Bernoulli => math.log(n + 2.0 * lambda) // 伯努利模型 182 case _ => throw new UnknownError(s"Invalid modeType: $modelType.") 183 } 184 var j = 0 185 while(j < numFeatures){ 186 theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom // 計算各個特征在各個類別中的條件概率 187 j += 1 188 } 189 i+= 1 190 } 191 new NaiveBayesModel(labels, pi, theta, modelType) 192 } 193 }  
NaiveBayesModel朴素貝葉斯模型類:
 1 package org.apache.spark
 2 
 3 import org.apache.spark.ml.linalg.{BLAS, Vector, DenseMatrix, DenseVector}
 4 import org.apache.spark.mllib.util.Saveable
 5 import org.apache.spark.rdd.RDD
 6 
 7 /**
 8   * Created by zhen on 2019/9/12.
 9   */
10 class NaiveBayesModel private[spark](
11                                       val labels : Array[Double],
12                                       val pi : Array[Double],
13                                       val theta : Array[Array[Double]],
14                                       val modelType : String
15                                     ) extends Serializable with Saveable{
16 
17   import NaiveBayes.{Bernoulli, Multinomial, supportedModelTypes}
18 
19   private val piVector = new DenseVector(pi) // 類別的先驗概率
20 
21   private val thetaMatrix = new DenseMatrix(labels.length, theta(0).length, theta.flatten, true) // 各個特征在各個類別的條件概率
22 
23   private[spark] def this(labels:Array[Double], pi:Array[Double], theta:Array[Array[Double]]) = this(labels, pi, theta, NaiveBayes.Multinomial)
24 
25   /**
26     * java接口的構造函數
27     */
28   private[spark] def this(
29                            labels : Iterable[Double],
30                            pi : Iterable[Double],
31                            theta : Iterable[Iterable[Double]]
32                          ) = this(labels.toArray, pi.toArray, theta.toArray.map(_.toArray))
33 
34   require(supportedModelTypes.contains(modelType), s"Invalid modelType $modelType.Supported modelTypes are $supportedModelTypes.")
35 
36   /**
37     * 伯努利模型額外處理
38     */
39   private val (thetaMinusNegTheta, negThetaSum) = modelType match {
40     case Multinomial => (None, None)
41     case Bernoulli =>
42       val negTheta = thetaMatrix.map(value => math.log(1.0 - math.exp(value)))
43       val ones = new DenseVector(Array.fill(thetaMatrix.numCols){1.0})
44       val thetaMinusNegTheta = thetaMatrix.map{value => value - math.log(1.0 - math.exp(value))}
45       (Option(thetaMinusNegTheta), Option(negTheta.multiply(ones)))
46     case _ => throw new UnknownError(s"Involid modelType: $modelType.")
47   }
48 
49   /**
50     * 對樣本RDD進行預測
51     */
52   def predict(testData : RDD[Vector]) : RDD[Double] = {
53     val bcModel = testData.context.broadcast(this)
54     testData.mapPartitions{ iter =>
55       val model = bcModel.value
56       iter.map(model.predict) // 調用參數為一個樣本的predict
57     }
58   }
59 
60   /**
61     * 根據一個樣本的特征向量進行預測
62     */
63   def predict(testData : Vector) : Double = {
64     modelType match {
65       case Multinomial =>
66         val prob = thetaMatrix.multiply(testData)
67         RBLAS.axpy(1.0, piVector, prob)
68         labels(prob.argmax)
69       case Bernoulli =>
70         testData.foreachActive{(index, value) =>
71           if(value != 0.0 && value != 1.0){
72             throw new SparkException(s"Bernouslli naive Bayes requires 0 or 1 feature values but found $testData.")
73           }
74         }
75         val prob = thetaMinusNegTheta.get.multiply(testData)
76         BLAS.axpy(1.0, piVector, prob)
77         BLAS.axpy(1.0, negThetaSum.get, prob)
78         labels(prob.argmax)
79       case _ =>
80         throw new UnknownError(s"Involid modelType: $modelType.")
81     }
82   }
83 
84   /**
85     * 保存模型
86     */
87   def save(sc : SparkContext, path : String) : Unit = {
88     //val data = NaiveBayesModel.SaveLoadV2_0.Data(labels, pi, theta, modelType)
89     //NaiveBayesModel.SaveLoadV2_0.save(sc, path, data)
90   }
91 
92   override protected def formatVersion : String = "2.0"
93 }

六.結果【Spark】

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM