Spark機器學習8· 文本處理(spark-shell)



Spark機器學習

自然語言處理(NLP,Natural Language Processing)

  • 提取特征
  • 建模
  • 機器學習

TF-IDF(詞頻 term frequency–逆向文件頻率 inverse document frequency)

  • 短語加權:根據詞頻,為單詞賦予權值
  • 特征哈希:使用哈希方程對特征賦予向量下標

0 運行環境

tar xfvz 20news-bydate.tar.gz export SPARK_HOME=/Users/erichan/Garden/spark-1.5.1-bin-hadoop2.6 cd $SPARK_HOME bin/spark-shell --name my_mlib --packages org.jblas:jblas:1.2.4-SNAPSHOT --driver-memory 4G --executor-memory 4G --driver-cores 2 

1 提取特征

val PATH = "/Users/erichan/sourcecode/book/Spark機器學習/20news-bydate" val path = PATH+"/20news-bydate-train/*" val rdd = sc.wholeTextFiles(path) println(rdd.count) 

11314

查看新聞組主題

val newsgroups = rdd.map { case (file, text) => file.split("/").takeRight(2).head } val countByGroup = newsgroups.map(n => (n, 1)).reduceByKey(_ + _).collect.sortBy(-_._2).mkString("\n") println(countByGroup) 

(rec.sport.hockey,600)
(soc.religion.christian,599)
(rec.motorcycles,598)
(rec.sport.baseball,597)
(sci.crypt,595)
(rec.autos,594)
(sci.med,594)
(comp.windows.x,593)
(sci.space,593)
(sci.electronics,591)
(comp.os.ms-windows.misc,591)
(comp.sys.ibm.pc.hardware,590)
(misc.forsale,585)
(comp.graphics,584)
(comp.sys.mac.hardware,578)
(talk.politics.mideast,564)
(talk.politics.guns,546)
(alt.atheism,480)
(talk.politics.misc,465)
(talk.religion.misc,377)

2 建模

2.1 分詞

val text = rdd.map { case (file, text) => text } val whiteSpaceSplit = text.flatMap(t => t.split(" ").map(_.toLowerCase)) println(whiteSpaceSplit.distinct.count) println(whiteSpaceSplit.sample(true, 0.3, 42).take(100).mkString(",")) 

402978
from:,mathew,mathew,faq:,faq:,atheist,resources
summary:,music,--,fiction,,mantis,consultants,,uk.
supersedes:,290

archive-name:,1.0

,,,,,,,,,,,,,,,,,,,organizations

,organizations

,,,,,,,,,,,,,,,,stickers,and,and,the,from,from,in,to:,to:,ffrf,,256-8900

evolution,designs

evolution,a,stick,cars,,written
inside.,fish,us.

write,evolution,,,,,,,bay,can,get,get,,to,the
price,is,of,the,the,so,on.,and,foote.,,atheist,pp.,0-910309-26-4,,,atrocities,,foote:,aap.,,the

2.2 改進分詞

val nonWordSplit = text.flatMap(t => t.split("""\W+""").map(_.toLowerCase)) println(nonWordSplit.distinct.count) println(nonWordSplit.distinct.sample(true, 0.3, 42).take(100).mkString(",")) 
val regex = """[^0-9]*""".r val filterNumbers = nonWordSplit.filter(token => regex.pattern.matcher(token).matches) println(filterNumbers.distinct.count) println(filterNumbers.distinct.sample(true, 0.3, 42).take(100).mkString(",")) 

2.3 移除停用詞

val tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _) val oreringDesc = Ordering.by[(String, Int), Int](_._2) //println(tokenCounts.top(20)(oreringDesc).mkString("\n")) val stopwords = Set( "the","a","an","of","or","in","for","by","on","but", "is", "not", "with", "as", "was", "if", "they", "are", "this", "and", "it", "have", "from", "at", "my", "be", "that", "to" ) val tokenCountsFilteredStopwords = tokenCounts.filter { case (k, v) => !stopwords.contains(k) } //println(tokenCountsFilteredStopwords.top(20)(oreringDesc).mkString("\n")) val tokenCountsFilteredSize = tokenCountsFilteredStopwords.filter { case (k, v) => k.size >= 2 } println(tokenCountsFilteredSize.top(20)(oreringDesc).mkString("\n")) 

2.4 移除低頻詞

val oreringAsc = Ordering.by[(String, Int), Int](-_._2) //println(tokenCountsFilteredSize.top(20)(oreringAsc).mkString("\n")) val rareTokens = tokenCounts.filter{ case (k, v) => v < 2 }.map { case (k, v) => k }.collect.toSet val tokenCountsFilteredAll = tokenCountsFilteredSize.filter { case (k, v) => !rareTokens.contains(k) } println(tokenCountsFilteredAll.top(20)(oreringAsc).mkString("\n")) def tokenize(line: String): Seq[String] = { line.split("""\W+""") .map(_.toLowerCase) .filter(token => regex.pattern.matcher(token).matches) .filterNot(token => stopwords.contains(token)) .filterNot(token => rareTokens.contains(token)) .filter(token => token.size >= 2) .toSeq }  //println(text.flatMap(doc => tokenize(doc)).distinct.count) val tokens = text.map(doc => tokenize(doc)) println(tokens.first.take(20)) 

2.5 提取詞干

  • 標准NLP方法
  • 搜索引擎
    • NLTK
    • OpenNLP
    • Lucene

3 訓練模型

3.1 HashingTF 特征哈希

import org.apache.spark.mllib.linalg.{ SparseVector => SV } import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.feature.IDF // set the dimensionality of TF-IDF vectors to 2^18 val dim = math.pow(2, 18).toInt val hashingTF = new HashingTF(dim) val tf = hashingTF.transform(tokens) tf.cache 
val v = tf.first.asInstanceOf[SV] println(v.size) println(v.values.size) println(v.values.take(10).toSeq) println(v.indices.take(10).toSeq) 

262144
706
WrappedArray(1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0, 1.0)
WrappedArray(313, 713, 871, 1202, 1203, 1209, 1795, 1862, 3115, 3166)

fit & transform

val idf = new IDF().fit(tf)
val tfidf = idf.transform(tf) val v2 = tfidf.first.asInstanceOf[SV] println(v2.values.size) println(v2.values.take(10).toSeq) println(v2.indices.take(10).toSeq) 

706
WrappedArray(2.3869085659322193, 4.670445463955571, 6.561295835827856, 4.597686109673142, 8.932700215224111, 5.750365619611528, 2.1871123786150006, 5.520408782213984, 3.4312512246662714, 1.7430324343790569)
WrappedArray(313, 713, 871, 1202, 1203, 1209, 1795, 1862, 3115, 3166)

3.2 分析權重

val minMaxVals = tfidf.map { v => val sv = v.asInstanceOf[SV] (sv.values.min, sv.values.max) } val globalMinMax = minMaxVals.reduce { case ((min1, max1), (min2, max2)) => (math.min(min1, min2), math.max(max1, max2)) } println(globalMinMax) 

globalMinMax: (Double, Double) = (0.0,66155.39470409753)

常用詞

val common = sc.parallelize(Seq(Seq("you", "do", "we"))) val tfCommon = hashingTF.transform(common) val tfidfCommon = idf.transform(tfCommon) val commonVector = tfidfCommon.first.asInstanceOf[SV] println(commonVector.values.toSeq) 

WrappedArray(0.9965359935704624, 1.3348773448236835, 0.5457486182039175)

不常出現的單詞

val uncommon = sc.parallelize(Seq(Seq("telescope", "legislation", "investment"))) val tfUncommon = hashingTF.transform(uncommon) val tfidfUncommon = idf.transform(tfUncommon) val uncommonVector = tfidfUncommon.first.asInstanceOf[SV] println(uncommonVector.values.toSeq) 

WrappedArray(5.3265513728351666, 5.308532867332488, 5.483736956357579)

4 使用模型

4.1 余弦相似度

import breeze.linalg._ val hockeyText = rdd.filter { case (file, text) => file.contains("hockey") } val hockeyTF = hockeyText.mapValues(doc => hashingTF.transform(tokenize(doc))) val hockeyTfIdf = idf.transform(hockeyTF.map(_._2)) val hockey1 = hockeyTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV] val breeze1 = new SparseVector(hockey1.indices, hockey1.values, hockey1.size) val hockey2 = hockeyTfIdf.sample(true, 0.1, 43).first.asInstanceOf[SV] val breeze2 = new SparseVector(hockey2.indices, hockey2.values, hockey2.size) val cosineSim = breeze1.dot(breeze2) / (norm(breeze1) * norm(breeze2)) println(cosineSim) 

cosineSim: Double = 0.060250114361164626

val graphicsText = rdd.filter { case (file, text) => file.contains("comp.graphics") } val graphicsTF = graphicsText.mapValues(doc => hashingTF.transform(tokenize(doc))) val graphicsTfIdf = idf.transform(graphicsTF.map(_._2)) val graphics = graphicsTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV] val breezeGraphics = new SparseVector(graphics.indices, graphics.values, graphics.size) val cosineSim2 = breeze1.dot(breezeGraphics) / (norm(breeze1) * norm(breezeGraphics)) println(cosineSim2) 

cosineSim2: Double = 0.004664850323792852

val baseballText = rdd.filter { case (file, text) => file.contains("baseball") } val baseballTF = baseballText.mapValues(doc => hashingTF.transform(tokenize(doc))) val baseballTfIdf = idf.transform(baseballTF.map(_._2)) val baseball = baseballTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV] val breezeBaseball = new SparseVector(baseball.indices, baseball.values, baseball.size) val cosineSim3 = breeze1.dot(breezeBaseball) / (norm(breeze1) * norm(breezeBaseball)) println(cosineSim3) 

