基於ml的spark中文文本分類(朴素貝葉斯)
中文分詞的流程和語料庫的獲取可以參考 https://www.cnblogs.com/DismalSnail/p/11801742.html
這里展示一下spark新的機器學習包ml的使用,分詞工具為HanLP(詳見 https://github.com/hankcs/HanLP )詞語權重為TF-IDF,分類器為朴素貝葉斯分類器,本次實驗將復旦中文語料庫的訓練集與測試集合並為一個。
package com.teligen.subject.ML
import java.io.File
import com.hankcs.hanlp.HanLP
import org.apache.commons.io.FileUtils
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, IDF, IDFModel, Tokenizer}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
/**
* 朴素貝葉斯訓練示例
*/
object NBClassDemo {
//將分詞后的詞語轉為間隔為空格的字符串
def toStringList(termString: String): String = {
termString.replace("[", "").replace("]", "").replace(",", "")
}
//存儲代表標簽的Double值和分詞后的字符串
//注意這里的Double必須從0.0開始,順序增長 0.0 1.0 2.0 ... ,不然即使預測正確,標簽的Double值也對不上,正確率的計算會
//出錯
val labelAndSentenceSeq: ListBuffer[(Double, String)] = ListBuffer[(Double, String)]()
//分詞函數
def segment(corpusPath: String): Unit = {
//代表標簽的Double,從0.0開始
var count: Double = 0.0
//設置hanLP分詞結果不帶詞性,這樣toString后就不會有 詞性字符了,方便構建詞向量
HanLP.Config.ShowTermNature = false
//打開根目錄
val corpusDir: File = new File(corpusPath)
//類別目錄
for (classDir: File <- corpusDir.listFiles()) {
//文件
for (text <- classDir.listFiles()) {
//將標簽Double,和分詞后的字符串存入labelAndSentenceSeq
labelAndSentenceSeq.append(Tuple2(count,
//對HanLP.segment().toString修改,使兩個詞之間為空格
toStringList(
//分詞
HanLP.segment(
//以字符串的形式讀取文本
FileUtils.readFileToString(text)
.replace("\r\n", "")//去換行、回車
.replace("\r", "")//去回車
.replace("\n", "")//去換行
.replace(" ", "")//去空格
.replace("\u3000", "")//去全角空格(中文空格)
.replace("\t", "")//去制表符
.replaceAll(s"\\pP|\\pS|\\pC|\\pN|\\pZ", "")//通過Unicode的類別相關正則,去掉各種符號
.trim
//分類器的toSting,單詞之間使逗號+空格,需要進一步處理
).toString)))
}
//改變標簽Label
count = count + 1.0
}
}
//構建以TF-IDF為權重的詞向量
def tfIdf(spark: SparkSession): DataFrame = {
//將標簽Double和分詞后的字符串轉為DataFrame
val sentenceData: DataFrame = spark.createDataFrame(labelAndSentenceSeq.toSeq).toDF("label", "sentence")
//將字分詞后的字符串分割為一個個詞語,Tokenizer()只能分割以空格間隔的字符串,
// RegexTokenizer功能更強大,詳情可以點進Tokenizer()源碼查看
//新建sentence --> words分割器
val tokenizer: Tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
//進行分割
//這里如果不select(),則每一步的計算結果都存儲在DataFrame,導致DataFrame很大,很容易造成 java heap space 異常
val wordsData: DataFrame = tokenizer.transform(sentenceData).select("label", "words")
//新建 words --> rawFeatures HasingTF類
val hashingTF: HashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures")
//執行計算,獲得每個語句中每詞語的詞頻即 TF(Term Frequency)
val featurizedData: DataFrame = hashingTF.transform(wordsData).select("label", "rawFeatures")
//新建rawFeatures --> features IDF類
val idf: IDF = new IDF().setInputCol("rawFeatures").setOutputCol("features")
//計算IDF (Inverse Document Frequency)
val idfModel: IDFModel = idf.fit(featurizedData)
//計算TF-IDF
idfModel.transform(featurizedData).select("label", "features")
}
//訓練和預測函數
def trainAndPredict(ifIdfData: DataFrame) = {
//按比例選取測試集和訓練集
val Array(trainingData, testData) = ifIdfData.randomSplit(Array(0.7, 0.3), seed = 1234L)
//訓練朴素貝葉斯分類器
val model = new NaiveBayes().fit(trainingData)
//預測
val predictions = model.transform(testData)
//展示測試結果,50條
predictions.show(50)
//測試結果評估
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
//測試結果准確率
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")
}
def main(args: Array[String]): Unit = {
//新建spark上下文
val spark = SparkSession.builder().master("local[2]").appName("NBC").getOrCreate()
//分詞
segment("./corpus/all_corpus/")
//訓練和預測
trainAndPredict(tfIdf(spark))
}
}