基於mllib的spark中文文本分類(朴素貝葉斯)
本文參考博客 https://blog.csdn.net/github_36326955/article/details/54891204
使用spark中ml包進行中文文本分類參見 https://www.cnblogs.com/DismalSnail/p/11802281.html
首先介紹一下文本分類的大致流程
- 預處理
- 中文分詞
- 構建詞向量空間
- 訓練模型
- 用訓練好的模型進行預測
- 通過預測結果對模型進行評估
預處理
- 語料庫
- 文本格式轉換
語料庫
要進行文本分類,首先要有文本,復旦中文文本語料庫
百度雲盤鏈接:https://pan.baidu.com/s/1nKAmM8EuF54sgtMGhZN9tw
密碼 ns8e
文本格式轉換
由於下載的語料庫是GBK格式的,為了處理方便,需要轉成UTF-8的格式,轉換代碼如下
package com.classification.text
import java.io.File
import org.apache.commons.io.FileUtils //Java的文件處理工具包
object GBK2UTF {
def GBK2UTF8(GBKCorpusPath: String, UTF8CorpusPath: String): Unit = {
//打開根目錄
val GBKCorpusDir: Array[File] = new File(GBKCorpusPath).listFiles()
//對應的UTF-8格式的目錄是否存在,不存在新建
val UTFCorpusDir: File = new File(UTF8CorpusPath);
if (!UTFCorpusDir.exists()) {
UTFCorpusDir.mkdir()
}
//打開類別目錄
for (gbkClassDir: File <- GBKCorpusDir) {
//記錄目錄路徑,為創建UTF-8格式的文件夾和文件提供路徑
val UTFClassDirPath: String = UTF8CorpusPath + gbkClassDir.getName
//UTF-8格式的類別目錄是否存在,不存在新建
val UTFClassDir: File = new File(UTFClassDirPath)
if (!UTFClassDir.exists()) {
UTFClassDir.mkdir()
}
for (gbkText: File <- gbkClassDir.listFiles()) {
//將文件以GBK格式讀取為字符串,轉為UTF-8格式后寫入新文件
FileUtils.write(new File(UTFClassDirPath + "/" + gbkText),
FileUtils.readFileToString(gbkText, "GBK"), "UTF-8")
}
}
}
def main(args: Array[String]): Unit = {
GBK2UTF8("./train_corpus/", "./utf_train_corpus/")
GBK2UTF8("./test_corpus/", "./utf_test_corpus/")
}
}
中文分詞
- 分詞工具介紹
- 選擇Ansj作為分詞工具,以及注意事項
- Ansj中文分詞實現
分詞工具介紹
中文分詞的理論部分很多博客都有介紹,這里主要介紹代碼實現(理論咱現在也不會,就會調用API)。如果用Python,分詞一般選擇jieba分詞,jieba分詞也有Java版的,但是用起來不是很方便。如果用Java或Scala,就要選擇Java版的中文分詞工具,我用的主要是Ansj和HanLP,兩個分詞工具可以百度,感覺HanLP比較強大。
選擇Ansj作為分詞工具,以及注意事項
本次實驗選擇Ansj作為中文文本分類的工具。注意:如果需要添加自定義詞典,詞典內的空白都必須是tab,但是如果使用Idea編輯詞典文件,tab鍵默認為4個空格(這種細節注意不到會讓人崩潰)
Ansj中文分詞的實現
package com.classification.text
import java.io.File
import java.util
import org.ansj.domain.Result
import org.ansj.recognition.impl.StopRecognition
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.commons.io.FileUtils
//scala集合與java集合轉換的包,按住Ctrl點進源碼,可以查看轉換規則
import scala.collection.JavaConversions._
object WordSplit {
//分詞函數
def corpusSegment(utfCorpusPath: String, utfSegmentPath: String, trainLabelListPath: String, trainSegmentPath: String): Unit = {
//計數用,統計樣本個數
var count = 0
//存放標簽的Java數組,這里使用java數組是為了方便寫入文件
val labelList = new util.ArrayList[String]()
//存放分詞后字符串的數組,同樣為了方便寫入文件
val contextList = new util.ArrayList[String]()
//打開根目錄
val corpusDir: Array[File] = new File(utfCorpusPath).listFiles()
//類別目錄
for (corpusClassDir: File <- corpusDir) {
//每一個文件
for (utfText <- corpusClassDir.listFiles()) {
count = count + 1
//調用分詞方法
val textSeg: Result = ToAnalysis.parse(FileUtils.readFileToString(utfText)
.replace("\r\n", "") //去除換行和回車
.replace("\r", "") //去除單獨的回車
.replace("\n", "") //去除單獨的換行
.replace(" ", "") //去除空格
.replace("\u3000", "") //去除全角空格(中文空格)
.replace("\t", "") //去除制表符
.replaceAll(s"\\pP|\\pS|\\pC|\\pN|\\pZ", "") //通過設置Unicode類別的相關正則去除符號
.trim
)
//讀取停用詞,就是一些對分類沒有作用的詞,去除可以對特征向量降維
val stopWordList: Seq[String] = FileUtils.readFileToString(new File("./stopWords/stop_word_chinese.txt"))
.split("\r\n").toSeq
//新建停用詞對象
val filter = new StopRecognition()
//加載停用詞列表
filter.insertStopWords(stopWordList)
//去除停用詞
val result: Result = textSeg.recognition(filter)
/**
*這里如果將每篇文章的分詞單獨寫入一個文件,則在構建詞向量時,spark
* 就要分別讀取每篇文章的分詞,而spark每讀一個文件,就會就會產生一個RDD,
* 這樣讀取所有文本的分詞就會產生巨量的RDD,這時把這些分詞合並到一個集合中(巨量的RDD
* 合並成一個RDD)時,spark在構建DAG時就會爆掉(親身經歷,當時用的時RDD的union方法)
*/
//將分詞內容加入列表
contextList.add(result.toStringWithOutNature)
//將標簽加入列表,標簽的順序和文本分詞后的順序是對應的
labelList.add(corpusClassDir.getName)
}
}
println(count)
//將分詞寫入文件
FileUtils.writeLines(new File(trainSegmentPath), "UTF-8", contextList)
//將文本標簽寫入文件
FileUtils.writeLines(new File(trainLabelListPath), "UTF-8", labelList)
}
def main(args: Array[String]): Unit = {
//這里該了一些目錄結構,對代碼的功能沒有影響
corpusSegment("./train/utf_train_corpus/", "./train/utf_train_segment/",
"./train/train_label_list.txt", "./train/train_seg.txt")
corpusSegment("./test/utf_test_corpus/", "./test/utf_test_segment/",
"./test/test_label_list.txt", "./test/test_seg.txt")
}
}
構建詞向量空間、訓練、預測、評估
分詞完成后就可以構建詞向量、訓練、預測、評估了
package com.classification.text
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.feature.{HashingTF, IDF, IDFModel}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Classification {
//讀取分詞文件和標簽文件,兩個文件讀取后都是RDD形式,元組的形式返回
def getDocumentsAndLabels(sc: SparkContext, segPath: String, labelListPath: String): (RDD[Seq[String]], Iterator[String]) = {
(sc.textFile(segPath).map(_.split(",").toSeq), sc.textFile(labelListPath).collect().toSeq.toIterator)
}
//訓練函數
def train(sc: SparkContext, trainSegPath: String, trainLabelListPath: String): NaiveBayesModel = {
//讀取訓練集的分詞和標簽
val (documents, labelList) = getDocumentsAndLabels(sc, trainSegPath, trainLabelListPath)
//新建HashingTF類
val hashingTF: HashingTF = new HashingTF()
//計算TF值
val tf: RDD[Vector] = hashingTF.transform(documents)
//緩存,為了計算快,對功能沒有影響
tf.cache()
//計算IDF值
val idf: IDFModel = new IDF(minDocFreq = 3).fit(tf)
//計算TF-IDF值
val tfIdf: RDD[Vector] = idf.transform(tf)
//將TFIDF數據,結合標簽,轉為LabelPoint數據,LabelPoint是訓練函數NaiveBayes.train()的輸入數據格式
val training: RDD[LabeledPoint] = tfIdf.map {
vector: Vector => LabeledPoint(getDoubleOfLabel(labelList.next()), vector)
}
//訓練函數訓練,
NaiveBayes.train(training, lambda = 1.0, modelType = "multinomial")
}
//測試函數,參數model為訓練集訓練后的模型
def test(sc: SparkContext, testSegPath: String, testLabelListPath: String, model: NaiveBayesModel): Double = {
//讀取測試數據集分詞和標簽數據
val (documents, labelList) = getDocumentsAndLabels(sc, testSegPath, testLabelListPath)
//和訓練的步驟差不多
val hashingTF: HashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)
tf.cache()
val idf: IDFModel = new IDF(minDocFreq = 3).fit(tf)
val tfIdf: RDD[Vector] = idf.transform(tf)
val test: RDD[LabeledPoint] = tfIdf.map {
vector: Vector => LabeledPoint(getDoubleOfLabel(labelList.next()), vector)
}
//預測
val predictionAndLabel: RDD[(Double, Double)] = test.map((p: LabeledPoint) => (model.predict(p.features), p.label))
//計算准確率
1.0 * predictionAndLabel.filter((x: (Double, Double)) => x._1 == x._2).count() / test.count()
}
//獲取標簽對應的Double數值,將標簽中的數組作為標簽對應的數值
//C11Space -> 11.0
def getDoubleOfLabel(label: String): Double = {
label.split("-")(0).tail.toDouble
}
def main(args: Array[String]): Unit = {
//新建spark上下文
val conf: SparkConf = new SparkConf().setAppName("Classification").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
//調用處理函數
println(test(sc, "./test/test_seg.txt",
"./test/test_label_list.txt",
train(sc,
"./train/train_seg.txt",
"./train/train_label_list.txt"
)
)
)
}
}
到此分詞的步驟就結束了,要想提高分詞的准確率可以嘗試不同的分詞工具和文本分類算法