基於mllib的spark中文文本分類(朴素貝葉斯)


基於mllib的spark中文文本分類(朴素貝葉斯)

本文參考博客 https://blog.csdn.net/github_36326955/article/details/54891204
使用spark中ml包進行中文文本分類參見 https://www.cnblogs.com/DismalSnail/p/11802281.html

首先介紹一下文本分類的大致流程

  • 預處理
  • 中文分詞
  • 構建詞向量空間
  • 訓練模型
  • 用訓練好的模型進行預測
  • 通過預測結果對模型進行評估

預處理

  • 語料庫
  • 文本格式轉換

語料庫

要進行文本分類,首先要有文本,復旦中文文本語料庫

百度雲盤鏈接:https://pan.baidu.com/s/1nKAmM8EuF54sgtMGhZN9tw

密碼 ns8e

文本格式轉換

由於下載的語料庫是GBK格式的,為了處理方便,需要轉成UTF-8的格式,轉換代碼如下


package com.classification.text

import java.io.File

import org.apache.commons.io.FileUtils //Java的文件處理工具包

object GBK2UTF {

  def GBK2UTF8(GBKCorpusPath: String, UTF8CorpusPath: String): Unit = {
    //打開根目錄
    val GBKCorpusDir: Array[File] = new File(GBKCorpusPath).listFiles()
    //對應的UTF-8格式的目錄是否存在,不存在新建
    val UTFCorpusDir: File = new File(UTF8CorpusPath);
    if (!UTFCorpusDir.exists()) {
      UTFCorpusDir.mkdir()
    }

    //打開類別目錄
    for (gbkClassDir: File <- GBKCorpusDir) {
      //記錄目錄路徑,為創建UTF-8格式的文件夾和文件提供路徑
      val UTFClassDirPath: String = UTF8CorpusPath + gbkClassDir.getName
      //UTF-8格式的類別目錄是否存在,不存在新建
      val UTFClassDir: File = new File(UTFClassDirPath)
      if (!UTFClassDir.exists()) {
        UTFClassDir.mkdir()
      }

      for (gbkText: File <- gbkClassDir.listFiles()) {
        //將文件以GBK格式讀取為字符串,轉為UTF-8格式后寫入新文件
        FileUtils.write(new File(UTFClassDirPath + "/" + gbkText),
          FileUtils.readFileToString(gbkText, "GBK"), "UTF-8")
      }
    }

  }


  def main(args: Array[String]): Unit = {
    GBK2UTF8("./train_corpus/", "./utf_train_corpus/")
    GBK2UTF8("./test_corpus/", "./utf_test_corpus/")
  }

}

中文分詞

  • 分詞工具介紹
  • 選擇Ansj作為分詞工具,以及注意事項
  • Ansj中文分詞實現

分詞工具介紹

中文分詞的理論部分很多博客都有介紹,這里主要介紹代碼實現(理論咱現在也不會,就會調用API)。如果用Python,分詞一般選擇jieba分詞,jieba分詞也有Java版的,但是用起來不是很方便。如果用Java或Scala,就要選擇Java版的中文分詞工具,我用的主要是Ansj和HanLP,兩個分詞工具可以百度,感覺HanLP比較強大。

選擇Ansj作為分詞工具,以及注意事項

本次實驗選擇Ansj作為中文文本分類的工具。注意:如果需要添加自定義詞典,詞典內的空白都必須是tab,但是如果使用Idea編輯詞典文件,tab鍵默認為4個空格(這種細節注意不到會讓人崩潰)

Ansj中文分詞的實現


package com.classification.text

import java.io.File
import java.util

import org.ansj.domain.Result
import org.ansj.recognition.impl.StopRecognition
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.commons.io.FileUtils

//scala集合與java集合轉換的包,按住Ctrl點進源碼,可以查看轉換規則
import scala.collection.JavaConversions._


object WordSplit {

  //分詞函數
  def corpusSegment(utfCorpusPath: String, utfSegmentPath: String, trainLabelListPath: String, trainSegmentPath: String): Unit = {
    //計數用,統計樣本個數
    var count = 0
    //存放標簽的Java數組,這里使用java數組是為了方便寫入文件
    val labelList = new util.ArrayList[String]()
    //存放分詞后字符串的數組,同樣為了方便寫入文件
    val contextList = new util.ArrayList[String]()
    //打開根目錄
    val corpusDir: Array[File] = new File(utfCorpusPath).listFiles()
    //類別目錄
    for (corpusClassDir: File <- corpusDir) {
      //每一個文件
      for (utfText <- corpusClassDir.listFiles()) {

        count = count + 1
        //調用分詞方法
        val textSeg: Result = ToAnalysis.parse(FileUtils.readFileToString(utfText)
          .replace("\r\n", "") //去除換行和回車
          .replace("\r", "") //去除單獨的回車
          .replace("\n", "") //去除單獨的換行
          .replace(" ", "") //去除空格
          .replace("\u3000", "") //去除全角空格(中文空格)
          .replace("\t", "") //去除制表符
          .replaceAll(s"\\pP|\\pS|\\pC|\\pN|\\pZ", "") //通過設置Unicode類別的相關正則去除符號
          .trim
        )
        //讀取停用詞,就是一些對分類沒有作用的詞,去除可以對特征向量降維
        val stopWordList: Seq[String] = FileUtils.readFileToString(new File("./stopWords/stop_word_chinese.txt"))
          .split("\r\n").toSeq
        //新建停用詞對象
        val filter = new StopRecognition()
        //加載停用詞列表
        filter.insertStopWords(stopWordList)
        //去除停用詞
        val result: Result = textSeg.recognition(filter)

        /**
         *這里如果將每篇文章的分詞單獨寫入一個文件,則在構建詞向量時,spark
         * 就要分別讀取每篇文章的分詞,而spark每讀一個文件,就會就會產生一個RDD,
         * 這樣讀取所有文本的分詞就會產生巨量的RDD,這時把這些分詞合並到一個集合中(巨量的RDD
         * 合並成一個RDD)時,spark在構建DAG時就會爆掉(親身經歷,當時用的時RDD的union方法)
         */
        
        //將分詞內容加入列表
        contextList.add(result.toStringWithOutNature)
        //將標簽加入列表,標簽的順序和文本分詞后的順序是對應的
        labelList.add(corpusClassDir.getName)

      }
    }
    println(count)
    //將分詞寫入文件
    FileUtils.writeLines(new File(trainSegmentPath), "UTF-8", contextList)
    //將文本標簽寫入文件
    FileUtils.writeLines(new File(trainLabelListPath), "UTF-8", labelList)

  }

  def main(args: Array[String]): Unit = {
    //這里該了一些目錄結構,對代碼的功能沒有影響
    corpusSegment("./train/utf_train_corpus/", "./train/utf_train_segment/",
      "./train/train_label_list.txt", "./train/train_seg.txt")

    corpusSegment("./test/utf_test_corpus/", "./test/utf_test_segment/",
      "./test/test_label_list.txt", "./test/test_seg.txt")
  }
}


構建詞向量空間、訓練、預測、評估

分詞完成后就可以構建詞向量、訓練、預測、評估了


package com.classification.text

import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.feature.{HashingTF, IDF, IDFModel}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Classification {
  //讀取分詞文件和標簽文件,兩個文件讀取后都是RDD形式,元組的形式返回
  def getDocumentsAndLabels(sc: SparkContext, segPath: String, labelListPath: String): (RDD[Seq[String]], Iterator[String]) = {
    (sc.textFile(segPath).map(_.split(",").toSeq), sc.textFile(labelListPath).collect().toSeq.toIterator)
  }

  //訓練函數
  def train(sc: SparkContext, trainSegPath: String, trainLabelListPath: String): NaiveBayesModel = {
    //讀取訓練集的分詞和標簽
    val (documents, labelList) = getDocumentsAndLabels(sc, trainSegPath, trainLabelListPath)
    //新建HashingTF類
    val hashingTF: HashingTF = new HashingTF()
    //計算TF值
    val tf: RDD[Vector] = hashingTF.transform(documents)

    //緩存,為了計算快,對功能沒有影響
    tf.cache()
    //計算IDF值
    val idf: IDFModel = new IDF(minDocFreq = 3).fit(tf)
    //計算TF-IDF值
    val tfIdf: RDD[Vector] = idf.transform(tf)
    //將TFIDF數據,結合標簽,轉為LabelPoint數據,LabelPoint是訓練函數NaiveBayes.train()的輸入數據格式
    val training: RDD[LabeledPoint] = tfIdf.map {
      vector: Vector => LabeledPoint(getDoubleOfLabel(labelList.next()), vector)
    }
    //訓練函數訓練,
    NaiveBayes.train(training, lambda = 1.0, modelType = "multinomial")
  }

  //測試函數,參數model為訓練集訓練后的模型
  def test(sc: SparkContext, testSegPath: String, testLabelListPath: String, model: NaiveBayesModel): Double = {

    //讀取測試數據集分詞和標簽數據
    val (documents, labelList) = getDocumentsAndLabels(sc, testSegPath, testLabelListPath)

    //和訓練的步驟差不多
    val hashingTF: HashingTF = new HashingTF()
    val tf: RDD[Vector] = hashingTF.transform(documents)
    tf.cache()
    val idf: IDFModel = new IDF(minDocFreq = 3).fit(tf)
    val tfIdf: RDD[Vector] = idf.transform(tf)
    val test: RDD[LabeledPoint] = tfIdf.map {
      vector: Vector => LabeledPoint(getDoubleOfLabel(labelList.next()), vector)
    }
    //預測
    val predictionAndLabel: RDD[(Double, Double)] = test.map((p: LabeledPoint) => (model.predict(p.features), p.label))
    //計算准確率
    1.0 * predictionAndLabel.filter((x: (Double, Double)) => x._1 == x._2).count() / test.count()
  }

  //獲取標簽對應的Double數值,將標簽中的數組作為標簽對應的數值
  //C11Space -> 11.0
  def getDoubleOfLabel(label: String): Double = {
    label.split("-")(0).tail.toDouble
  }

  def main(args: Array[String]): Unit = {
    //新建spark上下文
    val conf: SparkConf = new SparkConf().setAppName("Classification").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    //調用處理函數
    println(test(sc, "./test/test_seg.txt",
      "./test/test_label_list.txt",
      train(sc,
        "./train/train_seg.txt",
        "./train/train_label_list.txt"
      )
    )
    )
  }
}


到此分詞的步驟就結束了,要想提高分詞的准確率可以嘗試不同的分詞工具和文本分類算法


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM