基於ml的spark中文文本分類(朴素貝葉斯)


基於ml的spark中文文本分類(朴素貝葉斯)

中文分詞的流程和語料庫的獲取可以參考 https://www.cnblogs.com/DismalSnail/p/11801742.html
這里展示一下spark新的機器學習包ml的使用,分詞工具為HanLP(詳見 https://github.com/hankcs/HanLP )詞語權重為TF-IDF,分類器為朴素貝葉斯分類器,本次實驗將復旦中文語料庫的訓練集與測試集合並為一個。


package com.teligen.subject.ML

import java.io.File

import com.hankcs.hanlp.HanLP
import org.apache.commons.io.FileUtils
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, IDF, IDFModel, Tokenizer}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.ListBuffer

/**
 * 朴素貝葉斯訓練示例
 */
object NBClassDemo {

  //將分詞后的詞語轉為間隔為空格的字符串
  def toStringList(termString: String): String = {
    termString.replace("[", "").replace("]", "").replace(",", "")

  }

  //存儲代表標簽的Double值和分詞后的字符串
  //注意這里的Double必須從0.0開始,順序增長 0.0 1.0 2.0 ... ,不然即使預測正確,標簽的Double值也對不上,正確率的計算會
  //出錯
  val labelAndSentenceSeq: ListBuffer[(Double, String)] = ListBuffer[(Double, String)]()

  //分詞函數
  def segment(corpusPath: String): Unit = {
    //代表標簽的Double,從0.0開始
    var count: Double = 0.0
    //設置hanLP分詞結果不帶詞性,這樣toString后就不會有 詞性字符了,方便構建詞向量

    HanLP.Config.ShowTermNature = false
    //打開根目錄
    val corpusDir: File = new File(corpusPath)
    //類別目錄
    for (classDir: File <- corpusDir.listFiles()) {
      //文件
      for (text <- classDir.listFiles()) {
        //將標簽Double,和分詞后的字符串存入labelAndSentenceSeq
        labelAndSentenceSeq.append(Tuple2(count,
          //對HanLP.segment().toString修改,使兩個詞之間為空格
          toStringList(
            //分詞
            HanLP.segment(
              //以字符串的形式讀取文本
              FileUtils.readFileToString(text)
                .replace("\r\n", "")//去換行、回車
                .replace("\r", "")//去回車
                .replace("\n", "")//去換行
                .replace(" ", "")//去空格
                .replace("\u3000", "")//去全角空格(中文空格)
                .replace("\t", "")//去制表符
                .replaceAll(s"\\pP|\\pS|\\pC|\\pN|\\pZ", "")//通過Unicode的類別相關正則,去掉各種符號
                .trim
              //分類器的toSting,單詞之間使逗號+空格,需要進一步處理
            ).toString)))
      }
      //改變標簽Label
      count = count + 1.0
    }
  }

  //構建以TF-IDF為權重的詞向量
  def tfIdf(spark: SparkSession): DataFrame = {
    //將標簽Double和分詞后的字符串轉為DataFrame
    val sentenceData: DataFrame = spark.createDataFrame(labelAndSentenceSeq.toSeq).toDF("label", "sentence")
    
    //將字分詞后的字符串分割為一個個詞語,Tokenizer()只能分割以空格間隔的字符串,
    // RegexTokenizer功能更強大,詳情可以點進Tokenizer()源碼查看
    
    //新建sentence --> words分割器
    val tokenizer: Tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
    //進行分割
    //這里如果不select(),則每一步的計算結果都存儲在DataFrame,導致DataFrame很大,很容易造成 java heap space 異常
    val wordsData: DataFrame = tokenizer.transform(sentenceData).select("label", "words")
    
    //新建 words --> rawFeatures HasingTF類
    val hashingTF: HashingTF = new HashingTF()
      .setInputCol("words").setOutputCol("rawFeatures")
    
    //執行計算,獲得每個語句中每詞語的詞頻即 TF(Term Frequency)
    val featurizedData: DataFrame = hashingTF.transform(wordsData).select("label", "rawFeatures")
    
    //新建rawFeatures --> features IDF類
    val idf: IDF = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    //計算IDF (Inverse Document Frequency)
    val idfModel: IDFModel = idf.fit(featurizedData)
    //計算TF-IDF
    idfModel.transform(featurizedData).select("label", "features")
  }

  //訓練和預測函數
  def trainAndPredict(ifIdfData: DataFrame) = {
    //按比例選取測試集和訓練集
    val Array(trainingData, testData) = ifIdfData.randomSplit(Array(0.7, 0.3), seed = 1234L)
    //訓練朴素貝葉斯分類器
    val model = new NaiveBayes().fit(trainingData)
    //預測
    val predictions = model.transform(testData)
    //展示測試結果,50條
    predictions.show(50)

    //測試結果評估
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    //測試結果准確率
    val accuracy = evaluator.evaluate(predictions)
    println(s"Test set accuracy = $accuracy")
  }

  def main(args: Array[String]): Unit = {
    //新建spark上下文
    val spark = SparkSession.builder().master("local[2]").appName("NBC").getOrCreate()
    //分詞
    segment("./corpus/all_corpus/")
    //訓練和預測
    trainAndPredict(tfIdf(spark))
  }
}



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM