在Spark1.2之后,Spark自帶實現TF-IDF接口,只要直接調用就可以,但實際上,Spark自帶的詞典大小設置較於古板,如果設置小了,則導致無法計算,如果設置大了,Driver端回收數據的時候,容易發生OOM,所以更多時候都是自己根據實際情況手動實現TF-IDF。不過,在本篇文章中,兩種方式都會介紹。
數據准備:
val df = ss.sql("select * from bigdatas.news_seg") //如果hive表的數據沒有切詞,則先對數據進行切詞操作(hive里面每一行是用空格將各個詞連接的字符串,或者說是一篇文章,結尾使用##@@##標識),得到一個數組類型數據
val df_seg = df.selectExpr("split(split(sentence,'##@@##')[0],' ') as seg")
一、Spark自帶TF-IDF
1、Spark自帶TF實現
首先需要實例化HashingTF,這個類用於根據給傳入的各篇已經分好詞的文章,對里面的每個詞進行hashing計算,每個hashing值對應詞表的一個位置,以及對每個詞在每篇文章中的一個統計;
這個類有一個方法setBinaty()可以設置其統計時的計算方式:多項式分布計算和伯努利分布計算:
- setBinary(false):多項式分布計算,一個詞在一篇文章中出現多少次,計算多少次;
- setBinary(true):伯努利分布計算,一個詞在一篇文章中,不管多少次,只要出現了,就為1,否則為0
還有一個重要方法setNumFeatures(),用於設置詞表的大小,默認是2^18。
實例化HashingTF之后,使用transform就可以計算詞頻(TF)。
TF代碼實現:
// 多項式分布計算
val hashingTF = new HashingTF() .setBinary(false) .setInputCol("seg") .setOutputCol("feature_tf") .setNumFeatures(1<<18) // 伯努利分布計算
val hashingTF_BN = new HashingTF() .setBinary(true) .setInputCol("seg") .setOutputCol("feature_tf") .setNumFeatures(1<<18) /** * hashingTF.transform(df_seg):轉換之后會在原來基礎上增加一列,就是setOutputCol("feature_tf")設置的列 * 新增列的數據結構為:(詞表大小,[該行數據的每個詞對應詞表的hashCode],[該行數據的每個詞在該行數據出現的次數,即多項式統計詞頻]) */ val df_tf = hashingTF.transform(df_seg).select("feature_tf")
最后列“feature_tf”的數據結構為(詞典大小, [hashingCode], [term freq])。
2、Spark自帶實現TF-IDF
對word進行idf加權(完成tf計算的基礎上)
實現原理跟上一步的TF類似,但多出一步,這一步是用於掃描一次上一步計算出來的tf數據。
代碼如下:
val idf = new IDF() .setInputCol("feature_tf") .setOutputCol("feature_tfidf") .setMinDocFreq(2) //fit():內部對df_tf進行遍歷,統計doc Freq,這個操作是在tf完成后才做的
val idfModel = idf.fit(df_tf) val df_tfidf = idfModel.transform(df_tf).select("feature_tfidf")
二、自己實現TF-IDF
手動實現,有計算的先后問題,必須先算DocFreq(DF),再算TermFreq(TF)。
1、doc Freq 文檔頻率計算 -> 同時可以得到所有文章的單詞集合(詞典)
df的數據是每一行代表一篇文章,那么在計算某個詞出現的文章次數,那么轉化為某個詞的統計。即將一篇文章切好詞之后,放在一個set集合里面,表示這個set集合的每個詞出現1次;那么將所有文章的詞都切好,放在set集合里面,每篇文章擁有一個set集合,然后再根據詞groupBy,Count,就可以得到每個詞的DocFreq。
val setUDF = udf((str:String)=>str.split(" ").distinct) // 1.1、對每篇文章的詞進行去重操作,即set集合
val df_set = df.withColumn("words_set",setUDF(df("sentence"))) // 1.2、行轉列,groupby、count,順帶可以求出詞典大小,以及每個詞對應在詞典的位置index
val docFreq_map = df_set.select(explode(df_set("words_set")).as("word")) .groupBy("word") .count() .rdd .map(x=>(x(0).toString,x(1).toString)) .collectAsMap() //collect有一個重要特性,就是會將數據回收到Driver端,方便分發
上面計算出來的數據格式為:Map(word->wordCount),或理解為:Map(word->DocFreq)
除了計算出DF外,還要順便計算詞典大小,因為詞典大小代表了向量的大小,以及每個詞對應詞典的位置。
val dictSize = docFreq_map.size // 對單詞進行編碼,得到索引,類型為int,每個詞對應於[0,dictSize-1]區間的一個位置
val wordEncode = docFreq_map.keys.zipWithIndex.toMap
2、term Freq 詞頻計算 -> 同時計算tf-idf
詞頻計算其實就是做wordCount,這里重點還需要順帶計算TF-IDF。
val docCount = df_seg.count() val mapUDF = udf { str: String =>
//每一行處理就是處理一篇文章 // wordCOunt
val tfMap = str.split("##@##")(0).split(" ") .map((_,1L)) .groupBy(_._1) .mapValues(_.length) // tfMap{term->termCount} // docFreq_map{term->termDocCount} // wordEncode{term->index} // 處理后的tfIDFMap數據結果為:[(index:int,tf_idf:Double)] //必須要處理成這種形式
val tfIDFMap = tfMap.map{x=> val idf_v = math.log10(docCount.toDouble/(docFreq_map.getOrElse(x._1,"0.0").toDouble+1)) (wordEncode.getOrElse(x._1,0),x._2.toDouble * idf_v) } // 做成向量:第一個參數為向量大小(詞典大小);第二個參數用於給指定的index賦值為tf-idf
Vectors.sparse(dictSize,tfIDFMap.toSeq) } val dfTF = df.withColumn("tf_idf",mapUDF(df("sentence")))
三、完整Demo代碼
package com.cjs import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.ml.feature.HashingTF import org.apache.spark.ml.feature.IDF import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object TFIDFTransform { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) val conf = new SparkConf() .set("spark.some.config.option","some-value") val ss = SparkSession .builder() .config(conf) .enableHiveSupport() .appName("test_tf-idf") //.master("local[2]") //單機版配置,集群情況下,可以去掉
.getOrCreate() val df = ss.sql("select * from bigdatas.news_seg") //如果hive表的數據沒有切詞,則先對數據進行切詞操作(hive里面每一行是用空格將各個詞連接的字符串,或者說是一篇文章,結尾使用##@@##標識),得到一個數組類型數據
val df_seg = df.selectExpr("split(split(sentence,'##@@##')[0],' ') as seg") val docCount = df_seg.count() //一、spark自帶tf-idf實現 //詞典默認是2^20,先給一個詞的一個hashCode,對應於詞典的一個位置 //詞典空間過大,Driver進行數據回收時,容易出現OOM // 1、spark自帶TF實現
/** setBinary: false:多項式分布 -> 一個詞在一篇文章中出現多少次,計算多少次 true:伯努利分布 -> 一個詞在一篇文章中,不管多少次,只要出現了,就為1,否則為0 **/
/** * setInputCol("seg") :輸入參數(DF)的列名 * setOutputCol("feature_tf"):輸出結果(結果)的列名 * setNumFeatures(1<<18):設置詞表大小,默認是1<<18 */
// 多項式分布計算
val hashingTF = new HashingTF() .setBinary(false) .setInputCol("seg") .setOutputCol("feature_tf") .setNumFeatures(1<<18) // 伯努利分布計算
val hashingTF_BN = new HashingTF() .setBinary(true) .setInputCol("seg") .setOutputCol("feature_tf") .setNumFeatures(1<<18) /** * hashingTF.transform(df_seg):轉換之后會在原來基礎上增加一列,就是setOutputCol("feature_tf")設置的列 * 新增列的數據結構為:(詞表大小,[該行數據的每個詞對應詞表的hashCode],[該行數據的每個詞在該行數據出現的次數,即多項式統計詞頻]) */ val df_tf = hashingTF.transform(df_seg).select("feature_tf") // 2、spark自帶IDF實現, 對word進行idf加權(完成tf計算的基礎上)
val idf = new IDF() .setInputCol("feature_tf") .setOutputCol("feature_tfidf") .setMinDocFreq(2) //fit():內部對df_tf進行遍歷,統計doc Freq,這個操作是在tf完成后才做的
val idfModel = idf.fit(df_tf) val df_tfidf = idfModel.transform(df_tf).select("feature_tfidf") //二、自己實現tf-idf,有順序的實現 //1、doc Freq 文檔頻率計算 -> 同時可以得到所有文章的單詞集合(詞典)
val setUDF = udf((str:String)=>str.split(" ").distinct) // 1.1、對每篇文章的詞進行去重操作,即set集合
val df_set = df.withColumn("words_set",setUDF(df("sentence"))) // 1.2、行轉列,groupby、count,順帶可以求出詞典大小,以及每個詞對應在詞典的位置index
val docFreq_map = df_set.select(explode(df_set("words_set")).as("word")) .groupBy("word") .count() .rdd // .map(x=>(x(0).toString,x(1).toString))
.map(x=>(x(0).toString,math.log10(docCount.toDouble/(x(1).toString.toDouble+1)))) //順帶計算idf
.collectAsMap() //collect有一個重要特性,就是會將數據回收到Driver端,方便分發
val dictSize = docFreq_map.size // 對單詞進行編碼,得到索引,類型為int,每個詞對應於[0,dictSize-1]區間的一個位置
val wordEncode = docFreq_map.keys.zipWithIndex.toMap //2、term Freq 詞頻計算 // 返回數據結構:
val mapUDF = udf { str: String =>
//每一行處理就是處理一篇文章 // wordCOunt
val tfMap = str.split("##@##")(0).split(" ") .map((_,1L)) .groupBy(_._1) .mapValues(_.length) // tfMap{term->termCount} // docFreq_map{term->termDocCount} // wordEncode{term->index} // 處理后的tfIDFMap數據結果為:[(index:int,tf_idf:Double)] //必須要處理成這種形式
val tfIDFMap = tfMap.map{x=>
// val idf_v = math.log10(docCount.toDouble/(docFreq_map.getOrElse(x._1,"0.0").toDouble+1)) // (wordEncode.getOrElse(x._1,0),x._2.toDouble * idf_v)
val idf_v = docFreq_map.getOrElse(x._1,0.0) (wordEncode.getOrElse(x._1,0),x._2.toDouble * idf_v) //已經在第1步算出idf情況下使用
} // 做成向量:第一個參數為向量大小(詞典大小);第二個參數用於給指定的index賦值為tf-idf
Vectors.sparse(dictSize,tfIDFMap.toSeq) } val dfTF = df.withColumn("tf_idf",mapUDF(df("sentence"))) } }