1、引言
最近,在做用戶畫像,利用文本分類方法挖掘用戶興趣模型。雖然文本分類不是很難,但是簡單的事情,細節卻是相當的重要。這篇文章我主要是想記錄一下,我在做分類的時候,使用到的特征選擇的方法,以及相關的是實現方法。
2、特征選擇的方法
(1)信息增益
信息增益這一詞來自通信領域,香濃提出的信息熵理論。信息熵的定義如下:
它的本質是衡量一個事件的不確定性的大小。而信息增益則是相對於某一個特定的特征而言的:例如,對與某個特征X,其對應的取值有n種(x1,x2,x3...xn),分別計算特征X在x1,x2,x3...xn的取值下的信息熵,並根據每個取值的概率,計算出所有取值信息熵的平均值:(如下所示)
而信息增益就可以表示為原信息熵-條件熵。如下所示:
(2)信息增益率
特征取值的個數對信息增益有較大的干擾,為了避免有些特征取值個數較多,有些特征取值取值較小,造成信息增益無法公正的衡量一個特征的好壞。於是,信息增益率是在原特征的基礎之上,相當與對信息增益做了歸一化。其表達式是:
IGR=IG(T)/H(C)
(3)相關系數
相關系數是判斷兩個變量之間的相關性,比較常用的是person相關系數。
(4)Gini指數
基尼指數是表示一個變量的重要程度,其值越小越重要。
(5)卡方檢驗
卡方檢驗是檢驗兩個變量之間是否相互獨立的,在特征選擇中的應用就是檢驗特征與目標變量之間的是否獨立。在這里原價假設是特征與目標變量是相互獨立的。統計計算特征的值與期望值之間的偏差,由於偏差不能取負值,所以計算特征的值與期望值之間的偏差的平方,並求和。最終結果作為判斷原假設是否成立的標准。其公式如下:(這里需要注意一點就是期望值是我們自己計算得到的)
但是,在特征選擇的過程中,我們需要選擇的是原假設不成立的特征,即偏差較大的特征。以下介紹一下卡方檢驗在文本分類中的應用的例子:
假設現在有N篇文檔,其中有M篇是關於體育的,我們想考察一個詞“籃球”與類別“體育”之間的相關性。我們有四個觀察值可以使用:
1. 包含“籃球”且屬於“體育”類別的文檔數,命名為A
2. 包含“籃球”但不屬於“體育”類別的文檔數,命名為B
3. 不包含“籃球”但卻屬於“體育”類別的文檔數,命名為C
4. 既不包含“籃球”也不屬於“體育”類別的文檔數,命名為D
那么如何計算特征“籃球”的期望值呢?因為A+B是包含“籃球”的文章數,除以總文檔數就是“籃球”出現的概率,當然,這里認為在一篇文章中出現即可,而不管出現了幾次,而屬於體育類的文章數為A+C,在這些個文檔中,應該有
此時,我們已經得到了“籃球”這個特征的期望值,接下來就是計算實際值與期望值之間的偏差了。如下所示:
代入上是公式,可以得到:
但是,我們在實際工程上是通過其大小來選擇特征詞的,對於所有的特征詞,A+C和B+D都是一樣的,所以,上面的公式只需要計算如下式子即可:
最后我們選擇其值較大的K個特征。
(6)通過建模的方式,來選擇特征
該方法主要是通過模型的方式來選擇特征,具體的做法:
1)首先根據原始數據,進行數據的預處理(如缺失值,異常值,歸一化)
2)對原始數據進行建模,一般采用邏輯回歸模型。
3)根據模型的評價指標(一般用正確率或AUC)來對模型進行預測調優。
4)若上述模型的評價指標較好,根據模型的權值的大小來確定特征的重要性。對應權值越大的特征,其重要性越大。
3、特征選擇方法的實現
(1)信息增益率和相關系數:

1 package com.welab.BDL.UserInterests 2 3 import org.apache.spark.mllib.linalg.Vectors 4 import org.apache.spark.mllib.linalg.Vector 5 import org.apache.spark.SparkContext 6 import org.apache.spark.rdd.RDD 7 import org.apache.spark.SparkConf 8 import org.apache.spark.mllib.stat.Statistics 9 import org.apache.spark.mllib.linalg.Matrix 10 11 /** 12 * @author LJY 13 */ 14 object InformationGain { 15 16 //計算某個屬性的信息熵 17 def inforEntropy(target_attribute: Array[Double]): Double = { 18 var temp = scala.collection.mutable.HashMap[Double, Int]() 19 for (item <- target_attribute) { 20 if (temp.contains(item)) { 21 temp(item) = temp(item) + 1 22 } else { 23 temp.put(item, 1) 24 } 25 } 26 var Entropy = 0.0 27 for (item <- temp) { 28 Entropy += (-1) * (item._2.toDouble / target_attribute.length) * log2(item._2.toDouble / target_attribute.length) 29 } 30 31 Entropy 32 } 33 34 def log2(x: Double): Double = scala.math.log(x) / scala.math.log(2) 35 36 //計算特征與目標特征之間的信息增益 37 def inforGain(sc: SparkContext, feature_attribute: RDD[(Double, Double)]): (Double, Double) = { 38 val target = feature_attribute.map { x => x._2 }.toArray() 39 val Entropy1 = inforEntropy(target) 40 41 val all_Entropy = sc.accumulator(0.0) 42 feature_attribute.groupBy(x => x._1).foreach { x => all_Entropy += (x._2.size.toDouble / target.length) * inforEntropy(x._2.map(x => x._2).toArray) 43 } 44 45 val X = feature_attribute.map { x => x._1 } 46 val Y = feature_attribute.map { x => x._2 } 47 48 val correlation: Double = Statistics.corr(X, Y, "pearson") 49 /* // calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method. 50 // If a method is not specified, Pearson's method will be used by default. 51 val correlMatrix: Matrix = Statistics.corr(data, "pearson")*/ 52 53 // println(Entropy1) 54 // println(all_Entropy.value) 55 ((Entropy1 - all_Entropy.value), correlation) 56 } 57 58 //計算特征與目標特征之間的信息增益率 59 def inforGainRate(sc: SparkContext, feature_attribute: RDD[(Double, Double)]): (Double, Double) = { 60 val target = feature_attribute.map { x => x._2 }.toArray() 61 val Entropy1 = inforEntropy(target) 62 63 val all_Entropy = sc.accumulator(0.0) 64 feature_attribute.groupBy(x => x._1).foreach { x => all_Entropy += (x._2.size.toDouble / target.length) * inforEntropy(x._2.map(x => x._2).toArray) 65 } 66 67 val X = feature_attribute.map { x => x._1 } 68 val Y = feature_attribute.map { x => x._2 } 69 70 val correlation: Double = Statistics.corr(X, Y, "pearson") 71 /* // calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method. 72 // If a method is not specified, Pearson's method will be used by default. 73 val correlMatrix: Matrix = Statistics.corr(data, "pearson")*/ 74 75 // println(Entropy1) 76 // println(all_Entropy.value) 77 ((Entropy1 - all_Entropy.value).toDouble/inforEntropy(X.toArray()), correlation) 78 } 79 80 def main(args: Array[String]): Unit = { 81 // Vectors.dense() 82 val conf = new SparkConf().setAppName("OneLevelClassification").setMaster("local") 83 val sc = new SparkContext(conf) 84 85 val data = sc.textFile("/user/hive/warehouse/bairong_summary") 86 87 val result = data.take(10).foreach { x => 88 val fields = x.split("\t") 89 fields.foreach { x => print(x + " ") } 90 } 91 92 } 93 }
(2)卡方檢驗自定義實現:

1 def my_chiSqTest3(sc: SparkContext, filename: String, p_thr: Double): Array[String] = { 2 val data = sc.textFile(filename, 10000).cache() 3 val allnum = data.count() 4 val result = data.map { x => 5 val fields = x.split("::") 6 val wds = fields(0).split(",") 7 val label = fields(1).split("#")(0) 8 var words = scala.collection.mutable.HashSet[String]() //准備對每個文檔中的關鍵詞進行去重 9 for (word <- wds) { 10 words.add(word) 11 } 12 var ss = "" 13 for (word <- words) { 14 if (ss.isEmpty()) { 15 ss = word + ":" + label 16 } else { 17 ss = ss + "," + word + ":" + label 18 } 19 } 20 ss 21 }.flatMap { x => x.split(",") }.map { x => 22 val fields = x.split(":") 23 (fields(0), fields(1)) //(word,label) 24 }.reduceByKey { (x, y) => x + "," + y }.cache() 25 26 println("所有的特征詞的總數為:" + result.count()) 27 28 /* 29 * A ——表示包含某個關鍵詞T,且屬於某類X的文檔數 30 * B ——表示包含某個關鍵詞T,但不屬於某類X的文檔數 31 * C ——表示不包含某個關鍵詞T,但屬於某類X的文檔數 32 * D ——表示即不包含某個關鍵詞T,也不屬於某類X的文檔數 33 * A+C ——表示屬於某類X的所有文檔數 34 * C+D ——表示不包含關鍵詞T的所有文檔數 35 */ 36 37 val result_A_B = result.map { x => 38 val word = x._1 39 var label_occur = scala.collection.mutable.HashMap[String, Int]() //(label,occur) 40 val labels = x._2.split(",") //包含該關鍵詞所有類別 41 var A = 0 42 var B = 0 43 //統計關鍵詞在每個類別中出現的次數 44 for (label <- labels) { 45 label_occur.get(label) match { 46 case Some(a) => label_occur.put(label, a + 1) 47 case None => label_occur.put(label, 1) 48 } 49 } 50 //相對每個類來說,對應的A和B分別如下 51 var word_A_B = "" 52 for (label_num <- label_occur) { 53 A = label_num._2 54 B = labels.length - label_num._2 55 if (word_A_B.isEmpty()) { 56 word_A_B = word + ":" + label_num._1 + "," + A + "#" + B 57 } else { 58 word_A_B = word_A_B + "@" + word + ":" + label_num._1 + "," + A + "#" + B 59 } 60 61 } 62 word_A_B 63 }.flatMap { x => x.split("@") }.map { x => 64 val fields = x.split(",") 65 val word_label = fields(0) 66 val A = fields(1).split("#")(0).toInt 67 val B = fields(1).split("#")(1).toInt 68 (word_label, (A, B)) //(word:label,(A,B)) 69 } 70 71 val result_CplusD = result.map { x => 72 val word = x._1 73 val C_and_D = allnum - x._2.split(",").length 74 (word, C_and_D) //相對於關鍵詞T來說,C+D 75 } 76 77 val result_Aplus_C = data.map { x => 78 val fields = x.split("::") 79 val label = fields(1).split("#")(0) 80 (label, 1) 81 }.reduceByKey(_ + _) 82 83 val res = result_A_B.map { x => 84 val word_label = x._1.split(":") 85 val word = word_label(0) 86 val label = word_label(1) 87 (word, (label, x._2)) //(word,(label,(A,B))) 88 }.leftOuterJoin(result_CplusD).map { x => 89 val word = x._1 90 val label_A_B = x._2._1 91 var C_plus_D = 0 92 x._2._2 match { 93 case Some(a) => C_plus_D = a.toInt 94 case None => 95 } 96 (label_A_B._1, (word, label_A_B._2, C_plus_D)) //(label,(word,(A,B),C+D)) 97 }.leftOuterJoin(result_Aplus_C).map { x => 98 val label = x._1 99 val word_A_B_CplusD = x._2._1 //(word,(A,B),C+D) 100 var AplusC = 0 101 x._2._2 match { 102 case Some(a) => AplusC = a 103 case None => 104 } 105 val A = word_A_B_CplusD._2._1 106 val B = word_A_B_CplusD._2._2 107 val C = AplusC - A 108 val D = word_A_B_CplusD._3 - C 109 (word_A_B_CplusD._1, ((A * D - B * C) * (A * D - B * C)).toDouble / ((A + B) * word_A_B_CplusD._3)) 110 }.reduceByKey { (x, y) => 111 var maxvalue = 0.0 112 if (x > y) { 113 maxvalue = x 114 } else { 115 maxvalue = y 116 } 117 maxvalue 118 }.map(x => (x._2, x._1)).sortByKey(false).map { x => (x._2, x._1) } 119 120 val res1 = res.take((res.count() * p_thr).toInt).map(x => x._1) 121 res1 122 }