0.05047395039466008

4.2 學習單詞與主題的映射關系

多分類映射
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.classification.NaiveBayes import org.apache.spark.mllib.evaluation.MulticlassMetrics val newsgroupsMap = newsgroups.distinct.collect().zipWithIndex.toMap val zipped = newsgroups.zip(tfidf) val train = zipped.map { case (topic, vector) => LabeledPoint(newsgroupsMap(topic), vector) } train.cache 
朴素貝葉斯訓練
val model = NaiveBayes.train(train, lambda = 0.1) 
加載測試數據集
val testPath = PATH+"/20news-bydate-test/*" val testRDD = sc.wholeTextFiles(testPath) val testLabels = testRDD.map { case (file, text) => val topic = file.split("/").takeRight(2).head newsgroupsMap(topic) } val testTf = testRDD.map { case (file, text) => hashingTF.transform(tokenize(text)) } val testTfIdf = idf.transform(testTf) val zippedTest = testLabels.zip(testTfIdf) val test = zippedTest.map { case (topic, vector) => LabeledPoint(topic, vector) } 
計算准確度和多分類加權F-指標
val predictionAndLabel = test.map(p => (model.predict(p.features), p.label)) val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count() println(accuracy) 

0.7915560276155071

val metrics = new MulticlassMetrics(predictionAndLabel) println(metrics.weightedFMeasure) 

0.7810675969031116

5 評估

val rawTokens = rdd.map { case (file, text) => text.split(" ") } val rawTF = rawTokens.map(doc => hashingTF.transform(doc)) val rawTrain = newsgroups.zip(rawTF).map { case (topic, vector) => LabeledPoint(newsgroupsMap(topic), vector) } val rawModel = NaiveBayes.train(rawTrain, lambda = 0.1) val rawTestTF = testRDD.map { case (file, text) => hashingTF.transform(text.split(" ")) } val rawZippedTest = testLabels.zip(rawTestTF) val rawTest = rawZippedTest.map { case (topic, vector) => LabeledPoint(topic, vector) } val rawPredictionAndLabel = rawTest.map(p => (rawModel.predict(p.features), p.label)) val rawAccuracy = 1.0 * rawPredictionAndLabel.filter(x => x._1 == x._2).count() / rawTest.count() println(rawAccuracy) 

0.7648698884758365

val rawMetrics = new MulticlassMetrics(rawPredictionAndLabel) println(rawMetrics.weightedFMeasure) 

0.7653320418573546

6 Word2Vec模型

Word2Vec模型(分布向量表示):把每個單詞表示成一個向量,MLlib中使用skip-gram模型

6.1 訓練

import org.apache.spark.mllib.feature.Word2Vec val word2vec = new Word2Vec() word2vec.setSeed(42) // we do this to generate the same results each time val word2vecModel = word2vec.fit(tokens) 

6.2 使用

最相似的20個單詞
word2vecModel.findSynonyms("hockey", 20).foreach(println) 

(sport,1.4818968962277133)
(ecac,1.467546566194254)
(hispanic,1.4166835301985194)
(glens,1.4061103042432825)
(woofers,1.3810090447028116)
(tournament,1.3148823031671586)
(champs,1.3133863003013941)
(boxscores,1.307735040384543)
(aargh,1.274986851270267)
(ahl,1.265165428167253)
(playoff,1.2645991118770572)
(ncaa,1.2383382015648046)
(pool,1.2261154635870224)
(champion,1.2119919989539134)
(filinuk,1.2062208620660915)
(olympic,1.2026738930160243)
(motorcycles,1.2008032355579679)
(yankees,1.1989755767973371)
(calder,1.194001886835493)
(homeruns,1.1800625883573932)

word2vecModel.findSynonyms("legislation", 20).foreach(println) 

(accommodates,0.9918184454068688)
(briefed,0.9256758135452989)
(amended,0.9105987267173344)
(telephony,0.8679173760123956)
(pitted,0.8609974033962533)
(aclu,0.8605885863332372)
(licensee,0.8493930472487975)
(agency,0.836706135804648)
(policies,0.8337986602365566)
(senate,0.8327312936220903)
(businesses,0.8291191155630467)
(permit,0.8266658804181389)
(cpsr,0.8231228090944367)
(cooperation,0.8195562469006543)
(surveillance,0.8134342524628756)
(congress,0.8132899468772855)
(restricted,0.8115013134507126)
(procure,0.8096839595766356)
(inquiry,0.8086297702914405)
(industry,0.8077900093754752)

  • legislation 立法
  • aclu 美國公民自由協會
  • senate 參議院
  • surveillance 監視
  • inquiry 調查


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM